Github user sachouche commented on a diff in the pull request:
https://github.com/apache/drill/pull/1107#discussion_r166450263
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
---
@@ -102,20 +105,78 @@
private final List<Comparator> comparators;
private final JoinRelType joinType;
private JoinWorker worker;
+ private final long outputBatchSize;
private static final String LEFT_INPUT = "LEFT INPUT";
private static final String RIGHT_INPUT = "RIGHT INPUT";
+ private class MergeJoinMemoryManager extends
AbstractRecordBatchMemoryManager {
+ private int leftRowWidth;
+ private int rightRowWidth;
+
+ /**
+ * mergejoin operates on one record at a time from the left and right
batches
+ * using RecordIterator abstraction. We have a callback mechanism to
get notified
+ * when new batch is loaded in record iterator.
+ * This can get called in the middle of current output batch we are
building.
+ * when this gets called, adjust number of output rows for the current
batch and
+ * update the value to be used for subsequent batches.
+ */
+ @Override
+ public void update(int inputIndex) {
+ switch(inputIndex) {
+ case 0:
+ final RecordBatchSizer leftSizer = new RecordBatchSizer(left);
+ leftRowWidth = leftSizer.netRowWidth();
+ break;
+ case 1:
+ final RecordBatchSizer rightSizer = new RecordBatchSizer(right);
+ rightRowWidth = rightSizer.netRowWidth();
+ default:
+ break;
+ }
+
+ final int newOutgoingRowWidth = leftRowWidth + rightRowWidth;
+
+ // If outgoing row width is 0, just return. This is possible for
empty batches or
+ // when first set of batches come with OK_NEW_SCHEMA and no data.
+ if (newOutgoingRowWidth == 0) {
+ return;
+ }
+
+ // update the value to be used for next batch(es)
+ setOutputRowCount(Math.min(ValueVector.MAX_ROW_COUNT,
+
Math.max(RecordBatchSizer.safeDivide(outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR,
newOutgoingRowWidth), MIN_NUM_ROWS)));
--- End diff --
Our goal is to eventually use Paul's framework for controlling batch sizes.
I feel the BatchRecordMemoryManager terminology is a bit ambitious (kind of
misleading).
---