Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1107#discussion_r166387073
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 ---
    @@ -102,20 +105,78 @@
       private final List<Comparator> comparators;
       private final JoinRelType joinType;
       private JoinWorker worker;
    +  private final long outputBatchSize;
     
       private static final String LEFT_INPUT = "LEFT INPUT";
       private static final String RIGHT_INPUT = "RIGHT INPUT";
     
    +  private class MergeJoinMemoryManager extends 
AbstractRecordBatchMemoryManager {
    +    private int leftRowWidth;
    +    private int rightRowWidth;
    +
    +    /**
    +     * mergejoin operates on one record at a time from the left and right 
batches
    +     * using RecordIterator abstraction. We have a callback mechanism to 
get notified
    +     * when new batch is loaded in record iterator.
    +     * This can get called in the middle of current output batch we are 
building.
    +     * when this gets called, adjust number of output rows for the current 
batch and
    +     * update the value to be used for subsequent batches.
    +     */
    +    @Override
    +    public void update(int inputIndex) {
    +      switch(inputIndex) {
    +        case 0:
    +          final RecordBatchSizer leftSizer = new RecordBatchSizer(left);
    +          leftRowWidth = leftSizer.netRowWidth();
    +          break;
    +        case 1:
    +          final RecordBatchSizer rightSizer = new RecordBatchSizer(right);
    +          rightRowWidth = rightSizer.netRowWidth();
    +        default:
    +          break;
    +      }
    +
    +      final int newOutgoingRowWidth = leftRowWidth + rightRowWidth;
    +
    +      // If outgoing row width is 0, just return. This is possible for 
empty batches or
    +      // when first set of batches come with OK_NEW_SCHEMA and no data.
    +      if (newOutgoingRowWidth == 0) {
    +        return;
    +      }
    +
    +      // update the value to be used for next batch(es)
    +      setOutputRowCount(Math.min(ValueVector.MAX_ROW_COUNT,
    +        
Math.max(RecordBatchSizer.safeDivide(outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR,
 newOutgoingRowWidth), MIN_NUM_ROWS)));
    --- End diff --
    
    Maybe wrap this in a method since it is used multiple times.


---

Reply via email to