DRILL-4437: Operator unit test framework Closes #394
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d93a3633 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d93a3633 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d93a3633 Branch: refs/heads/master Commit: d93a3633815ed1c7efd6660eae62b7351a2c9739 Parents: 01e04cd Author: Jason Altekruse <[email protected]> Authored: Fri Feb 26 14:55:30 2016 -0800 Committer: Jason Altekruse <[email protected]> Committed: Wed Apr 20 09:07:13 2016 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/ExecConstants.java | 6 +- .../drill/exec/compile/ClassTransformer.java | 4 +- .../drill/exec/physical/impl/ImplCreator.java | 4 +- .../exec/physical/impl/join/HashJoinBatch.java | 2 +- .../physical/impl/xsort/ExternalSortBatch.java | 4 + .../exec/store/easy/json/JSONRecordReader.java | 4 +- .../java/org/apache/drill/DrillTestWrapper.java | 215 ++++++++---- .../test/java/org/apache/drill/TestBuilder.java | 4 - .../physical/unit/BasicPhysicalOpUnitTest.java | 322 +++++++++++++++++ .../physical/unit/PhysicalOpUnitTestBase.java | 341 +++++++++++++++++++ .../apache/drill/jdbc/test/JdbcDataTest.java | 6 +- 11 files changed, 834 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index a490116..6a0889d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -149,7 +149,7 @@ public interface ExecConstants { OptionValidator FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR = new StringValidator(FILESYSTEM_PARTITION_COLUMN_LABEL, "dir"); String JSON_READ_NUMBERS_AS_DOUBLE = "store.json.read_numbers_as_double"; - OptionValidator JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR = new BooleanValidator(JSON_READ_NUMBERS_AS_DOUBLE, false); + BooleanValidator JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR = new BooleanValidator(JSON_READ_NUMBERS_AS_DOUBLE, false); String MONGO_ALL_TEXT_MODE = "store.mongo.all_text_mode"; OptionValidator MONGO_READER_ALL_TEXT_MODE_VALIDATOR = new BooleanValidator(MONGO_ALL_TEXT_MODE, false); @@ -178,9 +178,9 @@ public interface ExecConstants { * HashTable runtime settings */ String MIN_HASH_TABLE_SIZE_KEY = "exec.min_hash_table_size"; - OptionValidator MIN_HASH_TABLE_SIZE = new PositiveLongValidator(MIN_HASH_TABLE_SIZE_KEY, HashTable.MAXIMUM_CAPACITY, HashTable.DEFAULT_INITIAL_CAPACITY); + PositiveLongValidator MIN_HASH_TABLE_SIZE = new PositiveLongValidator(MIN_HASH_TABLE_SIZE_KEY, HashTable.MAXIMUM_CAPACITY, HashTable.DEFAULT_INITIAL_CAPACITY); String MAX_HASH_TABLE_SIZE_KEY = "exec.max_hash_table_size"; - OptionValidator MAX_HASH_TABLE_SIZE = new PositiveLongValidator(MAX_HASH_TABLE_SIZE_KEY, HashTable.MAXIMUM_CAPACITY, HashTable.MAXIMUM_CAPACITY); + PositiveLongValidator MAX_HASH_TABLE_SIZE = new PositiveLongValidator(MAX_HASH_TABLE_SIZE_KEY, HashTable.MAXIMUM_CAPACITY, HashTable.MAXIMUM_CAPACITY); /** * Limits the maximum level of parallelization to this factor time the number of Drillbits http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java index 3c93599..02323a9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java @@ -216,9 +216,7 @@ public class ClassTransformer { final String entireClass, final String materializedClassName) throws ClassTransformationException { // unfortunately, this hasn't been set up at construction time, so we have to do it here - final OptionValue optionValue = optionManager.getOption(SCALAR_REPLACEMENT_OPTION); - final ScalarReplacementOption scalarReplacementOption = - ScalarReplacementOption.fromString((String) optionValue.getValue()); // TODO(DRILL-2474) + final ScalarReplacementOption scalarReplacementOption = ScalarReplacementOption.fromString(optionManager.getOption(SCALAR_REPLACEMENT_VALIDATOR)); try { final long t1 = System.nanoTime(); http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java index 8a8a1ae..5872ef1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; @@ -120,7 +121,8 @@ public class ImplCreator { /** Create a RecordBatch and its children for given PhysicalOperator */ - private RecordBatch getRecordBatch(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException { + @VisibleForTesting + public RecordBatch getRecordBatch(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException { Preconditions.checkNotNull(op); final List<RecordBatch> childRecordBatches = getChildren(op, context); http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 2ba54dd..2ace69e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -308,7 +308,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { } final HashTableConfig htConfig = - new HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(), + new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE), HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr); // Create the chained hash table http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 7797339..0ee518e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -374,6 +374,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { (spillCount == 0 && !hasMemoryForInMemorySort(totalCount)) || // If we haven't spilled so far, make sure we don't exceed the maximum number of batches SV4 can address (spillCount == 0 && totalBatches > Character.MAX_VALUE) || + // TODO(DRILL-4438) - consider setting this threshold more intelligently, + // lowering caused a failing low memory condition (test in BasicPhysicalOpUnitTest) + // to complete successfully (although it caused perf decrease as there was more spilling) + // current memory used is more than 95% of memory usage limit of this operator (oAllocator.getAllocatedMemory() > .95 * oAllocator.getLimit()) || // Number of incoming batches (BatchGroups) exceed the limit and number of incoming batches accumulated http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java index e943401..dbbe6b0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java @@ -112,8 +112,8 @@ public class JSONRecordReader extends AbstractRecordReader { // only enable all text mode if we aren't using embedded content mode. this.enableAllTextMode = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR); - this.readNumbersAsDouble = fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE).bool_val; - this.unionEnabled = fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE); + this.readNumbersAsDouble = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR); + this.unionEnabled = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE); setColumns(columns); } http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java index f853414..2a9c03d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java +++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java @@ -24,8 +24,10 @@ import java.io.UnsupportedEncodingException; import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -44,7 +46,10 @@ import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.HyperVectorWrapper; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.rpc.user.QueryDataBatch; import org.apache.drill.exec.util.Text; import org.apache.drill.exec.vector.ValueVector; @@ -147,19 +152,19 @@ public class DrillTestWrapper { i++; } } - for (HyperVectorValueIterator hvi : expectedRecords.values()) { - for (ValueVector vv : hvi.getHyperVector().getValueVectors()) { - vv.clear(); - } - } - for (HyperVectorValueIterator hvi : actualRecords.values()) { + cleanupHyperValueIterators(expectedRecords.values()); + cleanupHyperValueIterators(actualRecords.values()); + } + + private void cleanupHyperValueIterators(Collection<HyperVectorValueIterator> hyperBatches) { + for (HyperVectorValueIterator hvi : hyperBatches) { for (ValueVector vv : hvi.getHyperVector().getValueVectors()) { vv.clear(); } } } - private void compareMergedVectors(Map<String, List<Object>> expectedRecords, Map<String, List<Object>> actualRecords) throws Exception { + public static void compareMergedVectors(Map<String, List<Object>> expectedRecords, Map<String, List<Object>> actualRecords) throws Exception { for (String s : actualRecords.keySet()) { assertNotNull("Unexpected extra column " + s + " returned by query.", expectedRecords.get(s)); assertEquals("Incorrect number of rows returned by query.", expectedRecords.get(s).size(), actualRecords.get(s).size()); @@ -180,7 +185,7 @@ public class DrillTestWrapper { } } - private String printNearbyRecords(Map<String, List<Object>> expectedRecords, Map<String, List<Object>> actualRecords, int offset) { + private static String printNearbyRecords(Map<String, List<Object>> expectedRecords, Map<String, List<Object>> actualRecords, int offset) { StringBuilder expected = new StringBuilder(); StringBuilder actual = new StringBuilder(); expected.append("Expected Records near verification failure:\n"); @@ -208,8 +213,9 @@ public class DrillTestWrapper { } - private Map<String, HyperVectorValueIterator> addToHyperVectorMap(List<QueryDataBatch> records, RecordBatchLoader loader, - BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException { + private Map<String, HyperVectorValueIterator> addToHyperVectorMap(final List<QueryDataBatch> records, + final RecordBatchLoader loader) + throws SchemaChangeException, UnsupportedEncodingException { // TODO - this does not handle schema changes Map<String, HyperVectorValueIterator> combinedVectors = new TreeMap<>(); @@ -218,7 +224,6 @@ public class DrillTestWrapper { int size = records.size(); for (int i = 0; i < size; i++) { batch = records.get(i); - loader = new RecordBatchLoader(getAllocator()); loader.load(batch.getHeader().getDef(), batch.getData()); logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords); totalRecords += loader.getRecordCount(); @@ -241,30 +246,70 @@ public class DrillTestWrapper { return combinedVectors; } + private static class BatchIterator implements Iterable<VectorAccessible>, AutoCloseable { + private final List<QueryDataBatch> dataBatches; + private final RecordBatchLoader batchLoader; + + public BatchIterator(List<QueryDataBatch> dataBatches, RecordBatchLoader batchLoader) { + this.dataBatches = dataBatches; + this.batchLoader = batchLoader; + } + + @Override + public Iterator<VectorAccessible> iterator() { + return new Iterator<VectorAccessible>() { + + int index = -1; + + @Override + public boolean hasNext() { + return index < dataBatches.size() - 1; + } + + @Override + public VectorAccessible next() { + index++; + if (index == dataBatches.size()) { + throw new RuntimeException("Tried to call next when iterator had no more items."); + } + batchLoader.clear(); + QueryDataBatch batch = dataBatches.get(index); + try { + batchLoader.load(batch.getHeader().getDef(), batch.getData()); + } catch (SchemaChangeException e) { + throw new RuntimeException(e); + } + return batchLoader; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Removing is not supported"); + } + }; + } + + @Override + public void close() throws Exception { + batchLoader.clear(); + } + + } + /** - * Only use this method if absolutely needed. There are utility methods to compare results of single queries. - * The current use case for exposing this is setting session or system options between the test and verification - * queries. - * - * TODO - evaluate adding an interface to allow setting session and system options before running queries - * @param records - * @param loader - * @param schema + * @param batches * @return * @throws SchemaChangeException * @throws UnsupportedEncodingException */ - private Map<String, List<Object>> addToCombinedVectorResults(List<QueryDataBatch> records, RecordBatchLoader loader, - BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException { + public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches) + throws SchemaChangeException, UnsupportedEncodingException { // TODO - this does not handle schema changes Map<String, List<Object>> combinedVectors = new TreeMap<>(); long totalRecords = 0; - QueryDataBatch batch; - int size = records.size(); - for (int i = 0; i < size; i++) { - batch = records.get(0); - loader.load(batch.getHeader().getDef(), batch.getData()); + BatchSchema schema = null; + for (VectorAccessible loader : batches) { // TODO: Clean: DRILL-2933: That load(...) no longer throws // SchemaChangeException, so check/clean throws clause above. if (schema == null) { @@ -272,24 +317,66 @@ public class DrillTestWrapper { for (MaterializedField mf : schema) { combinedVectors.put(SchemaPath.getSimplePath(mf.getPath()).toExpr(), new ArrayList<Object>()); } + } else { + // TODO - actually handle schema changes, this is just to get access to the SelectionVectorMode + // of the current batch, the check for a null schema is used to only mutate the schema once + // need to add new vectors and null fill for previous batches? distinction between null and non-existence important? + schema = loader.getSchema(); } logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords); totalRecords += loader.getRecordCount(); for (VectorWrapper<?> w : loader) { String field = SchemaPath.getSimplePath(w.getField().getPath()).toExpr(); - for (int j = 0; j < loader.getRecordCount(); j++) { - Object obj = w.getValueVector().getAccessor().getObject(j); - if (obj != null) { - if (obj instanceof Text) { - obj = obj.toString(); + ValueVector[] vectors; + if (w.isHyper()) { + vectors = w.getValueVectors(); + } else { + vectors = new ValueVector[] {w.getValueVector()}; + } + SelectionVector2 sv2 = null; + SelectionVector4 sv4 = null; + switch(schema.getSelectionVectorMode()) { + case TWO_BYTE: + sv2 = loader.getSelectionVector2(); + break; + case FOUR_BYTE: + sv4 = loader.getSelectionVector4(); + break; + } + if (sv4 != null) { + for (int j = 0; j < sv4.getCount(); j++) { + int complexIndex = sv4.get(j); + int batchIndex = complexIndex >> 16; + int recordIndexInBatch = complexIndex & 65535; + Object obj = vectors[batchIndex].getAccessor().getObject(recordIndexInBatch); + if (obj != null) { + if (obj instanceof Text) { + obj = obj.toString(); + } + } + combinedVectors.get(field).add(obj); + } + } + else { + for (ValueVector vv : vectors) { + for (int j = 0; j < loader.getRecordCount(); j++) { + int index; + if (sv2 != null) { + index = sv2.getIndex(j); + } else { + index = j; + } + Object obj = vv.getAccessor().getObject(index); + if (obj != null) { + if (obj instanceof Text) { + obj = obj.toString(); + } + } + combinedVectors.get(field).add(obj); } } - combinedVectors.get(field).add(obj); } } - records.remove(0); - batch.release(); - loader.clear(); } return combinedVectors; } @@ -342,7 +429,6 @@ public class DrillTestWrapper { */ protected void compareUnorderedResults() throws Exception { RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); - BatchSchema schema = null; List<QueryDataBatch> actual = Collections.emptyList(); List<QueryDataBatch> expected = Collections.emptyList(); @@ -356,14 +442,14 @@ public class DrillTestWrapper { checkNumBatches(actual); addTypeInfoIfMissing(actual.get(0), testBuilder); - addToMaterializedResults(actualRecords, actual, loader, schema); + addToMaterializedResults(actualRecords, actual, loader); // If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes // the cases where the baseline is stored in a file. if (baselineRecords == null) { BaseTestQuery.test(baselineOptionSettingQueries); expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery()); - addToMaterializedResults(expectedRecords, expected, loader, schema); + addToMaterializedResults(expectedRecords, expected, loader); } else { expectedRecords = baselineRecords; } @@ -409,28 +495,24 @@ public class DrillTestWrapper { // To avoid extra work for test writers, types can optionally be inferred from the test query addTypeInfoIfMissing(actual.get(0), testBuilder); - actualSuperVectors = addToCombinedVectorResults(actual, loader, schema); + BatchIterator batchIter = new BatchIterator(actual, loader); + actualSuperVectors = addToCombinedVectorResults(batchIter); + batchIter.close(); // If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes // the cases where the baseline is stored in a file. if (baselineRecords == null) { BaseTestQuery.test(baselineOptionSettingQueries); expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery()); - expectedSuperVectors = addToCombinedVectorResults(expected, loader, schema); + BatchIterator exBatchIter = new BatchIterator(expected, loader); + expectedSuperVectors = addToCombinedVectorResults(exBatchIter); + exBatchIter.close(); } else { // data is built in the TestBuilder in a row major format as it is provided by the user // translate it here to vectorized, the representation expected by the ordered comparison - expectedSuperVectors = new TreeMap<>(); - expected = new ArrayList<>(); - for (String s : baselineRecords.get(0).keySet()) { - expectedSuperVectors.put(s, new ArrayList<>()); - } - for (Map<String, Object> m : baselineRecords) { - for (String s : m.keySet()) { - expectedSuperVectors.get(s).add(m.get(s)); - } - } + expectedSuperVectors = translateRecordListToHeapVectors(baselineRecords); } + compareMergedVectors(expectedSuperVectors, actualSuperVectors); } catch (Exception e) { throw new Exception(e.getMessage() + "\nFor query: " + query , e); @@ -439,9 +521,21 @@ public class DrillTestWrapper { } } + public static Map<String, List<Object>> translateRecordListToHeapVectors(List<Map<String, Object>> records) { + Map<String, List<Object>> ret = new TreeMap<>(); + for (String s : records.get(0).keySet()) { + ret.put(s, new ArrayList<>()); + } + for (Map<String, Object> m : records) { + for (String s : m.keySet()) { + ret.get(s).add(m.get(s)); + } + } + return ret; + } + public void compareResultsHyperVector() throws Exception { RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); - BatchSchema schema = null; BaseTestQuery.test(testOptionSettingQueries); List<QueryDataBatch> results = BaseTestQuery.testRunAndReturn(queryType, query); @@ -451,12 +545,12 @@ public class DrillTestWrapper { // To avoid extra work for test writers, types can optionally be inferred from the test query addTypeInfoIfMissing(results.get(0), testBuilder); - Map<String, HyperVectorValueIterator> actualSuperVectors = addToHyperVectorMap(results, loader, schema); + Map<String, HyperVectorValueIterator> actualSuperVectors = addToHyperVectorMap(results, loader); BaseTestQuery.test(baselineOptionSettingQueries); List<QueryDataBatch> expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery()); - Map<String, HyperVectorValueIterator> expectedSuperVectors = addToHyperVectorMap(expected, loader, schema); + Map<String, HyperVectorValueIterator> expectedSuperVectors = addToHyperVectorMap(expected, loader); compareHyperVectors(expectedSuperVectors, actualSuperVectors); cleanupBatches(results, expected); @@ -496,8 +590,10 @@ public class DrillTestWrapper { } } - protected void addToMaterializedResults(List<Map<String, Object>> materializedRecords, List<QueryDataBatch> records, RecordBatchLoader loader, - BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException { + public static void addToMaterializedResults(List<Map<String, Object>> materializedRecords, + List<QueryDataBatch> records, + RecordBatchLoader loader) + throws SchemaChangeException, UnsupportedEncodingException { long totalRecords = 0; QueryDataBatch batch; int size = records.size(); @@ -506,9 +602,6 @@ public class DrillTestWrapper { loader.load(batch.getHeader().getDef(), batch.getData()); // TODO: Clean: DRILL-2933: That load(...) no longer throws // SchemaChangeException, so check/clean throws clause above. - if (schema == null) { - schema = loader.getSchema(); - } logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords); totalRecords += loader.getRecordCount(); for (int j = 0; j < loader.getRecordCount(); j++) { @@ -531,7 +624,7 @@ public class DrillTestWrapper { } } - public boolean compareValuesErrorOnMismatch(Object expected, Object actual, int counter, String column) throws Exception { + public static boolean compareValuesErrorOnMismatch(Object expected, Object actual, int counter, String column) throws Exception { if (compareValues(expected, actual, counter, column)) { return true; @@ -554,7 +647,7 @@ public class DrillTestWrapper { return true; } - public boolean compareValues(Object expected, Object actual, int counter, String column) throws Exception { + public static boolean compareValues(Object expected, Object actual, int counter, String column) throws Exception { if (expected == null) { if (actual == null) { if (VERBOSE_DEBUG) { @@ -648,7 +741,7 @@ public class DrillTestWrapper { assertEquals(0, actualRecords.size()); } - private String findMissingColumns(Set<String> expected, Set<String> actual) { + private static String findMissingColumns(Set<String> expected, Set<String> actual) { String missingCols = ""; for (String colName : expected) { if (!actual.contains(colName)) { http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java index 8702eb5..b073371 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java @@ -194,16 +194,12 @@ public class TestBuilder { // modified code from SchemaPath.De class. This should be used sparingly and only in tests if absolutely needed. public static SchemaPath parsePath(String path) { try { - // logger.debug("Parsing expression string '{}'", expr); ExprLexer lexer = new ExprLexer(new ANTLRStringStream(path)); CommonTokenStream tokens = new CommonTokenStream(lexer); ExprParser parser = new ExprParser(tokens); - //TODO: move functionregistry and error collector to injectables. - //ctxt.findInjectableValue(valueId, forProperty, beanInstance) ExprParser.parse_return ret = parser.parse(); - // ret.e.resolveAndValidate(expr, errorCollector); if (ret.e instanceof SchemaPath) { return (SchemaPath) ret.e; } else { http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java new file mode 100644 index 0000000..6f2f160 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.unit; + +import com.google.common.collect.Lists; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.physical.config.ComplexToJson; +import org.apache.drill.exec.physical.config.ExternalSort; +import org.apache.drill.exec.physical.config.Filter; +import org.apache.drill.exec.physical.config.HashAggregate; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.physical.config.MergeJoinPOP; +import org.apache.drill.exec.physical.config.MergingReceiverPOP; +import org.apache.drill.exec.physical.config.Project; +import org.apache.drill.exec.physical.config.StreamingAggregate; +import org.apache.drill.exec.physical.config.TopN; +import org.junit.Ignore; +import org.junit.Test; + +import java.lang.reflect.Constructor; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Set; + +import static org.apache.drill.TestBuilder.mapOf; + +public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase { + + @Test + public void testSimpleProject() { + Project projectConf = new Project(parseExprs("x+5", "x"), null); + List<String> jsonBatches = Lists.newArrayList( + "[{\"x\": 5 },{\"x\": 10 }]", + "[{\"x\": 20 },{\"x\": 30 },{\"x\": 40 }]"); + opTestBuilder() + .physicalOperator(projectConf) + .inputDataStreamJson(jsonBatches) + .baselineColumns("x") + .baselineValues(10l) + .baselineValues(15l) + .baselineValues(25l) + .baselineValues(35l) + .baselineValues(45l) + .go(); + } + + @Test + public void testProjectComplexOutput() { + Project projectConf = new Project(parseExprs("convert_from(json_col, 'JSON')", "complex_col"), null); + List<String> jsonBatches = Lists.newArrayList( + "[{\"json_col\": \"{ \\\"a\\\" : 1 }\"}]", + "[{\"json_col\": \"{ \\\"a\\\" : 5 }\"}]"); + opTestBuilder() + .physicalOperator(projectConf) + .inputDataStreamJson(jsonBatches) + .baselineColumns("complex_col") + .baselineValues(mapOf("a", 1l)) + .baselineValues(mapOf("a", 5l)) + .go(); + } + + @Test + public void testSimpleHashJoin() { + HashJoinPOP joinConf = new HashJoinPOP(null, null, Lists.newArrayList(joinCond("x", "EQUALS", "x1")), JoinRelType.LEFT); + // TODO - figure out where to add validation, column names must be unique, even between the two batches, + // for all columns, not just the one in the join condition + // TODO - if any are common between the two, it is failing in the generated setup method in HashJoinProbeGen + List<String> leftJsonBatches = Lists.newArrayList( + "[{\"x\": 5, \"a\" : \"a string\"}]", + "[{\"x\": 5, \"a\" : \"a different string\"},{\"x\": 5, \"a\" : \"meh\"}]"); + List<String> rightJsonBatches = Lists.newArrayList( + "[{\"x1\": 5, \"a2\" : \"asdf\"}]", + "[{\"x1\": 6, \"a2\" : \"qwerty\"},{\"x1\": 5, \"a2\" : \"12345\"}]"); + opTestBuilder() + .physicalOperator(joinConf) + .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)) + .baselineColumns("x", "a", "a2", "x1") + .baselineValues(5l, "a string", "asdf", 5l) + .baselineValues(5l, "a string", "12345", 5l) + .baselineValues(5l, "a different string", "asdf", 5l) + .baselineValues(5l, "a different string", "12345", 5l) + .baselineValues(5l, "meh", "asdf", 5l) + .baselineValues(5l, "meh", "12345", 5l) + .go(); + } + + @Test + public void testSimpleMergeJoin() { + MergeJoinPOP joinConf = new MergeJoinPOP(null, null, Lists.newArrayList(joinCond("x", "EQUALS", "x1")), JoinRelType.LEFT); + // TODO - figure out where to add validation, column names must be unique, even between the two batches, + // for all columns, not just the one in the join condition + List<String> leftJsonBatches = Lists.newArrayList( + "[{\"x\": 5, \"a\" : \"a string\"}]", + "[{\"x\": 5, \"a\" : \"a different string\"},{\"x\": 5, \"a\" : \"meh\"}]"); + List<String> rightJsonBatches = Lists.newArrayList( + "[{\"x1\": 5, \"a2\" : \"asdf\"}]", + "[{\"x1\": 5, \"a2\" : \"12345\"}, {\"x1\": 6, \"a2\" : \"qwerty\"}]"); + opTestBuilder() + .physicalOperator(joinConf) + .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)) + .baselineColumns("x", "a", "a2", "x1") + .baselineValues(5l, "a string", "asdf", 5l) + .baselineValues(5l, "a string", "12345", 5l) + .baselineValues(5l, "a different string", "asdf", 5l) + .baselineValues(5l, "a different string", "12345", 5l) + .baselineValues(5l, "meh", "asdf", 5l) + .baselineValues(5l, "meh", "12345", 5l) + .go(); + } + + @Test + public void testSimpleHashAgg() { + HashAggregate aggConf = new HashAggregate(null, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f); + List<String> inputJsonBatches = Lists.newArrayList( + "[{\"a\": 5, \"b\" : 1 }]", + "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]"); + opTestBuilder() + .physicalOperator(aggConf) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("b_sum", "a") + .baselineValues(6l, 5l) + .baselineValues(8l, 3l) + .go(); + } + + @Test + public void testSimpleStreamAgg() { + StreamingAggregate aggConf = new StreamingAggregate(null, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f); + List<String> inputJsonBatches = Lists.newArrayList( + "[{\"a\": 5, \"b\" : 1 }]", + "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]"); + opTestBuilder() + .physicalOperator(aggConf) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("b_sum", "a") + .baselineValues(6l, 5l) + .baselineValues(8l, 3l) + .go(); + } + + @Test + public void testComplexToJson() { + ComplexToJson complexToJson = new ComplexToJson(null); + List<String> inputJsonBatches = Lists.newArrayList( + "[{\"a\": {\"b\" : 1 }}]", + "[{\"a\": {\"b\" : 5}},{\"a\": {\"b\" : 8}}]"); + opTestBuilder() + .physicalOperator(complexToJson) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("a") + .baselineValues("{\n \"b\" : 1\n}") + .baselineValues("{\n \"b\" : 5\n}") + .baselineValues("{\n \"b\" : 8\n}") + .go(); + } + + @Test + public void testFilter() { + Filter filterConf = new Filter(null, parseExpr("a=5"), 1.0f); + List<String> inputJsonBatches = Lists.newArrayList( + "[{\"a\": 5, \"b\" : 1 }]", + "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]", + "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]"); + opTestBuilder() + .physicalOperator(filterConf) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("a", "b") + .baselineValues(5l, 1l) + .baselineValues(5l, 5l) + .go(); + } + + @Test + public void testExternalSort() { + ExternalSort sortConf = new ExternalSort(null, + Lists.newArrayList(ordering("b", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false); + List<String> inputJsonBatches = Lists.newArrayList( + "[{\"a\": 5, \"b\" : 1 }]", + "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]", + "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]"); + opTestBuilder() + .physicalOperator(sortConf) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("a", "b") + .baselineValues(5l, 1l) + .baselineValues(40l, 3l) + .baselineValues(5l, 5l) + .baselineValues(3l, 8l) + .baselineValues(13l, 100l) + .go(); + } + + private void externalSortLowMemoryHelper(int batchSize, int numberOfBatches, long initReservation, long maxAllocation) { + ExternalSort sortConf = new ExternalSort(null, + Lists.newArrayList(ordering("b", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false); + List<String> inputJsonBatches = Lists.newArrayList(); + StringBuilder batchString = new StringBuilder(); + for (int j = 0; j < numberOfBatches; j++) { + batchString.append("["); + for (int i = 0; i < batchSize; i++) { + batchString.append("{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8},"); + } + batchString.append("{\"a\": 5, \"b\" : 1 }"); + batchString.append("]"); + inputJsonBatches.add(batchString.toString()); + } + + OperatorTestBuilder opTestBuilder = + opTestBuilder() + .initReservation(initReservation) + .maxAllocation(maxAllocation) + .physicalOperator(sortConf) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("a", "b"); + for (int i = 0; i < numberOfBatches; i++) { + opTestBuilder.baselineValues(5l, 1l); + } + for (int i = 0; i < batchSize * numberOfBatches; i++) { + opTestBuilder.baselineValues(5l, 5l); + } + for (int i = 0; i < batchSize * numberOfBatches; i++) { + opTestBuilder.baselineValues(3l, 8l); + } + opTestBuilder.go(); + } + + // TODO - Failing with - org.apache.drill.exec.exception.OutOfMemoryException: Unable to allocate buffer of size 262144 (rounded from 147456) due to memory limit. Current allocation: 16422656 + // look in ExternalSortBatch for this JIRA number, changing this percentage of the allocator limit that is + // the threshold for spilling (it worked with 0.65 for me) "fixed" the problem but hurt perf, will want + // to find a better solutions to this problem. When it is fixed this threshold will likely become unnecessary + @Test + @Ignore("DRILL-4438") + public void testExternalSortLowMemory1() { + externalSortLowMemoryHelper(4960, 100, 10000000, 16500000); + } + + // TODO- believe this was failing in the scan not the sort, may not require a fix + @Test + @Ignore("DRILL-4438") + public void testExternalSortLowMemory2() { + externalSortLowMemoryHelper(4960, 100, 10000000, 15000000); + } + + // TODO - believe this was failing in the scan not the sort, may not require a fix + @Test + @Ignore("DRILL-4438") + public void testExternalSortLowMemory3() { + externalSortLowMemoryHelper(40960, 10, 10000000, 10000000); + } + + // TODO - Failing with - org.apache.drill.exec.exception.OutOfMemoryException: Unable to allocate sv2 buffer after repeated attempts + // see comment above testExternalSortLowMemory1 about TODO left in ExternalSortBatch + @Test + @Ignore("DRILL-4438") + public void testExternalSortLowMemory4() { + externalSortLowMemoryHelper(15960, 30, 10000000, 14500000); + } + + @Test + public void testTopN() { + TopN sortConf = new TopN(null, + Lists.newArrayList(ordering("b", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false, 3); + List<String> inputJsonBatches = Lists.newArrayList( + "[{\"a\": 5, \"b\" : 1 }]", + "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]", + "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]"); + opTestBuilder() + .physicalOperator(sortConf) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("a", "b") + .baselineValues(5l, 1l) + .baselineValues(40l, 3l) + .baselineValues(5l, 5l) + .go(); + } + + // TODO(DRILL-4439) - doesn't expect incoming batches, uses instead RawFragmentBatch + // need to figure out how to mock these + @Ignore + @Test + public void testSimpleMergingReceiver() { + MergingReceiverPOP mergeConf = new MergingReceiverPOP(-1, Lists.<MinorFragmentEndpoint>newArrayList(), + Lists.newArrayList(ordering("x", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false); + List<String> leftJsonBatches = Lists.newArrayList( + "[{\"x\": 5, \"a\" : \"a string\"}]", + "[{\"x\": 5, \"a\" : \"a different string\"},{\"x\": 5, \"a\" : \"meh\"}]"); + List<String> rightJsonBatches = Lists.newArrayList( + "[{\"x\": 5, \"a\" : \"asdf\"}]", + "[{\"x\": 5, \"a\" : \"12345\"}, {\"x\": 6, \"a\" : \"qwerty\"}]"); + opTestBuilder() + .physicalOperator(mergeConf) + .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)) + .baselineColumns("x", "a") + .baselineValues(5l, "a string") + .baselineValues(5l, "a different string") + .baselineValues(5l, "meh") + .baselineValues(5l, "asdf") + .baselineValues(5l, "12345") + .baselineValues(6l, "qwerty") + .go(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java new file mode 100644 index 0000000..245e5bb --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.unit; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import mockit.Delegate; +import mockit.Injectable; +import mockit.NonStrictExpectations; +import org.antlr.runtime.ANTLRStringStream; +import org.antlr.runtime.CommonTokenStream; +import org.antlr.runtime.RecognitionException; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.drill.DrillTestWrapper; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.parser.ExprLexer; +import org.apache.drill.common.expression.parser.ExprParser; +import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.common.logical.data.Order; +import org.apache.drill.common.scanner.ClassPathScanner; +import org.apache.drill.common.scanner.persistence.ScanResult; +import org.apache.drill.exec.compile.CodeCompiler; +import org.apache.drill.exec.compile.TemplateClassDefinition; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.RootAllocatorFactory; +import org.apache.drill.exec.ops.BufferManagerImpl; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry; +import org.apache.drill.exec.physical.impl.ScanBatch; +import org.apache.drill.exec.physical.impl.project.Projector; +import org.apache.drill.exec.physical.impl.project.ProjectorTemplate; +import org.apache.drill.exec.proto.ExecProtos; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.server.options.TypeValidators; +import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.store.easy.json.JSONRecordReader; +import org.apache.drill.exec.testing.ExecutionControls; +import org.apache.drill.test.DrillTest; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Look! Doesn't extend BaseTestQuery!! + */ +public class PhysicalOpUnitTestBase extends DrillTest { + + @Injectable FragmentContext fragContext; + @Injectable OperatorContext opContext; + @Injectable OperatorStats opStats; + @Injectable OptionManager optManager; + @Injectable PhysicalOperator popConf; + @Injectable ExecutionControls executionControls; + + private final DrillConfig drillConf = DrillConfig.create(); + private final BufferAllocator allocator = RootAllocatorFactory.newRoot(drillConf); + private final BufferManagerImpl bufManager = new BufferManagerImpl(allocator); + private final ScanResult classpathScan = ClassPathScanner.fromPrescan(drillConf); + private final FunctionImplementationRegistry funcReg = new FunctionImplementationRegistry(drillConf, classpathScan); + private final TemplateClassDefinition templateClassDefinition = new TemplateClassDefinition<>(Projector.class, ProjectorTemplate.class); + private final OperatorCreatorRegistry opCreatorReg = new OperatorCreatorRegistry(classpathScan); + + protected LogicalExpression parseExpr(String expr) { + ExprLexer lexer = new ExprLexer(new ANTLRStringStream(expr)); + CommonTokenStream tokens = new CommonTokenStream(lexer); + ExprParser parser = new ExprParser(tokens); + try { + return parser.parse().e; + } catch (RecognitionException e) { + throw new RuntimeException("Error parsing expression: " + expr); + } + } + + protected Order.Ordering ordering(String expression, RelFieldCollation.Direction direction, RelFieldCollation.NullDirection nullDirection) { + return new Order.Ordering(direction, parseExpr(expression), nullDirection); + } + + protected JoinCondition joinCond(String leftExpr, String relationship, String rightExpr) { + return new JoinCondition(relationship, parseExpr(leftExpr), parseExpr(rightExpr)); + } + + protected List<NamedExpression> parseExprs(String... expressionsAndOutputNames) { + Preconditions.checkArgument(expressionsAndOutputNames.length %2 ==0, "List of expressions and output field names" + + " is not complete, each expression must explicitly give and output name,"); + List<NamedExpression> ret = new ArrayList<>(); + for (int i = 0; i < expressionsAndOutputNames.length; i += 2) { + ret.add(new NamedExpression(parseExpr(expressionsAndOutputNames[i]), + new FieldReference(new SchemaPath(new PathSegment.NameSegment(expressionsAndOutputNames[i+1]))))); + } + return ret; + } + + + void runTest(OperatorTestBuilder testBuilder) { + BatchCreator<PhysicalOperator> opCreator; + RecordBatch testOperator; + try { + mockFragmentContext(testBuilder.initReservation, testBuilder.maxAllocation); + opCreator = (BatchCreator<PhysicalOperator>) + opCreatorReg.getOperatorCreator(testBuilder.popConfig.getClass()); + List<RecordBatch> incomingStreams = Lists.newArrayList(); + for (List<String> batchesJson : testBuilder.inputStreamsJSON) { + incomingStreams.add(new ScanBatch(null, fragContext, + getRecordReadersForJsonBatches(batchesJson, fragContext))); + } + testOperator = opCreator.getBatch(fragContext, testBuilder.popConfig, incomingStreams); + + Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator)); + Map<String, List<Object>> expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(testBuilder.baselineRecords); + DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors); + + } catch (ExecutionSetupException e) { + throw new RuntimeException(e); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } catch (SchemaChangeException e) { + throw new RuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static class BatchIterator implements Iterable<VectorAccessible> { + + private RecordBatch operator; + public BatchIterator(RecordBatch operator) { + this.operator = operator; + } + + @Override + public Iterator<VectorAccessible> iterator() { + return new Iterator<VectorAccessible>() { + boolean needToGrabNext = true; + RecordBatch.IterOutcome lastResultOutcome; + @Override + public boolean hasNext() { + if (needToGrabNext) { + lastResultOutcome = operator.next(); + needToGrabNext = false; + } + if (lastResultOutcome == RecordBatch.IterOutcome.NONE + || lastResultOutcome == RecordBatch.IterOutcome.STOP) { + return false; + } else if (lastResultOutcome == RecordBatch.IterOutcome.OUT_OF_MEMORY) { + throw new RuntimeException("Operator ran out of memory"); + } else { + return true; + } + } + + @Override + public VectorAccessible next() { + if (needToGrabNext) { + lastResultOutcome = operator.next(); + } + needToGrabNext = true; + return operator; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove is not supported."); + } + }; + } + } + + protected OperatorTestBuilder opTestBuilder() { + return new OperatorTestBuilder(); + } + + protected class OperatorTestBuilder { + + private PhysicalOperator popConfig; + private String[] baselineColumns; + private List<Map<String, Object>> baselineRecords; + private List<List<String>> inputStreamsJSON; + private long initReservation = 10000000; + private long maxAllocation = 15000000; + + public void go() { + runTest(this); + } + + public OperatorTestBuilder physicalOperator(PhysicalOperator batch) { + this.popConfig = batch; + return this; + } + + public OperatorTestBuilder initReservation(long initReservation) { + this.initReservation = initReservation; + return this; + } + + public OperatorTestBuilder maxAllocation(long maxAllocation) { + this.maxAllocation = maxAllocation; + return this; + } + + public OperatorTestBuilder inputDataStreamJson(List<String> jsonBatches) { + this.inputStreamsJSON = new ArrayList<>(); + this.inputStreamsJSON.add(jsonBatches); + return this; + } + + public OperatorTestBuilder inputDataStreamsJson(List<List<String>> childStreams) { + this.inputStreamsJSON = childStreams; + return this; + } + + public OperatorTestBuilder baselineColumns(String... columns) { + for (int i = 0; i < columns.length; i++) { + LogicalExpression ex = parseExpr(columns[i]); + if (ex instanceof SchemaPath) { + columns[i] = ((SchemaPath)ex).toExpr(); + } else { + throw new IllegalStateException("Schema path is not a valid format."); + } + } + this.baselineColumns = columns; + return this; + } + + public OperatorTestBuilder baselineValues(Object ... baselineValues) { + if (baselineRecords == null) { + baselineRecords = new ArrayList(); + } + Map<String, Object> ret = new HashMap(); + int i = 0; + Preconditions.checkArgument(baselineValues.length == baselineColumns.length, + "Must supply the same number of baseline values as columns."); + for (String s : baselineColumns) { + ret.put(s, baselineValues[i]); + i++; + } + this.baselineRecords.add(ret); + return this; + } + } + + private void mockFragmentContext(long initReservation, long maxAllocation) { + final CodeCompiler compiler = new CodeCompiler(drillConf, optManager); + final BufferAllocator allocator = this.allocator.newChildAllocator("allocator_for_operator_test", initReservation, maxAllocation); + new NonStrictExpectations() { + { + optManager.getOption(withAny(new TypeValidators.BooleanValidator("", false))); result = false; + // TODO(DRILL-4450) - Probably want to just create a default option manager, this is a hack to prevent + // the code compilation from failing when trying to decide of scalar replacement is turned on + // this will cause other code paths to fail because this return value won't be valid for most + // string options + optManager.getOption(withAny(new TypeValidators.StringValidator("", "try"))); result = "try"; + optManager.getOption(withAny(new TypeValidators.PositiveLongValidator("", 1l, 1l))); result = 10; + fragContext.getOptions(); result = optManager; + fragContext.getManagedBuffer(); result = bufManager.getManagedBuffer(); + fragContext.shouldContinue(); result = true; + fragContext.getExecutionControls(); result = executionControls; + fragContext.getFunctionRegistry(); result = funcReg; + fragContext.getConfig(); result = drillConf; + fragContext.getHandle(); result = ExecProtos.FragmentHandle.getDefaultInstance(); + try { + fragContext.getImplementationClass(withAny(CodeGenerator.get(templateClassDefinition, funcReg))); + result = new Delegate() + { + Object getImplementationClass(CodeGenerator gen) throws IOException, ClassTransformationException { + return compiler.getImplementationClass(gen); + } + }; + fragContext.getImplementationClass(withAny(CodeGenerator.get(templateClassDefinition, funcReg).getRoot())); + result = new Delegate() + { + Object getImplementationClass(ClassGenerator gen) throws IOException, ClassTransformationException { + return compiler.getImplementationClass(gen.getCodeGenerator()); + } + }; + } catch (ClassTransformationException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } + opContext.getStats();result = opStats; + opContext.getAllocator(); result = allocator; + fragContext.newOperatorContext(withAny(popConf));result = opContext; + } + }; + } + + private Iterator<RecordReader> getRecordReadersForJsonBatches(List<String> jsonBatches, FragmentContext fragContext) { + ObjectMapper mapper = new ObjectMapper(); + List<RecordReader> readers = new ArrayList<>(); + for (String batchJason : jsonBatches) { + JsonNode records; + try { + records = mapper.readTree(batchJason); + } catch (IOException e) { + throw new RuntimeException(e); + } + readers.add(new JSONRecordReader(fragContext, records, null, Collections.singletonList(SchemaPath.getSimplePath("*")))); + } + return readers.iterator(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/d93a3633/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcDataTest.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcDataTest.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcDataTest.java index 56e58dc..fd5d4f0 100644 --- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcDataTest.java +++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcDataTest.java @@ -186,7 +186,7 @@ public class JdbcDataTest extends JdbcTestBase { Scan scan = findOnlyOperator(plan, Scan.class); Assert.assertEquals("donuts-json", scan.getStorageEngine()); Project project = findOnlyOperator(plan, Project.class); - Assert.assertEquals(1, project.getSelections().length); + Assert.assertEquals(1, project.getSelections().size()); Assert.assertEquals(Scan.class, project.getInput().getClass()); Store store = findOnlyOperator(plan, Store.class); Assert.assertEquals("queue", store.getStorageEngine()); @@ -244,9 +244,9 @@ public class JdbcDataTest extends JdbcTestBase { Assert.assertTrue(filter.getInput() instanceof Scan); Project[] projects = Iterables.toArray(findOperator(plan, Project.class), Project.class); Assert.assertEquals(2, projects.length); - Assert.assertEquals(1, projects[0].getSelections().length); + Assert.assertEquals(1, projects[0].getSelections().size()); Assert.assertEquals(Filter.class, projects[0].getInput().getClass()); - Assert.assertEquals(2, projects[1].getSelections().length); + Assert.assertEquals(2, projects[1].getSelections().size()); Assert.assertEquals(Project.class, projects[1].getInput().getClass()); Store store = findOnlyOperator(plan, Store.class); Assert.assertEquals("queue", store.getStorageEngine());
