Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1107#discussion_r166387630
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 ---
    @@ -102,20 +105,78 @@
       private final List<Comparator> comparators;
       private final JoinRelType joinType;
       private JoinWorker worker;
    +  private final long outputBatchSize;
     
       private static final String LEFT_INPUT = "LEFT INPUT";
       private static final String RIGHT_INPUT = "RIGHT INPUT";
     
    +  private class MergeJoinMemoryManager extends 
AbstractRecordBatchMemoryManager {
    +    private int leftRowWidth;
    +    private int rightRowWidth;
    +
    +    /**
    +     * mergejoin operates on one record at a time from the left and right 
batches
    +     * using RecordIterator abstraction. We have a callback mechanism to 
get notified
    +     * when new batch is loaded in record iterator.
    +     * This can get called in the middle of current output batch we are 
building.
    +     * when this gets called, adjust number of output rows for the current 
batch and
    +     * update the value to be used for subsequent batches.
    +     */
    +    @Override
    +    public void update(int inputIndex) {
    +      switch(inputIndex) {
    +        case 0:
    +          final RecordBatchSizer leftSizer = new RecordBatchSizer(left);
    +          leftRowWidth = leftSizer.netRowWidth();
    +          break;
    +        case 1:
    +          final RecordBatchSizer rightSizer = new RecordBatchSizer(right);
    +          rightRowWidth = rightSizer.netRowWidth();
    +        default:
    +          break;
    +      }
    +
    +      final int newOutgoingRowWidth = leftRowWidth + rightRowWidth;
    +
    +      // If outgoing row width is 0, just return. This is possible for 
empty batches or
    +      // when first set of batches come with OK_NEW_SCHEMA and no data.
    +      if (newOutgoingRowWidth == 0) {
    +        return;
    +      }
    +
    +      // update the value to be used for next batch(es)
    +      setOutputRowCount(Math.min(ValueVector.MAX_ROW_COUNT,
    +        
Math.max(RecordBatchSizer.safeDivide(outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR,
 newOutgoingRowWidth), MIN_NUM_ROWS)));
    +
    +      // Adjust for the current batch.
    +      // calculate memory used so far based on previous outgoing row width 
and how many rows we already processed.
    +      final long memoryUsed = status.getOutPosition() * 
getOutgoingRowWidth();
    +      // This is the remaining memory.
    +      final long remainingMemory = 
Math.max(outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR - memoryUsed, 0);
    +      // These are number of rows we can fit in remaining memory based on 
new outgoing row width.
    +      final int numOutputRowsRemaining = 
RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth);
    +
    +      final int adjustedOutputRowCount = Math.min(MAX_NUM_ROWS, 
Math.max(status.getOutPosition() + numOutputRowsRemaining, MIN_NUM_ROWS));
    +      status.setOutputRowCount(adjustedOutputRowCount);
    +      setOutgoingRowWidth(newOutgoingRowWidth);
    --- End diff --
    
    This number is valid only for this one batch. The next batch doesn't have 
the "legacy" rows. Do we recompute the number at the start of the next output 
batch?


---

Reply via email to