Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1107#discussion_r166387630 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java --- @@ -102,20 +105,78 @@ private final List<Comparator> comparators; private final JoinRelType joinType; private JoinWorker worker; + private final long outputBatchSize; private static final String LEFT_INPUT = "LEFT INPUT"; private static final String RIGHT_INPUT = "RIGHT INPUT"; + private class MergeJoinMemoryManager extends AbstractRecordBatchMemoryManager { + private int leftRowWidth; + private int rightRowWidth; + + /** + * mergejoin operates on one record at a time from the left and right batches + * using RecordIterator abstraction. We have a callback mechanism to get notified + * when new batch is loaded in record iterator. + * This can get called in the middle of current output batch we are building. + * when this gets called, adjust number of output rows for the current batch and + * update the value to be used for subsequent batches. + */ + @Override + public void update(int inputIndex) { + switch(inputIndex) { + case 0: + final RecordBatchSizer leftSizer = new RecordBatchSizer(left); + leftRowWidth = leftSizer.netRowWidth(); + break; + case 1: + final RecordBatchSizer rightSizer = new RecordBatchSizer(right); + rightRowWidth = rightSizer.netRowWidth(); + default: + break; + } + + final int newOutgoingRowWidth = leftRowWidth + rightRowWidth; + + // If outgoing row width is 0, just return. This is possible for empty batches or + // when first set of batches come with OK_NEW_SCHEMA and no data. + if (newOutgoingRowWidth == 0) { + return; + } + + // update the value to be used for next batch(es) + setOutputRowCount(Math.min(ValueVector.MAX_ROW_COUNT, + Math.max(RecordBatchSizer.safeDivide(outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR, newOutgoingRowWidth), MIN_NUM_ROWS))); + + // Adjust for the current batch. + // calculate memory used so far based on previous outgoing row width and how many rows we already processed. + final long memoryUsed = status.getOutPosition() * getOutgoingRowWidth(); + // This is the remaining memory. + final long remainingMemory = Math.max(outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR - memoryUsed, 0); + // These are number of rows we can fit in remaining memory based on new outgoing row width. + final int numOutputRowsRemaining = RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth); + + final int adjustedOutputRowCount = Math.min(MAX_NUM_ROWS, Math.max(status.getOutPosition() + numOutputRowsRemaining, MIN_NUM_ROWS)); + status.setOutputRowCount(adjustedOutputRowCount); + setOutgoingRowWidth(newOutgoingRowWidth); --- End diff -- This number is valid only for this one batch. The next batch doesn't have the "legacy" rows. Do we recompute the number at the start of the next output batch?
---