[
https://issues.apache.org/jira/browse/DRILL-6356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495591#comment-16495591
]
ASF GitHub Bot commented on DRILL-6356:
---------------------------------------
Ben-Zvi closed pull request #1255: DRILL-6356: batch sizing for union all
URL: https://github.com/apache/drill/pull/1255
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
index d731ca4c24..dcb944512c 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
@@ -29,7 +29,7 @@
import
org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
-import org.apache.drill.exec.record.JoinBatchMemoryManager;
+import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
/**
@@ -53,9 +53,10 @@
register(CoreOperatorType.EXTERNAL_SORT_VALUE,
ExternalSortBatch.Metric.class);
register(CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE,
ParquetRecordReader.Metric.class);
register(CoreOperatorType.FLATTEN_VALUE, FlattenRecordBatch.Metric.class);
- register(CoreOperatorType.MERGE_JOIN_VALUE,
JoinBatchMemoryManager.Metric.class);
- register(CoreOperatorType.LATERAL_JOIN_VALUE,
JoinBatchMemoryManager.Metric.class);
+ register(CoreOperatorType.MERGE_JOIN_VALUE,
AbstractBinaryRecordBatch.Metric.class);
+ register(CoreOperatorType.LATERAL_JOIN_VALUE,
AbstractBinaryRecordBatch.Metric.class);
register(CoreOperatorType.UNNEST_VALUE, UnnestRecordBatch.Metric.class);
+ register(CoreOperatorType.UNION_VALUE,
AbstractBinaryRecordBatch.Metric.class);
}
private static void register(final int operatorType, final Class<? extends
MetricDef> metricDef) {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index b4d0e7726d..f4c1900a5f 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -27,6 +27,7 @@
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
@@ -40,6 +41,7 @@
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
@@ -68,6 +70,10 @@
public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children,
FragmentContext context) throws OutOfMemoryException {
super(config, context, true, children.get(0), children.get(1));
+
+ // get the output batch size from config.
+ int configuredBatchSize = (int)
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+ batchMemoryManager = new RecordBatchMemoryManager(numInputs,
configuredBatchSize);
}
@Override
@@ -106,9 +112,9 @@ public IterOutcome innerNext() {
return IterOutcome.NONE;
}
- Pair<IterOutcome, RecordBatch> nextBatch = unionInputIterator.next();
+ Pair<IterOutcome, BatchStatusWrappper> nextBatch =
unionInputIterator.next();
IterOutcome upstream = nextBatch.left;
- RecordBatch incoming = nextBatch.right;
+ BatchStatusWrappper batchStatus = nextBatch.right;
switch (upstream) {
case NONE:
@@ -116,14 +122,14 @@ public IterOutcome innerNext() {
case STOP:
return upstream;
case OK_NEW_SCHEMA:
- return doWork(nextBatch.right, true);
+ return doWork(batchStatus, true);
case OK:
// skip batches with same schema as the previous one yet having 0
row.
- if (incoming.getRecordCount() == 0) {
- VectorAccessibleUtilities.clear(incoming);
+ if (batchStatus.batch.getRecordCount() == 0) {
+ VectorAccessibleUtilities.clear(batchStatus.batch);
continue;
}
- return doWork(nextBatch.right, false);
+ return doWork(batchStatus, false);
default:
throw new IllegalStateException(String.format("Unknown state %s.",
upstream));
}
@@ -142,19 +148,26 @@ public int getRecordCount() {
@SuppressWarnings("resource")
- private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) throws
ClassTransformationException, IOException, SchemaChangeException {
- Preconditions.checkArgument(inputBatch.getSchema().getFieldCount() ==
container.getSchema().getFieldCount(),
+ private IterOutcome doWork(BatchStatusWrappper batchStatus, boolean
newSchema) throws ClassTransformationException, IOException,
SchemaChangeException {
+ Preconditions.checkArgument(batchStatus.batch.getSchema().getFieldCount()
== container.getSchema().getFieldCount(),
"Input batch and output batch have different field counthas!");
if (newSchema) {
- createUnionAller(inputBatch);
+ createUnionAller(batchStatus.batch);
}
+ // Get number of records to include in the batch.
+ final int recordsToProcess =
Math.min(batchMemoryManager.getOutputRowCount(),
batchStatus.getRemainingRecords());
+
container.zeroVectors();
- VectorUtil.allocateVectors(allocationVectors, inputBatch.getRecordCount());
- recordCount = unionall.unionRecords(0, inputBatch.getRecordCount(), 0);
+ batchMemoryManager.allocateVectors(allocationVectors, recordsToProcess);
+ recordCount = unionall.unionRecords(batchStatus.recordsProcessed,
recordsToProcess, 0);
VectorUtil.setValueCount(allocationVectors, recordCount);
+ // save number of records processed so far in batch status.
+ batchStatus.recordsProcessed += recordCount;
+ batchMemoryManager.updateOutgoingStats(recordCount);
+
if (callBack.getSchemaChangedAndReset()) {
return IterOutcome.OK_NEW_SCHEMA;
} else {
@@ -168,6 +181,7 @@ private void createUnionAller(RecordBatch inputBatch)
throws ClassTransformation
final ClassGenerator<UnionAller> cg =
CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, context.getOptions());
cg.getCodeGenerator().plainJavaCapable(true);
+ // cg.getCodeGenerator().saveCodeForDebugging(true);
int index = 0;
for(VectorWrapper<?> vw : inputBatch) {
@@ -303,16 +317,25 @@ private static boolean
hasSameTypeAndMode(MaterializedField leftField, Materiali
final RecordBatch batch;
final int inputIndex;
final IterOutcome outcome;
+ int recordsProcessed;
+ int totalRecordsToProcess;
BatchStatusWrappper(boolean prefetched, IterOutcome outcome, RecordBatch
batch, int inputIndex) {
this.prefetched = prefetched;
this.outcome = outcome;
this.batch = batch;
this.inputIndex = inputIndex;
+ this.totalRecordsToProcess = batch.getRecordCount();
+ this.recordsProcessed = 0;
}
+
+ public int getRemainingRecords() {
+ return (totalRecordsToProcess - recordsProcessed);
+ }
+
}
- private class UnionInputIterator implements Iterator<Pair<IterOutcome,
RecordBatch>> {
+ private class UnionInputIterator implements Iterator<Pair<IterOutcome,
BatchStatusWrappper>> {
private Stack<BatchStatusWrappper> batchStatusStack = new Stack<>();
UnionInputIterator(IterOutcome leftOutCome, RecordBatch left, IterOutcome
rightOutCome, RecordBatch right) {
@@ -331,23 +354,35 @@ public boolean hasNext() {
}
@Override
- public Pair<IterOutcome, RecordBatch> next() {
+ public Pair<IterOutcome, BatchStatusWrappper> next() {
while (!batchStatusStack.isEmpty()) {
BatchStatusWrappper topStatus = batchStatusStack.peek();
if (topStatus.prefetched) {
topStatus.prefetched = false;
- return Pair.of(topStatus.outcome, topStatus.batch);
+ batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
+ return Pair.of(topStatus.outcome, topStatus);
} else {
+
+ // If we have more records to process, just return the top batch.
+ if (topStatus.getRemainingRecords() > 0) {
+ return Pair.of(IterOutcome.OK, topStatus);
+ }
+
IterOutcome outcome =
UnionAllRecordBatch.this.next(topStatus.inputIndex, topStatus.batch);
+
switch (outcome) {
case OK:
case OK_NEW_SCHEMA:
- return Pair.of(outcome, topStatus.batch);
+ // since we just read a new batch, update memory manager and
initialize batch stats.
+ topStatus.recordsProcessed = 0;
+ topStatus.totalRecordsToProcess = topStatus.batch.getRecordCount();
+ batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
+ return Pair.of(outcome, topStatus);
case OUT_OF_MEMORY:
case STOP:
batchStatusStack.pop();
- return Pair.of(outcome, topStatus.batch);
+ return Pair.of(outcome, topStatus);
case NONE:
batchStatusStack.pop();
if (batchStatusStack.isEmpty()) {
@@ -367,6 +402,13 @@ public boolean hasNext() {
public void remove() {
throw new UnsupportedOperationException();
}
+
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ updateBatchMemoryManagerStats();
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllerTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllerTemplate.java
index f6a8c154bc..b26553c442 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllerTemplate.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllerTemplate.java
@@ -42,7 +42,7 @@ public final int unionRecords(int startIndex, final int
recordCount, int firstOu
}
for (TransferPair t : transfers) {
- t.transfer();
+ t.splitAndTransfer(startIndex, recordCount);
}
return recordCount;
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
index 6bd3969a99..90528366fe 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -19,6 +19,7 @@
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.base.PhysicalOperator;
public abstract class AbstractBinaryRecordBatch<T extends PhysicalOperator>
extends AbstractRecordBatch<T> {
@@ -34,9 +35,32 @@
// state (IterOutcome) of the right input
protected IterOutcome rightUpstream = IterOutcome.NONE;
- // For now only used by Lateral and Merge Join
protected RecordBatchMemoryManager batchMemoryManager;
+ public final int numInputs = 2;
+ public static final int LEFT_INDEX = 0;
+ public static final int RIGHT_INDEX = 1;
+
+ public enum Metric implements MetricDef {
+ LEFT_INPUT_BATCH_COUNT,
+ LEFT_AVG_INPUT_BATCH_BYTES,
+ LEFT_AVG_INPUT_ROW_BYTES,
+ LEFT_INPUT_RECORD_COUNT,
+ RIGHT_INPUT_BATCH_COUNT,
+ RIGHT_AVG_INPUT_BATCH_BYTES,
+ RIGHT_AVG_INPUT_ROW_BYTES,
+ RIGHT_INPUT_RECORD_COUNT,
+ OUTPUT_BATCH_COUNT,
+ AVG_OUTPUT_BATCH_BYTES,
+ AVG_OUTPUT_ROW_BYTES,
+ OUTPUT_RECORD_COUNT;
+
+ @Override
+ public int metricId() {
+ return ordinal();
+ }
+ }
+
protected AbstractBinaryRecordBatch(final T popConfig, final FragmentContext
context, RecordBatch left,
RecordBatch right) throws OutOfMemoryException {
super(popConfig, context, true, context.newOperatorContext(popConfig));
@@ -98,32 +122,19 @@ protected boolean checkForEarlyFinish(IterOutcome
leftOutcome, IterOutcome right
}
protected void updateBatchMemoryManagerStats() {
- stats.setLongStat(JoinBatchMemoryManager.Metric.LEFT_INPUT_BATCH_COUNT,
-
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX));
- stats.setLongStat(JoinBatchMemoryManager.Metric.LEFT_AVG_INPUT_BATCH_BYTES,
-
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX));
- stats.setLongStat(JoinBatchMemoryManager.Metric.LEFT_AVG_INPUT_ROW_BYTES,
-
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX));
- stats.setLongStat(JoinBatchMemoryManager.Metric.LEFT_INPUT_RECORD_COUNT,
-
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
- stats.setLongStat(JoinBatchMemoryManager.Metric.RIGHT_INPUT_BATCH_COUNT,
-
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX));
-
stats.setLongStat(JoinBatchMemoryManager.Metric.RIGHT_AVG_INPUT_BATCH_BYTES,
-
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX));
- stats.setLongStat(JoinBatchMemoryManager.Metric.RIGHT_AVG_INPUT_ROW_BYTES,
-
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX));
- stats.setLongStat(JoinBatchMemoryManager.Metric.RIGHT_INPUT_RECORD_COUNT,
-
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
- stats.setLongStat(JoinBatchMemoryManager.Metric.OUTPUT_BATCH_COUNT,
- batchMemoryManager.getNumOutgoingBatches());
- stats.setLongStat(JoinBatchMemoryManager.Metric.AVG_OUTPUT_BATCH_BYTES,
- batchMemoryManager.getAvgOutputBatchSize());
- stats.setLongStat(JoinBatchMemoryManager.Metric.AVG_OUTPUT_ROW_BYTES,
- batchMemoryManager.getAvgOutputRowWidth());
- stats.setLongStat(JoinBatchMemoryManager.Metric.OUTPUT_RECORD_COUNT,
- batchMemoryManager.getTotalOutputRecords());
-
+ stats.setLongStat(Metric.LEFT_INPUT_BATCH_COUNT,
batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
+ stats.setLongStat(Metric.LEFT_AVG_INPUT_BATCH_BYTES,
batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
+ stats.setLongStat(Metric.LEFT_AVG_INPUT_ROW_BYTES,
batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX));
+ stats.setLongStat(Metric.LEFT_INPUT_RECORD_COUNT,
batchMemoryManager.getTotalInputRecords(LEFT_INDEX));
+
+ stats.setLongStat(Metric.RIGHT_INPUT_BATCH_COUNT,
batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX));
+ stats.setLongStat(Metric.RIGHT_AVG_INPUT_BATCH_BYTES,
batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX));
+ stats.setLongStat(Metric.RIGHT_AVG_INPUT_ROW_BYTES,
batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX));
+ stats.setLongStat(Metric.RIGHT_INPUT_RECORD_COUNT,
batchMemoryManager.getTotalInputRecords(RIGHT_INDEX));
+
+ stats.setLongStat(Metric.OUTPUT_BATCH_COUNT,
batchMemoryManager.getNumOutgoingBatches());
+ stats.setLongStat(Metric.AVG_OUTPUT_BATCH_BYTES,
batchMemoryManager.getAvgOutputBatchSize());
+ stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES,
batchMemoryManager.getAvgOutputRowWidth());
+ stats.setLongStat(Metric.OUTPUT_RECORD_COUNT,
batchMemoryManager.getTotalOutputRecords());
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
index c162bbfc5f..c147cf7ccc 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.record;
-import org.apache.drill.exec.ops.MetricDef;
-
public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(JoinBatchMemoryManager.class);
@@ -96,24 +94,4 @@ public int update(int inputIndex, int outputPosition) {
}
return rightSizer == null ? null : rightSizer.getColumn(name);
}
-
- public enum Metric implements MetricDef {
- LEFT_INPUT_BATCH_COUNT,
- LEFT_AVG_INPUT_BATCH_BYTES,
- LEFT_AVG_INPUT_ROW_BYTES,
- LEFT_INPUT_RECORD_COUNT,
- RIGHT_INPUT_BATCH_COUNT,
- RIGHT_AVG_INPUT_BATCH_BYTES,
- RIGHT_AVG_INPUT_ROW_BYTES,
- RIGHT_INPUT_RECORD_COUNT,
- OUTPUT_BATCH_COUNT,
- AVG_OUTPUT_BATCH_BYTES,
- AVG_OUTPUT_ROW_BYTES,
- OUTPUT_RECORD_COUNT;
-
- @Override
- public int metricId() {
- return ordinal();
- }
- }
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
index 2100ae1238..759e597d8f 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
@@ -21,6 +21,8 @@
import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.ValueVector;
+import java.util.List;
+
public class RecordBatchMemoryManager {
protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT;
protected static final int MIN_NUM_ROWS = 1;
@@ -152,6 +154,15 @@ public void update(int inputIndex) {
public void update() {};
+ public void update(RecordBatch recordBatch, int index) {
+ // Get sizing information for the batch.
+ setRecordBatchSizer(index, new RecordBatchSizer(recordBatch));
+ setOutgoingRowWidth(getRecordBatchSizer(index).netRowWidth());
+ // Number of rows in outgoing batch
+ setOutputRowCount(getOutputBatchSize(),
getRecordBatchSizer(index).netRowWidth());
+ updateIncomingStats(index);
+ }
+
public int getOutputRowCount() {
return outputRowCount;
}
@@ -188,7 +199,9 @@ public int getOutgoingRowWidth() {
public void setRecordBatchSizer(int index, RecordBatchSizer sizer) {
Preconditions.checkArgument(index >= 0 && index < numInputs);
this.sizer[index] = sizer;
- inputBatchStats[index] = new BatchStats();
+ if (inputBatchStats[index] == null) {
+ inputBatchStats[index] = new BatchStats();
+ }
}
public void setRecordBatchSizer(RecordBatchSizer sizer) {
@@ -211,7 +224,13 @@ public RecordBatchSizer getRecordBatchSizer() {
}
public RecordBatchSizer.ColumnSize getColumnSize(String name) {
- return sizer[DEFAULT_INPUT_INDEX].getColumn(name);
+ for (int index = 0; index < numInputs; index++) {
+ if (sizer[index] == null || sizer[index].getColumn(name) == null) {
+ continue;
+ }
+ return sizer[index].getColumn(name);
+ }
+ return null;
}
public void updateIncomingStats(int index) {
@@ -241,4 +260,31 @@ public int getOutputBatchSize() {
public int getOffsetVectorWidth() {
return UInt4Vector.VALUE_WIDTH;
}
+
+
+ public void allocateVectors(VectorContainer container, int recordCount) {
+ // Allocate memory for the vectors.
+ // This will iteratively allocate memory for all nested columns underneath.
+ for (VectorWrapper w : container) {
+ RecordBatchSizer.ColumnSize colSize =
getColumnSize(w.getField().getName());
+ colSize.allocateVector(w.getValueVector(), recordCount);
+ }
+ }
+
+ public void allocateVectors(VectorContainer container) {
+ allocateVectors(container, outputRowCount);
+ }
+
+ public void allocateVectors(List<ValueVector> valueVectors, int recordCount)
{
+ // Allocate memory for the vectors.
+ // This will iteratively allocate memory for all nested columns underneath.
+ for (ValueVector v : valueVectors) {
+ RecordBatchSizer.ColumnSize colSize =
getColumnSize(v.getField().getName());
+ colSize.allocateVector(v, recordCount);
+ }
+ }
+
+ public void allocateVectors(List<ValueVector> valueVectors) {
+ allocateVectors(valueVectors, outputRowCount);
+ }
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
index f4b91091c6..98386702ec 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -26,6 +26,7 @@
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.FlattenPOP;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.config.UnionAll;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.RecordBatch;
@@ -42,6 +43,7 @@
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -1134,6 +1136,222 @@ public void testMergeJoinLowerLimit() throws Exception {
opTestBuilder.go();
}
+ @Test
+ public void testUnionOutputBatch() throws Exception {
+ UnionAll unionAll = new UnionAll(Collections.<PhysicalOperator>
emptyList());
+ mockOpContext(unionAll, initReservation, maxAllocation);
+
+ // create batches from both sides.
+ numRows = 4000;
+
+ // create left input rows like this.
+ // "a1" : 5, "b1" : wideString, "c1" : <id>
+ List<String> leftJsonBatches = Lists.newArrayList();
+ StringBuilder leftBatchString = new StringBuilder();
+ leftBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString
+ "\"," + "\"c1\" : " + i + "},");
+ }
+ leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString +
"\"," + "\"c1\" : " + numRows + "}");
+ leftBatchString.append("]");
+
+ leftJsonBatches.add(leftBatchString.toString());
+
+ // create right input rows like this.
+ // "a2" : 6, "b2" : wideString, "c2" : <id>
+ List<String> rightJsonBatches = Lists.newArrayList();
+ StringBuilder rightBatchString = new StringBuilder();
+ rightBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ rightBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString
+ "\"," + "\"c1\" : " + i + "},");
+ }
+ rightBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString +
"\"," + "\"c1\" : " + numRows + "}");
+ rightBatchString.append("]");
+ rightJsonBatches.add(rightBatchString.toString());
+
+ // output rows will be like this.
+ // "a1" : 5, "b1" : wideString, "c1" : 1
+ // "a1" : 5, "b1" : wideString, "c1" : 2
+ // "a1" : 5, "b1" : wideString, "c1" : 3
+ List<String> expectedJsonBatches = Lists.newArrayList();
+ StringBuilder expectedBatchString = new StringBuilder();
+ expectedBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" +
wideString + "\"," + "\"c1\" : " + i);
+ expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" +
wideString + "\"," + "\"c2\" : " + i + "},");
+ }
+ expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" +
wideString + "\"," + "\"c1\" : " + numRows);
+ expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" +
wideString + "\"," + "\"c2\" : " + numRows + "}");
+ expectedBatchString.append("]");
+ expectedJsonBatches.add(expectedBatchString.toString());
+
+ long totalSize = getExpectedSize(expectedJsonBatches);
+
+ // set the output batch size to twice of total size expected.
+ // We should get 2 batches, one for the left and one for the right.
+
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size",
totalSize*2);
+
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(unionAll)
+ .baselineColumns("a1", "b1", "c1")
+ .expectedNumBatches(2) // verify number of batches
+ .expectedBatchSize(totalSize) // verify batch size
+ .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches,
rightJsonBatches));
+
+ for (long i = 0; i < numRows + 1; i++) {
+ opTestBuilder.baselineValues(5l, wideString, i);
+ }
+
+ for (long i = 0; i < numRows + 1; i++) {
+ opTestBuilder.baselineValues(5l, wideString, i);
+ }
+
+ opTestBuilder.go();
+ }
+
+ @Test
+ public void testUnionMultipleOutputBatches() throws Exception {
+ UnionAll unionAll = new UnionAll(Collections.<PhysicalOperator>
emptyList());
+ mockOpContext(unionAll, initReservation, maxAllocation);
+
+ // create multiple batches from both sides.
+ numRows = 8000;
+
+ // create left input rows like this.
+ // "a1" : 5, "b1" : wideString, "c1" : <id>
+ List<String> leftJsonBatches = Lists.newArrayList();
+ StringBuilder leftBatchString = new StringBuilder();
+ leftBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString
+ "\"," + "\"c1\" : " + i + "},");
+ }
+ leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString +
"\"," + "\"c1\" : " + numRows + "}");
+ leftBatchString.append("]");
+
+ leftJsonBatches.add(leftBatchString.toString());
+
+ // create right input rows like this.
+ // "a1" : 5, "b1" : wideString, "c1" : <id>
+ List<String> rightJsonBatches = Lists.newArrayList();
+ StringBuilder rightBatchString = new StringBuilder();
+ rightBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ rightBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString
+ "\"," + "\"c1\" : " + i + "},");
+ }
+ rightBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString +
"\"," + "\"c1\" : " + numRows + "}");
+ rightBatchString.append("]");
+ rightJsonBatches.add(rightBatchString.toString());
+
+ // output rows will be like this.
+ // "a1" : 5, "b1" : wideString, "c1" : 1
+ // "a1" : 5, "b1" : wideString, "c1" : 2
+ // "a1" : 5, "b1" : wideString, "c1" : 3
+ List<String> expectedJsonBatches = Lists.newArrayList();
+ StringBuilder expectedBatchString = new StringBuilder();
+ expectedBatchString.append("[");
+ for (int i = 0; i < numRows*2; i++) {
+ expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" +
wideString + "\"," + "\"c1\" : " + i + "},");
+ }
+ expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" +
wideString + "\"," + "\"c1\" : " + numRows + "}");
+ expectedBatchString.append("]");
+ expectedJsonBatches.add(expectedBatchString.toString());
+
+ long totalSize = getExpectedSize(expectedJsonBatches);
+
+ // set the output batch size to half of total size expected.
+ // We should get 4 batches, 2 for the left and 2 for the right.
+
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size",
totalSize/2);
+
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(unionAll)
+ .baselineColumns("a1", "b1", "c1")
+ .expectedNumBatches(4) // verify number of batches
+ .expectedBatchSize(totalSize) // verify batch size
+ .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches,
rightJsonBatches));
+
+ for (long i = 0; i < numRows + 1; i++) {
+ opTestBuilder.baselineValues(5l, wideString, i);
+ }
+
+ for (long i = 0; i < numRows + 1; i++) {
+ opTestBuilder.baselineValues(5l, wideString, i);
+ }
+
+ opTestBuilder.go();
+ }
+
+ @Test
+ public void testUnionLowerLimit() throws Exception {
+ UnionAll unionAll = new UnionAll(Collections.<PhysicalOperator>
emptyList());
+ mockOpContext(unionAll, initReservation, maxAllocation);
+
+ // create multiple batches from both sides.
+ numRows = 10;
+
+ // create left input rows like this.
+ // "a1" : 5, "b1" : wideString, "c1" : <id>
+ List<String> leftJsonBatches = Lists.newArrayList();
+ StringBuilder leftBatchString = new StringBuilder();
+
+ leftBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString
+ "\"," + "\"c1\" : " + i + "},");
+ }
+ leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString +
"\"," + "\"c1\" : " + numRows + "}");
+ leftBatchString.append("]");
+
+ leftJsonBatches.add(leftBatchString.toString());
+
+ // create right input rows like this.
+ // "a1" : 5, "b1" : wideString, "c1" : <id>
+ List<String> rightJsonBatches = Lists.newArrayList();
+ StringBuilder rightBatchString = new StringBuilder();
+ rightBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ rightBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString
+ "\"," + "\"c1\" : " + i + "},");
+ }
+ rightBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString +
"\"," + "\"c1\" : " + numRows + "}");
+ rightBatchString.append("]");
+ rightJsonBatches.add(rightBatchString.toString());
+
+ // output rows will be like this.
+ // "a1" : 5, "b1" : wideString, "c1" : 1
+ // "a1" : 5, "b1" : wideString, "c1" : 2
+ // "a1" : 5, "b1" : wideString, "c1" : 3
+ List<String> expectedJsonBatches = Lists.newArrayList();
+ StringBuilder expectedBatchString = new StringBuilder();
+ expectedBatchString.append("[");
+ for (int i = 0; i < numRows*2; i++) {
+ expectedBatchString.append("{\"a1\": 5, " + "\"c1\" : " + i + "},");
+ }
+ expectedBatchString.append("{\"a1\": 5, " + "\"c1\" : " + numRows + "}");
+ expectedBatchString.append("]");
+ expectedJsonBatches.add(expectedBatchString.toString());
+
+ long totalSize = getExpectedSize(expectedJsonBatches);
+
+ // set the output batch size very low so we get only one row per batch.
+ // We should get 22 batches for 22 rows.
+
fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size",
128);
+
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(unionAll)
+ .baselineColumns("a1","b1", "c1")
+ .expectedNumBatches(22) // verify number of batches
+ .expectedBatchSize(totalSize) // verify batch size
+ .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches,
rightJsonBatches));
+
+ for (long i = 0; i < numRows + 1; i++) {
+ opTestBuilder.baselineValues(5l, wideString, i);
+ }
+
+ for (long i = 0; i < numRows + 1; i++) {
+ opTestBuilder.baselineValues(5l, wideString, i);
+ }
+
+ opTestBuilder.go();
+ }
+
@Test
public void testSizerRepeatedList() throws Exception {
List<String> inputJsonBatches = Lists.newArrayList();
diff --git a/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
b/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
index e9884208fe..4e6edb580d 100644
--- a/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
@@ -172,6 +172,7 @@ public void copyFrom(int inIndex, int outIndex,
Repeated${minor.class}Vector v)
for (int i = 0; i < count; i++) {
mutator.add(outIndex, vAccessor.get(inIndex, i));
}
+ mutator.setValueCount(outIndex+1);
}
public void copyFromSafe(int inIndex, int outIndex,
Repeated${minor.class}Vector v) {
@@ -181,6 +182,7 @@ public void copyFromSafe(int inIndex, int outIndex,
Repeated${minor.class}Vector
for (int i = 0; i < count; i++) {
mutator.addSafe(outIndex, vAccessor.get(inIndex, i));
}
+ mutator.setValueCount(outIndex+1);
}
@Override
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> batch sizing for union all
> --------------------------
>
> Key: DRILL-6356
> URL: https://issues.apache.org/jira/browse/DRILL-6356
> Project: Apache Drill
> Issue Type: Improvement
> Components: Execution - Relational Operators
> Affects Versions: 1.13.0
> Reporter: Padma Penumarthy
> Assignee: Padma Penumarthy
> Priority: Major
> Fix For: 1.14.0
>
>
> batch sizing changes for union all operator
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)