DRILL-6177: Merge Join - Allocate memory for outgoing value vectors based on 
sizes of incoming batches.

closes #1125


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/766315ea
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/766315ea
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/766315ea

Branch: refs/heads/master
Commit: 766315ea17377199897d685ab801edd38394fe01
Parents: 31e0f29
Author: Padma Penumarthy <ppenuma...@yahoo.com>
Authored: Tue Mar 6 16:09:43 2018 -0800
Committer: Ben-Zvi <bben-...@mapr.com>
Committed: Wed Mar 7 15:42:11 2018 -0800

----------------------------------------------------------------------
 .../exec/physical/impl/join/MergeJoinBatch.java | 23 ++++++++++++++++----
 1 file changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/766315ea/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index f612ae2..2155f0a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -57,7 +57,6 @@ import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.AbstractRecordBatchMemoryManager;
-import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 
@@ -114,6 +113,9 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     private int leftRowWidth;
     private int rightRowWidth;
 
+    private RecordBatchSizer leftSizer;
+    private RecordBatchSizer rightSizer;
+
     /**
      * mergejoin operates on one record at a time from the left and right 
batches
      * using RecordIterator abstraction. We have a callback mechanism to get 
notified
@@ -126,11 +128,11 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     public void update(int inputIndex) {
       switch(inputIndex) {
         case 0:
-          final RecordBatchSizer leftSizer = new RecordBatchSizer(left);
+          leftSizer = new RecordBatchSizer(left);
           leftRowWidth = leftSizer.netRowWidth();
           break;
         case 1:
-          final RecordBatchSizer rightSizer = new RecordBatchSizer(right);
+          rightSizer = new RecordBatchSizer(right);
           rightRowWidth = rightSizer.netRowWidth();
         default:
           break;
@@ -158,6 +160,14 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
       status.setTargetOutputRowCount(status.getOutPosition() + 
numOutputRowsRemaining);
       setOutgoingRowWidth(newOutgoingRowWidth);
     }
+
+    @Override
+    public RecordBatchSizer.ColumnSize getColumnSize(String name) {
+      if (leftSizer != null && leftSizer.getColumn(name) != null) {
+        return leftSizer.getColumn(name);
+      }
+      return rightSizer == null ? null : rightSizer.getColumn(name);
+    }
   }
 
   private final MergeJoinMemoryManager mergeJoinMemoryManager = new 
MergeJoinMemoryManager();
@@ -492,8 +502,13 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
     } else {
       container.zeroVectors();
     }
+
+    // Allocate memory for the vectors.
+    // This will iteratively allocate memory for all nested columns underneath.
+    int outputRowCount = mergeJoinMemoryManager.getOutputRowCount();
     for (VectorWrapper w : container) {
-      AllocationHelper.allocateNew(w.getValueVector(), Character.MAX_VALUE);
+      RecordBatchSizer.ColumnSize colSize = 
mergeJoinMemoryManager.getColumnSize(w.getField().getName());
+      colSize.allocateVector(w.getValueVector(), outputRowCount);
     }
 
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);

Reply via email to