This is an automated email from the ASF dual-hosted git repository.

agozhiy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit a81c49653503b7c2f8707a769095e50838c8f8fa
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Mon Feb 17 20:32:32 2020 +0200

    DRILL-7589: Set temporary tests folder for UDF_DIRECTORY_LOCAL, fix 
allocators closing in BloomFilterTest and TestWriteToDisk, fix permissions 
issue for TestGracefulShutdown tests
    
    closes #1987
---
 .../apache/drill/exec/cache/TestWriteToDisk.java   | 106 +++-----
 .../exec/udf/dynamic/TestDynamicUDFSupport.java    |  11 +-
 .../drill/exec/work/filter/BloomFilterTest.java    | 274 ++++++---------------
 .../org/apache/drill/test/BaseDirTestWatcher.java  |  11 +
 .../java/org/apache/drill/test/ClusterFixture.java |   1 +
 .../org/apache/drill/test/OperatorFixture.java     |   1 +
 .../apache/drill/test/TestGracefulShutdown.java    | 100 ++++----
 7 files changed, 186 insertions(+), 318 deletions(-)

diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java 
b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
index db7f743..c6a68b6 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
@@ -18,95 +18,61 @@
 package org.apache.drill.exec.cache;
 
 import java.io.File;
-import java.util.List;
 
-import org.apache.drill.shaded.guava.com.google.common.io.Files;
-import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSets;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.test.TestTools;
+import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.exec.ExecTest;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.RemoteServiceSet;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.IntVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.junit.Rule;
 import org.junit.Test;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.junit.rules.TestRule;
