sohami closed pull request #1375: DRILL-6594: Data batches for Project operator 
are not being split properly and exceed the maximum specified
URL: https://github.com/apache/drill/pull/1375
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
index b9240d68be0..84a3f46f886 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
@@ -95,30 +95,31 @@ public OutputWidthCalculator getCalculator() {
     }
 
     /**
-     * VarLenReadExpr captures the name of a variable length column that is 
used (read) in an expression.
-     * The captured name will be used to lookup the average entry size for the 
column in the corresponding
+     * VarLenReadExpr captures the inputColumnName and the readExpression used 
to read a variable length column.
+     * The captured inputColumnName will be used to lookup the average entry 
size for the column in the corresponding.
+     * If inputColumnName is null then the readExpression is used to get the 
name of the column.
      * {@link org.apache.drill.exec.record.RecordBatchSizer}
      */
     public static class VarLenReadExpr extends OutputWidthExpression  {
         ValueVectorReadExpression readExpression;
-        String name;
+        String inputColumnName;
 
         public VarLenReadExpr(ValueVectorReadExpression readExpression) {
             this.readExpression = readExpression;
-            this.name = null;
+            this.inputColumnName = null;
         }
 
-        public VarLenReadExpr(String name) {
+        public VarLenReadExpr(String inputColumnName) {
             this.readExpression = null;
-            this.name = name;
+            this.inputColumnName = inputColumnName;
         }
 
         public ValueVectorReadExpression getReadExpression() {
             return readExpression;
         }
 
-        public String getName() {
-            return name;
+        public String getInputColumnName() {
+            return inputColumnName;
         }
 
         @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
index cb587952304..70908bf2fcb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
@@ -205,7 +205,7 @@ public OutputWidthExpression visitFixedLenExpr(FixedLenExpr 
fixedLenExpr, Output
     @Override
     public OutputWidthExpression visitVarLenReadExpr(VarLenReadExpr 
varLenReadExpr, OutputWidthVisitorState state)
                                                         throws 
RuntimeException {
-        String columnName = varLenReadExpr.getName();
+        String columnName = varLenReadExpr.getInputColumnName();
         if (columnName == null) {
             TypedFieldId fieldId = 
varLenReadExpr.getReadExpression().getTypedFieldId();
             columnName =  TypedFieldId.getPath(fieldId, 
state.manager.getIncomingBatch());
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
index c0e0cb1c9e6..e18c827128b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
@@ -20,18 +20,13 @@
 public class OutputWidthVisitorState {
 
     ProjectMemoryManager manager;
-    ProjectMemoryManager.OutputColumnType outputColumnType;
 
-    public OutputWidthVisitorState(ProjectMemoryManager manager, 
ProjectMemoryManager.OutputColumnType outputColumnType) {
+    public OutputWidthVisitorState(ProjectMemoryManager manager) {
         this.manager = manager;
-        this.outputColumnType = outputColumnType;
     }
 
     public ProjectMemoryManager getManager() {
         return manager;
     }
 
-    public ProjectMemoryManager.OutputColumnType getOutputColumnType() {
-        return outputColumnType;
-    }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
index f461b092281..03c849cfef6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.project;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -88,15 +89,12 @@ public RecordBatch getIncomingBatch() {
     }
 
     class ColumnWidthInfo {
-        //MaterializedField materializedField;
         OutputWidthExpression outputExpression;
         int width;
         WidthType widthType;
         OutputColumnType outputColumnType;
-        String name;
 
-        ColumnWidthInfo(ValueVector vv,
-                        OutputWidthExpression outputWidthExpression,
+        ColumnWidthInfo(OutputWidthExpression outputWidthExpression,
                         OutputColumnType outputColumnType,
                         WidthType widthType,
                         int fieldWidth) {
@@ -104,8 +102,6 @@ public RecordBatch getIncomingBatch() {
             this.width = fieldWidth;
             this.outputColumnType = outputColumnType;
             this.widthType = widthType;
-            String columnName = vv.getField().getName();
-            this.name = columnName;
         }
 
         public OutputWidthExpression getOutputExpression() { return 
outputExpression; }
@@ -116,7 +112,6 @@ public RecordBatch getIncomingBatch() {
 
         public int getWidth() { return width; }
 
-        public String getName() { return name; }
     }
 
     void ShouldNotReachHere() {
@@ -180,43 +175,44 @@ public static int 
getWidthOfFixedWidthType(TypeProtos.MajorType majorType) {
     }
 
 
-    void addTransferField(ValueVector vvOut, String path) {
-        addField(vvOut, null, OutputColumnType.TRANSFER, path);
+    void addTransferField(ValueVector vvIn, String inputColumnName, String 
outputColumnName) {
+        addField(vvIn, null, OutputColumnType.TRANSFER, inputColumnName, 
outputColumnName);
     }
 
-    void addNewField(ValueVector vv, LogicalExpression logicalExpression) {
-        addField(vv, logicalExpression, OutputColumnType.NEW, null);
+    void addNewField(ValueVector vvOut, LogicalExpression logicalExpression) {
+        addField(vvOut, logicalExpression, OutputColumnType.NEW, null, 
vvOut.getField().getName());
     }
 
-    void addField(ValueVector vv, LogicalExpression logicalExpression, 
OutputColumnType outputColumnType, String path) {
+    void addField(ValueVector vv, LogicalExpression logicalExpression, 
OutputColumnType outputColumnType,
+                  String inputColumnName, String outputColumnName) {
         if(isFixedWidth(vv)) {
             addFixedWidthField(vv);
         } else {
-            addVariableWidthField(vv, logicalExpression, outputColumnType, 
path);
+            addVariableWidthField(vv, logicalExpression, outputColumnType, 
inputColumnName, outputColumnName);
         }
     }
 
     private void addVariableWidthField(ValueVector vv, LogicalExpression 
logicalExpression,
-                                       OutputColumnType outputColumnType, 
String path) {
+                                       OutputColumnType outputColumnType, 
String inputColumnName, String outputColumnName) {
         variableWidthColumnCount++;
         ColumnWidthInfo columnWidthInfo;
         //Variable width transfers
         if(outputColumnType == OutputColumnType.TRANSFER) {
-            String columnName = path;
-            VarLenReadExpr readExpr = new VarLenReadExpr(columnName);
-            columnWidthInfo = new ColumnWidthInfo(vv, readExpr, 
outputColumnType,
+            VarLenReadExpr readExpr = new VarLenReadExpr(inputColumnName);
+            columnWidthInfo = new ColumnWidthInfo(readExpr, outputColumnType,
                     WidthType.VARIABLE, -1); //fieldWidth has to be obtained 
from the RecordBatchSizer
         } else if (isComplex(vv.getField().getType())) {
             addComplexField(vv);
             return;
         } else {
             // Walk the tree of LogicalExpressions to get a tree of 
OutputWidthExpressions
-            OutputWidthVisitorState state = new OutputWidthVisitorState(this, 
outputColumnType);
+            OutputWidthVisitorState state = new OutputWidthVisitorState(this);
             OutputWidthExpression outputWidthExpression = 
logicalExpression.accept(new OutputWidthVisitor(), state);
-            columnWidthInfo = new ColumnWidthInfo(vv, outputWidthExpression, 
outputColumnType,
+            columnWidthInfo = new ColumnWidthInfo(outputWidthExpression, 
outputColumnType,
                     WidthType.VARIABLE, -1); //fieldWidth has to be obtained 
from the OutputWidthExpression
         }
-        outputColumnSizes.put(columnWidthInfo.getName(), columnWidthInfo);
+        ColumnWidthInfo existingInfo = outputColumnSizes.put(outputColumnName, 
columnWidthInfo);
+        Preconditions.checkState(existingInfo == null);
     }
 
     void addComplexField(ValueVector vv) {
@@ -258,8 +254,8 @@ public void update() {
         setRecordBatchSizer(batchSizer);
         rowWidth = 0;
         int totalVariableColumnWidth = 0;
-        for (String expr : outputColumnSizes.keySet()) {
-            ColumnWidthInfo columnWidthInfo = outputColumnSizes.get(expr);
+        for (String outputColumnName : outputColumnSizes.keySet()) {
+            ColumnWidthInfo columnWidthInfo = 
outputColumnSizes.get(outputColumnName);
             int width = -1;
             if (columnWidthInfo.isFixedWidth()) {
                 // fixed width columns are accumulated in 
totalFixedWidthColumnWidth
@@ -269,12 +265,10 @@ public void update() {
                 //As the tree is walked, the RecordBatchSizer and function 
annotations
                 //are looked-up to come up with the final FixedLenExpr
                 OutputWidthExpression savedWidthExpr = 
columnWidthInfo.getOutputExpression();
-                OutputColumnType columnType = 
columnWidthInfo.getOutputColumnType();
-                OutputWidthVisitorState state = new 
OutputWidthVisitorState(this, columnType);
+                OutputWidthVisitorState state = new 
OutputWidthVisitorState(this);
                 OutputWidthExpression reducedExpr = savedWidthExpr.accept(new 
OutputWidthVisitor(), state);
-                assert reducedExpr instanceof FixedLenExpr;
                 width = ((FixedLenExpr)reducedExpr).getWidth();
-                assert width >= 0;
+                Preconditions.checkState(width >= 0);
             }
             totalVariableColumnWidth += width;
         }
@@ -301,7 +295,7 @@ public void update() {
         logger.trace("update() : Output RC {}, BatchSizer RC {}, incoming RC 
{}, width {}, total fixed width {}"
                     + ", total variable width {}, total complex width {}, 
batchSizer time {} ms, update time {}  ms"
                     + ", manager {}, incoming {}",outPutRowCount, 
batchSizer.rowCount(), incomingBatch.getRecordCount(),
-                    totalFixedWidthColumnWidth, totalVariableColumnWidth, 
totalComplexColumnWidth,
+                    rowWidth, totalFixedWidthColumnWidth, 
totalVariableColumnWidth, totalComplexColumnWidth,
                     (batchSizerEndTime - updateStartTime),(updateEndTime - 
updateStartTime), this, incomingBatch);
 
         logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 4bc63c0b1b7..dd933250a2b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -113,11 +113,6 @@ private void clear() {
 
   public ProjectRecordBatch(final Project pop, final RecordBatch incoming, 
final FragmentContext context) throws OutOfMemoryException {
     super(pop, context, incoming);
-
-    // get the output batch size from config.
-    int configuredBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-
-    memoryManager = new ProjectMemoryManager(configuredBatchSize);
   }
 
   @Override
@@ -367,6 +362,9 @@ private boolean isWildcard(final NamedExpression ex) {
 
   private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws 
SchemaChangeException {
     long setupNewSchemaStartTime = System.currentTimeMillis();
+    // get the output batch size from config.
+    int configuredBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    memoryManager = new ProjectMemoryManager(configuredBatchSize);
     memoryManager.init(incomingBatch, this);
     if (allocationVectors != null) {
       for (final ValueVector v : allocationVectors) {
@@ -431,7 +429,7 @@ private void setupNewSchemaFromInput(RecordBatch 
incomingBatch) throws SchemaCha
               final ValueVector vvOut = 
container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(),
                 vvIn.getField().getType()), callBack);
               final TransferPair tp = vvIn.makeTransferPair(vvOut);
-              memoryManager.addTransferField(vvIn, vvIn.getField().getName());
+              memoryManager.addTransferField(vvIn, vvIn.getField().getName(), 
vvOut.getField().getName());
               transfers.add(tp);
             }
           } else if (value != null && value > 1) { // subsequent wildcards 
should do a copy of incoming valuevectors
@@ -513,7 +511,7 @@ private void setupNewSchemaFromInput(RecordBatch 
incomingBatch) throws SchemaCha
           
container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(),
             vectorRead.getMajorType()), callBack);
         final TransferPair tp = vvIn.makeTransferPair(vvOut);
-        memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id, 
incomingBatch));
+        memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id, 
incomingBatch), vvOut.getField().getName());
         transfers.add(tp);
         transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
       } else if (expr instanceof DrillFuncHolderExpr &&
@@ -540,13 +538,13 @@ private void setupNewSchemaFromInput(RecordBatch 
incomingBatch) throws SchemaCha
         memoryManager.addComplexField(null); // this will just add an estimate 
to the row width
       } else {
         // need to do evaluation.
-        final ValueVector vector = container.addOrGet(outputField, callBack);
-        allocationVectors.add(vector);
+        final ValueVector ouputVector = container.addOrGet(outputField, 
callBack);
+        allocationVectors.add(ouputVector);
         final TypedFieldId fid = 
container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
-        final boolean useSetSafe = !(vector instanceof FixedWidthVector);
+        final boolean useSetSafe = !(ouputVector instanceof FixedWidthVector);
         final ValueVectorWriteExpression write = new 
ValueVectorWriteExpression(fid, expr, useSetSafe);
         final HoldingContainer hc = cg.addExpr(write, 
ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
-        memoryManager.addNewField(vector, write);
+        memoryManager.addNewField(ouputVector, write);
 
         // We cannot do multiple transfers from the same vector. However we 
still need to instantiate the output vector.
         if (expr instanceof ValueVectorReadExpression) {
@@ -555,7 +553,7 @@ private void setupNewSchemaFromInput(RecordBatch 
incomingBatch) throws SchemaCha
             final TypedFieldId id = vectorRead.getFieldId();
             final ValueVector vvIn = 
incomingBatch.getValueAccessorById(id.getIntermediateClass(),
                     id.getFieldIds()).getValueVector();
-            vvIn.makeTransferPair(vector);
+            vvIn.makeTransferPair(ouputVector);
           }
         }
       }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to