This is an automated email from the ASF dual-hosted git repository. agozhiy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit a81c49653503b7c2f8707a769095e50838c8f8fa Author: Volodymyr Vysotskyi <[email protected]> AuthorDate: Mon Feb 17 20:32:32 2020 +0200 DRILL-7589: Set temporary tests folder for UDF_DIRECTORY_LOCAL, fix allocators closing in BloomFilterTest and TestWriteToDisk, fix permissions issue for TestGracefulShutdown tests closes #1987 --- .../apache/drill/exec/cache/TestWriteToDisk.java | 106 +++----- .../exec/udf/dynamic/TestDynamicUDFSupport.java | 11 +- .../drill/exec/work/filter/BloomFilterTest.java | 274 ++++++--------------- .../org/apache/drill/test/BaseDirTestWatcher.java | 11 + .../java/org/apache/drill/test/ClusterFixture.java | 1 + .../org/apache/drill/test/OperatorFixture.java | 1 + .../apache/drill/test/TestGracefulShutdown.java | 100 ++++---- 7 files changed, 186 insertions(+), 318 deletions(-) diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java index db7f743..c6a68b6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java @@ -18,95 +18,61 @@ package org.apache.drill.exec.cache; import java.io.File; -import java.util.List; -import org.apache.drill.shaded.guava.com.google.common.io.Files; -import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.physical.rowSet.RowSet; +import org.apache.drill.exec.physical.rowSet.RowSets; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.common.types.TypeProtos; -import org.apache.drill.common.types.Types; -import org.apache.drill.test.TestTools; +import org.apache.drill.test.SubOperatorTest; import org.apache.drill.exec.ExecTest; -import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.WritableBatch; -import org.apache.drill.exec.server.Drillbit; -import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.server.RemoteServiceSet; -import org.apache.drill.exec.vector.AllocationHelper; -import org.apache.drill.exec.vector.IntVector; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.VarBinaryVector; +import org.apache.drill.test.rowSet.RowSetUtilities; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.junit.Rule; import org.junit.Test; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; -import org.junit.rules.TestRule; - -public class TestWriteToDisk extends ExecTest { - @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(90000); // 90secs +public class TestWriteToDisk extends SubOperatorTest { @Test - @SuppressWarnings("static-method") public void test() throws Exception { - final List<ValueVector> vectorList = Lists.newArrayList(); - final DrillConfig config = DrillConfig.create(); - try (final RemoteServiceSet serviceSet = RemoteServiceSet - .getLocalServiceSet(); - final Drillbit bit = new Drillbit(config, serviceSet)) { - bit.run(); - final DrillbitContext context = bit.getContext(); - - final MaterializedField intField = MaterializedField.create("int", Types.required(TypeProtos.MinorType.INT)); - final MaterializedField binField = MaterializedField.create("binary", Types.required(TypeProtos.MinorType.VARBINARY)); - try (final IntVector intVector = (IntVector) TypeHelper.getNewVector(intField, context.getAllocator()); - final VarBinaryVector binVector = - (VarBinaryVector) TypeHelper.getNewVector(binField, context.getAllocator())) { - AllocationHelper.allocate(intVector, 4, 4); - AllocationHelper.allocate(binVector, 4, 5); - vectorList.add(intVector); - vectorList.add(binVector); + VectorContainer container = expectedRowSet().container(); - intVector.getMutator().setSafe(0, 0); - binVector.getMutator().setSafe(0, "ZERO".getBytes()); - intVector.getMutator().setSafe(1, 1); - binVector.getMutator().setSafe(1, "ONE".getBytes()); - intVector.getMutator().setSafe(2, 2); - binVector.getMutator().setSafe(2, "TWO".getBytes()); - intVector.getMutator().setSafe(3, 3); - binVector.getMutator().setSafe(3, "THREE".getBytes()); - intVector.getMutator().setValueCount(4); - binVector.getMutator().setValueCount(4); + WritableBatch batch = WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false); - VectorContainer container = new VectorContainer(); - container.addCollection(vectorList); - container.setRecordCount(4); - WritableBatch batch = WritableBatch.getBatchNoHVWrap( - container.getRecordCount(), container, false); - VectorAccessibleSerializable wrap = new VectorAccessibleSerializable( - batch, context.getAllocator()); + VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, fixture.allocator()); - final VectorAccessibleSerializable newWrap = new VectorAccessibleSerializable( - context.getAllocator()); - try (final FileSystem fs = getLocalFileSystem()) { - final File tempDir = Files.createTempDir(); - tempDir.deleteOnExit(); - final Path path = new Path(tempDir.getAbsolutePath(), "drillSerializable"); - try (final FSDataOutputStream out = fs.create(path)) { - wrap.writeToStream(out); - } - - try (final FSDataInputStream in = fs.open(path)) { - newWrap.readFromStream(in); - } - } + VectorAccessibleSerializable newWrap = new VectorAccessibleSerializable(fixture.allocator()); + try (FileSystem fs = ExecTest.getLocalFileSystem()) { + File tempDir = dirTestWatcher.getTmpDir(); + tempDir.deleteOnExit(); + Path path = new Path(tempDir.getAbsolutePath(), "drillSerializable"); + try (FSDataOutputStream out = fs.create(path)) { + wrap.writeToStream(out); + } - newWrap.get(); + try (FSDataInputStream in = fs.open(path)) { + newWrap.readFromStream(in); } } + + RowSetUtilities.verify(expectedRowSet(), RowSets.wrap(newWrap.get())); + } + + private RowSet expectedRowSet() { + TupleMetadata schema = new SchemaBuilder() + .add("int", TypeProtos.MinorType.INT) + .add("binary", TypeProtos.MinorType.VARBINARY) + .build(); + + return fixture.rowSetBuilder(schema) + .addRow(0, "ZERO".getBytes()) + .addRow(1, "ONE".getBytes()) + .addRow(2, "TWO".getBytes()) + .addRow(3, "THREE".getBytes()) + .build(); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java b/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java index dd9da22..c83d24a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java @@ -78,7 +78,6 @@ public class TestDynamicUDFSupport extends BaseTestQuery { private static final String DEFAULT_JAR_NAME = "drill-custom-lower"; private static URI fsUri; - private static File udfDir; private static File jarsDir; private static File buildDirectory; private static JarBuilder jarBuilder; @@ -103,9 +102,10 @@ public class TestDynamicUDFSupport extends BaseTestQuery { @Before public void setupNewDrillbit() throws Exception { - udfDir = dirTestWatcher.makeSubDir(Paths.get("udf")); + File udfLocalDir = new File(dirTestWatcher.getUdfDir(), "local"); Properties overrideProps = new Properties(); - overrideProps.setProperty(ExecConstants.UDF_DIRECTORY_ROOT, udfDir.getAbsolutePath()); + overrideProps.setProperty(ExecConstants.UDF_DIRECTORY_ROOT, dirTestWatcher.getUdfDir().getAbsolutePath()); + overrideProps.setProperty(ExecConstants.UDF_DIRECTORY_LOCAL, udfLocalDir.getAbsolutePath()); overrideProps.setProperty(ExecConstants.UDF_DIRECTORY_FS, FileSystem.DEFAULT_FS); updateTestCluster(1, DrillConfig.create(overrideProps)); @@ -115,7 +115,6 @@ public class TestDynamicUDFSupport extends BaseTestQuery { @After public void cleanup() throws Exception { closeClient(); - FileUtils.cleanDirectory(udfDir); dirTestWatcher.clear(); } @@ -957,7 +956,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery { return spy; } - private class SimpleQueryRunner implements Runnable { + private static class SimpleQueryRunner implements Runnable { private final String query; @@ -975,7 +974,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery { } } - private class TestBuilderRunner implements Runnable { + private static class TestBuilderRunner implements Runnable { private final TestBuilder testBuilder; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java index 761a2cc..5875a40 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java @@ -17,18 +17,16 @@ */ package org.apache.drill.exec.work.filter; -import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.scanner.ClassPathScanner; import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.util.function.CheckedFunction; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ValueVectorReadExpression; -import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.ops.FragmentContextImpl; -import org.apache.drill.exec.proto.BitControl; +import org.apache.drill.exec.physical.rowSet.RowSet; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; @@ -36,26 +34,23 @@ import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.server.Drillbit; -import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.server.RemoteServiceSet; -import org.apache.drill.exec.vector.VarCharVector; -import org.apache.drill.test.BaseTest; +import org.apache.drill.test.SubOperatorTest; import org.junit.Assert; import org.junit.Test; + +import java.io.IOException; import java.util.Iterator; -public class BloomFilterTest extends BaseTest { - public static DrillConfig c = DrillConfig.create(); +public class BloomFilterTest extends SubOperatorTest { - class TestRecordBatch implements RecordBatch { + private static class TestRecordBatch implements RecordBatch { private final VectorContainer container; public TestRecordBatch(VectorContainer container) { this.container = container; - } @Override @@ -85,7 +80,6 @@ public class BloomFilterTest extends BaseTest { @Override public void kill(boolean sendUpstream) { - } @Override @@ -133,214 +127,110 @@ public class BloomFilterTest extends BaseTest { } } - @Test public void testNotExist() throws Exception { - Drillbit bit = new Drillbit(c, RemoteServiceSet.getLocalServiceSet(), ClassPathScanner.fromPrescan(c)); - bit.run(); - DrillbitContext bitContext = bit.getContext(); - FunctionImplementationRegistry registry = bitContext.getFunctionImplementationRegistry(); - FragmentContextImpl context = new FragmentContextImpl(bitContext, BitControl.PlanFragment.getDefaultInstance(), null, registry); - BufferAllocator bufferAllocator = bitContext.getAllocator(); - //create RecordBatch - VarCharVector vector = new VarCharVector(SchemaBuilder.columnSchema("a", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED), bufferAllocator); - vector.allocateNew(); - int valueCount = 3; - VarCharVector.Mutator mutator = vector.getMutator(); - mutator.setSafe(0, "a".getBytes()); - mutator.setSafe(1, "b".getBytes()); - mutator.setSafe(2, "c".getBytes()); - mutator.setValueCount(valueCount); - VectorContainer vectorContainer = new VectorContainer(); - TypedFieldId fieldId = vectorContainer.add(vector); - RecordBatch recordBatch = new TestRecordBatch(vectorContainer); - //construct hash64 - ValueVectorReadExpression exp = new ValueVectorReadExpression(fieldId); - LogicalExpression[] expressions = new LogicalExpression[1]; - expressions[0] = exp; - TypedFieldId[] fieldIds = new TypedFieldId[1]; - fieldIds[0] = fieldId; - ValueVectorHashHelper valueVectorHashHelper = new ValueVectorHashHelper(recordBatch, context); - ValueVectorHashHelper.Hash64 hash64 = valueVectorHashHelper.getHash64(expressions, fieldIds); - - //construct BloomFilter - int numBytes = BloomFilter.optimalNumOfBytes(3, 0.03); - - BloomFilter bloomFilter = new BloomFilter(numBytes, bufferAllocator); - for (int i = 0; i < valueCount; i++) { - long hashCode = hash64.hash64Code(i, 0, 0); - bloomFilter.insert(hashCode); - } + RowSet.SingleRowSet probeRowSet = fixture.rowSetBuilder(getTestSchema()) + .addRow("f") + .build(); - //-----------------create probe side RecordBatch--------------------- - VarCharVector probeVector = new VarCharVector(SchemaBuilder.columnSchema("a", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED), bufferAllocator); - probeVector.allocateNew(); - int probeValueCount = 1; - VarCharVector.Mutator mutator1 = probeVector.getMutator(); - mutator1.setSafe(0, "f".getBytes()); - mutator1.setValueCount(probeValueCount); - VectorContainer probeVectorContainer = new VectorContainer(); - TypedFieldId probeFieldId = probeVectorContainer.add(probeVector); - RecordBatch probeRecordBatch = new TestRecordBatch(probeVectorContainer); - ValueVectorReadExpression probExp = new ValueVectorReadExpression(probeFieldId); - LogicalExpression[] probExpressions = new LogicalExpression[1]; - probExpressions[0] = probExp; - TypedFieldId[] probeFieldIds = new TypedFieldId[1]; - probeFieldIds[0] = probeFieldId; - ValueVectorHashHelper probeValueVectorHashHelper = new ValueVectorHashHelper(probeRecordBatch, context); - ValueVectorHashHelper.Hash64 probeHash64 = probeValueVectorHashHelper.getHash64(probExpressions, probeFieldIds); - long hashCode = probeHash64.hash64Code(0, 0, 0); - boolean contain = bloomFilter.find(hashCode); - Assert.assertFalse(contain); - bloomFilter.getContent().close(); - vectorContainer.clear(); - probeVectorContainer.clear(); - context.close(); - bitContext.close(); - bit.close(); + checkBloomFilterResult(probeRowSet, BloomFilterTest::getSimpleBloomFilter, false); } - @Test public void testExist() throws Exception { + RowSet.SingleRowSet probeRowSet = fixture.rowSetBuilder(getTestSchema()) + .addRow("a") + .build(); - Drillbit bit = new Drillbit(c, RemoteServiceSet.getLocalServiceSet(), ClassPathScanner.fromPrescan(c)); - bit.run(); - DrillbitContext bitContext = bit.getContext(); - FunctionImplementationRegistry registry = bitContext.getFunctionImplementationRegistry(); - FragmentContextImpl context = new FragmentContextImpl(bitContext, BitControl.PlanFragment.getDefaultInstance(), null, registry); - BufferAllocator bufferAllocator = bitContext.getAllocator(); - //create RecordBatch - VarCharVector vector = new VarCharVector(SchemaBuilder.columnSchema("a", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED), bufferAllocator); - vector.allocateNew(); - int valueCount = 3; - VarCharVector.Mutator mutator = vector.getMutator(); - mutator.setSafe(0, "a".getBytes()); - mutator.setSafe(1, "b".getBytes()); - mutator.setSafe(2, "c".getBytes()); - mutator.setValueCount(valueCount); - VectorContainer vectorContainer = new VectorContainer(); - TypedFieldId fieldId = vectorContainer.add(vector); - RecordBatch recordBatch = new TestRecordBatch(vectorContainer); - //construct hash64 - ValueVectorReadExpression exp = new ValueVectorReadExpression(fieldId); - LogicalExpression[] expressions = new LogicalExpression[1]; - expressions[0] = exp; - TypedFieldId[] fieldIds = new TypedFieldId[1]; - fieldIds[0] = fieldId; - ValueVectorHashHelper valueVectorHashHelper = new ValueVectorHashHelper(recordBatch, context); - ValueVectorHashHelper.Hash64 hash64 = valueVectorHashHelper.getHash64(expressions, fieldIds); - - //construct BloomFilter - int numBytes = BloomFilter.optimalNumOfBytes(3, 0.03); + checkBloomFilterResult(probeRowSet, BloomFilterTest::getSimpleBloomFilter, true); + } + + @Test + public void testMerged() throws Exception { + RowSet.SingleRowSet probeRowSet = fixture.rowSetBuilder(getTestSchema()) + .addRow("a") + .build(); - BloomFilter bloomFilter = new BloomFilter(numBytes, bufferAllocator); + checkBloomFilterResult(probeRowSet, this::getDisjunctionBloomFilter, true); + } + + private BloomFilter getDisjunctionBloomFilter(ValueVectorHashHelper.Hash64 hash64) throws SchemaChangeException { + int numBytes = BloomFilter.optimalNumOfBytes(3, 0.03); + BloomFilter bloomFilter = new BloomFilter(numBytes, fixture.allocator()); + int valueCount = 3; for (int i = 0; i < valueCount; i++) { long hashCode = hash64.hash64Code(i, 0, 0); bloomFilter.insert(hashCode); } - //-----------------create probe side RecordBatch--------------------- - VarCharVector probeVector = new VarCharVector(SchemaBuilder.columnSchema("a", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED), bufferAllocator); - probeVector.allocateNew(); - int probeValueCount = 1; - VarCharVector.Mutator mutator1 = probeVector.getMutator(); - mutator1.setSafe(0, "a".getBytes()); - mutator1.setValueCount(probeValueCount); - VectorContainer probeVectorContainer = new VectorContainer(); - TypedFieldId probeFieldId = probeVectorContainer.add(probeVector); - RecordBatch probeRecordBatch = new TestRecordBatch(probeVectorContainer); - ValueVectorReadExpression probExp = new ValueVectorReadExpression(probeFieldId); - LogicalExpression[] probExpressions = new LogicalExpression[1]; - probExpressions[0] = probExp; - TypedFieldId[] probeFieldIds = new TypedFieldId[1]; - probeFieldIds[0] = probeFieldId; - ValueVectorHashHelper probeValueVectorHashHelper = new ValueVectorHashHelper(probeRecordBatch, context); - ValueVectorHashHelper.Hash64 probeHash64 = probeValueVectorHashHelper.getHash64(probExpressions, probeFieldIds); - long hashCode = probeHash64.hash64Code(0, 0, 0); - boolean contain = bloomFilter.find(hashCode); - Assert.assertTrue(contain); + BloomFilter disjunctionBloomFilter = getSimpleBloomFilter(hash64); + disjunctionBloomFilter.or(bloomFilter); + bloomFilter.getContent().close(); - vectorContainer.clear(); - probeVectorContainer.clear(); - context.close(); - bitContext.close(); - bit.close(); + + return disjunctionBloomFilter; } + private static BloomFilter getSimpleBloomFilter(ValueVectorHashHelper.Hash64 hash64) throws SchemaChangeException { + int numBytes = BloomFilter.optimalNumOfBytes(3, 0.03); - @Test - public void testMerged() throws Exception { + BloomFilter bloomFilter = new BloomFilter(numBytes, fixture.allocator()); - Drillbit bit = new Drillbit(c, RemoteServiceSet.getLocalServiceSet(), ClassPathScanner.fromPrescan(c)); - bit.run(); - DrillbitContext bitContext = bit.getContext(); - FunctionImplementationRegistry registry = bitContext.getFunctionImplementationRegistry(); - FragmentContextImpl context = new FragmentContextImpl(bitContext, BitControl.PlanFragment.getDefaultInstance(), null, registry); - BufferAllocator bufferAllocator = bitContext.getAllocator(); - //create RecordBatch - VarCharVector vector = new VarCharVector(SchemaBuilder.columnSchema("a", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED), bufferAllocator); - vector.allocateNew(); int valueCount = 3; - VarCharVector.Mutator mutator = vector.getMutator(); - mutator.setSafe(0, "a".getBytes()); - mutator.setSafe(1, "b".getBytes()); - mutator.setSafe(2, "c".getBytes()); - mutator.setValueCount(valueCount); - VectorContainer vectorContainer = new VectorContainer(); - TypedFieldId fieldId = vectorContainer.add(vector); - RecordBatch recordBatch = new TestRecordBatch(vectorContainer); - //construct hash64 - ValueVectorReadExpression exp = new ValueVectorReadExpression(fieldId); - LogicalExpression[] expressions = new LogicalExpression[1]; - expressions[0] = exp; - TypedFieldId[] fieldIds = new TypedFieldId[1]; - fieldIds[0] = fieldId; - ValueVectorHashHelper valueVectorHashHelper = new ValueVectorHashHelper(recordBatch, context); - ValueVectorHashHelper.Hash64 hash64 = valueVectorHashHelper.getHash64(expressions, fieldIds); - - //construct BloomFilter - int numBytes = BloomFilter.optimalNumOfBytes(3, 0.03); - - BloomFilter bloomFilter = new BloomFilter(numBytes, bufferAllocator); for (int i = 0; i < valueCount; i++) { long hashCode = hash64.hash64Code(i, 0, 0); bloomFilter.insert(hashCode); } + return bloomFilter; + } - BloomFilter bloomFilter1 = new BloomFilter(numBytes, bufferAllocator); - for (int i = 0; i < valueCount; i++) { - long hashCode = hash64.hash64Code(i, 0, 0); - bloomFilter1.insert(hashCode); + private void checkBloomFilterResult(RowSet.SingleRowSet probeRowSet, + CheckedFunction<ValueVectorHashHelper.Hash64, BloomFilter, SchemaChangeException> bloomFilterProvider, + boolean matches) throws ClassTransformationException, IOException, SchemaChangeException { + try (FragmentContext context = fixture.getFragmentContext()) { + // create build side batch + RowSet.SingleRowSet batchRowSet = fixture.rowSetBuilder(getTestSchema()) + .addRow("a") + .addRow("b") + .addRow("c") + .build(); + + // create build side Hash64 + ValueVectorHashHelper.Hash64 hash64 = getHash64(context, batchRowSet); + + // construct BloomFilter + BloomFilter bloomFilter = bloomFilterProvider.apply(hash64); + + // create probe side Hash64 + ValueVectorHashHelper.Hash64 probeHash64 = getHash64(context, probeRowSet); + + long hashCode = probeHash64.hash64Code(0, 0, 0); + + Assert.assertEquals(matches, bloomFilter.find(hashCode)); + + bloomFilter.getContent().close(); + batchRowSet.clear(); + probeRowSet.clear(); } + } - bloomFilter.or(bloomFilter1); - - //-----------------create probe side RecordBatch--------------------- - VarCharVector probeVector = new VarCharVector(SchemaBuilder.columnSchema("a", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED), bufferAllocator); - probeVector.allocateNew(); - int probeValueCount = 1; - VarCharVector.Mutator mutator1 = probeVector.getMutator(); - mutator1.setSafe(0, "a".getBytes()); - mutator1.setValueCount(probeValueCount); - VectorContainer probeVectorContainer = new VectorContainer(); - TypedFieldId probeFieldId = probeVectorContainer.add(probeVector); - RecordBatch probeRecordBatch = new TestRecordBatch(probeVectorContainer); + private static TupleMetadata getTestSchema() { + return new SchemaBuilder() + .add("a", TypeProtos.MinorType.VARCHAR) + .build(); + } + + private static ValueVectorHashHelper.Hash64 getHash64(FragmentContext context, + RowSet.SingleRowSet probeRowSet) throws ClassTransformationException, IOException, SchemaChangeException { + + RecordBatch probeRecordBatch = new TestRecordBatch(probeRowSet.container()); + TypedFieldId probeFieldId = probeRecordBatch.getValueVectorId(SchemaPath.getSimplePath("a")); ValueVectorReadExpression probExp = new ValueVectorReadExpression(probeFieldId); LogicalExpression[] probExpressions = new LogicalExpression[1]; probExpressions[0] = probExp; TypedFieldId[] probeFieldIds = new TypedFieldId[1]; probeFieldIds[0] = probeFieldId; ValueVectorHashHelper probeValueVectorHashHelper = new ValueVectorHashHelper(probeRecordBatch, context); - ValueVectorHashHelper.Hash64 probeHash64 = probeValueVectorHashHelper.getHash64(probExpressions, probeFieldIds); - long hashCode = probeHash64.hash64Code(0, 0, 0); - boolean contain = bloomFilter.find(hashCode); - Assert.assertTrue(contain); - bloomFilter.getContent().close(); - vectorContainer.clear(); - probeVectorContainer.clear(); - context.close(); - bitContext.close(); - bit.close(); + return probeValueVectorHashHelper.getHash64(probExpressions, probeFieldIds); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java b/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java index 9a538d8..7f65750 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/BaseDirTestWatcher.java @@ -73,6 +73,7 @@ public class BaseDirTestWatcher extends DirTestWatcher { private File dfsTestTmpParentDir; private File dfsTestTmpDir; private File rootDir; + private File udfDir; /** * Creates a {@link BaseDirTestWatcher} which does not delete it's temp directories at the end of tests. @@ -103,6 +104,7 @@ public class BaseDirTestWatcher extends DirTestWatcher { tmpDir = makeSubDir(Paths.get("tmp")); storeDir = makeSubDir(Paths.get("store")); dfsTestTmpParentDir = makeSubDir(Paths.get("dfsTestTmp")); + udfDir = makeSubDir(Paths.get("udf")); newDfsTestTmpDir(); } @@ -118,6 +120,7 @@ public class BaseDirTestWatcher extends DirTestWatcher { FileUtils.cleanDirectory(tmpDir); FileUtils.cleanDirectory(storeDir); FileUtils.cleanDirectory(dfsTestTmpDir); + FileUtils.cleanDirectory(udfDir); } catch (IOException e) { throw new RuntimeException(e); } @@ -168,6 +171,14 @@ public class BaseDirTestWatcher extends DirTestWatcher { } /** + * Gets the temp directory that should be used as base directory for dynamic UDFs. + * @return The temp directory that should be used as base directory for dynamic UDFs. + */ + public File getUdfDir() { + return udfDir; + } + + /** * This methods creates a new directory which can be mapped to <b>dfs.tmp</b>. */ public void newDfsTestTmpDir() { diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java index 735c97a..a7e8ef7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java @@ -584,6 +584,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { Properties props = new Properties(); props.putAll(ClusterFixture.TEST_CONFIGURATIONS); props.setProperty(ExecConstants.DRILL_TMP_DIR, dirTestWatcher.getTmpDir().getAbsolutePath()); + props.setProperty(ExecConstants.UDF_DIRECTORY_ROOT, dirTestWatcher.getUdfDir().getAbsolutePath()); props.setProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, dirTestWatcher.getStoreDir().getAbsolutePath()); builder.configBuilder.configProps(props); diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java index b9f4b43..3ded2f8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java @@ -127,6 +127,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable { configBuilder.put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, dirTestWatcher.getStoreDir().getAbsolutePath()); configBuilder.put(ExecConstants.SPILL_DIRS, Lists.newArrayList(dirTestWatcher.getSpillDir().getAbsolutePath())); configBuilder.put(ExecConstants.HASHJOIN_SPILL_DIRS, Lists.newArrayList(dirTestWatcher.getSpillDir().getAbsolutePath())); + configBuilder.put(ExecConstants.UDF_DIRECTORY_ROOT, dirTestWatcher.getUdfDir().getAbsolutePath()); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java b/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java index c5d508b..5e27f75 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java @@ -31,10 +31,11 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; +import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; -import java.io.IOException; +import java.io.InputStreamReader; import java.io.PrintWriter; import java.lang.reflect.Field; import java.net.HttpURLConnection; @@ -50,29 +51,30 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @Category({SlowTest.class}) -public class TestGracefulShutdown extends BaseTestQuery { +public class TestGracefulShutdown extends ClusterTest { @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(120_000); @BeforeClass public static void setUpTestData() throws Exception { - for( int i = 0; i < 300; i++) { + for (int i = 0; i < 300; i++) { setupFile(i); } } - private static void enableWebServer(ClusterFixtureBuilder builder) { - enableDrillPortHunting(builder); - builder.configBuilder.put(ExecConstants.HTTP_ENABLE, true); - builder.configBuilder.put(ExecConstants.HTTP_PORT_HUNT, true); - builder.sessionOption(ExecConstants.SLICE_TARGET, 10); + private static ClusterFixtureBuilder builderWithEnabledWebServer() { + return builderWithEnabledPortHunting() + .configProperty(ExecConstants.HTTP_ENABLE, true) + .configProperty(ExecConstants.HTTP_PORT_HUNT, true) + .configProperty(ExecConstants.SLICE_TARGET, 10); } - private static void enableDrillPortHunting(ClusterFixtureBuilder builder) { - builder.configBuilder.put(ExecConstants.DRILL_PORT_HUNT, true); - builder.configBuilder.put(ExecConstants.GRACE_PERIOD, 500); - builder.configBuilder.put(ExecConstants.ALLOW_LOOPBACK_ADDRESS_BINDING, true); + private static ClusterFixtureBuilder builderWithEnabledPortHunting() { + return ClusterFixture.builder(dirTestWatcher) + .configProperty(ExecConstants.DRILL_PORT_HUNT, true) + .configProperty(ExecConstants.GRACE_PERIOD, 500) + .configProperty(ExecConstants.ALLOW_LOOPBACK_ADDRESS_BINDING, true); } /* @@ -80,24 +82,25 @@ public class TestGracefulShutdown extends BaseTestQuery { endpoints and check if the drillbit still exists. */ @Test - public void testOnlineEndPoints() throws Exception { + public void testOnlineEndPoints() throws Exception { String[] drillbits = {"db1", "db2", "db3"}; - ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk().withBits(drillbits); - enableDrillPortHunting(builder); + ClusterFixtureBuilder builder = builderWithEnabledPortHunting() + .withLocalZk() + .withBits(drillbits); - try ( ClusterFixture cluster = builder.build()) { + try (ClusterFixture cluster = builder.build()) { Drillbit drillbit = cluster.drillbit("db2"); int zkRefresh = drillbit.getContext().getConfig().getInt(ExecConstants.ZK_REFRESH); - DrillbitEndpoint drillbitEndpoint = drillbit.getRegistrationHandle().getEndPoint(); + DrillbitEndpoint drillbitEndpoint = drillbit.getRegistrationHandle().getEndPoint(); cluster.closeDrillbit("db2"); while (true) { Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit() - .getContext() - .getClusterCoordinator() - .getOnlineEndPoints(); + .getContext() + .getClusterCoordinator() + .getOnlineEndPoints(); if (!drillbitEndpoints.contains(drillbitEndpoint)) { // Success @@ -116,12 +119,14 @@ public class TestGracefulShutdown extends BaseTestQuery { public void testRestApi() throws Exception { String[] drillbits = {"db1", "db2", "db3"}; - ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk().withBits(drillbits); - enableWebServer(builder); + ClusterFixtureBuilder builder = builderWithEnabledWebServer() + .withLocalZk() + .withBits(drillbits); + QueryBuilder.QuerySummaryFuture listener; final String sql = "select * from dfs.root.`.`"; try (ClusterFixture cluster = builder.build(); - final ClientFixture client = cluster.clientFixture()) { + ClientFixture client = cluster.clientFixture()) { Drillbit drillbit = cluster.drillbit("db1"); int port = drillbit.getWebServerPort(); int zkRefresh = drillbit.getContext().getConfig().getInt(ExecConstants.ZK_REFRESH); @@ -133,10 +138,7 @@ public class TestGracefulShutdown extends BaseTestQuery { throw new RuntimeException("Failed : HTTP error code : " + conn.getResponseCode()); } - while (true) { - if (listener.isDone()) { - break; - } + while (!listener.isDone()) { Thread.sleep(100L); } @@ -154,8 +156,8 @@ public class TestGracefulShutdown extends BaseTestQuery { public void testRestApiShutdown() throws Exception { String[] drillbits = {"db1", "db2", "db3"}; - ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk().withBits(drillbits); - enableWebServer(builder); + ClusterFixtureBuilder builder = builderWithEnabledWebServer().withLocalZk().withBits(drillbits); + QueryBuilder.QuerySummaryFuture listener; final String sql = "select * from dfs.root.`.`"; try (ClusterFixture cluster = builder.build(); @@ -163,12 +165,8 @@ public class TestGracefulShutdown extends BaseTestQuery { Drillbit drillbit = cluster.drillbit("db1"); int port = drillbit.getWebServerPort(); int zkRefresh = drillbit.getContext().getConfig().getInt(ExecConstants.ZK_REFRESH); - listener = client.queryBuilder().sql(sql).futureSummary(); - while (true) { - if (listener.isDone()) { - break; - } - + listener = client.queryBuilder().sql(sql).futureSummary(); + while (!listener.isDone()) { Thread.sleep(100L); } URL url = new URL("http://localhost:" + port + "/shutdown"); @@ -187,7 +185,8 @@ public class TestGracefulShutdown extends BaseTestQuery { @Test // DRILL-6912 public void testDrillbitWithSamePortContainsShutdownThread() throws Exception { - ClusterFixtureBuilder fixtureBuilder = ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk() + ClusterFixtureBuilder fixtureBuilder = ClusterFixture.builder(dirTestWatcher) + .withLocalZk() .configProperty(ExecConstants.ALLOW_LOOPBACK_ADDRESS_BINDING, true) .configProperty(ExecConstants.INITIAL_USER_PORT, QueryTestUtil.getFreePortNumber(31170, 300)) .configProperty(ExecConstants.INITIAL_BIT_PORT, QueryTestUtil.getFreePortNumber(31180, 300)); @@ -213,14 +212,14 @@ public class TestGracefulShutdown extends BaseTestQuery { @Test // DRILL-7056 public void testDrillbitTempDir() throws Exception { - File originalDrillbitTempDir = null; - ClusterFixtureBuilder fixtureBuilder = ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk() + File originalDrillbitTempDir; + ClusterFixtureBuilder fixtureBuilder = ClusterFixture.builder(dirTestWatcher).withLocalZk() .configProperty(ExecConstants.ALLOW_LOOPBACK_ADDRESS_BINDING, true) .configProperty(ExecConstants.INITIAL_USER_PORT, QueryTestUtil.getFreePortNumber(31170, 300)) .configProperty(ExecConstants.INITIAL_BIT_PORT, QueryTestUtil.getFreePortNumber(31180, 300)); try (ClusterFixture fixture = fixtureBuilder.build(); - Drillbit twinDrillbitOnSamePort = new Drillbit(fixture.config(), + Drillbit twinDrillbitOnSamePort = new Drillbit(fixture.config(), fixtureBuilder.configBuilder().getDefinitions(), fixture.serviceSet())) { // Assert preconditions : // 1. First drillbit instance should be started normally @@ -262,17 +261,18 @@ public class TestGracefulShutdown extends BaseTestQuery { } private static void setupFile(int file_num) throws Exception { - final String file = "employee"+file_num+".json"; - final Path path = dirTestWatcher.getRootDir().toPath().resolve(file); - try(PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(path.toFile(), true)))) { - out.println("{\"employee_id\":1,\"full_name\":\"Sheri Nowmer\",\"first_name\":\"Sheri\",\"last_name\":\"Nowmer\",\"position_id\":1,\"position_title\":\"President\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1961-08-26\",\"hire_date\":\"1994-12-01 00:00:00.0\",\"end_date\":null,\"salary\":80000.0000,\"supervisor_id\":0,\"education_level\":\"Graduate Degree\",\"marital_status\":\"S\",\"gender\":\"F\",\"management_role\":\"Senior Management\"}\n" + - "{\"employee_id\":2,\"full_name\":\"Derrick Whelply\",\"first_name\":\"Derrick\",\"last_name\":\"Whelply\",\"position_id\":2,\"position_title\":\"VP Country Manager\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1915-07-03\",\"hire_date\":\"1994-12-01 00:00:00.0\",\"end_date\":null,\"salary\":40000.0000,\"supervisor_id\":1,\"education_level\":\"Graduate Degree\",\"marital_status\":\"M\",\"gender\":\"M\",\"management_role\":\"Senior Management\"}\n" + - "{\"employee_id\":4,\"full_name\":\"Michael Spence\",\"first_name\":\"Michael\",\"last_name\":\"Spence\",\"position_id\":2,\"position_title\":\"VP Country Manager\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1969-06-20\",\"hire_date\":\"1998-01-01 00:00:00.0\",\"end_date\":null,\"salary\":40000.0000,\"supervisor_id\":1,\"education_level\":\"Graduate Degree\",\"marital_status\":\"S\",\"gender\":\"M\",\"management_role\":\"Senior Management\"}\n" + - "{\"employee_id\":5,\"full_name\":\"Maya Gutierrez\",\"first_name\":\"Maya\",\"last_name\":\"Gutierrez\",\"position_id\":2,\"position_title\":\"VP Country Manager\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1951-05-10\",\"hire_date\":\"1998-01-01 00:00:00.0\",\"end_date\":null,\"salary\":35000.0000,\"supervisor_id\":1,\"education_level\":\"Bachelors Degree\",\"marital_status\":\"M\",\"gender\":\"F\",\"management_role\":\"Senior Management\"}\n" + - "{\"employee_id\":6,\"full_name\":\"Roberta Damstra\",\"first_name\":\"Roberta\",\"last_name\":\"Damstra\",\"position_id\":3,\"position_title\":\"VP Information Systems\",\"store_id\":0,\"department_id\":2,\"birth_date\":\"1942-10-08\",\"hire_date\":\"1994-12-01 00:00:00.0\",\"end_date\":null,\"salary\":25000.0000,\"supervisor_id\":1,\"education_level\":\"Bachelors Degree\",\"marital_status\":\"M\",\"gender\":\"F\",\"management_role\":\"Senior Management\"}\n" + - "{\"employee_id\":7,\"full_name\":\"Rebecca Kanagaki\",\"first_name\":\"Rebecca\",\"last_name\":\"Kanagaki\",\"position_id\":4,\"position_title\":\"VP Human Resources\",\"store_id\":0,\"department_id\":3,\"birth_date\":\"1949-03-27\",\"hire_date\":\"1994-12-01 00:00:00.0\",\"end_date\":null,\"salary\":15000.0000,\"supervisor_id\":1,\"education_level\":\"Bachelors Degree\",\"marital_status\":\"M\",\"gender\":\"F\",\"management_role\":\"Senior Management\"}\n"); - } catch (IOException e) { - fail(e.getMessage()); + String file = "employee" + file_num + ".json"; + Path path = dirTestWatcher.getRootDir().toPath().resolve(file); + StringBuilder stringBuilder = new StringBuilder(); + int rowsCount = 7; + try (BufferedReader reader = new BufferedReader(new InputStreamReader(TestGracefulShutdown.class.getResourceAsStream("/employee.json")))) { + for (int i = 0; i < rowsCount; i++) { + stringBuilder.append(reader.readLine()); + } + } + String content = stringBuilder.toString(); + try (PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(path.toFile(), true)))) { + out.println(content); } }