-
-public class TestWriteToDisk extends ExecTest {
-  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(90000); // 
90secs
+public class TestWriteToDisk extends SubOperatorTest {
 
   @Test
-  @SuppressWarnings("static-method")
   public void test() throws Exception {
-    final List<ValueVector> vectorList = Lists.newArrayList();
-    final DrillConfig config = DrillConfig.create();
-    try (final RemoteServiceSet serviceSet = RemoteServiceSet
-        .getLocalServiceSet();
-        final Drillbit bit = new Drillbit(config, serviceSet)) {
-      bit.run();
-      final DrillbitContext context = bit.getContext();
-
-      final MaterializedField intField = MaterializedField.create("int", 
Types.required(TypeProtos.MinorType.INT));
-      final MaterializedField binField = MaterializedField.create("binary", 
Types.required(TypeProtos.MinorType.VARBINARY));
-      try (final IntVector intVector = (IntVector) 
TypeHelper.getNewVector(intField, context.getAllocator());
-          final VarBinaryVector binVector =
-              (VarBinaryVector) TypeHelper.getNewVector(binField, 
context.getAllocator())) {
-        AllocationHelper.allocate(intVector, 4, 4);
-        AllocationHelper.allocate(binVector, 4, 5);
-        vectorList.add(intVector);
-        vectorList.add(binVector);
+    VectorContainer container = expectedRowSet().container();
 
-        intVector.getMutator().setSafe(0, 0);
-        binVector.getMutator().setSafe(0, "ZERO".getBytes());
-        intVector.getMutator().setSafe(1, 1);
-        binVector.getMutator().setSafe(1, "ONE".getBytes());
-        intVector.getMutator().setSafe(2, 2);
-        binVector.getMutator().setSafe(2, "TWO".getBytes());
-        intVector.getMutator().setSafe(3, 3);
-        binVector.getMutator().setSafe(3, "THREE".getBytes());
-        intVector.getMutator().setValueCount(4);
-        binVector.getMutator().setValueCount(4);
+    WritableBatch batch = 
WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false);
 
-        VectorContainer container = new VectorContainer();
-        container.addCollection(vectorList);
-        container.setRecordCount(4);
-        WritableBatch batch = WritableBatch.getBatchNoHVWrap(
-            container.getRecordCount(), container, false);
-        VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(
-            batch, context.getAllocator());
+    VectorAccessibleSerializable wrap = new 
VectorAccessibleSerializable(batch, fixture.allocator());
 
-        final VectorAccessibleSerializable newWrap = new 
VectorAccessibleSerializable(
-            context.getAllocator());
-        try (final FileSystem fs = getLocalFileSystem()) {
-          final File tempDir = Files.createTempDir();
-          tempDir.deleteOnExit();
-          final Path path = new Path(tempDir.getAbsolutePath(), 
"drillSerializable");
-          try (final FSDataOutputStream out = fs.create(path)) {
-            wrap.writeToStream(out);
-          }
-
-          try (final FSDataInputStream in = fs.open(path)) {
-            newWrap.readFromStream(in);
-          }
-        }
+    VectorAccessibleSerializable newWrap = new 
VectorAccessibleSerializable(fixture.allocator());
+    try (FileSystem fs = ExecTest.getLocalFileSystem()) {
+      File tempDir = dirTestWatcher.getTmpDir();
+      tempDir.deleteOnExit();
+      Path path = new Path(tempDir.getAbsolutePath(), "drillSerializable");
+      try (FSDataOutputStream out = fs.create(path)) {
+        wrap.writeToStream(out);
+      }
 
-        newWrap.get();
+      try (FSDataInputStream in = fs.open(path)) {
+        newWrap.readFromStream(in);
       }
     }
+
+    RowSetUtilities.verify(expectedRowSet(), RowSets.wrap(newWrap.get()));
+  }
+
+  private RowSet expectedRowSet() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("int", TypeProtos.MinorType.INT)
+        .add("binary", TypeProtos.MinorType.VARBINARY)
+        .build();
+
+    return fixture.rowSetBuilder(schema)
+        .addRow(0, "ZERO".getBytes())
+        .addRow(1, "ONE".getBytes())
+        .addRow(2, "TWO".getBytes())
+        .addRow(3, "THREE".getBytes())
+        .build();
   }
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java
index dd9da22..c83d24a 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java
@@ -78,7 +78,6 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
 
   private static final String DEFAULT_JAR_NAME = "drill-custom-lower";
   private static URI fsUri;
-  private static File udfDir;
   private static File jarsDir;
   private static File buildDirectory;
   private static JarBuilder jarBuilder;
@@ -103,9 +102,10 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
 
   @Before
   public void setupNewDrillbit() throws Exception {
-    udfDir = dirTestWatcher.makeSubDir(Paths.get("udf"));
+    File udfLocalDir = new File(dirTestWatcher.getUdfDir(), "local");
     Properties overrideProps = new Properties();
-    overrideProps.setProperty(ExecConstants.UDF_DIRECTORY_ROOT, 
udfDir.getAbsolutePath());
+    overrideProps.setProperty(ExecConstants.UDF_DIRECTORY_ROOT, 
dirTestWatcher.getUdfDir().getAbsolutePath());
+    overrideProps.setProperty(ExecConstants.UDF_DIRECTORY_LOCAL, 
udfLocalDir.getAbsolutePath());
     overrideProps.setProperty(ExecConstants.UDF_DIRECTORY_FS, 
FileSystem.DEFAULT_FS);
     updateTestCluster(1, DrillConfig.create(overrideProps));
 
@@ -115,7 +115,6 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
   @After
   public void cleanup() throws Exception {
     closeClient();
-    FileUtils.cleanDirectory(udfDir);
     dirTestWatcher.clear();
   }
 
@@ -957,7 +956,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
     return spy;
   }
 
-  private class SimpleQueryRunner implements Runnable {
+  private static class SimpleQueryRunner implements Runnable {
 
     private final String query;
 
@@ -975,7 +974,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
     }
   }
 
-  private class TestBuilderRunner implements Runnable {
+  private static class TestBuilderRunner implements Runnable {
 
     private final TestBuilder testBuilder;
 
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
index 761a2cc..5875a40 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
@@ -17,18 +17,16 @@
  */
 package org.apache.drill.exec.work.filter;
 
-import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.scanner.ClassPathScanner;
 import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.util.function.CheckedFunction;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ValueVectorReadExpression;
-import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.FragmentContextImpl;
-import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
@@ -36,26 +34,23 @@ import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.RemoteServiceSet;
-import org.apache.drill.exec.vector.VarCharVector;
-import org.apache.drill.test.BaseTest;
+import org.apache.drill.test.SubOperatorTest;
 import org.junit.Assert;
 import org.junit.Test;
+
+import java.io.IOException;
 import java.util.Iterator;
 
-public class BloomFilterTest extends BaseTest {
-  public static DrillConfig c = DrillConfig.create();
+public class BloomFilterTest extends SubOperatorTest {
 
-  class TestRecordBatch implements RecordBatch {
+  private static class TestRecordBatch implements RecordBatch {
     private final VectorContainer container;
 
     public TestRecordBatch(VectorContainer container) {
       this.container = container;
-
     }
 
     @Override
@@ -85,7 +80,6 @@ public class BloomFilterTest extends BaseTest {
 
     @Override
     public void kill(boolean sendUpstream) {
-
     }
 
     @Override
@@ -133,214 +127,110 @@ public class BloomFilterTest extends BaseTest {
     }
   }
 
-
   @Test
   public void testNotExist() throws Exception {
-    Drillbit bit = new Drillbit(c, RemoteServiceSet.getLocalServiceSet(), 
ClassPathScanner.fromPrescan(c));
-    bit.run();
-    DrillbitContext bitContext = bit.getContext();
-    FunctionImplementationRegistry registry = 
bitContext.getFunctionImplementationRegistry();
-    FragmentContextImpl context = new FragmentContextImpl(bitContext, 
BitControl.PlanFragment.getDefaultInstance(), null, registry);
-    BufferAllocator bufferAllocator = bitContext.getAllocator();
-    //create RecordBatch
-    VarCharVector vector = new VarCharVector(SchemaBuilder.columnSchema("a", 
TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED), bufferAllocator);
-    vector.allocateNew();
-    int valueCount = 3;
-    VarCharVector.Mutator mutator = vector.getMutator();
-    mutator.setSafe(0, "a".getBytes());
-    mutator.setSafe(1, "b".getBytes());
-    mutator.setSafe(2, "c".getBytes());
-    mutator.setValueCount(valueCount);
-    VectorContainer vectorContainer = new VectorContainer();
-    TypedFieldId fieldId = vectorContainer.add(vector);
-    RecordBatch recordBatch = new TestRecordBatch(vectorContainer);
-    //construct hash64
-    ValueVectorReadExpression exp = new ValueVectorReadExpression(fieldId);
-    LogicalExpression[] expressions = new LogicalExpression[1];
-    expressions[0] = exp;
-    TypedFieldId[] fieldIds = new TypedFieldId[1];
-    fieldIds[0] = fieldId;
-    ValueVectorHashHelper valueVectorHashHelper = new 
ValueVectorHashHelper(recordBatch, context);
-    ValueVectorHashHelper.Hash64 hash64 = 
valueVectorHashHelper.getHash64(expressions, fieldIds);
-
-    //construct BloomFilter
-    int numBytes = BloomFilter.optimalNumOfBytes(3, 0.03);
-
-    BloomFilter bloomFilter = new BloomFilter(numBytes, bufferAllocator);
-    for (int i = 0; i < valueCount; i++) {
-      long hashCode = hash64.hash64Code(i, 0, 0);
-      bloomFilter.insert(hashCode);
-    }
+    RowSet.SingleRowSet probeRowSet = fixture.rowSetBuilder(getTestSchema())
+        .addRow("f")
+        .build();
 
-    //-----------------create probe side RecordBatch---------------------
-    VarCharVector probeVector = new 
VarCharVector(SchemaBuilder.columnSchema("a", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.REQUIRED), bufferAllocator);
-    probeVector.allocateNew();
-    int probeValueCount = 1;
-    VarCharVector.Mutator mutator1 = probeVector.getMutator();
-    mutator1.setSafe(0, "f".getBytes());
-    mutator1.setValueCount(probeValueCount);
-    VectorContainer probeVectorContainer = new VectorContainer();
-    TypedFieldId probeFieldId = probeVectorContainer.add(probeVector);
-    RecordBatch probeRecordBatch = new TestRecordBatch(probeVectorContainer);
-    ValueVectorReadExpression probExp = new 
ValueVectorReadExpression(probeFieldId);
-    LogicalExpression[] probExpressions = new LogicalExpression[1];
-    probExpressions[0] = probExp;
-    TypedFieldId[] probeFieldIds = new TypedFieldId[1];
-    probeFieldIds[0] = probeFieldId;
-    ValueVectorHashHelper probeValueVectorHashHelper = new 
ValueVectorHashHelper(probeRecordBatch, context);
-    ValueVectorHashHelper.Hash64 probeHash64 = 
probeValueVectorHashHelper.getHash64(probExpressions, probeFieldIds);
-    long hashCode = probeHash64.hash64Code(0, 0, 0);
-    boolean contain = bloomFilter.find(hashCode);
-    Assert.assertFalse(contain);
-    bloomFilter.getContent().close();
-    vectorContainer.clear();
-    probeVectorContainer.clear();
-    context.close();
-    bitContext.close();
-    bit.close();
+    checkBloomFilterResult(probeRowSet, BloomFilterTest::getSimpleBloomFilter, 
false);
   }
 
-
   @Test
   public void testExist() throws Exception {
+    RowSet.SingleRowSet probeRowSet = fixture.rowSetBuilder(getTestSchema())
+        .addRow("a")
+        .build();
 
-    Drillbit bit = new Drillbit(c, RemoteServiceSet.getLocalServiceSet(), 
ClassPathScanner.fromPrescan(c));
-    bit.run();
-    DrillbitContext bitContext = bit.getContext();
-    FunctionImplementationRegistry registry = 
bitContext.getFunctionImplementationRegistry();
-    FragmentContextImpl context = new FragmentContextImpl(bitContext, 
BitControl.PlanFragment.getDefaultInstance(), null, registry);
-    BufferAllocator bufferAllocator = bitContext.getAllocator();
-    //create RecordBatch
-    VarCharVector vector = new VarCharVector(SchemaBuilder.columnSchema("a", 
TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED), bufferAllocator);
-    vector.allocateNew();
-    int valueCount = 3;
-    VarCharVector.Mutator mutator = vector.getMutator();
-    mutator.setSafe(0, "a".getBytes());
-    mutator.setSafe(1, "b".getBytes());
-    mutator.setSafe(2, "c".getBytes());
-    mutator.setValueCount(valueCount);
-    VectorContainer vectorContainer = new VectorContainer();
-    TypedFieldId fieldId = vectorContainer.add(vector);
-    RecordBatch recordBatch = new TestRecordBatch(vectorContainer);
-    //construct hash64
-    ValueVectorReadExpression exp = new ValueVectorReadExpression(fieldId);
-    LogicalExpression[] expressions = new LogicalExpression[1];
-    expressions[0] = exp;
-    TypedFieldId[] fieldIds = new TypedFieldId[1];
-    fieldIds[0] = fieldId;
-    ValueVectorHashHelper valueVectorHashHelper = new 
ValueVectorHashHelper(recordBatch, context);
-    ValueVectorHashHelper.Hash64 hash64 = 
valueVectorHashHelper.getHash64(expressions, fieldIds);
-
-    //construct BloomFilter
-    int numBytes = BloomFilter.optimalNumOfBytes(3, 0.03);
+    checkBloomFilterResult(probeRowSet, BloomFilterTest::getSimpleBloomFilter, 
true);
+  }
+
+  @Test
+  public void testMerged() throws Exception {
+    RowSet.SingleRowSet probeRowSet = fixture.rowSetBuilder(getTestSchema())
+        .addRow("a")
+        .build();
 
-    BloomFilter bloomFilter = new BloomFilter(numBytes, bufferAllocator);
+    checkBloomFilterResult(probeRowSet, this::getDisjunctionBloomFilter, true);
+  }
+
+  private BloomFilter getDisjunctionBloomFilter(ValueVectorHashHelper.Hash64 
hash64) throws SchemaChangeException {
+    int numBytes = BloomFilter.optimalNumOfBytes(3, 0.03);
+    BloomFilter bloomFilter = new BloomFilter(numBytes, fixture.allocator());
+    int valueCount = 3;
     for (int i = 0; i < valueCount; i++) {
       long hashCode = hash64.hash64Code(i, 0, 0);
       bloomFilter.insert(hashCode);
     }
 
-    //-----------------create probe side RecordBatch---------------------
-    VarCharVector probeVector = new 
VarCharVector(SchemaBuilder.columnSchema("a", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.REQUIRED), bufferAllocator);
-    probeVector.allocateNew();
-    int probeValueCount = 1;
-    VarCharVector.Mutator mutator1 = probeVector.getMutator();
-    mutator1.setSafe(0, "a".getBytes());
-    mutator1.setValueCount(probeValueCount);
-    VectorContainer probeVectorContainer = new VectorContainer();
-    TypedFieldId probeFieldId = probeVectorContainer.add(probeVector);
-    RecordBatch probeRecordBatch = new TestRecordBatch(probeVectorContainer);
-    ValueVectorReadExpression probExp = new 
ValueVectorReadExpression(probeFieldId);
-    LogicalExpression[] probExpressions = new LogicalExpression[1];
-    probExpressions[0] = probExp;
-    TypedFieldId[] probeFieldIds = new TypedFieldId[1];
-    probeFieldIds[0] = probeFieldId;
-    ValueVectorHashHelper probeValueVectorHashHelper = new 
ValueVectorHashHelper(probeRecordBatch, context);
-    ValueVectorHashHelper.Hash64 probeHash64 = 
probeValueVectorHashHelper.getHash64(probExpressions, probeFieldIds);
-    long hashCode = probeHash64.hash64Code(0, 0, 0);
-    boolean contain = bloomFilter.find(hashCode);
-    Assert.assertTrue(contain);
+    BloomFilter disjunctionBloomFilter = getSimpleBloomFilter(hash64);
+    disjunctionBloomFilter.or(bloomFilter);
+
     bloomFilter.getContent().close();
-    vectorContainer.clear();
-    probeVectorContainer.clear();
-    context.close();
-    bitContext.close();
-    bit.close();
+
+    return disjunctionBloomFilter;
   }
 
+  private static BloomFilter getSimpleBloomFilter(ValueVectorHashHelper.Hash64 
hash64) throws SchemaChangeException {
+    int numBytes = BloomFilter.optimalNumOfBytes(3, 0.03);
 
-  @Test
-  public void testMerged() throws Exception {
+    BloomFilter bloomFilter = new BloomFilter(numBytes, fixture.allocator());
 
-    Drillbit bit = new Drillbit(c, RemoteServiceSet.getLocalServiceSet(), 
ClassPathScanner.fromPrescan(c));
-    bit.run();
-    DrillbitContext bitContext = bit.getContext();
-    FunctionImplementationRegistry registry = 
bitContext.getFunctionImplementationRegistry();
-    FragmentContextImpl context = new FragmentContextImpl(bitContext, 
BitControl.PlanFragment.getDefaultInstance(), null, registry);
-    BufferAllocator bufferAllocator = bitContext.getAllocator();
-    //create RecordBatch
-    VarCharVector vector = new VarCharVector(SchemaBuilder.columnSchema("a", 
TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED), bufferAllocator);
-    vector.allocateNew();
     int valueCount = 3;
-    VarCharVector.Mutator mutator = vector.getMutator();
-    mutator.setSafe(0, "a".getBytes());
-    mutator.setSafe(1, "b".getBytes());
-    mutator.setSafe(2, "c".getBytes());
-    mutator.setValueCount(valueCount);
-    VectorContainer vectorContainer = new VectorContainer();
-    TypedFieldId fieldId = vectorContainer.add(vector);
-    RecordBatch recordBatch = new TestRecordBatch(vectorContainer);
-    //construct hash64
-    ValueVectorReadExpression exp = new ValueVectorReadExpression(fieldId);
-    LogicalExpression[] expressions = new LogicalExpression[1];
-    expressions[0] = exp;
-    TypedFieldId[] fieldIds = new TypedFieldId[1];
-    fieldIds[0] = fieldId;
-    ValueVectorHashHelper valueVectorHashHelper = new 
ValueVectorHashHelper(recordBatch, context);
-    ValueVectorHashHelper.Hash64 hash64 = 
valueVectorHashHelper.getHash64(expressions, fieldIds);
-
-    //construct BloomFilter
-    int numBytes = BloomFilter.optimalNumOfBytes(3, 0.03);
-
-    BloomFilter bloomFilter = new BloomFilter(numBytes, bufferAllocator);
     for (int i = 0; i < valueCount; i++) {
       long hashCode = hash64.hash64Code(i, 0, 0);
       bloomFilter.insert(hashCode);
     }
+    return bloomFilter;
+  }
 
-    BloomFilter bloomFilter1 = new BloomFilter(numBytes, bufferAllocator);
-    for (int i = 0; i < valueCount; i++) {
-      long hashCode = hash64.hash64Code(i, 0, 0);
-      bloomFilter1.insert(hashCode);
+  private void checkBloomFilterResult(RowSet.SingleRowSet probeRowSet,
+      CheckedFunction<ValueVectorHashHelper.Hash64, BloomFilter, 
SchemaChangeException> bloomFilterProvider,
+      boolean matches) throws ClassTransformationException, IOException, 
SchemaChangeException {
+    try (FragmentContext context = fixture.getFragmentContext()) {
+      // create build side batch
+      RowSet.SingleRowSet batchRowSet = fixture.rowSetBuilder(getTestSchema())
+          .addRow("a")
+          .addRow("b")
+          .addRow("c")
+          .build();
+
+      // create build side Hash64
+      ValueVectorHashHelper.Hash64 hash64 = getHash64(context, batchRowSet);
+
+      // construct BloomFilter
+      BloomFilter bloomFilter = bloomFilterProvider.apply(hash64);
+
+      // create probe side Hash64
+      ValueVectorHashHelper.Hash64 probeHash64 = getHash64(context, 
probeRowSet);
+
+      long hashCode = probeHash64.hash64Code(0, 0, 0);
+
+      Assert.assertEquals(matches, bloomFilter.find(hashCode));
+
+      bloomFilter.getContent().close();
+      batchRowSet.clear();
+      probeRowSet.clear();
     }
+  }
 
-    bloomFilter.or(bloomFilter1);
-
-    //-----------------create probe side RecordBatch---------------------
-    VarCharVector probeVector = new 
VarCharVector(SchemaBuilder.columnSchema("a", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.REQUIRED), bufferAllocator);
-    probeVector.allocateNew();
-    int probeValueCount = 1;
-    VarCharVector.Mutator mutator1 = probeVector.getMutator();
-    mutator1.setSafe(0, "a".getBytes());
-    mutator1.setValueCount(probeValueCount);
-    VectorContainer probeVectorContainer = new VectorContainer();
-    TypedFieldId probeFieldId = probeVectorContainer.add(probeVector);
-    RecordBatch probeRecordBatch = new TestRecordBatch(probeVectorContainer);
+  private static TupleMetadata getTestSchema() {
+    return new SchemaBuilder()
+        .add("a", TypeProtos.MinorType.VARCHAR)
+        .build();
+  }
+
+  private static ValueVectorHashHelper.Hash64 getHash64(FragmentContext 
context,
+      RowSet.SingleRowSet probeRowSet) throws ClassTransformationException, 
IOException, SchemaChangeException {
+
+    RecordBatch probeRecordBatch = new 
TestRecordBatch(probeRowSet.container());
+    TypedFieldId probeFieldId = 
probeRecordBatch.getValueVectorId(SchemaPath.getSimplePath("a"));
     ValueVectorReadExpression probExp = new 
ValueVectorReadExpression(probeFieldId);
     LogicalExpression[] probExpressions = new LogicalExpression[1];
     probExpressions[0] = probExp;
     TypedFieldId[] probeFieldIds = new TypedFieldId[1];
     probeFieldIds[0] = probeFieldId;
     ValueVectorHashHelper probeValueVectorHashHelper = new 
ValueVectorHashHelper(probeRecordBatch, context);
-    ValueVectorHashHelper.Hash64 probeHash64 = 
probeValueVectorHashHelper.getHash64(probExpressions, probeFieldIds);
-    long hashCode = probeHash64.hash64Code(0, 0, 0);
-    boolean contain = bloomFilter.find(hashCode);
-    Assert.assertTrue(contain);
-    bloomFilter.getContent().close();
-    vectorContainer.clear();
-    probeVectorContainer.clear();
-    context.close();
-    bitContext.close();
-    bit.close();
+    return probeValueVectorHashHelper.getHash64(probExpressions, 
probeFieldIds);
   }
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java
index 9a538d8..7f65750 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java
@@ -73,6 +73,7 @@ public class BaseDirTestWatcher extends DirTestWatcher {
   private File dfsTestTmpParentDir;
   private File dfsTestTmpDir;
   private File rootDir;
+  private File udfDir;
 
   /**
    * Creates a {@link BaseDirTestWatcher} which does not delete it's temp 
directories at the end of tests.
@@ -103,6 +104,7 @@ public class BaseDirTestWatcher extends DirTestWatcher {
     tmpDir = makeSubDir(Paths.get("tmp"));
     storeDir = makeSubDir(Paths.get("store"));
     dfsTestTmpParentDir = makeSubDir(Paths.get("dfsTestTmp"));
+    udfDir = makeSubDir(Paths.get("udf"));
 
     newDfsTestTmpDir();
   }
@@ -118,6 +120,7 @@ public class BaseDirTestWatcher extends DirTestWatcher {
       FileUtils.cleanDirectory(tmpDir);
       FileUtils.cleanDirectory(storeDir);
       FileUtils.cleanDirectory(dfsTestTmpDir);
+      FileUtils.cleanDirectory(udfDir);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -168,6 +171,14 @@ public class BaseDirTestWatcher extends DirTestWatcher {
   }
 
   /**
+   * Gets the temp directory that should be used as base directory for dynamic 
UDFs.
+   * @return The temp directory that should be used as base directory for 
dynamic UDFs.
+   */
+  public File getUdfDir() {
+    return udfDir;
+  }
+
+  /**
    * This methods creates a new directory which can be mapped to 
<b>dfs.tmp</b>.
    */
   public void newDfsTestTmpDir() {
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index 735c97a..a7e8ef7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -584,6 +584,7 @@ public class ClusterFixture extends BaseFixture implements 
AutoCloseable {
     Properties props = new Properties();
     props.putAll(ClusterFixture.TEST_CONFIGURATIONS);
     props.setProperty(ExecConstants.DRILL_TMP_DIR, 
dirTestWatcher.getTmpDir().getAbsolutePath());
+    props.setProperty(ExecConstants.UDF_DIRECTORY_ROOT, 
dirTestWatcher.getUdfDir().getAbsolutePath());
     props.setProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, 
dirTestWatcher.getStoreDir().getAbsolutePath());
 
     builder.configBuilder.configProps(props);
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index b9f4b43..3ded2f8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -127,6 +127,7 @@ public class OperatorFixture extends BaseFixture implements 
AutoCloseable {
         configBuilder.put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, 
dirTestWatcher.getStoreDir().getAbsolutePath());
         configBuilder.put(ExecConstants.SPILL_DIRS, 
Lists.newArrayList(dirTestWatcher.getSpillDir().getAbsolutePath()));
         configBuilder.put(ExecConstants.HASHJOIN_SPILL_DIRS, 
Lists.newArrayList(dirTestWatcher.getSpillDir().getAbsolutePath()));
+        configBuilder.put(ExecConstants.UDF_DIRECTORY_ROOT, 
dirTestWatcher.getUdfDir().getAbsolutePath());
       }
     }
 
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java
index c5d508b..5e27f75 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java
@@ -31,10 +31,11 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestRule;
 
+import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
-import java.io.IOException;
+import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.lang.reflect.Field;
 import java.net.HttpURLConnection;
@@ -50,29 +51,30 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @Category({SlowTest.class})
-public class TestGracefulShutdown extends BaseTestQuery {
+public class TestGracefulShutdown extends ClusterTest {
 
   @Rule
   public final TestRule TIMEOUT = TestTools.getTimeoutRule(120_000);
 
   @BeforeClass
   public static void setUpTestData() throws Exception {
-    for( int i = 0; i < 300; i++) {
+    for (int i = 0; i < 300; i++) {
       setupFile(i);
     }
   }
 
-  private static void enableWebServer(ClusterFixtureBuilder builder) {
-    enableDrillPortHunting(builder);
-    builder.configBuilder.put(ExecConstants.HTTP_ENABLE, true);
-    builder.configBuilder.put(ExecConstants.HTTP_PORT_HUNT, true);
-    builder.sessionOption(ExecConstants.SLICE_TARGET, 10);
+  private static ClusterFixtureBuilder builderWithEnabledWebServer() {
+    return builderWithEnabledPortHunting()
+        .configProperty(ExecConstants.HTTP_ENABLE, true)
+        .configProperty(ExecConstants.HTTP_PORT_HUNT, true)
+        .configProperty(ExecConstants.SLICE_TARGET, 10);
   }
 
-  private static void enableDrillPortHunting(ClusterFixtureBuilder builder) {
-    builder.configBuilder.put(ExecConstants.DRILL_PORT_HUNT, true);
-    builder.configBuilder.put(ExecConstants.GRACE_PERIOD, 500);
-    builder.configBuilder.put(ExecConstants.ALLOW_LOOPBACK_ADDRESS_BINDING, 
true);
+  private static ClusterFixtureBuilder builderWithEnabledPortHunting() {
+    return ClusterFixture.builder(dirTestWatcher)
+        .configProperty(ExecConstants.DRILL_PORT_HUNT, true)
+        .configProperty(ExecConstants.GRACE_PERIOD, 500)
+        .configProperty(ExecConstants.ALLOW_LOOPBACK_ADDRESS_BINDING, true);
   }
 
   /*
@@ -80,24 +82,25 @@ public class TestGracefulShutdown extends BaseTestQuery {
   endpoints and check if the drillbit still exists.
    */
   @Test
-  public void testOnlineEndPoints() throws  Exception {
+  public void testOnlineEndPoints() throws Exception {
 
     String[] drillbits = {"db1", "db2", "db3"};
-    ClusterFixtureBuilder builder = 
ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk().withBits(drillbits);
-    enableDrillPortHunting(builder);
+    ClusterFixtureBuilder builder = builderWithEnabledPortHunting()
+        .withLocalZk()
+        .withBits(drillbits);
 
-    try ( ClusterFixture cluster = builder.build()) {
+    try (ClusterFixture cluster = builder.build()) {
 
       Drillbit drillbit = cluster.drillbit("db2");
       int zkRefresh = 
drillbit.getContext().getConfig().getInt(ExecConstants.ZK_REFRESH);
-      DrillbitEndpoint drillbitEndpoint =  
drillbit.getRegistrationHandle().getEndPoint();
+      DrillbitEndpoint drillbitEndpoint = 
drillbit.getRegistrationHandle().getEndPoint();
       cluster.closeDrillbit("db2");
 
       while (true) {
         Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit()
-                .getContext()
-                .getClusterCoordinator()
-                .getOnlineEndPoints();
+            .getContext()
+            .getClusterCoordinator()
+            .getOnlineEndPoints();
 
         if (!drillbitEndpoints.contains(drillbitEndpoint)) {
           // Success
@@ -116,12 +119,14 @@ public class TestGracefulShutdown extends BaseTestQuery {
   public void testRestApi() throws Exception {
 
     String[] drillbits = {"db1", "db2", "db3"};
-    ClusterFixtureBuilder builder = 
ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk().withBits(drillbits);
-    enableWebServer(builder);
+    ClusterFixtureBuilder builder = builderWithEnabledWebServer()
+        .withLocalZk()
+        .withBits(drillbits);
+
     QueryBuilder.QuerySummaryFuture listener;
     final String sql = "select * from dfs.root.`.`";
     try (ClusterFixture cluster = builder.build();
-         final ClientFixture client = cluster.clientFixture()) {
+         ClientFixture client = cluster.clientFixture()) {
       Drillbit drillbit = cluster.drillbit("db1");
       int port = drillbit.getWebServerPort();
       int zkRefresh = 
drillbit.getContext().getConfig().getInt(ExecConstants.ZK_REFRESH);
@@ -133,10 +138,7 @@ public class TestGracefulShutdown extends BaseTestQuery {
         throw new RuntimeException("Failed : HTTP error code : "
                 + conn.getResponseCode());
       }
-      while (true) {
-        if (listener.isDone()) {
-          break;
-        }
+      while (!listener.isDone()) {
         Thread.sleep(100L);
       }
 
@@ -154,8 +156,8 @@ public class TestGracefulShutdown extends BaseTestQuery {
   public void testRestApiShutdown() throws Exception {
 
     String[] drillbits = {"db1", "db2", "db3"};
-    ClusterFixtureBuilder builder = 
ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk().withBits(drillbits);
-    enableWebServer(builder);
+    ClusterFixtureBuilder builder = 
builderWithEnabledWebServer().withLocalZk().withBits(drillbits);
+
     QueryBuilder.QuerySummaryFuture listener;
     final String sql = "select * from dfs.root.`.`";
     try (ClusterFixture cluster = builder.build();
@@ -163,12 +165,8 @@ public class TestGracefulShutdown extends BaseTestQuery {
       Drillbit drillbit = cluster.drillbit("db1");
       int port = drillbit.getWebServerPort();
       int zkRefresh = 
drillbit.getContext().getConfig().getInt(ExecConstants.ZK_REFRESH);
-      listener =  client.queryBuilder().sql(sql).futureSummary();
-      while (true) {
-        if (listener.isDone()) {
-          break;
-        }
-
+      listener = client.queryBuilder().sql(sql).futureSummary();
+      while (!listener.isDone()) {
         Thread.sleep(100L);
       }
       URL url = new URL("http://localhost:"; + port + "/shutdown");
@@ -187,7 +185,8 @@ public class TestGracefulShutdown extends BaseTestQuery {
 
   @Test // DRILL-6912
   public void testDrillbitWithSamePortContainsShutdownThread() throws 
Exception {
-    ClusterFixtureBuilder fixtureBuilder = 
ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk()
+    ClusterFixtureBuilder fixtureBuilder = 
ClusterFixture.builder(dirTestWatcher)
+        .withLocalZk()
         .configProperty(ExecConstants.ALLOW_LOOPBACK_ADDRESS_BINDING, true)
         .configProperty(ExecConstants.INITIAL_USER_PORT, 
QueryTestUtil.getFreePortNumber(31170, 300))
         .configProperty(ExecConstants.INITIAL_BIT_PORT, 
QueryTestUtil.getFreePortNumber(31180, 300));
@@ -213,14 +212,14 @@ public class TestGracefulShutdown extends BaseTestQuery {
 
   @Test // DRILL-7056
   public void testDrillbitTempDir() throws Exception {
-    File originalDrillbitTempDir = null;
-    ClusterFixtureBuilder fixtureBuilder = 
ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk()
+    File originalDrillbitTempDir;
+    ClusterFixtureBuilder fixtureBuilder = 
ClusterFixture.builder(dirTestWatcher).withLocalZk()
         .configProperty(ExecConstants.ALLOW_LOOPBACK_ADDRESS_BINDING, true)
         .configProperty(ExecConstants.INITIAL_USER_PORT, 
QueryTestUtil.getFreePortNumber(31170, 300))
         .configProperty(ExecConstants.INITIAL_BIT_PORT, 
QueryTestUtil.getFreePortNumber(31180, 300));
 
     try (ClusterFixture fixture = fixtureBuilder.build();
-        Drillbit twinDrillbitOnSamePort = new Drillbit(fixture.config(),
+         Drillbit twinDrillbitOnSamePort = new Drillbit(fixture.config(),
             fixtureBuilder.configBuilder().getDefinitions(), 
fixture.serviceSet())) {
       // Assert preconditions :
       //      1. First drillbit instance should be started normally
@@ -262,17 +261,18 @@ public class TestGracefulShutdown extends BaseTestQuery {
   }
 
   private static void setupFile(int file_num) throws Exception {
-    final String file = "employee"+file_num+".json";
-    final Path path = dirTestWatcher.getRootDir().toPath().resolve(file);
-    try(PrintWriter out = new PrintWriter(new BufferedWriter(new 
FileWriter(path.toFile(), true)))) {
-      out.println("{\"employee_id\":1,\"full_name\":\"Sheri 
Nowmer\",\"first_name\":\"Sheri\",\"last_name\":\"Nowmer\",\"position_id\":1,\"position_title\":\"President\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1961-08-26\",\"hire_date\":\"1994-12-01
 
00:00:00.0\",\"end_date\":null,\"salary\":80000.0000,\"supervisor_id\":0,\"education_level\":\"Graduate
 
Degree\",\"marital_status\":\"S\",\"gender\":\"F\",\"management_role\":\"Senior 
Management\"}\n" +
-              "{\"employee_id\":2,\"full_name\":\"Derrick 
Whelply\",\"first_name\":\"Derrick\",\"last_name\":\"Whelply\",\"position_id\":2,\"position_title\":\"VP
 Country 
Manager\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1915-07-03\",\"hire_date\":\"1994-12-01
 
00:00:00.0\",\"end_date\":null,\"salary\":40000.0000,\"supervisor_id\":1,\"education_level\":\"Graduate
 
Degree\",\"marital_status\":\"M\",\"gender\":\"M\",\"management_role\":\"Senior 
Management\"}\n" +
-              "{\"employee_id\":4,\"full_name\":\"Michael 
Spence\",\"first_name\":\"Michael\",\"last_name\":\"Spence\",\"position_id\":2,\"position_title\":\"VP
 Country 
Manager\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1969-06-20\",\"hire_date\":\"1998-01-01
 
00:00:00.0\",\"end_date\":null,\"salary\":40000.0000,\"supervisor_id\":1,\"education_level\":\"Graduate
 
Degree\",\"marital_status\":\"S\",\"gender\":\"M\",\"management_role\":\"Senior 
Management\"}\n" +
-              "{\"employee_id\":5,\"full_name\":\"Maya 
Gutierrez\",\"first_name\":\"Maya\",\"last_name\":\"Gutierrez\",\"position_id\":2,\"position_title\":\"VP
 Country 
Manager\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1951-05-10\",\"hire_date\":\"1998-01-01
 
00:00:00.0\",\"end_date\":null,\"salary\":35000.0000,\"supervisor_id\":1,\"education_level\":\"Bachelors
 
Degree\",\"marital_status\":\"M\",\"gender\":\"F\",\"management_role\":\"Senior 
Management\"}\n" +
-              "{\"employee_id\":6,\"full_name\":\"Roberta 
Damstra\",\"first_name\":\"Roberta\",\"last_name\":\"Damstra\",\"position_id\":3,\"position_title\":\"VP
 Information 
Systems\",\"store_id\":0,\"department_id\":2,\"birth_date\":\"1942-10-08\",\"hire_date\":\"1994-12-01
 
00:00:00.0\",\"end_date\":null,\"salary\":25000.0000,\"supervisor_id\":1,\"education_level\":\"Bachelors
 
Degree\",\"marital_status\":\"M\",\"gender\":\"F\",\"management_role\":\"Senior 
Management\"}\n" +
-              "{\"employee_id\":7,\"full_name\":\"Rebecca 
Kanagaki\",\"first_name\":\"Rebecca\",\"last_name\":\"Kanagaki\",\"position_id\":4,\"position_title\":\"VP
 Human 
Resources\",\"store_id\":0,\"department_id\":3,\"birth_date\":\"1949-03-27\",\"hire_date\":\"1994-12-01
 
00:00:00.0\",\"end_date\":null,\"salary\":15000.0000,\"supervisor_id\":1,\"education_level\":\"Bachelors
 
Degree\",\"marital_status\":\"M\",\"gender\":\"F\",\"management_role\":\"Senior 
Management\"}\n");
-    } catch (IOException e) {
-      fail(e.getMessage());
+    String file = "employee" + file_num + ".json";
+    Path path = dirTestWatcher.getRootDir().toPath().resolve(file);
+    StringBuilder stringBuilder = new StringBuilder();
+    int rowsCount = 7;
+    try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(TestGracefulShutdown.class.getResourceAsStream("/employee.json"))))
 {
+      for (int i = 0; i < rowsCount; i++) {
+        stringBuilder.append(reader.readLine());
+      }
+    }
+    String content = stringBuilder.toString();
+    try (PrintWriter out = new PrintWriter(new BufferedWriter(new 
FileWriter(path.toFile(), true)))) {
+      out.println(content);
     }
   }
 

Reply via email to