DRILL-6296: Add operator metrics for batch sizing for merge join

closes #1181


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/da241134
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/da241134
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/da241134

Branch: refs/heads/master
Commit: da241134fb88464139437b05b1feaafbb3014bb0
Parents: 77f5e90
Author: Padma Penumarthy <ppenuma...@yahoo.com>
Authored: Thu Mar 29 19:05:07 2018 -0700
Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com>
Committed: Fri Apr 6 12:07:30 2018 +0300

----------------------------------------------------------------------
 .../drill/exec/ops/OperatorMetricRegistry.java  |   2 +
 .../impl/flatten/FlattenRecordBatch.java        |  28 ++-
 .../exec/physical/impl/join/MergeJoinBatch.java |  90 +++++++-
 .../AbstractRecordBatchMemoryManager.java       | 134 -----------
 .../exec/record/RecordBatchMemoryManager.java   | 228 +++++++++++++++++++
 .../drill/exec/record/RecordIterator.java       |   6 +-
 6 files changed, 330 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/da241134/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
index b029154..0b9aeb6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
@@ -23,6 +23,7 @@ import 
org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate;
 import 
org.apache.drill.exec.physical.impl.broadcastsender.BroadcastSenderRootExec;
 import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch;
 import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
+import org.apache.drill.exec.physical.impl.join.MergeJoinBatch;
 import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
 import 
org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
 import 
org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
@@ -51,6 +52,7 @@ public class OperatorMetricRegistry {
     register(CoreOperatorType.EXTERNAL_SORT_VALUE, 
ExternalSortBatch.Metric.class);
     register(CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE, 
ParquetRecordReader.Metric.class);
     register(CoreOperatorType.FLATTEN_VALUE, FlattenRecordBatch.Metric.class);
+    register(CoreOperatorType.MERGE_JOIN_VALUE, MergeJoinBatch.Metric.class);
   }
 
   private static void register(final int operatorType, final Class<? extends 
MetricDef> metricDef) {

http://git-wip-us.apache.org/repos/asf/drill/blob/da241134/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index a1f783f..aea415b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -45,7 +45,7 @@ import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.AbstractRecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
@@ -104,11 +104,11 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
     INPUT_BATCH_COUNT,
     AVG_INPUT_BATCH_BYTES,
     AVG_INPUT_ROW_BYTES,
-    TOTAL_INPUT_RECORDS,
+    INPUT_RECORD_COUNT,
     OUTPUT_BATCH_COUNT,
     AVG_OUTPUT_BATCH_BYTES,
     AVG_OUTPUT_ROW_BYTES,
-    TOTAL_OUTPUT_RECORDS;
+    OUTPUT_RECORD_COUNT;
 
     @Override
     public int metricId() {
@@ -116,7 +116,7 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
     }
   }
 
-  private class FlattenMemoryManager extends AbstractRecordBatchMemoryManager {
+  private class FlattenMemoryManager extends RecordBatchMemoryManager {
 
     @Override
     public void update() {
@@ -152,9 +152,10 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
       // i.e. all rows fit within memory budget.
       setOutputRowCount(Math.min(columnSize.getElementCount(), 
getOutputRowCount()));
 
-      logger.debug("flatten incoming batch sizer : {}, outputBatchSize : {}," +
-        "avgOutgoingRowWidth : {}, outputRowCount : {}", 
getRecordBatchSizer(), outputBatchSize,
-        avgOutgoingRowWidth, getOutputRowCount());
+      logger.debug("incoming batch size : {}", getRecordBatchSizer());
+
+      logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output 
rowCount : {}",
+        outputBatchSize, avgOutgoingRowWidth, getOutputRowCount());
 
       updateIncomingStats();
     }
@@ -496,11 +497,20 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
     stats.setLongStat(Metric.INPUT_BATCH_COUNT, 
flattenMemoryManager.getNumIncomingBatches());
     stats.setLongStat(Metric.AVG_INPUT_BATCH_BYTES, 
flattenMemoryManager.getAvgInputBatchSize());
     stats.setLongStat(Metric.AVG_INPUT_ROW_BYTES, 
flattenMemoryManager.getAvgInputRowWidth());
-    stats.setLongStat(Metric.TOTAL_INPUT_RECORDS, 
flattenMemoryManager.getTotalInputRecords());
+    stats.setLongStat(Metric.INPUT_RECORD_COUNT, 
flattenMemoryManager.getTotalInputRecords());
     stats.setLongStat(Metric.OUTPUT_BATCH_COUNT, 
flattenMemoryManager.getNumOutgoingBatches());
     stats.setLongStat(Metric.AVG_OUTPUT_BATCH_BYTES, 
flattenMemoryManager.getAvgOutputBatchSize());
     stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, 
flattenMemoryManager.getAvgOutputRowWidth());
-    stats.setLongStat(Metric.TOTAL_OUTPUT_RECORDS, 
flattenMemoryManager.getTotalOutputRecords());
+    stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, 
flattenMemoryManager.getTotalOutputRecords());
+
+    logger.debug("input: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
+      flattenMemoryManager.getNumIncomingBatches(), 
flattenMemoryManager.getAvgInputBatchSize(),
+      flattenMemoryManager.getAvgInputRowWidth(), 
flattenMemoryManager.getTotalInputRecords());
+
+    logger.debug("output: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
+      flattenMemoryManager.getNumOutgoingBatches(), 
flattenMemoryManager.getAvgOutputBatchSize(),
+      flattenMemoryManager.getAvgOutputRowWidth(), 
flattenMemoryManager.getTotalOutputRecords());
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/da241134/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 2155f0a..ab50b22 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -44,6 +44,7 @@ import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.impl.common.Comparator;
 import org.apache.drill.exec.record.RecordBatchSizer;
