[ https://issues.apache.org/jira/browse/DRILL-6123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354231#comment-16354231 ]
ASF GitHub Bot commented on DRILL-6123: --------------------------------------- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1107#discussion_r166387073 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java --- @@ -102,20 +105,78 @@ private final List<Comparator> comparators; private final JoinRelType joinType; private JoinWorker worker; + private final long outputBatchSize; private static final String LEFT_INPUT = "LEFT INPUT"; private static final String RIGHT_INPUT = "RIGHT INPUT"; + private class MergeJoinMemoryManager extends AbstractRecordBatchMemoryManager { + private int leftRowWidth; + private int rightRowWidth; + + /** + * mergejoin operates on one record at a time from the left and right batches + * using RecordIterator abstraction. We have a callback mechanism to get notified + * when new batch is loaded in record iterator. + * This can get called in the middle of current output batch we are building. + * when this gets called, adjust number of output rows for the current batch and + * update the value to be used for subsequent batches. + */ + @Override + public void update(int inputIndex) { + switch(inputIndex) { + case 0: + final RecordBatchSizer leftSizer = new RecordBatchSizer(left); + leftRowWidth = leftSizer.netRowWidth(); + break; + case 1: + final RecordBatchSizer rightSizer = new RecordBatchSizer(right); + rightRowWidth = rightSizer.netRowWidth(); + default: + break; + } + + final int newOutgoingRowWidth = leftRowWidth + rightRowWidth; + + // If outgoing row width is 0, just return. This is possible for empty batches or + // when first set of batches come with OK_NEW_SCHEMA and no data. + if (newOutgoingRowWidth == 0) { + return; + } + + // update the value to be used for next batch(es) + setOutputRowCount(Math.min(ValueVector.MAX_ROW_COUNT, + Math.max(RecordBatchSizer.safeDivide(outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR, newOutgoingRowWidth), MIN_NUM_ROWS))); --- End diff -- Maybe wrap this in a method since it is used multiple times. > Limit batch size for Merge Join based on memory > ----------------------------------------------- > > Key: DRILL-6123 > URL: https://issues.apache.org/jira/browse/DRILL-6123 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Flow > Affects Versions: 1.12.0 > Reporter: Padma Penumarthy > Assignee: Padma Penumarthy > Priority: Major > Fix For: 1.13.0 > > > Merge join limits output batch size to 32K rows irrespective of row size. > This can create very large or very small batches (in terms of memory), > depending upon average row width. Change this to figure out output row count > based on memory specified with the new outputBatchSize option and average row > width of incoming left and right batches. Output row count will be minimum of > 1 and max of 64k. -- This message was sent by Atlassian JIRA (v7.6.3#76005)