Repository: drill Updated Branches: refs/heads/master a9ea4ec1c -> 039530a41
DRILL-6071: Limit batch size for flatten operator closes #1091 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/039530a4 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/039530a4 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/039530a4 Branch: refs/heads/master Commit: 039530a4195ba8fa4532b9ca92980206fa66c181 Parents: a9ea4ec Author: Padma Penumarthy <[email protected]> Authored: Wed Jan 10 05:06:58 2018 -0800 Committer: Boaz Ben-Zvi <[email protected]> Committed: Tue Jan 30 12:00:47 2018 -0800 ---------------------------------------------------------------------- .../org/apache/drill/exec/ExecConstants.java | 3 + .../impl/flatten/FlattenRecordBatch.java | 55 ++ .../physical/impl/flatten/FlattenTemplate.java | 142 +-- .../exec/physical/impl/flatten/Flattener.java | 1 + .../physical/impl/spill/RecordBatchSizer.java | 50 +- .../server/options/SystemOptionManager.java | 3 +- .../src/main/resources/drill-module.conf | 1 + .../physical/unit/BasicPhysicalOpUnitTest.java | 29 + .../physical/unit/MiniPlanUnitTestBase.java | 2 +- .../physical/unit/PhysicalOpUnitTestBase.java | 45 +- .../exec/physical/unit/TestOutputBatchSize.java | 882 +++++++++++++++++++ .../org/apache/drill/test/DrillTestWrapper.java | 39 +- 12 files changed, 1095 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 25f6135..c949e51 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -76,6 +76,9 @@ public final class ExecConstants { public static final String SPILL_FILESYSTEM = "drill.exec.spill.fs"; public static final String SPILL_DIRS = "drill.exec.spill.directories"; + public static final String OUTPUT_BATCH_SIZE = "drill.exec.memory.operator.output_batch_size"; + public static final LongValidator OUTPUT_BATCH_SIZE_VALIDATOR = new RangeLongValidator(OUTPUT_BATCH_SIZE, 1024, 512 * 1024 * 1024); + // External Sort Boot configuration public static final String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size"; http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index 8be16ad..9483f29 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -28,6 +28,7 @@ import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.common.types.Types; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; @@ -40,6 +41,7 @@ import org.apache.drill.exec.expr.ValueVectorReadExpression; import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.FlattenPOP; +import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; @@ -68,6 +70,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { private boolean hasRemainder = false; private int remainderIndex = 0; private int recordCount; + private long outputBatchSize; private final Flattener.Monitor monitor = new Flattener.Monitor() { @Override @@ -94,8 +97,57 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { } } + private class FlattenMemoryManager { + private final int outputRowCount; + private static final int OFFSET_VECTOR_WIDTH = 4; + private static final int WORST_CASE_FRAGMENTATION_FACTOR = 2; + private static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT; + private static final int MIN_NUM_ROWS = 1; + + private FlattenMemoryManager(RecordBatch incoming, long outputBatchSize, SchemaPath flattenColumn) { + // Get sizing information for the batch. + RecordBatchSizer sizer = new RecordBatchSizer(incoming); + + final TypedFieldId typedFieldId = incoming.getValueVectorId(flattenColumn); + final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]); + + // Get column size of flatten column. + RecordBatchSizer.ColumnSize columnSize = RecordBatchSizer.getColumn(incoming.getValueAccessorById(field.getValueClass(), + typedFieldId.getFieldIds()).getValueVector(), field.getName()); + + // Average rowWidth of flatten column + final int avgRowWidthFlattenColumn = RecordBatchSizer.safeDivide(columnSize.netSize, incoming.getRecordCount()); + + // Average rowWidth excluding the flatten column. + final int avgRowWidthWithOutFlattenColumn = sizer.netRowWidth() - avgRowWidthFlattenColumn; + + // Average rowWidth of single element in the flatten list. + // subtract the offset vector size from column data size. + final int avgRowWidthSingleFlattenEntry = + RecordBatchSizer.safeDivide(columnSize.netSize - (OFFSET_VECTOR_WIDTH * columnSize.valueCount), columnSize.elementCount); + + // Average rowWidth of outgoing batch. + final int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + avgRowWidthSingleFlattenEntry; + + // Number of rows in outgoing batch + outputRowCount = Math.max(MIN_NUM_ROWS, Math.min(MAX_NUM_ROWS, + RecordBatchSizer.safeDivide((outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR), avgOutgoingRowWidth))); + + logger.debug("flatten incoming batch sizer : {}, outputBatchSize : {}," + + "avgOutgoingRowWidth : {}, outputRowCount : {}", sizer, outputBatchSize, avgOutgoingRowWidth, outputRowCount); + } + + public int getOutputRowCount() { + return outputRowCount; + } + } + + public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { super(pop, context, incoming); + + // get the output batch size from config. + outputBatchSize = context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); } @Override @@ -148,6 +200,9 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { @Override protected IterOutcome doWork() { + FlattenMemoryManager flattenMemoryManager = new FlattenMemoryManager(incoming, outputBatchSize, popConfig.getColumn()); + flattener.setOutputCount(flattenMemoryManager.getOutputRowCount()); + int incomingRecordCount = incoming.getRecordCount(); if (!doAlloc()) { http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java index ed20429..cd58bfd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java @@ -31,6 +31,7 @@ import org.apache.drill.exec.record.TransferPair; import com.google.common.collect.ImmutableList; +import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.RepeatedValueVector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,8 +39,7 @@ import org.slf4j.LoggerFactory; public abstract class FlattenTemplate implements Flattener { private static final Logger logger = LoggerFactory.getLogger(FlattenTemplate.class); - private static final int OUTPUT_BATCH_SIZE = 4*1024; - private static final int OUTPUT_MEMORY_LIMIT = 512 * 1024 * 1024; + private static final int OUTPUT_ROW_COUNT = ValueVector.MAX_ROW_COUNT; private ImmutableList<TransferPair> transfers; private BufferAllocator outputAllocator; @@ -47,14 +47,12 @@ public abstract class FlattenTemplate implements Flattener { private RepeatedValueVector fieldToFlatten; private RepeatedValueVector.RepeatedAccessor accessor; private int valueIndex; - private boolean bigRecords = false; - private int bigRecordsBufferSize; /** - * The output batch limit starts at OUTPUT_BATCH_SIZE, but may be decreased + * The output batch limit starts at OUTPUT_ROW_COUNT, but may be decreased * if records are found to be large. */ - private int outputLimit = OUTPUT_BATCH_SIZE; + private int outputLimit = OUTPUT_ROW_COUNT; // this allows for groups to be written between batches if we run out of space, for cases where we have finished // a batch on the boundary it will be set to 0 @@ -73,6 +71,11 @@ public abstract class FlattenTemplate implements Flattener { } @Override + public void setOutputCount(int outputCount) { + outputLimit = outputCount; + } + + @Override public final int flattenRecords(final int recordCount, final int firstOutputIndex, final Flattener.Monitor monitor) { switch (svMode) { @@ -101,75 +104,10 @@ public abstract class FlattenTemplate implements Flattener { for ( ; innerValueIndexLocal < innerValueCount; innerValueIndexLocal++) { // If we've hit the batch size limit, stop and flush what we've got so far. if (recordsThisCall == outputLimit) { - if (bigRecords) { - /* - * We got to the limit we used before, but did we go over - * the bigRecordsBufferSize in the second half of the batch? If - * so, we'll need to adjust the batch limits. - */ - adjustBatchLimits(1, monitor, recordsThisCall); - } - // Flush this batch. break outer; } - /* - * At the moment, the output record includes the input record, so for very - * large records that we're flattening, we're carrying forward the original - * record as well as the flattened element. We've seen a case where flattening a 4MB - * record with a 20,000 element array causing memory usage to explode. To avoid - * that until we can push down the selected fields to operators like this, we - * also limit the amount of memory in use at one time. - * - * We have to have written at least one record to be able to get a buffer that will - * have a real allocator, so we have to do this lazily. We won't check the limit - * for the first two records, but that keeps this simple. - */ - if (bigRecords) { - /* - * If we're halfway through the outputLimit, check on our memory - * usage so far. - */ - if (recordsThisCall == outputLimit / 2) { - /* - * If we've used more than half the space we've used for big records - * in the past, we've seen even bigger records than before, so stop and - * see if we need to flush here before we go over bigRecordsBufferSize - * memory usage, and reduce the outputLimit further before we continue - * with the next batch. - */ - if (adjustBatchLimits(2, monitor, recordsThisCall)) { - break outer; - } - } - } else { - if (outputAllocator.getAllocatedMemory() > OUTPUT_MEMORY_LIMIT) { - /* - * We're dealing with big records. Reduce the outputLimit to - * the current record count, and take note of how much space the - * vectors report using for that. We'll use those numbers as limits - * going forward in order to avoid allocating more memory. - */ - bigRecords = true; - outputLimit = Math.min(recordsThisCall, outputLimit); - if (outputLimit < 1) { - throw new IllegalStateException("flatten outputLimit (" + outputLimit - + ") won't make progress"); - } - - /* - * This will differ from what the allocator reports because of - * overhead. But the allocator check is much cheaper to do, so we - * only compute this at selected times. - */ - bigRecordsBufferSize = monitor.getBufferSizeFor(recordsThisCall); - - // Stop and flush. - break outer; - } - } - try { doEval(valueIndexLocal, outputIndex); } catch (OversizedAllocationException ex) { @@ -211,68 +149,6 @@ public abstract class FlattenTemplate implements Flattener { } } - /** - * Determine if the current batch record limit needs to be adjusted (when handling - * bigRecord mode). If so, adjust the limit, and return true, otherwise return false. - * - * <p>If the limit is adjusted, it will always be adjusted down, because we need to operate - * based on the largest sized record we've ever seen.</p> - * - * <p>If the limit is adjusted, then the current batch should be flushed, because - * continuing would lead to going over the large memory limit that has already been - * established.</p> - * - * @param multiplier Multiply currently used memory (according to the monitor) before - * checking against past memory limits. This allows for checking the currently used - * memory after processing a fraction of the expected batch limit, but using that as - * a predictor of the full batch's size. For example, if this is checked after half - * the batch size limit's records are processed, then using a multiplier of two will - * do the check under the assumption that processing the full batch limit will use - * twice as much memory. - * @param monitor the Flattener.Monitor instance to use for the current memory usage check - * @param recordsThisCall the number of records processed so far during this call to - * flattenRecords(). - * @return true if the batch size limit was adjusted, false otherwise - */ - private boolean adjustBatchLimits(final int multiplier, final Flattener.Monitor monitor, - final int recordsThisCall) { - assert bigRecords : "adjusting batch limits when no big records"; - final int bufferSize = multiplier * monitor.getBufferSizeFor(recordsThisCall); - - /* - * If the amount of space we've used so far is below the amount that triggered - * the bigRecords mode, then no adjustment is needed. - */ - if (bufferSize <= bigRecordsBufferSize) { - return false; - } - - /* - * We've used more space than we've used for big records in the past, we've seen - * even bigger records, so we need to adjust our limits, and flush what we've got so far. - * - * We should reduce the outputLimit proportionately to get the predicted - * amount of memory used back down to bigRecordsBufferSize. - * - * The number of records to limit is therefore - * outputLimit * - * (1 - (bufferSize - bigRecordsBufferSize) / bigRecordsBufferSize) - * - * Doing some algebra on the multiplier: - * (bigRecordsBufferSize - (bufferSize - bigRecordsBufferSize)) / bigRecordsBufferSize - * (bigRecordsBufferSize - bufferSize + bigRecordsBufferSize) / bigRecordsBufferSize - * (2 * bigRecordsBufferSize - bufferSize) / bigRecordsBufferSize - * - * If bufferSize has gotten so big that this would be negative, we'll - * just go down to one record per batch. We need to check for that on - * outputLimit anyway, in order to make sure that we make progress. - */ - final int newLimit = (int) - (outputLimit * (2.0 * ((double) bigRecordsBufferSize) - bufferSize) / bigRecordsBufferSize); - outputLimit = Math.max(1, newLimit); - return true; - } - @Override public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException{ http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java index 392757e..b1d93c1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java @@ -45,6 +45,7 @@ public interface Flattener { int flattenRecords(int recordCount, int firstOutputIndex, Monitor monitor); void setFlattenField(RepeatedValueVector repeatedColumn); + void setOutputCount(int outputCount); RepeatedValueVector getFlattenField(); http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java index 5808f45..0fe67d5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java @@ -93,9 +93,19 @@ public class RecordBatchSizer { public final int elementCount; + /** + * Size of the top level value vector. For map and repeated list, + * this is just size of offset vector. + */ public int dataSize; /** + * Total size of the column includes the sum total of memory for all + * value vectors representing the column. + */ + public int netSize; + + /** * The estimated, average number of elements per parent value. * Always 1 for a non-repeated type. For a repeated type, * this is the average entries per array (per repeated element). @@ -131,9 +141,15 @@ public class RecordBatchSizer { break; default: dataSize = v.getPayloadByteCount(valueCount); - stdSize = TypeHelper.getSize(metadata.getType()) * elementCount; + try { + stdSize = TypeHelper.getSize(metadata.getType()) * elementCount; + } catch (Exception e) { + // For unsupported types, just set stdSize to 0. + stdSize = 0; + } } estSize = safeDivide(dataSize, valueCount); + netSize = v.getPayloadByteCount(valueCount); } @SuppressWarnings("resource") @@ -154,8 +170,14 @@ public class RecordBatchSizer { return childCount; } + @SuppressWarnings("resource") private void buildList(ValueVector v) { - @SuppressWarnings("resource") + // complex ListVector cannot be casted to RepeatedListVector. + // check the mode. + if (v.getField().getDataMode() != DataMode.REPEATED) { + dataSize = v.getPayloadByteCount(valueCount); + return; + } UInt4Vector offsetVector = ((RepeatedListVector) v).getOffsetVector(); dataSize = offsetVector.getPayloadByteCount(valueCount); } @@ -232,6 +254,10 @@ public class RecordBatchSizer { } } + public static ColumnSize getColumn(ValueVector v, String prefix) { + return new ColumnSize(v, prefix); + } + public static final int MAX_VECTOR_SIZE = ValueVector.MAX_BUFFER_SIZE; // 16 MiB private List<ColumnSize> columnSizes = new ArrayList<>(); @@ -380,14 +406,18 @@ public class RecordBatchSizer { // vectors do consume space, so visit columns recursively. switch (v.getField().getType().getMinorType()) { - case MAP: - expandMap((AbstractMapVector) v, prefix + v.getField().getName() + "."); - break; - case LIST: - expandList((RepeatedListVector) v, prefix + v.getField().getName() + "."); - break; - default: - v.collectLedgers(ledgers); + case MAP: + expandMap((AbstractMapVector) v, prefix + v.getField().getName() + "."); + break; + case LIST: + // complex ListVector cannot be casted to RepeatedListVector. + // do not expand the list if it is not repeated mode. + if (v.getField().getDataMode() == DataMode.REPEATED) { + expandList((RepeatedListVector) v, prefix + v.getField().getName() + "."); + } + break; + default: + v.collectLedgers(ledgers); } netRowWidth += colSize.estSize; http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 369f3bc..4dba96d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -211,7 +211,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(ExecConstants.PERSISTENT_TABLE_UMASK_VALIDATOR), new OptionDefinition(ExecConstants.CPU_LOAD_AVERAGE), new OptionDefinition(ExecConstants.ENABLE_VECTOR_VALIDATOR), - new OptionDefinition(ExecConstants.ENABLE_ITERATOR_VALIDATOR) + new OptionDefinition(ExecConstants.ENABLE_ITERATOR_VALIDATOR), + new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)) }; final CaseInsensitiveMap<OptionDefinition> map = CaseInsensitiveMap.newHashMap(); http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 28b7975..5659c82 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -420,6 +420,7 @@ drill.exec.options: { drill.exec.storage.implicit.fqn.column.label: "fqn", drill.exec.storage.implicit.suffix.column.label: "suffix", drill.exec.testing.controls: "{}", + drill.exec.memory.operator.output_batch_size : 33554432, # 32 MB exec.bulk_load_table_list.bulk_size: 1000, exec.compile.scalar_replacement: false, exec.enable_bulk_load_table_list: false, http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java index ec85f21..fb8160a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java @@ -23,7 +23,9 @@ import java.util.List; import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.MinorFragmentEndpoint; +import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.ComplexToJson; import org.apache.drill.exec.physical.config.ExternalSort; import org.apache.drill.exec.physical.config.Filter; @@ -34,6 +36,7 @@ import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.physical.config.TopN; +import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.planner.physical.AggPrelBase; import org.junit.Ignore; import org.junit.Test; @@ -189,6 +192,32 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase { } @Test + public void testFlatten() { + final PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("b")); + List<String> inputJsonBatches = Lists.newArrayList(); + StringBuilder batchString = new StringBuilder(); + + for (int j = 0; j < 1; j++) { + batchString.append("["); + for (int i = 0; i < 1; i++) { + batchString.append("{\"a\": 5, \"b\" : [5, 6, 7]}"); + } + batchString.append("]"); + inputJsonBatches.add(batchString.toString()); + } + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(flatten) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("a", "b") + .baselineValues(5l, 5l) + .baselineValues(5l, 6l) + .baselineValues(5l, 7l); + + opTestBuilder.go(); + } + + @Test public void testExternalSort() { ExternalSort sortConf = new ExternalSort(null, Lists.newArrayList(ordering("b", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false); http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java index e6e72e7..7eafb86 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java @@ -152,7 +152,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { } Map<String, List<Object>> actualSuperVectors = new TreeMap<String, List<Object>>(); - int actualBatchNum = DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectSchema, actualSuperVectors); + int actualBatchNum = DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectSchema, null, null, actualSuperVectors); if (expectBatchNum != null) { if (expectBatchNum != actualBatchNum) { throw new AssertionError(String.format("Expected %s batches from operator tree. But operators return %s batch!", expectBatchNum, actualBatchNum)); http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java index b01dd3e..d1ad990 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java @@ -74,6 +74,7 @@ import org.mockito.Mockito; import java.io.IOException; import java.io.UnsupportedEncodingException; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -84,6 +85,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.TreeMap; public class PhysicalOpUnitTestBase extends ExecTest { protected MockExecutorFragmentContext fragContext; @@ -209,16 +211,19 @@ public class PhysicalOpUnitTestBase extends ExecTest { private List<List<String>> inputStreamsJSON; private long initReservation = AbstractBase.INIT_ALLOCATION; private long maxAllocation = AbstractBase.MAX_ALLOCATION; + private boolean checkBatchMemory; + private boolean expectNoRows; + private Long expectedBatchSize; + private Integer expectedNumBatches; - @SuppressWarnings({ "unchecked", "resource" }) + @SuppressWarnings({"unchecked", "resource"}) public void go() { BatchCreator<PhysicalOperator> opCreator; RecordBatch testOperator; try { mockOpContext(popConfig, initReservation, maxAllocation); - opCreator = (BatchCreator<PhysicalOperator>) - opCreatorReg.getOperatorCreator(popConfig.getClass()); + opCreator = (BatchCreator<PhysicalOperator>) opCreatorReg.getOperatorCreator(popConfig.getClass()); List<RecordBatch> incomingStreams = Lists.newArrayList(); if (inputStreamsJSON != null) { for (List<String> batchesJson : inputStreamsJSON) { @@ -229,8 +234,19 @@ public class PhysicalOpUnitTestBase extends ExecTest { testOperator = opCreator.getBatch(fragContext, popConfig, incomingStreams); - Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator)); - Map<String, List<Object>> expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords); + Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator), expectedBatchSize, expectedNumBatches); + + Map<String, List<Object>> expectedSuperVectors; + + if (expectNoRows) { + expectedSuperVectors = new TreeMap<>(); + for (String column : baselineColumns) { + expectedSuperVectors.put(column, new ArrayList<>()); + } + } else { + expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords); + } + DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors); } catch (ExecutionSetupException e) { @@ -281,7 +297,7 @@ public class PhysicalOpUnitTestBase extends ExecTest { return this; } - public OperatorTestBuilder baselineValues(Object ... baselineValues) { + public OperatorTestBuilder baselineValues(Object... baselineValues) { if (baselineRecords == null) { baselineRecords = new ArrayList<>(); } @@ -296,6 +312,21 @@ public class PhysicalOpUnitTestBase extends ExecTest { this.baselineRecords.add(ret); return this; } + + public OperatorTestBuilder expectZeroRows() { + this.expectNoRows = true; + return this; + } + + public OperatorTestBuilder expectedNumBatches(Integer expectedNumBatches) { + this.expectedNumBatches = expectedNumBatches; + return this; + } + + public OperatorTestBuilder expectedBatchSize(Long batchSize) { + this.expectedBatchSize = batchSize; + return this; + } } /** @@ -448,7 +479,7 @@ public class PhysicalOpUnitTestBase extends ExecTest { return getJsonReadersFromBatchString(jsonBatches, fragContext, Collections.singletonList(SchemaPath.getSimplePath("*"))); } - private List<RecordReader> getReaderListForJsonBatches(List<String> jsonBatches, FragmentContext fragContext) { + public List<RecordReader> getReaderListForJsonBatches(List<String> jsonBatches, FragmentContext fragContext) { Iterator<RecordReader> readers = getRecordReadersForJsonBatches(jsonBatches, fragContext); List<RecordReader> readerList = new ArrayList<>(); while(readers.hasNext()) { http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java new file mode 100644 index 0000000..4a1dc8f --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java @@ -0,0 +1,882 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.unit; + +import com.google.common.collect.Lists; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; + +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.FlattenPOP; +import org.apache.drill.exec.physical.impl.ScanBatch; +import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.util.JsonStringArrayList; +import org.apache.drill.exec.util.JsonStringHashMap; +import org.apache.drill.exec.util.Text; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TestOutputBatchSize extends PhysicalOpUnitTestBase { + private static final long initReservation = AbstractBase.INIT_ALLOCATION; + private static final long maxAllocation = AbstractBase.MAX_ALLOCATION; + // Keeping row count below 4096 so we do not produce more than one batch. + // scanBatch with json reader produces batches of 4k. + private int numRows = 4000; + private static final String wideString = + "b00dUrA0oa2i4ZEHg6zvPXPXlVQYB2BXe8T5gIEtvUDzcN6yUkIqyS07gaAy8k4ac6Bn1cxblsXFnkp8g8hiQkUMJPyl6" + + "l0jTdsIzQ4PkVCURGGyF0aduGqCXUaKp91gqkRMvLhHhmrHdEb22QN20dXEHSygR7vrb2zZhhfWeJbXRsesuYDqdGig801IAS6VWRIdQtJ6gaRhCdNz"; + + /** + * Figures out what will be total size of the batches for a given Json input batch. + */ + private long getExpectedSize(List<String> expectedJsonBatches) throws ExecutionSetupException { + // Create a dummy scanBatch to figure out the size. + RecordBatch scanBatch = new ScanBatch(new MockPhysicalOperator(), fragContext, getReaderListForJsonBatches(expectedJsonBatches, fragContext)); + Iterable<VectorAccessible> batches = new BatchIterator(scanBatch); + + long totalSize = 0; + for (VectorAccessible batch : batches) { + RecordBatchSizer sizer = new RecordBatchSizer(batch); + totalSize += sizer.netSize(); + } + return totalSize; + } + + @Test + public void testFlattenFixedWidth() throws Exception { + PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); + mockOpContext(flatten, initReservation, maxAllocation); + + // create input rows like this. + // "a" : 5, "b" : wideString, "c" : [6,7,8,9] + List<String> inputJsonBatches = Lists.newArrayList(); + StringBuilder batchString = new StringBuilder(); + batchString.append("["); + for (int i = 0; i < numRows; i++) { + batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [6, 7, 8, 9]},"); + } + batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [6, 7, 8, 9]}"); + batchString.append("]"); + inputJsonBatches.add(batchString.toString()); + + // Figure out what will be approximate total output size out of flatten for input above + // We will use this sizing information to set output batch size so we can produce desired + // number of batches that can be verified. + + // output rows will be like this. + // "a" : 5, "b" : wideString, "c" : 6 + // "a" : 5, "b" : wideString, "c" : 7 + // "a" : 5, "b" : wideString, "c" : 8 + // "a" : 5, "b" : wideString, "c" : 9 + List<String> expectedJsonBatches = Lists.newArrayList(); + StringBuilder expectedBatchString = new StringBuilder(); + expectedBatchString.append("["); + for (int i = 0; i < numRows; i++) { + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 6},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 7},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 8},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 9},"); + } + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 6},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 7},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 8},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 9}"); + expectedBatchString.append("]"); + expectedJsonBatches.add(expectedBatchString.toString()); + + long totalSize = getExpectedSize(expectedJsonBatches); + + // set the output batch size to 1/2 of total size expected. + // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(flatten) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("a", "b", "c") + .expectedNumBatches(4) // verify number of batches + .expectedBatchSize(totalSize / 2); // verify batch size. + + for (int i = 0; i < numRows + 1; i++) { + opTestBuilder.baselineValues(5l, wideString, 6l); + opTestBuilder.baselineValues(5l, wideString, 7l); + opTestBuilder.baselineValues(5l, wideString, 8l); + opTestBuilder.baselineValues(5l, wideString, 9l); + } + + opTestBuilder.go(); + } + + @Test + public void testFlattenVariableWidth() throws Exception { + PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); + mockOpContext(flatten, initReservation, maxAllocation); + + // create input rows like this. + // "a" : 5, "b" : wideString, "c" : ["parrot", "hummingbird", "owl", "woodpecker", "peacock"] + List<String> inputJsonBatches = Lists.newArrayList(); + StringBuilder batchString = new StringBuilder(); + batchString.append("["); + for (int i = 0; i < numRows; i++) { + batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\",\"c\" : [\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\", \"peacock\"]},"); + } + batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\",\"c\" : [\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\", \"peacock\"]}"); + batchString.append("]"); + inputJsonBatches.add(batchString.toString()); + + // Figure out what will be approximate total output size out of flatten for input above + // We will use this sizing information to set output batch size so we can produce desired + // number of batches that can be verified. + + // output rows will be like this. + // "a" : 5, "b" : wideString, "c" : parrot + // "a" : 5, "b" : wideString, "c" : hummingbird + // "a" : 5, "b" : wideString, "c" : owl + // "a" : 5, "b" : wideString, "c" : woodpecker + // "a" : 5, "b" : wideString, "c" : peacock + List<String> expectedJsonBatches = Lists.newArrayList(); + StringBuilder expectedBatchString = new StringBuilder(); + expectedBatchString.append("["); + for (int i = 0; i < numRows; i++) { + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"parrot\"},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"hummingbird\"},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"owl\"},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"woodpecker\"},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"peacock\"},"); + } + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"parrot\"},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"hummingbird\"},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"owl\"},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"woodpecker\"},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"peacock\"}"); + expectedBatchString.append("]"); + expectedJsonBatches.add(expectedBatchString.toString()); + + long totalSize = getExpectedSize(expectedJsonBatches); + + // set the output batch size to 1/2 of total size expected. + // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(flatten) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("a", "b", "c") + .expectedNumBatches(4) // verify number of batches + .expectedBatchSize(totalSize / 2); // verify batch size. + + for (int i = 0; i < numRows + 1; i++) { + opTestBuilder.baselineValues(5l, wideString, "parrot"); + opTestBuilder.baselineValues(5l, wideString, "hummingbird"); + opTestBuilder.baselineValues(5l, wideString, "owl"); + opTestBuilder.baselineValues(5l, wideString, "woodpecker"); + opTestBuilder.baselineValues(5l, wideString, "peacock"); + } + + opTestBuilder.go(); + } + + @Test + public void testFlattenFixedWidthList() throws Exception { + PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); + mockOpContext(flatten, initReservation, maxAllocation); + + // create input rows like this. + // "a" : 5, "b" : wideString, "c" : [[1,2,3,4], [5,6,7,8]] + List<String> inputJsonBatches = Lists.newArrayList(); + StringBuilder batchString = new StringBuilder(); + batchString.append("["); + for (int i = 0; i < numRows; i++) { + batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" + "[1,2,3,4]," + "[5,6,7,8]" + "]"); + batchString.append("},"); + } + batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" + "[1,2,3,4]," + "[5,6,7,8]" + "]"); + batchString.append("}]"); + inputJsonBatches.add(batchString.toString()); + + // Figure out what will be approximate total output size out of flatten for input above + // We will use this sizing information to set output batch size so we can produce desired + // number of batches that can be verified. + + // output rows will be like this. + // "a" : 5, "b" : wideString, "c" : [1,2,3,4] + // "a" : 5, "b" : wideString, "c" : [5,6,7,8] + List<String> expectedJsonBatches = Lists.newArrayList(); + StringBuilder expectedBatchString = new StringBuilder(); + expectedBatchString.append("["); + for (int i = 0; i < numRows; i++) { + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"[1,2,3,4]\"},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"[5,6,7,8]\"},"); + } + + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"[1,2,3,4]\"},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"[5,6,7,8]\"}"); + + expectedBatchString.append("]"); + expectedJsonBatches.add(expectedBatchString.toString()); + + long totalSize = getExpectedSize(expectedJsonBatches); + // set the output batch size to 1/2 of total size expected. + // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(flatten) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("a", "b", "c") + .expectedNumBatches(4) // verify number of batches + .expectedBatchSize(totalSize); // verify batch size. + + for (int i = 0; i < numRows + 1; i++) { + opTestBuilder.baselineValues(5l, wideString, new ArrayList<Long>(Arrays.asList(1L, 2L, 3L, 4L))); + opTestBuilder.baselineValues(5l, wideString, new ArrayList<Long>(Arrays.asList(5L, 6L, 7L, 8L))); + } + + opTestBuilder.go(); + } + + @Test + public void testFlattenVariableWidthList() throws Exception { + PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); + mockOpContext(flatten, initReservation, maxAllocation); + List<String> inputJsonBatches = Lists.newArrayList(); + StringBuilder batchString = new StringBuilder(); + + // create input rows like this. + // "a" : 5, "b" : wideString, "c" : [["parrot", "hummingbird", "owl", "woodpecker"], ["hawk", "nightingale", "swallow", "peacock"]] + batchString.append("["); + for (int i = 0; i < numRows; i++) { + batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + + "\"c\" : [" + "[\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\"]," + "[\"hawk\",\"nightingale\",\"swallow\",\"peacock\"]" + "]"); + batchString.append("},"); + } + batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + + "\"c\" : [" + "[\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\"]," + "[\"hawk\",\"nightingale\",\"swallow\",\"peacock\"]" + "]"); + batchString.append("}]"); + inputJsonBatches.add(batchString.toString()); + + // Figure out what will be approximate total output size out of flatten for input above + // We will use this sizing information to set output batch size so we can produce desired + // number of batches that can be verified. + + // output rows will be like this. + // "a" : 5, "b" : wideString, "c" : ["parrot", "hummingbird", "owl", "woodpecker"] + // "a" : 5, "b" : wideString, "c" : ["hawk", "nightingale", "swallow", "peacock"] + List<String> expectedJsonBatches = Lists.newArrayList(); + StringBuilder expectedBatchString = new StringBuilder(); + + expectedBatchString.append("["); + for (int i = 0; i < numRows; i++) { + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\"]},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [\"hawk\", \"nightingale\", \"swallow\", \"peacock\"]},"); + } + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\"]},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [\"hawk\", \"nightingale\", \"swallow\", \"peacock\"]}"); + + expectedBatchString.append("]"); + expectedJsonBatches.add(expectedBatchString.toString()); + + long totalSize = getExpectedSize(expectedJsonBatches); + // set the output batch size to 1/2 of total size expected. + // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(flatten) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("a", "b", "c") + .expectedNumBatches(4) // verify number of batches + .expectedBatchSize(totalSize); // verify batch size. + + final JsonStringArrayList<Text> birds1 = new JsonStringArrayList<Text>() {{ + add(new Text("parrot")); + add(new Text("hummingbird")); + add(new Text("owl")); + add(new Text("woodpecker")); + }}; + + final JsonStringArrayList<Text> birds2 = new JsonStringArrayList<Text>() {{ + add(new Text("hawk")); + add(new Text("nightingale")); + add(new Text("swallow")); + add(new Text("peacock")); + }}; + + for (int i = 0; i < numRows + 1; i++) { + opTestBuilder.baselineValues(5l, wideString, birds1); + opTestBuilder.baselineValues(5l, wideString, birds2); + } + + opTestBuilder.go(); + } + + @Test + public void testFlattenMap() throws Exception { + PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); + mockOpContext(flatten, initReservation, maxAllocation); + + // create input rows like this. + // "a" : 5, "b" : wideString, "c" : [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}] + + List<String> inputJsonBatches = Lists.newArrayList(); + StringBuilder batchString = new StringBuilder(); + + batchString.append("["); + for (int i = 0; i < numRows; i++) { + batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + + "\"c\" : [" + " { \"trans_id\":\"t1\", \"amount\":100, " + + "\"trans_time\":7777777, \"type\":\"sports\"}," + + " { \"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}"); + batchString.append("]},"); + } + batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + + "\"c\" : [" + " { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777," + + " \"type\":\"sports\"}," + + " { \"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}"); + batchString.append("]}]"); + inputJsonBatches.add(batchString.toString()); + + // Figure out what will be approximate total output size out of flatten for input above + // We will use this sizing information to set output batch size so we can produce desired + // number of batches that can be verified. + + // output rows will be like this. + // "a" : 5, "b" : wideString, "c" : {"trans_id":"t1", amount:100, trans_time:7777777, type:sports} + // "a" : 5, "b" : wideString, "c" : {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries} + List<String> expectedJsonBatches = Lists.newArrayList(); + StringBuilder expectedBatchString = new StringBuilder(); + + expectedBatchString.append("["); + for (int i = 0; i < numRows; i++) { + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + + "{\"trans_id\":\"t1\", \"amount\":100, " + + "\"trans_time\":7777777, \"type\":\"sports\"}},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + + "{\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}},"); + } + + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + + "{\"trans_id\":\"t1\", \"amount\":100, " + + "\"trans_time\":7777777, \"type\":\"sports\"}},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + + "{\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}}"); + + expectedBatchString.append("]"); + expectedJsonBatches.add(expectedBatchString.toString()); + + long totalSize = getExpectedSize(expectedJsonBatches); + // set the output batch size to 1/2 of total size expected. + // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(flatten) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("a", "b", "c") + .expectedNumBatches(4) // verify number of batches + .expectedBatchSize(totalSize / 2); // verify batch size. + + JsonStringHashMap<String, Object> resultExpected1 = new JsonStringHashMap<>(); + resultExpected1.put("trans_id", new Text("t1")); + resultExpected1.put("amount", new Long(100)); + resultExpected1.put("trans_time", new Long(7777777)); + resultExpected1.put("type", new Text("sports")); + + JsonStringHashMap<String, Object> resultExpected2 = new JsonStringHashMap<>(); + resultExpected2.put("trans_id", new Text("t2")); + resultExpected2.put("amount", new Long(1000)); + resultExpected2.put("trans_time", new Long(8888888)); + resultExpected2.put("type", new Text("groceries")); + + for (int i = 0; i < numRows + 1; i++) { + opTestBuilder.baselineValues(5l, wideString, resultExpected1); + opTestBuilder.baselineValues(5l, wideString, resultExpected2); + } + + opTestBuilder.go(); + } + + @Test + public void testFlattenListOfMaps() throws Exception { + PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); + mockOpContext(flatten, initReservation, maxAllocation); + + // create input rows like this. + // "a" : 5, "b" : wideString, + // "c" : [ [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}], + // [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}], + // [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}] ] + + List<String> inputJsonBatches = Lists.newArrayList(); + StringBuilder batchString = new StringBuilder(); + + batchString.append("["); + for (int i = 0; i < numRows; i++) { + batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" + + "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " + + "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ], " + + "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " + + "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ], " + + "[ { \"trans_id\":\"t1\", \"amount\":100, " + + "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " + + "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ]"); + batchString.append("]},"); + } + batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" + + "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " + + "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ], " + + "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " + + "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ], " + + "[ { \"trans_id\":\"t1\", \"amount\":100, " + + "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " + + "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ]"); + batchString.append("]}]"); + inputJsonBatches.add(batchString.toString()); + + // Figure out what will be approximate total output size out of flatten for input above + // We will use this sizing information to set output batch size so we can produce desired + // number of batches that can be verified. + + // output rows will be like this. + // "a" : 5, "b" : wideString, "c" : [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}] + // "a" : 5, "b" : wideString, "c" : [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}] + List<String> expectedJsonBatches = Lists.newArrayList(); + StringBuilder expectedBatchString = new StringBuilder(); + + expectedBatchString.append("["); + for (int i = 0; i < numRows; i++) { + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + + "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " + + "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ]},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + + "[ { \"trans_id\":\"t1\", \"amount\":100, " + + "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " + + "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + + "[ { \"trans_id\":\"t1\", \"amount\":100, " + + "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " + + "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]},"); + } + + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + + "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " + + "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ]},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + + "[ { \"trans_id\":\"t1\", \"amount\":100, " + + "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " + + "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]},"); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + + "[ { \"trans_id\":\"t1\", \"amount\":100, " + + "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " + + "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]}"); + + expectedBatchString.append("]"); + expectedJsonBatches.add(expectedBatchString.toString()); + + long totalSize = getExpectedSize(expectedJsonBatches); + // set the output batch size to 1/2 of total size expected. + // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(flatten) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("a", "b", "c") + .expectedNumBatches(4) // verify number of batches + .expectedBatchSize(totalSize / 2); // verify batch size. + + final JsonStringHashMap<String, Object> resultExpected1 = new JsonStringHashMap<>(); + resultExpected1.put("trans_id", new Text("t1")); + resultExpected1.put("amount", new Long(100)); + resultExpected1.put("trans_time", new Long(7777777)); + resultExpected1.put("type", new Text("sports")); + + final JsonStringHashMap<String, Object> resultExpected2 = new JsonStringHashMap<>(); + resultExpected2.put("trans_id", new Text("t2")); + resultExpected2.put("amount", new Long(1000)); + resultExpected2.put("trans_time", new Long(8888888)); + resultExpected2.put("type", new Text("groceries")); + + final JsonStringArrayList<JsonStringHashMap<String, Object>> results = new JsonStringArrayList<JsonStringHashMap<String, Object>>() {{ + add(resultExpected1); + add(resultExpected2); + }}; + + for (int i = 0; i < numRows + 1; i++) { + opTestBuilder.baselineValues(5l, wideString, results); + opTestBuilder.baselineValues(5l, wideString, results); + opTestBuilder.baselineValues(5l, wideString, results); + } + + opTestBuilder.go(); + } + + @Test + public void testFlattenNestedMap() throws Exception { + PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); + mockOpContext(flatten, initReservation, maxAllocation); + + // create input rows like this. + // "a" : 5, "b" : wideString, + // "c" : [ {innerMap: {"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, "trans_id":"t1", amount:100, trans_time:8888888, type:groceries}, + // {innerMap: {"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, "trans_id":"t1", amount:100, trans_time:8888888, type:groceries} ] + + List<String> inputJsonBatches = Lists.newArrayList(); + StringBuilder batchString = new StringBuilder(); + + StringBuilder innerMap = new StringBuilder(); + innerMap.append("{ \"trans_id\":\"inner_trans_t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}"); + + batchString.append("["); + for (int i = 0; i < numRows; i++) { + batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" + + " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " + + "\"type\":\"sports\"}," + " { \"innerMap\": " + innerMap + + ", \"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]"); + batchString.append("},"); + } + batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" + + " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " + + "\"type\":\"sports\"}," + " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}"); + batchString.append("]}]"); + inputJsonBatches.add(batchString.toString()); + + // Figure out what will be approximate total output size out of flatten for input above + // We will use this sizing information to set output batch size so we can produce desired + // number of batches that can be verified. + + // output rows will be like this. + // "a" : 5, "b" : wideString, "c" : {innerMap: {"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, "trans_id":"t1", amount:100, trans_time:8888888, type:groceries} + // "a" : 5, "b" : wideString, "c" : {innerMap: {"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, "trans_id":"t1", amount:100, trans_time:8888888, type:groceries} + List<String> expectedJsonBatches = Lists.newArrayList(); + StringBuilder expectedBatchString = new StringBuilder(); + + expectedBatchString.append("["); + for (int i = 0; i < numRows; i++) { + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + + " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " + + "\"type\":\"sports\"} }, "); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + + " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " + + "\"type\":\"sports\"} }, "); + } + + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + + " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " + + "\"type\":\"sports\"} }, "); + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + + " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " + + "\"type\":\"sports\"} }"); + + expectedBatchString.append("]"); + expectedJsonBatches.add(expectedBatchString.toString()); + + long totalSize = getExpectedSize(expectedJsonBatches); + // set the output batch size to 1/2 of total size expected. + // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(flatten) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("a", "b", "c") + .expectedNumBatches(4) // verify number of batches + .expectedBatchSize(totalSize / 2); // verify batch size. + + JsonStringHashMap<String, Object> innerMapResult = new JsonStringHashMap<>(); + innerMapResult.put("trans_id", new Text("inner_trans_t1")); + innerMapResult.put("amount", new Long(100)); + innerMapResult.put("trans_time", new Long(7777777)); + innerMapResult.put("type", new Text("sports")); + + JsonStringHashMap<String, Object> resultExpected1 = new JsonStringHashMap<>(); + resultExpected1.put("trans_id", new Text("t1")); + resultExpected1.put("amount", new Long(100)); + resultExpected1.put("trans_time", new Long(7777777)); + resultExpected1.put("type", new Text("sports")); + resultExpected1.put("innerMap", innerMapResult); + + JsonStringHashMap<String, Object> resultExpected2 = new JsonStringHashMap<>(); + resultExpected2.put("trans_id", new Text("t2")); + resultExpected2.put("amount", new Long(1000)); + resultExpected2.put("trans_time", new Long(8888888)); + resultExpected2.put("type", new Text("groceries")); + resultExpected2.put("innerMap", innerMapResult); + + for (int i = 0; i < numRows + 1; i++) { + opTestBuilder.baselineValues(5l, wideString, resultExpected1); + opTestBuilder.baselineValues(5l, wideString, resultExpected2); + } + + opTestBuilder.go(); + } + + @Test + public void testFlattenUpperLimit() throws Exception { + // test the upper limit of 65535 records per batch. + PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); + mockOpContext(flatten, initReservation, maxAllocation); + List<String> inputJsonBatches = Lists.newArrayList(); + StringBuilder batchString = new StringBuilder(); + + StringBuilder flattenElement = new StringBuilder(); + + // Create list of 1000 elements + flattenElement.append("["); + for (int i = 0; i < 1000; i++) { + flattenElement.append(i); + flattenElement.append(","); + } + flattenElement.append(1000); + flattenElement.append("]"); + + batchString.append("["); + + numRows = 1000; + + for (int i = 0; i < numRows; i++) { + batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\":" + flattenElement + "},"); + } + batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\":" + flattenElement + "}"); + batchString.append("]"); + inputJsonBatches.add(batchString.toString()); + + // Figure out what will be approximate total output size out of flatten for input above + // We will use this sizing information to set output batch size so we can produce desired + // number of batches that can be verified. + List<String> expectedJsonBatches = Lists.newArrayList(); + StringBuilder expectedBatchString = new StringBuilder(); + + expectedBatchString.append("["); + for (int i = 0; i < numRows; i++) { + for (int j = 0; j < 1000; j++) { + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" :"); + expectedBatchString.append(j); + expectedBatchString.append("},"); + } + } + for (int j = 0; j < 999; j++) { + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" :"); + expectedBatchString.append(j); + expectedBatchString.append("},"); + } + + expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" :"); + expectedBatchString.append(1000); + expectedBatchString.append("}"); + + expectedBatchString.append("]"); + expectedJsonBatches.add(expectedBatchString.toString()); + + long totalSize = getExpectedSize(expectedJsonBatches); + // set the output batch size to 1/2 of total size expected. + // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); + + // Here we expect 16 batches because each batch will be limited by upper limit of 65535 records. + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(flatten) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("a", "b", "c") + .expectedNumBatches(16) // verify number of batches + .expectedBatchSize(totalSize / 2); // verify batch size. + + for (long i = 0; i < numRows + 1; i++) { + for (long j = 0; j < 1001; j++) { + opTestBuilder.baselineValues(5l, wideString, j); + } + } + + opTestBuilder.go(); + } + + @Test + public void testFlattenLowerLimit() throws Exception { + // test the lower limit of at least one batch + PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); + mockOpContext(flatten, initReservation, maxAllocation); + + List<String> inputJsonBatches = Lists.newArrayList(); + StringBuilder batchString = new StringBuilder(); + StringBuilder flattenElement = new StringBuilder(); + + // Create list of 10 elements + flattenElement.append("["); + for (int i = 0; i < 10; i++) { + flattenElement.append(i); + flattenElement.append(","); + } + flattenElement.append(10); + flattenElement.append("]"); + + // create list of wideStrings + final StringBuilder wideStrings = new StringBuilder(); + wideStrings.append("["); + for (int i = 0; i < 10; i++) { + wideStrings.append("\"" + wideString + "\","); + } + wideStrings.append("\"" + wideString + "\""); + wideStrings.append("]"); + + batchString.append("["); + batchString.append("{\"a\": " + wideStrings + "," + "\"c\":" + flattenElement); + batchString.append("}]"); + inputJsonBatches.add(batchString.toString()); + + // Figure out what will be approximate total output size out of flatten for input above + // We will use this sizing information to set output batch size so we can produce desired + // number of batches that can be verified. + + // set very low value of batch size for a large record size. + // This is to test we atleast get one record per batch. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 1024); + + // Here we expect 10 batches because each batch will be bounded by lower limit of at least 1 record. + // do not check the output batch size as it will be more than configured value of 1024, so we get + // at least one record out. + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(flatten) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("a", "c") + .expectedNumBatches(10); // verify number of batches + + final JsonStringArrayList<Text> results = new JsonStringArrayList<Text>() {{ + add(new Text(wideString)); + add(new Text(wideString)); + add(new Text(wideString)); + add(new Text(wideString)); + add(new Text(wideString)); + add(new Text(wideString)); + add(new Text(wideString)); + add(new Text(wideString)); + add(new Text(wideString)); + add(new Text(wideString)); + add(new Text(wideString)); + }}; + + for (long j = 0; j < 11; j++) { + opTestBuilder.baselineValues(results, j); + } + + opTestBuilder.go(); + } + + @Test + public void testFlattenEmptyList() throws Exception { + final PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("b")); + + List<String> inputJsonBatches = Lists.newArrayList(); + StringBuilder batchString = new StringBuilder(); + + StringBuilder flattenElement = new StringBuilder(); + + flattenElement.append("["); + flattenElement.append("]"); + + batchString.append("["); + for (int i = 0; i < numRows; i++) { + batchString.append("{\"a\": 5, " + "\"b\" : " + flattenElement + "},"); + } + batchString.append("{\"a\": 5, " + "\"b\" : " + flattenElement + "}"); + batchString.append("]"); + inputJsonBatches.add(batchString.toString()); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(flatten) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("a", "b") + .expectZeroRows(); + + opTestBuilder.go(); + } + + @Test + public void testFlattenLargeRecords() throws Exception { + PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); + mockOpContext(flatten, initReservation, maxAllocation); + + // create input rows like this. + // "a" : <id1>, "b" : wideString, "c" : [ 10 wideStrings ] + List<String> inputJsonBatches = Lists.newArrayList(); + StringBuilder batchString = new StringBuilder(); + int arrayLength = 10; + StringBuilder test = new StringBuilder(); + test.append("[ \""); + for (int i = 0; i < arrayLength; i++) { + test.append(wideString); + test.append("\",\""); + } + test.append(wideString); + test.append("\"]"); + + batchString.append("["); + for (int i = 0; i < numRows; i++) { + batchString.append("{" + "\"a\" :" + (new StringBuilder().append(i)) + ",\"b\": \"" + wideString + "\"," + + "\"c\": " + test + "},"); + } + batchString.append("{" + "\"a\" :" + (new StringBuilder().append(numRows)) + ",\"b\": \"" + wideString + "\"," + + "\"c\": " + test + "}"); + batchString.append("]"); + inputJsonBatches.add(batchString.toString()); + + // output rows will be like this. + // "a" : <id1>, "b" : wideString, "c" : wideString + + // Figure out what will be approximate total output size out of flatten for input above + // We will use this sizing information to set output batch size so we can produce desired + // number of batches that can be verified. + List<String> expectedJsonBatches = Lists.newArrayList(); + StringBuilder expectedBatchString = new StringBuilder(); + expectedBatchString.append("["); + for (int k = 0; k < (numRows) * 11; k++) { + expectedBatchString.append("{" + "\"a\" :" + (new StringBuilder().append(k)) + ",\"b\": \"" + wideString + "\","); + expectedBatchString.append("\"c\": \"" + wideString + "\"},"); + } + expectedBatchString.append("{" + "\"a\" :" + (new StringBuilder().append(numRows)) + ",\"b\": \"" + wideString + "\","); + expectedBatchString.append("\"c\": \"" + wideString + "\"}"); + expectedBatchString.append("]"); + expectedJsonBatches.add(expectedBatchString.toString()); + + long totalSize = getExpectedSize(expectedJsonBatches); + // set the output batch size to 1/2 of total size expected. + // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten. + fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); + + OperatorTestBuilder opTestBuilder = opTestBuilder() + .physicalOperator(flatten) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("a", "b", "c") + .expectedNumBatches(4) // verify number of batches + .expectedBatchSize(totalSize / 2); // verify batch size. + + for (long k = 0; k < ((numRows + 1)); k++) { + for (int j = 0; j < arrayLength + 1; j++) { + opTestBuilder.baselineValues(k, wideString, wideString); + } + } + + opTestBuilder.go(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java index cd68bf3..c470b0d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java @@ -40,6 +40,8 @@ import org.apache.drill.common.types.Types; import org.apache.drill.exec.HyperVectorValueIterator; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; +import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.QueryType; import org.apache.drill.exec.record.BatchSchema; @@ -318,10 +320,11 @@ public class DrillTestWrapper { * @throws SchemaChangeException * @throws UnsupportedEncodingException */ - public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches) + public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches, + Long expectedBatchSize, Integer expectedNumBatches) throws SchemaChangeException, UnsupportedEncodingException { Map<String, List<Object>> combinedVectors = new TreeMap<>(); - addToCombinedVectorResults(batches, null, combinedVectors); + addToCombinedVectorResults(batches, null, expectedBatchSize, expectedNumBatches, combinedVectors); return combinedVectors; } @@ -336,12 +339,15 @@ public class DrillTestWrapper { * @throws SchemaChangeException * @throws UnsupportedEncodingException */ - public static int addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema, Map<String, List<Object>> combinedVectors) + public static int addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema, + Long expectedBatchSize, Integer expectedNumBatches, + Map<String, List<Object>> combinedVectors) throws SchemaChangeException, UnsupportedEncodingException { // TODO - this does not handle schema changes int numBatch = 0; long totalRecords = 0; BatchSchema schema = null; + for (VectorAccessible loader : batches) { numBatch++; if (expectedSchema != null) { @@ -352,6 +358,13 @@ public class DrillTestWrapper { } } + if (expectedBatchSize != null) { + RecordBatchSizer sizer = new RecordBatchSizer(loader); + // Not checking actualSize as accounting is not correct when we do + // split and transfer ownership across operators. + Assert.assertTrue(sizer.netSize() <= expectedBatchSize); + } + // TODO: Clean: DRILL-2933: That load(...) no longer throws // SchemaChangeException, so check/clean throws clause above. if (schema == null) { @@ -367,6 +380,7 @@ public class DrillTestWrapper { } logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords); totalRecords += loader.getRecordCount(); + for (VectorWrapper<?> w : loader) { String field = SchemaPath.getSimplePath(w.getField().getName()).toExpr(); ValueVector[] vectors; @@ -420,6 +434,16 @@ public class DrillTestWrapper { } } } + + if (expectedNumBatches != null) { + // Based on how much memory is actually taken by value vectors (because of doubling stuff), + // we have to do complex math for predicting exact number of batches. + // Instead, check that number of batches is at least the minimum that is expected + // and no more than twice of that. + Assert.assertTrue(numBatch >= expectedNumBatches); + Assert.assertTrue(numBatch <= (2*expectedNumBatches)); + } + return numBatch; } @@ -539,7 +563,7 @@ public class DrillTestWrapper { addTypeInfoIfMissing(actual.get(0), testBuilder); BatchIterator batchIter = new BatchIterator(actual, loader); - actualSuperVectors = addToCombinedVectorResults(batchIter); + actualSuperVectors = addToCombinedVectorResults(batchIter, null, null); batchIter.close(); // If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes @@ -552,7 +576,7 @@ public class DrillTestWrapper { test(baselineOptionSettingQueries); expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery()); BatchIterator exBatchIter = new BatchIterator(expected, loader); - expectedSuperVectors = addToCombinedVectorResults(exBatchIter); + expectedSuperVectors = addToCombinedVectorResults(exBatchIter, null, null); exBatchIter.close(); } } else { @@ -587,6 +611,11 @@ public class DrillTestWrapper { public static Map<String, List<Object>> translateRecordListToHeapVectors(List<Map<String, Object>> records) { Map<String, List<Object>> ret = new TreeMap<>(); + + if (records == null) { + return ret; + } + for (String s : records.get(0).keySet()) { ret.put(s, new ArrayList<>()); }