@@ -56,7 +57,7 @@ import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.AbstractRecordBatch;
-import org.apache.drill.exec.record.AbstractRecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 
@@ -109,12 +110,37 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
   private static final String LEFT_INPUT = "LEFT INPUT";
   private static final String RIGHT_INPUT = "RIGHT INPUT";
 
-  private class MergeJoinMemoryManager extends 
AbstractRecordBatchMemoryManager {
+  private static final int numInputs = 2;
+  private static final int LEFT_INDEX = 0;
+  private static final int RIGHT_INDEX = 1;
+
+  public enum Metric implements MetricDef {
+    LEFT_INPUT_BATCH_COUNT,
+    LEFT_AVG_INPUT_BATCH_BYTES,
+    LEFT_AVG_INPUT_ROW_BYTES,
+    LEFT_INPUT_RECORD_COUNT,
+    RIGHT_INPUT_BATCH_COUNT,
+    RIGHT_AVG_INPUT_BATCH_BYTES,
+    RIGHT_AVG_INPUT_ROW_BYTES,
+    RIGHT_INPUT_RECORD_COUNT,
+    OUTPUT_BATCH_COUNT,
+    AVG_OUTPUT_BATCH_BYTES,
+    AVG_OUTPUT_ROW_BYTES,
+    OUTPUT_RECORD_COUNT;
+
+    @Override
+    public int metricId() {
+      return ordinal();
+    }
+  }
+
+  private class MergeJoinMemoryManager extends RecordBatchMemoryManager {
     private int leftRowWidth;
     private int rightRowWidth;
 
-    private RecordBatchSizer leftSizer;
-    private RecordBatchSizer rightSizer;
+    public MergeJoinMemoryManager() {
+      super(numInputs);
+    }
 
     /**
      * mergejoin operates on one record at a time from the left and right 
batches
@@ -127,17 +153,20 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     @Override
     public void update(int inputIndex) {
       switch(inputIndex) {
-        case 0:
-          leftSizer = new RecordBatchSizer(left);
-          leftRowWidth = leftSizer.netRowWidth();
+        case LEFT_INDEX:
+          setRecordBatchSizer(inputIndex, new RecordBatchSizer(left));
+          leftRowWidth = getRecordBatchSizer(inputIndex).netRowWidth();
+          logger.debug("left incoming batch size : {}", 
getRecordBatchSizer(inputIndex));
           break;
-        case 1:
-          rightSizer = new RecordBatchSizer(right);
-          rightRowWidth = rightSizer.netRowWidth();
+        case RIGHT_INDEX:
+          setRecordBatchSizer(inputIndex, new RecordBatchSizer(right));
+          rightRowWidth = getRecordBatchSizer(inputIndex).netRowWidth();
+          logger.debug("right incoming batch size : {}", 
getRecordBatchSizer(inputIndex));
         default:
           break;
       }
 
+      updateIncomingStats(inputIndex);
       final int newOutgoingRowWidth = leftRowWidth + rightRowWidth;
 
       // If outgoing row width is 0, just return. This is possible for empty 
batches or
@@ -153,16 +182,22 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
       // calculate memory used so far based on previous outgoing row width and 
how many rows we already processed.
       final long memoryUsed = status.getOutPosition() * getOutgoingRowWidth();
       // This is the remaining memory.
-      final long remainingMemory = 
Math.max(outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR - memoryUsed, 0);
+      final long remainingMemory = Math.max(outputBatchSize - memoryUsed, 0);
       // These are number of rows we can fit in remaining memory based on new 
outgoing row width.
       final int numOutputRowsRemaining = 
RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth);
 
-      status.setTargetOutputRowCount(status.getOutPosition() + 
numOutputRowsRemaining);
+      
status.setTargetOutputRowCount(adjustOutputRowCount(status.getOutPosition() + 
numOutputRowsRemaining));
       setOutgoingRowWidth(newOutgoingRowWidth);
+
+      logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output 
rowCount : {}",
+        outputBatchSize, getOutgoingRowWidth(), getOutputRowCount());
     }
 
     @Override
     public RecordBatchSizer.ColumnSize getColumnSize(String name) {
+      RecordBatchSizer leftSizer = getRecordBatchSizer(LEFT_INDEX);
+      RecordBatchSizer rightSizer = getRecordBatchSizer(RIGHT_INDEX);
+
       if (leftSizer != null && leftSizer.getColumn(name) != null) {
         return leftSizer.getColumn(name);
       }
@@ -324,10 +359,12 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
       Preconditions.checkArgument(!vw.isHyper());
       vw.getValueVector().getMutator().setValueCount(getRecordCount());
     }
+    mergeJoinMemoryManager.updateOutgoingStats(getRecordCount());
   }
 
   @Override
   public void close() {
+    updateStats();
     super.close();
     leftIterator.close();
     rightIterator.close();
@@ -573,4 +610,33 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     }
     return materializedExpr;
   }
+
+  private void updateStats() {
+    stats.setLongStat(MergeJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT, 
mergeJoinMemoryManager.getNumIncomingBatches(LEFT_INDEX));
+    stats.setLongStat(MergeJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES, 
mergeJoinMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
+    stats.setLongStat(MergeJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES, 
mergeJoinMemoryManager.getAvgInputRowWidth(LEFT_INDEX));
+    stats.setLongStat(Metric.LEFT_INPUT_RECORD_COUNT, 
mergeJoinMemoryManager.getTotalInputRecords(LEFT_INDEX));
+
+    stats.setLongStat(MergeJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT, 
mergeJoinMemoryManager.getNumIncomingBatches(RIGHT_INDEX));
+    stats.setLongStat(MergeJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES, 
mergeJoinMemoryManager.getAvgInputBatchSize(RIGHT_INDEX));
+    stats.setLongStat(MergeJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES, 
mergeJoinMemoryManager.getAvgInputRowWidth(RIGHT_INDEX));
+    stats.setLongStat(Metric.RIGHT_INPUT_RECORD_COUNT, 
mergeJoinMemoryManager.getTotalInputRecords(RIGHT_INDEX));
+
+    stats.setLongStat(MergeJoinBatch.Metric.OUTPUT_BATCH_COUNT, 
mergeJoinMemoryManager.getNumOutgoingBatches());
+    stats.setLongStat(MergeJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES, 
mergeJoinMemoryManager.getAvgOutputBatchSize());
+    stats.setLongStat(MergeJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES, 
mergeJoinMemoryManager.getAvgOutputRowWidth());
+    stats.setLongStat(MergeJoinBatch.Metric.OUTPUT_RECORD_COUNT, 
mergeJoinMemoryManager.getTotalOutputRecords());
+
+    logger.debug("left input: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
+      mergeJoinMemoryManager.getNumIncomingBatches(LEFT_INDEX), 
mergeJoinMemoryManager.getAvgInputBatchSize(LEFT_INDEX),
+      mergeJoinMemoryManager.getAvgInputRowWidth(LEFT_INDEX), 
mergeJoinMemoryManager.getTotalInputRecords(LEFT_INDEX));
+
+    logger.debug("right input: batch count : {}, avg batch bytes : {},  avg 
row bytes : {}, record count : {}",
+      mergeJoinMemoryManager.getNumIncomingBatches(RIGHT_INDEX), 
mergeJoinMemoryManager.getAvgInputBatchSize(RIGHT_INDEX),
+      mergeJoinMemoryManager.getAvgInputRowWidth(RIGHT_INDEX), 
mergeJoinMemoryManager.getTotalInputRecords(RIGHT_INDEX));
+
+    logger.debug("output: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
+      mergeJoinMemoryManager.getNumOutgoingBatches(), 
mergeJoinMemoryManager.getAvgOutputBatchSize(),
+      mergeJoinMemoryManager.getAvgOutputRowWidth(), 
mergeJoinMemoryManager.getTotalOutputRecords());
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/da241134/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
deleted file mode 100644
index 67c9cee..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.record;
-
-import org.apache.drill.exec.vector.ValueVector;
-
-public abstract class AbstractRecordBatchMemoryManager {
-  protected static final int OFFSET_VECTOR_WIDTH = 4;
-  protected static final int WORST_CASE_FRAGMENTATION_FACTOR = 2;
-  protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT;
-  protected static final int MIN_NUM_ROWS = 1;
-  private int outputRowCount = MAX_NUM_ROWS;
-  private int outgoingRowWidth;
-  private RecordBatchSizer sizer;
-
-  /**
-   * operator metric stats
-   */
-  private long numIncomingBatches;
-  private long sumInputBatchSizes;
-  private long totalInputRecords;
-  private long numOutgoingBatches;
-  private long sumOutputBatchSizes;
-  private long totalOutputRecords;
-
-  public long getNumIncomingBatches() {
-    return numIncomingBatches;
-  }
-
-  public long getTotalInputRecords() {
-    return totalInputRecords;
-  }
-
-  public long getNumOutgoingBatches() {
-    return numOutgoingBatches;
-  }
-
-  public long getTotalOutputRecords() {
-    return totalOutputRecords;
-  }
-
-  public long getAvgInputBatchSize() {
-    return RecordBatchSizer.safeDivide(sumInputBatchSizes, numIncomingBatches);
-  }
-
-  public long getAvgInputRowWidth() {
-    return RecordBatchSizer.safeDivide(sumInputBatchSizes, totalInputRecords);
-  }
-
-  public long getAvgOutputBatchSize() {
-    return RecordBatchSizer.safeDivide(sumOutputBatchSizes, 
numOutgoingBatches);
-  }
-
-  public long getAvgOutputRowWidth() {
-    return RecordBatchSizer.safeDivide(sumOutputBatchSizes, 
totalOutputRecords);
-  }
-
-  public void update(int inputIndex) {};
-
-  public void update() {};
-
-  public int getOutputRowCount() {
-    return outputRowCount;
-  }
-
-  /**
-   * Given batchSize and rowWidth, this will set output rowCount taking into 
account
-   * the min and max that is allowed.
-   */
-  public void setOutputRowCount(int targetBatchSize, int rowWidth) {
-    this.outputRowCount = 
adjustOutputRowCount(RecordBatchSizer.safeDivide(targetBatchSize, rowWidth));
-  }
-
-  public void setOutputRowCount(int outputRowCount) {
-    this.outputRowCount = outputRowCount;
-  }
-
-  /**
-   * This will adjust rowCount taking into account the min and max that is 
allowed.
-   * We will round down to nearest power of two - 1 for better memory 
utilization.
-   * -1 is done for adjusting accounting for offset vectors.
-   */
-  public static int adjustOutputRowCount(int rowCount) {
-    return (Math.min(MAX_NUM_ROWS, Math.max(Integer.highestOneBit(rowCount) - 
1, MIN_NUM_ROWS)));
-  }
-
-  public void setOutgoingRowWidth(int outgoingRowWidth) {
-    this.outgoingRowWidth = outgoingRowWidth;
-  }
-
-  public int getOutgoingRowWidth() {
-    return outgoingRowWidth;
-  }
-
-  public void setRecordBatchSizer(RecordBatchSizer sizer) {
-    this.sizer = sizer;
-  }
-
-  public RecordBatchSizer getRecordBatchSizer() {
-    return sizer;
-  }
-
-  public RecordBatchSizer.ColumnSize getColumnSize(String name) {
-    return sizer.getColumn(name);
-  }
-
-  public void updateIncomingStats() {
-    numIncomingBatches++;
-    sumInputBatchSizes += sizer.netSize();
-    totalInputRecords += sizer.rowCount();
-  }
-
-  public void updateOutgoingStats(int outputRecords) {
-    numOutgoingBatches++;
-    totalOutputRecords += outputRecords;
-    sumOutputBatchSizes += outgoingRowWidth * outputRecords;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/da241134/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
new file mode 100644
index 0000000..a8bb259
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.record;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.vector.ValueVector;
+
+public class RecordBatchMemoryManager {
+  protected static final int OFFSET_VECTOR_WIDTH = 4;
+  protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT;
+  protected static final int MIN_NUM_ROWS = 1;
+  protected static final int DEFAULT_INPUT_INDEX = 0;
+  private int outputRowCount = MAX_NUM_ROWS;
+  private int outgoingRowWidth;
+  private RecordBatchSizer[] sizer;
+  private BatchStats[] inputBatchStats;
+  private BatchStats outputBatchStats;
+
+  // By default, we expect one input batch stream and one output batch stream.
+  // Some operators can get multiple input batch streams i.e. for example
+  // joins get 2 batches (left and right). Merge Receiver can get more than 2.
+  private int numInputs = 1;
+
+  private class BatchStats {
+    /**
+     * operator metric stats
+     */
+    private long numBatches;
+    private long sumBatchSizes;
+    private long totalRecords;
+
+    public long getNumBatches() {
+      return numBatches;
+    }
+
+    public long getTotalRecords() {
+      return totalRecords;
+    }
+
+    public long getAvgBatchSize() {
+      return RecordBatchSizer.safeDivide(sumBatchSizes, numBatches);
+    }
+
+    public long getAvgRowWidth() {
+      return RecordBatchSizer.safeDivide(sumBatchSizes, totalRecords);
+    }
+
+    public void incNumBatches() {
+      ++numBatches;
+    }
+
+    public void incSumBatchSizes(long batchSize) {
+      sumBatchSizes += batchSize;
+    }
+
+    public void incTotalRecords(long numRecords) {
+      totalRecords += numRecords;
+    }
+
+  }
+
+  public long getNumOutgoingBatches() {
+    return outputBatchStats.getNumBatches();
+  }
+
+  public long getTotalOutputRecords() {
+    return outputBatchStats.getTotalRecords();
+  }
+
+  public long getAvgOutputBatchSize() {
+    return outputBatchStats.getAvgBatchSize();
+  }
+
+  public long getAvgOutputRowWidth() {
+    return outputBatchStats.getAvgRowWidth();
+  }
+
+  public long getNumIncomingBatches() {
+    return inputBatchStats[DEFAULT_INPUT_INDEX].getNumBatches();
+  }
+
+  public long getAvgInputBatchSize() {
+    return inputBatchStats[DEFAULT_INPUT_INDEX].getAvgBatchSize();
+  }
+
+  public long getAvgInputRowWidth() {
+    return inputBatchStats[DEFAULT_INPUT_INDEX].getAvgRowWidth();
+  }
+
+  public long getTotalInputRecords() {
+    return inputBatchStats[DEFAULT_INPUT_INDEX].getTotalRecords();
+  }
+
+  public long getNumIncomingBatches(int index) {
+    Preconditions.checkArgument(index >= 0 && index < numInputs);
+    return inputBatchStats[index] == null ? 0 : 
inputBatchStats[index].getNumBatches();
+  }
+
+  public long getAvgInputBatchSize(int index) {
+    Preconditions.checkArgument(index >= 0 && index < numInputs);
+    return inputBatchStats[index] == null ? 0 : 
inputBatchStats[index].getAvgBatchSize();
+  }
+
+  public long getAvgInputRowWidth(int index) {
+    Preconditions.checkArgument(index >= 0 && index < numInputs);
+    return inputBatchStats[index] == null ? 0 : 
inputBatchStats[index].getAvgRowWidth();
+  }
+
+  public long getTotalInputRecords(int index) {
+    Preconditions.checkArgument(index >= 0 && index < numInputs);
+    return inputBatchStats[index] == null ? 0 : 
inputBatchStats[index].getTotalRecords();
+  }
+
+  public RecordBatchMemoryManager(int numInputs) {
+    this.numInputs = numInputs;
+    sizer = new RecordBatchSizer[numInputs];
+    inputBatchStats = new BatchStats[numInputs];
+    outputBatchStats = new BatchStats();
+  }
+
+  public RecordBatchMemoryManager() {
+    sizer = new RecordBatchSizer[numInputs];
+    inputBatchStats = new BatchStats[numInputs];
+    outputBatchStats = new BatchStats();
+  }
+
+  public void update(int inputIndex) {};
+
+  public void update() {};
+
+  public int getOutputRowCount() {
+    return outputRowCount;
+  }
+
+  /**
+   * Given batchSize and rowWidth, this will set output rowCount taking into 
account
+   * the min and max that is allowed.
+   */
+  public void setOutputRowCount(int targetBatchSize, int rowWidth) {
+    this.outputRowCount = 
adjustOutputRowCount(RecordBatchSizer.safeDivide(targetBatchSize, rowWidth));
+  }
+
+  public void setOutputRowCount(int outputRowCount) {
+    this.outputRowCount = outputRowCount;
+  }
+
+  /**
+   * This will adjust rowCount taking into account the min and max that is 
allowed.
+   * We will round down to nearest power of two - 1 for better memory 
utilization.
+   * -1 is done for adjusting accounting for offset vectors.
+   */
+  public static int adjustOutputRowCount(int rowCount) {
+    return (Math.min(MAX_NUM_ROWS, Math.max(Integer.highestOneBit(rowCount) - 
1, MIN_NUM_ROWS)));
+  }
+
+  public void setOutgoingRowWidth(int outgoingRowWidth) {
+    this.outgoingRowWidth = outgoingRowWidth;
+  }
+
+  public int getOutgoingRowWidth() {
+    return outgoingRowWidth;
+  }
+
+  public void setRecordBatchSizer(int index, RecordBatchSizer sizer) {
+    Preconditions.checkArgument(index >= 0 && index < numInputs);
+    this.sizer[index] = sizer;
+    inputBatchStats[index] = new BatchStats();
+  }
+
+  public void setRecordBatchSizer(RecordBatchSizer sizer) {
+    this.sizer[DEFAULT_INPUT_INDEX] = sizer;
+    inputBatchStats[DEFAULT_INPUT_INDEX] = new BatchStats();
+  }
+
+  public RecordBatchSizer getRecordBatchSizer(int index) {
+    Preconditions.checkArgument(index >= 0 && index < numInputs);
+    return sizer[index];
+  }
+
+  public RecordBatchSizer getRecordBatchSizer() {
+    return sizer[DEFAULT_INPUT_INDEX];
+  }
+
+  public RecordBatchSizer.ColumnSize getColumnSize(int index, String name) {
+    Preconditions.checkArgument(index >= 0 && index < numInputs);
+    return sizer[index].getColumn(name);
+  }
+
+  public RecordBatchSizer.ColumnSize getColumnSize(String name) {
+    return sizer[DEFAULT_INPUT_INDEX].getColumn(name);
+  }
+
+  public void updateIncomingStats(int index) {
+    Preconditions.checkArgument(index >= 0 && index < numInputs);
+    Preconditions.checkArgument(inputBatchStats[index] != null);
+    inputBatchStats[index].incNumBatches();
+    inputBatchStats[index].incSumBatchSizes(sizer[index].netSize());
+    inputBatchStats[index].incTotalRecords(sizer[index].rowCount());
+  }
+
+  public void updateIncomingStats() {
+    inputBatchStats[DEFAULT_INPUT_INDEX].incNumBatches();
+    
inputBatchStats[DEFAULT_INPUT_INDEX].incSumBatchSizes(sizer[DEFAULT_INPUT_INDEX].netSize());
+    
inputBatchStats[DEFAULT_INPUT_INDEX].incTotalRecords(sizer[DEFAULT_INPUT_INDEX].rowCount());
+  }
+
+  public void updateOutgoingStats(int outputRecords) {
+    outputBatchStats.incNumBatches();
+    outputBatchStats.incTotalRecords(outputRecords);
+    outputBatchStats.incSumBatchSizes(outgoingRowWidth * outputRecords);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/da241134/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
index 32c69ce..072a5cb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -57,12 +57,12 @@ public class RecordIterator implements VectorAccessible {
   private final VectorContainer container; // Holds VectorContainer of current 
record batch
   private final TreeRangeMap<Long, RecordBatchData> batches = 
TreeRangeMap.create();
 
-  private final AbstractRecordBatchMemoryManager newBatchCallBack;
+  private final RecordBatchMemoryManager newBatchCallBack;
 
   public RecordIterator(RecordBatch incoming,
                         AbstractRecordBatch<?> outgoing,
                         OperatorContext oContext,
-                        int inputIndex, AbstractRecordBatchMemoryManager 
callBack) {
+                        int inputIndex, RecordBatchMemoryManager callBack) {
     this(incoming, outgoing, oContext, inputIndex, true, callBack);
   }
 
@@ -71,7 +71,7 @@ public class RecordIterator implements VectorAccessible {
                         OperatorContext oContext,
                         int inputIndex,
                         boolean enableMarkAndReset,
-                        AbstractRecordBatchMemoryManager callBack) {
+                        RecordBatchMemoryManager callBack) {
     this.incoming = incoming;
     this.outgoing = outgoing;
     this.inputIndex = inputIndex;

Reply via email to