This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 0af04cd44622c288278965b13685907fd1e40fb4
Author: Paul Rogers <[email protected]>
AuthorDate: Mon Dec 30 19:28:25 2019 -0800

    DRILL-7503: Refactor the project operator
    
    Breaks the big "setup" function into its own class, and
    separates out physical vector setup from logical projection
    planning. No functional change; just rearranging existing
    code.
    
    closes #1944
---
 .../physical/impl/project/OutputWidthVisitor.java  | 418 +++++++-------
 .../physical/impl/project/ProjectBatchBuilder.java | 141 +++++
 .../impl/project/ProjectMemoryManager.java         | 526 ++++++++---------
 .../physical/impl/project/ProjectRecordBatch.java  | 620 ++------------------
 .../impl/project/ProjectionMaterializer.java       | 626 +++++++++++++++++++++
 .../impl/window/WindowFrameRecordBatch.java        |   2 +-
 .../apache/drill/exec/record/VectorAccessible.java |   3 +-
 .../apache/drill/exec/record/VectorWrapper.java    |   2 -
 .../src/test/resources/project/test1.json          |   8 +-
 9 files changed, 1265 insertions(+), 1081 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
index 7b2e2b8..4fa7a89 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
@@ -43,238 +43,212 @@ import org.apache.drill.exec.record.TypedFieldId;
 
 import java.util.ArrayList;
 
-public class OutputWidthVisitor extends 
AbstractExecExprVisitor<OutputWidthExpression, OutputWidthVisitorState,
-        RuntimeException> {
-
-    @Override
-    public OutputWidthExpression visitVarDecimalConstant(VarDecimalExpression 
varDecimalExpression,
-                                                         
OutputWidthVisitorState state) throws RuntimeException {
-        
Preconditions.checkArgument(varDecimalExpression.getMajorType().hasPrecision());
-        return new 
FixedLenExpr(varDecimalExpression.getMajorType().getPrecision());
-    }
-
-
-    /**
-     *
-     * Records the {@link IfExpression} as a {@link IfElseWidthExpr}. 
IfElseWidthExpr will be reduced to
-     * a {@link FixedLenExpr} by taking the max of the if-expr-width and the 
else-expr-width.
-     *
-     * @param ifExpression
-     * @param state
-     * @return IfElseWidthExpr
-     * @throws RuntimeException
-     */
-    @Override
-    public OutputWidthExpression visitIfExpression(IfExpression ifExpression, 
OutputWidthVisitorState state)
-                                                                    throws 
RuntimeException {
-        IfExpression.IfCondition condition = ifExpression.ifCondition;
-        LogicalExpression ifExpr = condition.expression;
-        LogicalExpression elseExpr = ifExpression.elseExpression;
-
-        OutputWidthExpression ifWidthExpr = ifExpr.accept(this, state);
-        OutputWidthExpression elseWidthExpr = null;
-        if (elseExpr != null) {
-            elseWidthExpr = elseExpr.accept(this, state);
-        }
-        return new IfElseWidthExpr(ifWidthExpr, elseWidthExpr);
+public class OutputWidthVisitor extends
+      AbstractExecExprVisitor<OutputWidthExpression,
+      OutputWidthVisitorState, RuntimeException> {
+
+  @Override
+  public OutputWidthExpression visitVarDecimalConstant(VarDecimalExpression 
varDecimalExpression,
+                                                       OutputWidthVisitorState 
state) throws RuntimeException {
+    
Preconditions.checkArgument(varDecimalExpression.getMajorType().hasPrecision());
+    return new 
FixedLenExpr(varDecimalExpression.getMajorType().getPrecision());
+  }
+
+  /**
+   * Records the {@link IfExpression} as a {@link IfElseWidthExpr}.
+   * IfElseWidthExpr will be reduced to a {@link FixedLenExpr} by taking the 
max
+   * of the if-expr-width and the else-expr-width.
+   */
+  @Override
+  public OutputWidthExpression visitIfExpression(IfExpression ifExpression, 
OutputWidthVisitorState state)
+                                                                  throws 
RuntimeException {
+    IfExpression.IfCondition condition = ifExpression.ifCondition;
+    LogicalExpression ifExpr = condition.expression;
+    LogicalExpression elseExpr = ifExpression.elseExpression;
+
+    OutputWidthExpression ifWidthExpr = ifExpr.accept(this, state);
+    OutputWidthExpression elseWidthExpr = null;
+    if (elseExpr != null) {
+      elseWidthExpr = elseExpr.accept(this, state);
     }
-
-    /**
-     * Handles a {@link FunctionHolderExpression}. Functions that produce 
fixed-width output are trivially
-     * converted to a {@link FixedLenExpr}. For functions that produce 
variable width output, the output width calculator
-     * annotation is looked-up and recorded in a {@link FunctionCallExpr}. 
This calculator will later be used to convert
-     * the FunctionCallExpr to a {@link FixedLenExpr} expression
-     * @param holderExpr
-     * @param state
-     * @return FunctionCallExpr
-     * @throws RuntimeException
-     */
-    @Override
-    public OutputWidthExpression 
visitFunctionHolderExpression(FunctionHolderExpression holderExpr,
-                                                               
OutputWidthVisitorState state) throws RuntimeException {
-        OutputWidthExpression fixedWidth = 
getFixedLenExpr(holderExpr.getMajorType());
-        if (fixedWidth != null) { return fixedWidth; }
-        // Only Drill functions can be handled. Non-drill Functions, like 
HiveFunctions
-        // will default to a fixed value
-        if (!(holderExpr instanceof DrillFuncHolderExpr)) {
-            // We currently only know how to handle DrillFuncs.
-            // Use a default if this is not a DrillFunc
-            return new 
FixedLenExpr(OutputSizeEstimateConstants.NON_DRILL_FUNCTION_OUTPUT_SIZE_ESTIMATE);
-        }
-
-        final DrillFuncHolder holder = ((DrillFuncHolderExpr) 
holderExpr).getHolder();
-
-        // If the user has provided a size estimate, use it
-        int estimate = holder.variableOutputSizeEstimate();
-        if (estimate != FunctionTemplate.OUTPUT_SIZE_ESTIMATE_DEFAULT) {
-            return new FixedLenExpr(estimate);
-        }
-        // Use the calculator provided by the user or use the default
-        OutputWidthCalculator widthCalculator = 
holder.getOutputWidthCalculator();
-        final int argSize = holderExpr.args.size();
-        ArrayList<OutputWidthExpression> arguments = null;
-        if (argSize != 0) {
-            arguments = new ArrayList<>(argSize);
-            for (LogicalExpression expr : holderExpr.args) {
-                arguments.add(expr.accept(this, state));
-            }
-        }
-        return new FunctionCallExpr(holderExpr, widthCalculator, arguments);
+    return new IfElseWidthExpr(ifWidthExpr, elseWidthExpr);
+  }
+
+  /**
+   * Handles a {@link FunctionHolderExpression}. Functions that produce
+   * fixed-width output are trivially converted to a {@link FixedLenExpr}. For
+   * functions that produce variable width output, the output width calculator
+   * annotation is looked-up and recorded in a {@link FunctionCallExpr}. This
+   * calculator will later be used to convert the FunctionCallExpr to a
+   * {@link FixedLenExpr} expression
+   */
+  @Override
+  public OutputWidthExpression 
visitFunctionHolderExpression(FunctionHolderExpression holderExpr,
+                                                             
OutputWidthVisitorState state) throws RuntimeException {
+    OutputWidthExpression fixedWidth = 
getFixedLenExpr(holderExpr.getMajorType());
+    if (fixedWidth != null) { return fixedWidth; }
+    // Only Drill functions can be handled. Non-drill Functions, like 
HiveFunctions
+    // will default to a fixed value
+    if (!(holderExpr instanceof DrillFuncHolderExpr)) {
+      // We currently only know how to handle DrillFuncs.
+      // Use a default if this is not a DrillFunc
+      return new 
FixedLenExpr(OutputSizeEstimateConstants.NON_DRILL_FUNCTION_OUTPUT_SIZE_ESTIMATE);
     }
 
-    /**
-     * Records a variable width write expression. This will be converted to a 
{@link FixedLenExpr} expression by walking
-     * the tree of expression attached to the write expression.
-     * @param writeExpr
-     * @param state
-     * @return
-     * @throws RuntimeException
-     */
-    @Override
-    public OutputWidthExpression 
visitValueVectorWriteExpression(ValueVectorWriteExpression writeExpr,
-                                                                 
OutputWidthVisitorState state) throws RuntimeException {
-        TypedFieldId fieldId = writeExpr.getFieldId();
-        ProjectMemoryManager manager = state.getManager();
-        OutputWidthExpression outputExpr;
-        if (manager.isFixedWidth(fieldId)) {
-            outputExpr = getFixedLenExpr(fieldId.getFinalType());
-        } else {
-            LogicalExpression writeArg = writeExpr.getChild();
-            outputExpr = writeArg.accept(this, state);
-        }
-        return outputExpr;
-    }
+    final DrillFuncHolder holder = ((DrillFuncHolderExpr) 
holderExpr).getHolder();
 
-    /**
-     * Records a variable width read expression as a {@link VarLenReadExpr}. 
This will be converted to a
-     * {@link FixedLenExpr} expression by getting the size for the 
corresponding column from the {@link RecordBatchSizer}.
-     *
-     * @param readExpr
-     * @param state
-     * @return
-     * @throws RuntimeException
-     */
-    @Override
-    public OutputWidthExpression 
visitValueVectorReadExpression(ValueVectorReadExpression readExpr,
-                                                                
OutputWidthVisitorState state) throws RuntimeException {
-        return new VarLenReadExpr(readExpr);
+    // If the user has provided a size estimate, use it
+    int estimate = holder.variableOutputSizeEstimate();
+    if (estimate != FunctionTemplate.OUTPUT_SIZE_ESTIMATE_DEFAULT) {
+      return new FixedLenExpr(estimate);
     }
-
-    @Override
-    public OutputWidthExpression 
visitQuotedStringConstant(ValueExpressions.QuotedString quotedString,
-                                                           
OutputWidthVisitorState state) throws RuntimeException {
-        return new FixedLenExpr(quotedString.getString().length());
+    // Use the calculator provided by the user or use the default
+    OutputWidthCalculator widthCalculator = holder.getOutputWidthCalculator();
+    final int argSize = holderExpr.args.size();
+    ArrayList<OutputWidthExpression> arguments = null;
+    if (argSize != 0) {
+      arguments = new ArrayList<>(argSize);
+      for (LogicalExpression expr : holderExpr.args) {
+        arguments.add(expr.accept(this, state));
+      }
     }
-
-    @Override
-    public OutputWidthExpression visitUnknown(LogicalExpression 
logicalExpression, OutputWidthVisitorState state) {
-        OutputWidthExpression fixedLenExpr = 
getFixedLenExpr(logicalExpression.getMajorType());
-        if (fixedLenExpr != null) {
-            return fixedLenExpr;
-        }
-        throw new IllegalStateException("Unknown variable width expression: " 
+ logicalExpression);
+    return new FunctionCallExpr(holderExpr, widthCalculator, arguments);
+  }
+
+  /**
+   * Records a variable width write expression. This will be converted to a
+   * {@link FixedLenExpr} expression by walking the tree of expression attached
+   * to the write expression.
+   */
+  @Override
+  public OutputWidthExpression 
visitValueVectorWriteExpression(ValueVectorWriteExpression writeExpr,
+                                                               
OutputWidthVisitorState state) throws RuntimeException {
+    TypedFieldId fieldId = writeExpr.getFieldId();
+    ProjectMemoryManager manager = state.getManager();
+    OutputWidthExpression outputExpr;
+    if (manager.isFixedWidth(fieldId)) {
+      outputExpr = getFixedLenExpr(fieldId.getFinalType());
+    } else {
+      LogicalExpression writeArg = writeExpr.getChild();
+      outputExpr = writeArg.accept(this, state);
     }
-
-    @Override
-    public OutputWidthExpression visitNullConstant(TypedNullConstant 
nullConstant, OutputWidthVisitorState state)
-            throws RuntimeException {
-        int width;
-        if (nullConstant.getMajorType().hasPrecision()) {
-            width = nullConstant.getMajorType().getPrecision();
-        } else {
-            width = 0;
-        }
-        return new FixedLenExpr(width);
+    return outputExpr;
+  }
+
+  /**
+   * Records a variable width read expression as a {@link VarLenReadExpr}. This
+   * will be converted to a {@link FixedLenExpr} expression by getting the size
+   * for the corresponding column from the {@link RecordBatchSizer}.
+   */
+  @Override
+  public OutputWidthExpression 
visitValueVectorReadExpression(ValueVectorReadExpression readExpr,
+                                                              
OutputWidthVisitorState state) throws RuntimeException {
+    return new VarLenReadExpr(readExpr);
+  }
+
+  @Override
+  public OutputWidthExpression 
visitQuotedStringConstant(ValueExpressions.QuotedString quotedString,
+                                                         
OutputWidthVisitorState state) throws RuntimeException {
+    return new FixedLenExpr(quotedString.getString().length());
+  }
+
+  @Override
+  public OutputWidthExpression visitUnknown(LogicalExpression 
logicalExpression, OutputWidthVisitorState state) {
+    OutputWidthExpression fixedLenExpr = 
getFixedLenExpr(logicalExpression.getMajorType());
+    if (fixedLenExpr != null) {
+      return fixedLenExpr;
     }
-
-
-    @Override
-    public OutputWidthExpression visitFixedLenExpr(FixedLenExpr fixedLenExpr, 
OutputWidthVisitorState state)
-            throws RuntimeException {
-        return fixedLenExpr;
+    throw new IllegalStateException("Unknown variable width expression: " + 
logicalExpression);
+  }
+
+  @Override
+  public OutputWidthExpression visitNullConstant(TypedNullConstant 
nullConstant, OutputWidthVisitorState state)
+          throws RuntimeException {
+    int width;
+    if (nullConstant.getMajorType().hasPrecision()) {
+      width = nullConstant.getMajorType().getPrecision();
+    } else {
+      width = 0;
     }
-
-    /**
-     * Converts the {@link VarLenReadExpr} to a {@link FixedLenExpr} by 
getting the size for the corresponding column
-     * from the RecordBatchSizer.
-     * @param varLenReadExpr
-     * @param state
-     * @return
-     * @throws RuntimeException
-     */
-    @Override
-    public OutputWidthExpression visitVarLenReadExpr(VarLenReadExpr 
varLenReadExpr, OutputWidthVisitorState state)
-                                                        throws 
RuntimeException {
-        String columnName = varLenReadExpr.getInputColumnName();
-        if (columnName == null) {
-            TypedFieldId fieldId = 
varLenReadExpr.getReadExpression().getTypedFieldId();
-            columnName =  TypedFieldId.getPath(fieldId, 
state.manager.getIncomingBatch());
-        }
-        final RecordBatchSizer.ColumnSize columnSize = 
state.manager.getColumnSize(columnName);
-
-        int columnWidth = columnSize.getDataSizePerEntry();
-        return new FixedLenExpr(columnWidth);
+    return new FixedLenExpr(width);
+  }
+
+
+  @Override
+  public OutputWidthExpression visitFixedLenExpr(FixedLenExpr fixedLenExpr, 
OutputWidthVisitorState state)
+          throws RuntimeException {
+    return fixedLenExpr;
+  }
+
+  /**
+   * Converts the {@link VarLenReadExpr} to a {@link FixedLenExpr} by getting
+   * the size for the corresponding column from the RecordBatchSizer.
+   */
+  @Override
+  public OutputWidthExpression visitVarLenReadExpr(VarLenReadExpr 
varLenReadExpr, OutputWidthVisitorState state)
+                                                      throws RuntimeException {
+    String columnName = varLenReadExpr.getInputColumnName();
+    if (columnName == null) {
+      TypedFieldId fieldId = 
varLenReadExpr.getReadExpression().getTypedFieldId();
+      columnName =  TypedFieldId.getPath(fieldId, 
state.manager.incomingBatch());
     }
-
-    /**
-     * Converts a {@link FunctionCallExpr} to a {@link FixedLenExpr} by 
passing the the args of the function to the
-     * width calculator for this function.
-     * @param functionCallExpr
-     * @param state
-     * @return
-     * @throws RuntimeException
-     */
-    @Override
-    public OutputWidthExpression visitFunctionCallExpr(FunctionCallExpr 
functionCallExpr, OutputWidthVisitorState state)
-                                                        throws 
RuntimeException {
-        ArrayList<OutputWidthExpression> args = functionCallExpr.getArgs();
-        ArrayList<FixedLenExpr> estimatedArgs = null;
-
-        if (args != null && args.size() != 0) {
-            estimatedArgs = new ArrayList<>(args.size());
-            for (OutputWidthExpression expr : args) {
-                // Once the args are visited, they will all become 
FixedWidthExpr
-                FixedLenExpr fixedLenExpr = (FixedLenExpr) expr.accept(this, 
state);
-                estimatedArgs.add(fixedLenExpr);
-            }
-        }
-        OutputWidthCalculator estimator = functionCallExpr.getCalculator();
-        int estimatedSize = estimator.getOutputWidth(estimatedArgs);
-        return new FixedLenExpr(estimatedSize);
+    final RecordBatchSizer.ColumnSize columnSize = 
state.manager.getColumnSize(columnName);
+
+    int columnWidth = columnSize.getDataSizePerEntry();
+    return new FixedLenExpr(columnWidth);
+  }
+
+  /**
+   * Converts a {@link FunctionCallExpr} to a {@link FixedLenExpr} by passing
+   * the the args of the function to the width calculator for this function.
+   */
+  @Override
+  public OutputWidthExpression visitFunctionCallExpr(FunctionCallExpr 
functionCallExpr, OutputWidthVisitorState state)
+                                                      throws RuntimeException {
+    ArrayList<OutputWidthExpression> args = functionCallExpr.getArgs();
+    ArrayList<FixedLenExpr> estimatedArgs = null;
+
+    if (args != null && args.size() != 0) {
+      estimatedArgs = new ArrayList<>(args.size());
+      for (OutputWidthExpression expr : args) {
+        // Once the args are visited, they will all become FixedWidthExpr
+        FixedLenExpr fixedLenExpr = (FixedLenExpr) expr.accept(this, state);
+        estimatedArgs.add(fixedLenExpr);
+      }
     }
-
-    /**
-     *  Converts the {@link IfElseWidthExpr}  to a {@link FixedLenExpr} by 
taking the max of the if-expr-width and the
-     *  else-expr-width.
-     * @param ifElseWidthExpr
-     * @param state
-     * @return
-     * @throws RuntimeException
-     */
-    @Override
-    public OutputWidthExpression visitIfElseWidthExpr(IfElseWidthExpr 
ifElseWidthExpr, OutputWidthVisitorState state)
-                                                        throws 
RuntimeException {
-        OutputWidthExpression ifReducedExpr = 
ifElseWidthExpr.expressions[0].accept(this, state);
-        assert ifReducedExpr instanceof FixedLenExpr;
-        int ifWidth = ((FixedLenExpr)ifReducedExpr).getDataWidth();
-        int elseWidth = -1;
-        if (ifElseWidthExpr.expressions[1] != null) {
-            OutputWidthExpression elseReducedExpr = 
ifElseWidthExpr.expressions[1].accept(this, state);
-            assert elseReducedExpr instanceof FixedLenExpr;
-            elseWidth = ((FixedLenExpr)elseReducedExpr).getDataWidth();
-        }
-        int outputWidth = Math.max(ifWidth, elseWidth);
-        return new FixedLenExpr(outputWidth);
+    OutputWidthCalculator estimator = functionCallExpr.getCalculator();
+    int estimatedSize = estimator.getOutputWidth(estimatedArgs);
+    return new FixedLenExpr(estimatedSize);
+  }
+
+  /**
+   * Converts the {@link IfElseWidthExpr} to a {@link FixedLenExpr} by taking
+   * the max of the if-expr-width and the else-expr-width.
+   */
+  @Override
+  public OutputWidthExpression visitIfElseWidthExpr(IfElseWidthExpr 
ifElseWidthExpr, OutputWidthVisitorState state)
+                                                      throws RuntimeException {
+    OutputWidthExpression ifReducedExpr = 
ifElseWidthExpr.expressions[0].accept(this, state);
+    assert ifReducedExpr instanceof FixedLenExpr;
+    int ifWidth = ((FixedLenExpr)ifReducedExpr).getDataWidth();
+    int elseWidth = -1;
+    if (ifElseWidthExpr.expressions[1] != null) {
+      OutputWidthExpression elseReducedExpr = 
ifElseWidthExpr.expressions[1].accept(this, state);
+      assert elseReducedExpr instanceof FixedLenExpr;
+      elseWidth = ((FixedLenExpr)elseReducedExpr).getDataWidth();
     }
-
-    private OutputWidthExpression getFixedLenExpr(MajorType majorType) {
-        MajorType type = majorType;
-        if (Types.isFixedWidthType(type)) {
-            // Use only the width of the data. Metadata width will be 
accounted for at the end
-            // This is to avoid using metadata size in intermediate 
calculations
-            int fixedDataWidth = 
ProjectMemoryManager.getDataWidthOfFixedWidthType(type);
-            return new OutputWidthExpression.FixedLenExpr(fixedDataWidth);
-        }
-        return null;
+    int outputWidth = Math.max(ifWidth, elseWidth);
+    return new FixedLenExpr(outputWidth);
+  }
+
+  private OutputWidthExpression getFixedLenExpr(MajorType majorType) {
+    MajorType type = majorType;
+    if (Types.isFixedWidthType(type)) {
+      // Use only the width of the data. Metadata width will be accounted for 
at the end
+      // This is to avoid using metadata size in intermediate calculations
+      int fixedDataWidth = ProjectMemoryManager.getFixedWidth(type);
+      return new OutputWidthExpression.FixedLenExpr(fixedDataWidth);
     }
-}
\ No newline at end of file
+    return null;
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java
new file mode 100644
index 0000000..ce5cb72
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.project;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+import org.apache.drill.exec.expr.ValueVectorWriteExpression;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+/**
+ * Implements callbacks to build the physical vectors for the project
+ * record batch.
+ */
+public class ProjectBatchBuilder implements 
ProjectionMaterializer.BatchBuilder {
+  private final ProjectRecordBatch projectBatch;
+  private final VectorContainer container;
+  private final SchemaChangeCallBack callBack;
+  private final RecordBatch incomingBatch;
+  private final List<TransferPair> transfers = new ArrayList<>();
+
+  public ProjectBatchBuilder(ProjectRecordBatch projectBatch, VectorContainer 
container,
+      SchemaChangeCallBack callBack, RecordBatch incomingBatch) {
+    this.projectBatch = projectBatch;
+    this.container = container;
+    this.callBack = callBack;
+    this.incomingBatch = incomingBatch;
+  }
+
+  public List<TransferPair> transfers() { return transfers; }
+
+  @Override
+  public void addTransferField(String name, ValueVector vvIn) {
+    FieldReference ref = new FieldReference(name);
+    ValueVector vvOut = 
container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(),
+      vvIn.getField().getType()), callBack);
+    projectBatch.memoryManager.addTransferField(vvIn, 
vvIn.getField().getName(), vvOut.getField().getName());
+    transfers.add(vvIn.makeTransferPair(vvOut));
+  }
+
+  @Override
+  public int addDirectTransfer(FieldReference ref, ValueVectorReadExpression 
vectorRead) {
+    TypedFieldId id = vectorRead.getFieldId();
+    ValueVector vvIn = 
incomingBatch.getValueAccessorById(id.getIntermediateClass(), 
id.getFieldIds()).getValueVector();
+    Preconditions.checkNotNull(incomingBatch);
+
+    ValueVector vvOut =
+        
container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(),
+        vectorRead.getMajorType()), callBack);
+    TransferPair tp = vvIn.makeTransferPair(vvOut);
+    projectBatch.memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id, 
incomingBatch), vvOut.getField().getName());
+    transfers.add(tp);
+    return vectorRead.getFieldId().getFieldIds()[0];
+  }
+
+  @Override
+  public ValueVectorWriteExpression addOutputVector(String name, 
LogicalExpression expr) {
+    MaterializedField outputField = MaterializedField.create(name, 
expr.getMajorType());
+    ValueVector vv = container.addOrGet(outputField, callBack);
+    projectBatch.allocationVectors.add(vv);
+    TypedFieldId fid = 
container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
+    ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, 
expr, true);
+    projectBatch.memoryManager.addNewField(vv, write);
+    return write;
+  }
+
+  @Override
+  public void addComplexField(FieldReference ref) {
+    initComplexWriters();
+    if (projectBatch.complexFieldReferencesList == null) {
+      projectBatch.complexFieldReferencesList = Lists.newArrayList();
+    } else {
+      projectBatch.complexFieldReferencesList.clear();
+    }
+
+    // save the field reference for later for getting schema when input is 
empty
+    projectBatch.complexFieldReferencesList.add(ref);
+    projectBatch.memoryManager.addComplexField(null); // this will just add an 
estimate to the row width
+  }
+
+  private void initComplexWriters() {
+    // Lazy initialization of the list of complex writers, if not done yet.
+    if (projectBatch.complexWriters == null) {
+      projectBatch.complexWriters = new ArrayList<>();
+    } else {
+      projectBatch.complexWriters.clear();
+    }
+  }
+
+  @Override
+  public ValueVectorWriteExpression addEvalVector(String outputName, 
LogicalExpression expr) {
+    MaterializedField outputField = MaterializedField.create(outputName, 
expr.getMajorType());
+    ValueVector ouputVector = container.addOrGet(outputField, callBack);
+    projectBatch.allocationVectors.add(ouputVector);
+    TypedFieldId fid = 
container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
+    boolean useSetSafe = !(ouputVector instanceof FixedWidthVector);
+    ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, 
expr, useSetSafe);
+    projectBatch.memoryManager.addNewField(ouputVector, write);
+
+    // We cannot do multiple transfers from the same vector. However we still
+    // need to instantiate the output vector.
+    if (expr instanceof ValueVectorReadExpression) {
+      ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
+      if (!vectorRead.hasReadPath()) {
+        TypedFieldId id = vectorRead.getFieldId();
+        ValueVector vvIn = 
incomingBatch.getValueAccessorById(id.getIntermediateClass(),
+                id.getFieldIds()).getValueVector();
+        vvIn.makeTransferPair(ouputVector);
+      }
+    }
+    return write;
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
index 92f2b3d..571871b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
@@ -18,10 +18,12 @@
 package org.apache.drill.exec.physical.impl.project;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.TypeHelper;
 import 
org.apache.drill.exec.physical.impl.project.OutputWidthExpression.VarLenReadExpr;
 import org.apache.drill.exec.record.RecordBatch;
@@ -42,307 +44,263 @@ import java.util.HashMap;
 import java.util.Map;
 
 /**
- *
- * ProjectMemoryManager(PMM) is used to estimate the size of rows produced by 
ProjectRecordBatch.
- * The PMM works as follows:
- *
- * Setup phase: As and when ProjectRecordBatch creates or transfers a field, 
it registers the field with PMM.
- * If the field is a variable width field, PMM records the expression that 
produces the variable
- * width field. The expression is a tree of LogicalExpressions. The PMM walks 
this tree of LogicalExpressions
- * to produce a tree of OutputWidthExpressions. The widths of Fixed width 
fields are just accumulated into a single
- * total. Note: The PMM, currently, cannot handle new complex fields, it just 
uses a hard-coded estimate for such fields.
- *
- *
- * Execution phase: Just before a batch is processed by Project, the PMM walks 
the tree of OutputWidthExpressions
- * and converts them to FixedWidthExpressions. It uses the RecordBatchSizer 
and the function annotations to do this conversion.
- * See OutputWidthVisitor for details.
+ * ProjectMemoryManager(PMM) is used to estimate the size of rows produced by
+ * ProjectRecordBatch. The PMM works as follows:
+ * <p>
+ * Setup phase: As and when ProjectRecordBatch creates or transfers a field, it
+ * registers the field with PMM. If the field is a variable-width field, PMM
+ * records the expression that produces the variable-width field. The 
expression
+ * is a tree of LogicalExpressions. The PMM walks this tree of
+ * LogicalExpressions to produce a tree of OutputWidthExpressions. The widths 
of
+ * fixed-width fields are just accumulated into a single total. Note: The PMM,
+ * currently, cannot handle new complex fields, it just uses a hard-coded
+ * estimate for such fields.
+ * <p>
+ * Execution phase: Just before a batch is processed by Project, the PMM walks
+ * the tree of OutputWidthExpressions and converts them to
+ * FixedWidthExpressions. It uses the RecordBatchSizer and the function
+ * annotations to do this conversion. See OutputWidthVisitor for details.
  */
 public class ProjectMemoryManager extends RecordBatchMemoryManager {
 
-    static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ProjectMemoryManager.class);
-
-    public RecordBatch getIncomingBatch() {
-        return incomingBatch;
-    }
-
-    RecordBatch incomingBatch = null;
-    ProjectRecordBatch outgoingBatch = null;
-
-    int rowWidth = 0;
-    Map<String, ColumnWidthInfo> outputColumnSizes;
-    // Number of variable width columns in the batch
-    int variableWidthColumnCount = 0;
-    // Number of fixed width columns in the batch
-    int fixedWidthColumnCount = 0;
-    // Number of complex columns in the batch
-    int complexColumnsCount = 0;
-
-
-    // Holds sum of all fixed width column widths
-    int totalFixedWidthColumnWidth = 0;
-    // Holds sum of all complex column widths
-    // Currently, this is just a guess
-    int totalComplexColumnWidth = 0;
-
-    enum WidthType {
-        FIXED,
-        VARIABLE
-    }
-
-    enum OutputColumnType {
-        TRANSFER,
-        NEW
-    }
-
-    class ColumnWidthInfo {
-        OutputWidthExpression outputExpression;
-        int width;
-        WidthType widthType;
-        OutputColumnType outputColumnType;
-        ValueVector outputVV; // for transfers, this is the transfer src
-
-
-        ColumnWidthInfo(OutputWidthExpression outputWidthExpression,
-                        OutputColumnType outputColumnType,
-                        WidthType widthType,
-                        int fieldWidth, ValueVector outputVV) {
-            this.outputExpression = outputWidthExpression;
-            this.width = fieldWidth;
-            this.outputColumnType = outputColumnType;
-            this.widthType = widthType;
-            this.outputVV = outputVV;
-        }
-
-        public OutputWidthExpression getOutputExpression() { return 
outputExpression; }
-
-        public OutputColumnType getOutputColumnType() { return 
outputColumnType; }
-
-        boolean isFixedWidth() { return widthType == WidthType.FIXED; }
-
-        public int getWidth() { return width; }
-
-    }
-
-    void ShouldNotReachHere() {
-        throw new IllegalStateException();
-    }
-
-    private void setIncomingBatch(RecordBatch recordBatch) {
-        incomingBatch = recordBatch;
-    }
-
-    private void setOutgoingBatch(ProjectRecordBatch outgoingBatch) {
-        this.outgoingBatch = outgoingBatch;
-    }
+  private static final Logger logger = 
LoggerFactory.getLogger(ProjectMemoryManager.class);
 
-    public ProjectMemoryManager(int configuredOutputSize) {
-        super(configuredOutputSize);
-        outputColumnSizes = new HashMap<>();
-    }
+  private enum OutputColumnType {
+      TRANSFER,
+      NEW
+  }
 
-    public boolean isComplex(MajorType majorType) {
-        MinorType minorType = majorType.getMinorType();
-        return minorType == MinorType.MAP || minorType == MinorType.UNION || 
minorType == MinorType.LIST;
-    }
+  public static class VariableWidthColumnInfo {
+    private final OutputWidthExpression outputExpression;
+    private final ValueVector outputVV; // for transfers, this is the transfer 
src
 
-    boolean isFixedWidth(TypedFieldId fieldId) {
-        ValueVector vv = getOutgoingValueVector(fieldId);
-        return isFixedWidth(vv);
-    }
 
-    public ValueVector getOutgoingValueVector(TypedFieldId fieldId) {
-        Class<?> clazz = fieldId.getIntermediateClass();
-        int[] fieldIds = fieldId.getFieldIds();
-        return outgoingBatch.getValueAccessorById(clazz, 
fieldIds).getValueVector();
+    VariableWidthColumnInfo(OutputWidthExpression outputWidthExpression,
+                    ValueVector outputVV) {
+      this.outputExpression = outputWidthExpression;
+      this.outputVV = outputVV;
     }
 
-    static boolean isFixedWidth(ValueVector vv) {  return (vv instanceof 
FixedWidthVector); }
-
-
-    static int getNetWidthOfFixedWidthType(ValueVector vv) {
-        assert isFixedWidth(vv);
-        return ((FixedWidthVector)vv).getValueWidth();
+    public OutputWidthExpression getOutputExpression() { return 
outputExpression; }
+  }
+
+  private RecordBatch incomingBatch;
+  private ProjectRecordBatch outgoingBatch;
+
+  private int rowWidth;
+  private final Map<String, VariableWidthColumnInfo> varWidthColumnSizes;
+  // Number of variable width columns in the batch
+  private int variableWidthColumnCount;
+  // Number of fixed width columns in the batch
+  private int fixedWidthColumnCount;
+  // Number of complex columns in the batch
+  private int complexColumnsCount;
+
+  // Holds sum of all fixed width column widths
+  private int totalFixedWidthColumnWidth;
+  // Holds sum of all complex column widths
+  // Currently, this is just a guess
+  private int totalComplexColumnWidth;
+
+  public ProjectMemoryManager(int configuredOutputSize) {
+    super(configuredOutputSize);
+    varWidthColumnSizes = new HashMap<>();
+  }
+
+  private void reset() {
+    rowWidth = 0;
+    totalFixedWidthColumnWidth = 0;
+    totalComplexColumnWidth = 0;
+
+    fixedWidthColumnCount = 0;
+    complexColumnsCount = 0;
+  }
+
+  private void setIncomingBatch(RecordBatch recordBatch) {
+    incomingBatch = recordBatch;
+  }
+
+  public RecordBatch incomingBatch() { return incomingBatch; }
+
+  private void setOutgoingBatch(ProjectRecordBatch outgoingBatch) {
+    this.outgoingBatch = outgoingBatch;
+  }
+
+  public boolean isComplex(MajorType majorType) {
+    return Types.isComplex(majorType) || Types.isUnion(majorType);
+  }
+
+  public boolean isFixedWidth(TypedFieldId fieldId) {
+    ValueVector vv = getOutgoingValueVector(fieldId);
+    return isFixedWidth(vv);
+  }
+
+  private ValueVector getOutgoingValueVector(TypedFieldId fieldId) {
+    Class<?> clazz = fieldId.getIntermediateClass();
+    int[] fieldIds = fieldId.getFieldIds();
+    return outgoingBatch.getValueAccessorById(clazz, 
fieldIds).getValueVector();
+  }
+
+  private static boolean isFixedWidth(ValueVector vv) {  return (vv instanceof 
FixedWidthVector); }
+
+
+  public static int getFixedWidth(TypeProtos.MajorType majorType) {
+    
Preconditions.checkArgument(!Types.isVarWidthType(majorType.getMinorType()),
+        "Expected fixed type but was '%s'.", majorType.getMinorType());
+    return TypeHelper.getSize(majorType);
+  }
+
+  public void addTransferField(ValueVector vvIn, String inputColumnName, 
String outputColumnName) {
+    addField(vvIn, null, OutputColumnType.TRANSFER, inputColumnName, 
outputColumnName);
+  }
+
+  public void addNewField(ValueVector vvOut, LogicalExpression 
logicalExpression) {
+    addField(vvOut, logicalExpression, OutputColumnType.NEW, null, 
vvOut.getField().getName());
+  }
+
+  private void addField(ValueVector vv, LogicalExpression logicalExpression, 
OutputColumnType outputColumnType,
+                String inputColumnName, String outputColumnName) {
+    if(isFixedWidth(vv)) {
+      addFixedWidthField(vv);
+    } else {
+      addVariableWidthField(vv, logicalExpression, outputColumnType, 
inputColumnName, outputColumnName);
     }
-
-    public static int getDataWidthOfFixedWidthType(TypeProtos.MajorType 
majorType) {
-        MinorType minorType = majorType.getMinorType();
-        final boolean isVariableWidth  = (minorType == MinorType.VARCHAR || 
minorType == MinorType.VAR16CHAR
-                || minorType == MinorType.VARBINARY);
-
-        if (isVariableWidth) {
-            throw new IllegalArgumentException("getWidthOfFixedWidthType() 
cannot handle variable width types");
-        }
-
-        if (minorType == MinorType.NULL) {
-            return 0;
-        }
-
-        return TypeHelper.getSize(majorType);
+  }
+
+  private void addVariableWidthField(ValueVector vv, LogicalExpression 
logicalExpression,
+                                     OutputColumnType outputColumnType, String 
inputColumnName,
+                                     String outputColumnName) {
+    variableWidthColumnCount++;
+    logger.trace("addVariableWidthField(): vv {} totalCount: {} 
outputColumnType: {}",
+            vvAsString(vv), variableWidthColumnCount, outputColumnType);
+    OutputWidthExpression outWidthExpr;
+    if (outputColumnType == OutputColumnType.TRANSFER) {
+      // Variable width transfers
+      // fieldWidth has to be obtained from the RecordBatchSizer
+      outWidthExpr = new VarLenReadExpr(inputColumnName);
+    } else {
+      // fieldWidth has to be obtained from the OutputWidthExpression
+      // Walk the tree of LogicalExpressions to get a tree of 
OutputWidthExpressions
+       outWidthExpr = logicalExpression.accept(new OutputWidthVisitor(),
+          new OutputWidthVisitorState(this));
     }
-
-
-    void addTransferField(ValueVector vvIn, String inputColumnName, String 
outputColumnName) {
-        addField(vvIn, null, OutputColumnType.TRANSFER, inputColumnName, 
outputColumnName);
+    VariableWidthColumnInfo columnWidthInfo = new 
VariableWidthColumnInfo(outWidthExpr, vv);
+    VariableWidthColumnInfo existingInfo = 
varWidthColumnSizes.put(outputColumnName, columnWidthInfo);
+    Preconditions.checkState(existingInfo == null);
+  }
+
+  private static String vvAsString(ValueVector vv) {
+    return vv == null ? "null" :
+      String.format("%s %s",
+          vv.getField().getName(),
+          vv.getField().getType());
+  }
+
+  public void addComplexField(ValueVector vv) {
+    //Complex types are not yet supported. Just use a guess for the size
+    assert vv == null || isComplex(vv.getField().getType());
+    complexColumnsCount++;
+    // just a guess
+    totalComplexColumnWidth +=  
OutputSizeEstimateConstants.COMPLEX_FIELD_ESTIMATE;
+    if (logger.isTraceEnabled()) {
+      logger.trace("addComplexField(): vv {} totalCount: {} 
totalComplexColumnWidth: {}",
+            vvAsString(vv), complexColumnsCount, totalComplexColumnWidth);
     }
-
-    void addNewField(ValueVector vvOut, LogicalExpression logicalExpression) {
-        addField(vvOut, logicalExpression, OutputColumnType.NEW, null, 
vvOut.getField().getName());
+  }
+
+  private void addFixedWidthField(ValueVector vv) {
+    assert isFixedWidth(vv);
+    fixedWidthColumnCount++;
+    int fixedFieldWidth = ((FixedWidthVector) vv).getValueWidth();
+    totalFixedWidthColumnWidth += fixedFieldWidth;
+    if (logger.isTraceEnabled()) {
+      logger.trace("addFixedWidthField(): vv {} totalCount: {} 
totalComplexColumnWidth: {}",
+              vvAsString(vv), fixedWidthColumnCount, 
totalFixedWidthColumnWidth);
     }
-
-    void addField(ValueVector vv, LogicalExpression logicalExpression, 
OutputColumnType outputColumnType,
-                  String inputColumnName, String outputColumnName) {
-        if(isFixedWidth(vv)) {
-            addFixedWidthField(vv);
-        } else {
-            addVariableWidthField(vv, logicalExpression, outputColumnType, 
inputColumnName, outputColumnName);
-        }
-    }
-
-    private void addVariableWidthField(ValueVector vv, LogicalExpression 
logicalExpression,
-                                       OutputColumnType outputColumnType, 
String inputColumnName, String outputColumnName) {
-        variableWidthColumnCount++;
-        ColumnWidthInfo columnWidthInfo;
-        logger.trace("addVariableWidthField(): vv {} totalCount: {} 
outputColumnType: {}",
-                printVV(vv), variableWidthColumnCount, outputColumnType);
-        //Variable width transfers
-        if(outputColumnType == OutputColumnType.TRANSFER) {
-            VarLenReadExpr readExpr = new VarLenReadExpr(inputColumnName);
-            columnWidthInfo = new ColumnWidthInfo(readExpr, outputColumnType,
-                    WidthType.VARIABLE, -1, vv); //fieldWidth has to be 
obtained from the RecordBatchSizer
-        } else if (isComplex(vv.getField().getType())) {
-            addComplexField(vv);
-            return;
-        } else {
-            // Walk the tree of LogicalExpressions to get a tree of 
OutputWidthExpressions
-            OutputWidthVisitorState state = new OutputWidthVisitorState(this);
-            OutputWidthExpression outputWidthExpression = 
logicalExpression.accept(new OutputWidthVisitor(), state);
-            columnWidthInfo = new ColumnWidthInfo(outputWidthExpression, 
outputColumnType,
-                    WidthType.VARIABLE, -1, vv); //fieldWidth has to be 
obtained from the OutputWidthExpression
-        }
-        ColumnWidthInfo existingInfo = outputColumnSizes.put(outputColumnName, 
columnWidthInfo);
-        Preconditions.checkState(existingInfo == null);
+  }
+
+  public void init(RecordBatch incomingBatch, ProjectRecordBatch 
outgoingBatch) {
+    setIncomingBatch(incomingBatch);
+    setOutgoingBatch(outgoingBatch);
+    reset();
+
+    
RecordBatchStats.printConfiguredBatchSize(outgoingBatch.getRecordBatchStatsContext(),
+      getOutputBatchSize());
+  }
+
+  @Override
+  public void update() {
+    long updateStartTime = System.currentTimeMillis();
+    RecordBatchSizer batchSizer = new RecordBatchSizer(incomingBatch);
+    long batchSizerEndTime = System.currentTimeMillis();
+
+    setRecordBatchSizer(batchSizer);
+    rowWidth = 0;
+    int totalVariableColumnWidth = 0;
+    for (String outputColumnName : varWidthColumnSizes.keySet()) {
+      VariableWidthColumnInfo columnWidthInfo = 
varWidthColumnSizes.get(outputColumnName);
+      int width = -1;
+      //Walk the tree of OutputWidthExpressions to get a FixedLenExpr
+      //As the tree is walked, the RecordBatchSizer and function annotations
+      //are looked-up to come up with the final FixedLenExpr
+      OutputWidthExpression savedWidthExpr = 
columnWidthInfo.getOutputExpression();
+      OutputWidthVisitorState state = new OutputWidthVisitorState(this);
+      OutputWidthExpression reducedExpr = savedWidthExpr.accept(new 
OutputWidthVisitor(), state);
+      width = ((FixedLenExpr)reducedExpr).getDataWidth();
+      Preconditions.checkState(width >= 0);
+      int metadataWidth = getMetadataWidth(columnWidthInfo.outputVV);
+      logger.trace("update(): fieldName {} width: {} metadataWidth: {}",
+              columnWidthInfo.outputVV.getField().getName(), width, 
metadataWidth);
+      width += metadataWidth;
+      totalVariableColumnWidth += width;
     }
-
-    public static String printVV(ValueVector vv) {
-        String str = "null";
-        if (vv != null) {
-            str = vv.getField().getName() + " " + vv.getField().getType();
-        }
-        return str;
+    rowWidth += totalFixedWidthColumnWidth;
+    rowWidth += totalComplexColumnWidth;
+    rowWidth += totalVariableColumnWidth;
+    int outPutRowCount;
+    if (rowWidth != 0) {
+      // if rowWidth is not zero, set the output row count in the sizer
+      setOutputRowCount(getOutputBatchSize(), rowWidth);
+      // if more rows can be allowed than the incoming row count, then set the
+      // output row count to the incoming row count.
+      outPutRowCount = Math.min(getOutputRowCount(), batchSizer.rowCount());
+    } else {
+      // if rowWidth == 0 then the memory manager does
+      // not have sufficient information to size the batch
+      // let the entire batch pass through.
+      // If incoming rc == 0, all RB Sizer look-ups will have
+      // 0 width and so total width can be 0
+      outPutRowCount = incomingBatch.getRecordCount();
     }
-
-    void addComplexField(ValueVector vv) {
-        //Complex types are not yet supported. Just use a guess for the size
-        assert vv == null || isComplex(vv.getField().getType());
-        complexColumnsCount++;
-        // just a guess
-        totalComplexColumnWidth +=  
OutputSizeEstimateConstants.COMPLEX_FIELD_ESTIMATE;
-        logger.trace("addComplexField(): vv {} totalCount: {} 
totalComplexColumnWidth: {}",
-                printVV(vv), complexColumnsCount, totalComplexColumnWidth);
-    }
-
-    void addFixedWidthField(ValueVector vv) {
-        assert isFixedWidth(vv);
-        fixedWidthColumnCount++;
-        int fixedFieldWidth = getNetWidthOfFixedWidthType(vv);
-        totalFixedWidthColumnWidth += fixedFieldWidth;
-        logger.trace("addFixedWidthField(): vv {} totalCount: {} 
totalComplexColumnWidth: {}",
-                printVV(vv), fixedWidthColumnCount, 
totalFixedWidthColumnWidth);
-    }
-
-    public void init(RecordBatch incomingBatch, ProjectRecordBatch 
outgoingBatch) {
-        setIncomingBatch(incomingBatch);
-        setOutgoingBatch(outgoingBatch);
-        reset();
-
-        
RecordBatchStats.printConfiguredBatchSize(outgoingBatch.getRecordBatchStatsContext(),
-          getOutputBatchSize());
-    }
-
-    private void reset() {
-        rowWidth = 0;
-        totalFixedWidthColumnWidth = 0;
-        totalComplexColumnWidth = 0;
-
-        fixedWidthColumnCount = 0;
-        complexColumnsCount = 0;
+    setOutputRowCount(outPutRowCount);
+    long updateEndTime = System.currentTimeMillis();
+    logger.trace("update() : Output RC {}, BatchSizer RC {}, incoming RC {}, 
width {}, total fixed width {}"
+                + ", total variable width {}, total complex width {}, 
batchSizer time {} ms, update time {}  ms"
+                + ", manager {}, incoming {}",outPutRowCount, 
batchSizer.rowCount(), incomingBatch.getRecordCount(),
+                rowWidth, totalFixedWidthColumnWidth, 
totalVariableColumnWidth, totalComplexColumnWidth,
+                (batchSizerEndTime - updateStartTime),(updateEndTime - 
updateStartTime), this, incomingBatch);
+
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, 
getRecordBatchSizer(), outgoingBatch.getRecordBatchStatsContext());
+    updateIncomingStats();
+  }
+
+  // TODO: Move this logic to TypesHelper or the ValueVector so it
+  // is more generic and reusable.
+  public static int getMetadataWidth(ValueVector vv) {
+    int width = 0;
+    if (vv instanceof NullableVector) {
+      width += ((NullableVector) vv).getBitsVector().getPayloadByteCount(1);
     }
 
-    @Override
-    public void update() {
-        long updateStartTime = System.currentTimeMillis();
-        RecordBatchSizer batchSizer = new RecordBatchSizer(incomingBatch);
-        long batchSizerEndTime = System.currentTimeMillis();
-
-        setRecordBatchSizer(batchSizer);
-        rowWidth = 0;
-        int totalVariableColumnWidth = 0;
-        for (String outputColumnName : outputColumnSizes.keySet()) {
-            ColumnWidthInfo columnWidthInfo = 
outputColumnSizes.get(outputColumnName);
-            int width = -1;
-            if (columnWidthInfo.isFixedWidth()) {
-                // fixed width columns are accumulated in 
totalFixedWidthColumnWidth
-                ShouldNotReachHere();
-            } else {
-                //Walk the tree of OutputWidthExpressions to get a FixedLenExpr
-                //As the tree is walked, the RecordBatchSizer and function 
annotations
-                //are looked-up to come up with the final FixedLenExpr
-                OutputWidthExpression savedWidthExpr = 
columnWidthInfo.getOutputExpression();
-                OutputWidthVisitorState state = new 
OutputWidthVisitorState(this);
-                OutputWidthExpression reducedExpr = savedWidthExpr.accept(new 
OutputWidthVisitor(), state);
-                width = ((FixedLenExpr)reducedExpr).getDataWidth();
-                Preconditions.checkState(width >= 0);
-                int metadataWidth = getMetadataWidth(columnWidthInfo.outputVV);
-                logger.trace("update(): fieldName {} width: {} metadataWidth: 
{}",
-                        columnWidthInfo.outputVV.getField().getName(), width, 
metadataWidth);
-                width += metadataWidth;
-            }
-            totalVariableColumnWidth += width;
-        }
-        rowWidth += totalFixedWidthColumnWidth;
-        rowWidth += totalComplexColumnWidth;
-        rowWidth += totalVariableColumnWidth;
-        int outPutRowCount;
-        if (rowWidth != 0) {
-            //if rowWidth is not zero, set the output row count in the sizer
-            setOutputRowCount(getOutputBatchSize(), rowWidth);
-            // if more rows can be allowed than the incoming row count, then 
set the
-            // output row count to the incoming row count.
-            outPutRowCount = Math.min(getOutputRowCount(), 
batchSizer.rowCount());
-        } else {
-            // if rowWidth == 0 then the memory manager does
-            // not have sufficient information to size the batch
-            // let the entire batch pass through.
-            // If incoming rc == 0, all RB Sizer look-ups will have
-            // 0 width and so total width can be 0
-            outPutRowCount = incomingBatch.getRecordCount();
-        }
-        setOutputRowCount(outPutRowCount);
-        long updateEndTime = System.currentTimeMillis();
-        logger.trace("update() : Output RC {}, BatchSizer RC {}, incoming RC 
{}, width {}, total fixed width {}"
-                    + ", total variable width {}, total complex width {}, 
batchSizer time {} ms, update time {}  ms"
-                    + ", manager {}, incoming {}",outPutRowCount, 
batchSizer.rowCount(), incomingBatch.getRecordCount(),
-                    rowWidth, totalFixedWidthColumnWidth, 
totalVariableColumnWidth, totalComplexColumnWidth,
-                    (batchSizerEndTime - updateStartTime),(updateEndTime - 
updateStartTime), this, incomingBatch);
-
-        RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, 
getRecordBatchSizer(), outgoingBatch.getRecordBatchStatsContext());
-        updateIncomingStats();
+    if (vv instanceof VariableWidthVector) {
+      width += ((VariableWidthVector) 
vv).getOffsetVector().getPayloadByteCount(1);
     }
 
-    public static int getMetadataWidth(ValueVector vv) {
-        int width = 0;
-        if (vv instanceof NullableVector) {
-            width += 
((NullableVector)vv).getBitsVector().getPayloadByteCount(1);
-        }
-
-        if (vv instanceof VariableWidthVector) {
-            width += 
((VariableWidthVector)vv).getOffsetVector().getPayloadByteCount(1);
-        }
-
-        if (vv instanceof BaseRepeatedValueVector) {
-            width += 
((BaseRepeatedValueVector)vv).getOffsetVector().getPayloadByteCount(1);
-            width += 
(getMetadataWidth(((BaseRepeatedValueVector)vv).getDataVector()) * 
RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
-        }
-        return width;
+    if (vv instanceof BaseRepeatedValueVector) {
+      width += ((BaseRepeatedValueVector) 
vv).getOffsetVector().getPayloadByteCount(1);
+      width += (getMetadataWidth(((BaseRepeatedValueVector) 
vv).getDataVector()) *
+          RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
     }
+    return width;
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 9857102..c021023 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -20,103 +20,47 @@ package org.apache.drill.exec.physical.impl.project;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 
 import java.io.IOException;
-import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.collections.map.CaseInsensitiveMap;
-import org.apache.drill.common.expression.ConvertExpression;
-import org.apache.drill.common.expression.ErrorCollector;
-import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.common.expression.FunctionCallFactory;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.PathSegment.NameSegment;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.expression.ValueExpressions;
-import org.apache.drill.common.expression.fn.FunctionReplacementUtils;
-import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.expr.DrillFuncHolderExpr;
-import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.ValueVectorReadExpression;
-import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.planner.StarColumnHelper;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SimpleRecordBatch;
-import org.apache.drill.exec.record.TransferPair;
-import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.util.record.RecordBatchStats;
 import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.UntypedNullHolder;
 import org.apache.drill.exec.vector.UntypedNullVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.carrotsearch.hppc.IntHashSet;
-
 public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   private static final Logger logger = 
LoggerFactory.getLogger(ProjectRecordBatch.class);
 
-  private static final String EMPTY_STRING = "";
-
+  protected List<ValueVector> allocationVectors;
+  protected List<ComplexWriter> complexWriters;
+  protected List<FieldReference> complexFieldReferencesList;
+  protected ProjectMemoryManager memoryManager;
   private Projector projector;
-  private List<ValueVector> allocationVectors;
-  private List<ComplexWriter> complexWriters;
-  private List<FieldReference> complexFieldReferencesList;
   private boolean hasRemainder;
   private int remainderIndex;
   private int recordCount;
-  private ProjectMemoryManager memoryManager;
   private boolean first = true;
   private boolean wasNone; // whether a NONE iter outcome was already seen
-  private final ColumnExplorer columnExplorer;
-
-  private class ClassifierResult {
-    public boolean isStar;
-    public List<String> outputNames;
-    public String prefix = "";
-    public HashMap<String, Integer> prefixMap = Maps.newHashMap();
-    public CaseInsensitiveMap outputMap = new CaseInsensitiveMap();
-    private final CaseInsensitiveMap sequenceMap = new CaseInsensitiveMap();
-
-    private void clear() {
-      isStar = false;
-      prefix = "";
-      if (outputNames != null) {
-        outputNames.clear();
-      }
-
-      // note:  don't clear the internal maps since they have cumulative data..
-    }
-  }
 
-  public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext 
context) throws OutOfMemoryException {
+  public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext 
context) {
     super(pop, context, incoming);
-    columnExplorer = new ColumnExplorer(context.getOptions());
   }
 
   @Override
@@ -249,7 +193,7 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
 
   private void handleRemainder() {
     int remainingRecordCount = incoming.getRecordCount() - remainderIndex;
-    assert this.memoryManager.incomingBatch == incoming;
+    assert memoryManager.incomingBatch() == incoming;
     int recordsToProcess = Math.min(remainingRecordCount, 
memoryManager.getOutputRowCount());
     doAlloc(recordsToProcess);
 
@@ -283,11 +227,13 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
     RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, 
getRecordBatchStatsContext());
   }
 
+  // Called from generated code.
+
   public void addComplexWriter(ComplexWriter writer) {
     complexWriters.add(writer);
   }
 
-  private void doAlloc(int recordCount) {
+  public void doAlloc(int recordCount) {
     // Allocate vv in the allocationVectors.
     for (ValueVector v : allocationVectors) {
       AllocationHelper.allocateNew(v, recordCount);
@@ -301,7 +247,7 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
     }
   }
 
-  private void setValueCount(int count) {
+  public void setValueCount(int count) {
     if (count == 0) {
       container.setEmpty();
       return;
@@ -323,521 +269,36 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
     }
   }
 
-  // hack to make ref and full work together... need to figure out if this is
-  // still necessary.
-  private FieldReference getRef(NamedExpression e) {
-    return e.getRef();
-  }
-
-  private boolean isAnyWildcard(List<NamedExpression> exprs) {
-    for (NamedExpression e : exprs) {
-      if (isWildcard(e)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean isWildcard(NamedExpression ex) {
-    if (!(ex.getExpr() instanceof SchemaPath)) {
+  @Override
+  protected boolean setupNewSchema() throws SchemaChangeException {
+    setupNewSchemaFromInput(incoming);
+    if (container.isSchemaChanged() || callBack.getSchemaChangedAndReset()) {
+      container.buildSchema(SelectionVectorMode.NONE);
+      return true;
+    } else {
       return false;
     }
-    NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
-    return expr.getPath().contains(SchemaPath.DYNAMIC_STAR);
   }
 
   private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws 
SchemaChangeException {
-    long setupNewSchemaStartTime = System.currentTimeMillis();
     // get the output batch size from config.
     int configuredBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-    memoryManager = new ProjectMemoryManager(configuredBatchSize);
-    memoryManager.init(incomingBatch, this);
-    if (allocationVectors != null) {
-      for (ValueVector v : allocationVectors) {
-        v.clear();
-      }
-    }
-    allocationVectors = Lists.newArrayList();
-
-    if (complexWriters != null) {
-      container.clear();
-    } else {
-      // Release the underlying DrillBufs and reset the ValueVectors to empty
-      // Not clearing the container here is fine since Project output schema is
-      // not determined solely based on incoming batch. It is defined by the
-      // expressions it has to evaluate.
-      //
-      // If there is a case where only the type of ValueVector already present
-      // in container is changed then addOrGet method takes care of it by
-      // replacing the vectors.
-      container.zeroVectors();
-    }
-
-    List<NamedExpression> exprs = getExpressionList();
-    ErrorCollector collector = new ErrorCollectorImpl();
-    List<TransferPair> transfers = Lists.newArrayList();
-
-    ClassGenerator<Projector> cg = 
CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getOptions());
-    cg.getCodeGenerator().plainJavaCapable(true);
-
-    IntHashSet transferFieldIds = new IntHashSet();
-
-    boolean isAnyWildcard = isAnyWildcard(exprs);
-
-    ClassifierResult result = new ClassifierResult();
-    boolean classify = isClassificationNeeded(exprs);
-
-    for (NamedExpression namedExpression : exprs) {
-      result.clear();
-      if (classify && namedExpression.getExpr() instanceof SchemaPath) {
-        classifyExpr(namedExpression, incomingBatch, result);
-
-        if (result.isStar) {
-          // The value indicates which wildcard we are processing now
-          Integer value = result.prefixMap.get(result.prefix);
-          if (value != null && value == 1) {
-            int k = 0;
-            for (VectorWrapper<?> wrapper : incomingBatch) {
-              ValueVector vvIn = wrapper.getValueVector();
-              if (k > result.outputNames.size() - 1) {
-                assert false;
-              }
-              String name = result.outputNames.get(k++);  // get the renamed 
column names
-              if (name.isEmpty()) {
-                continue;
-              }
-
-              if (isImplicitFileColumn(vvIn)) {
-                continue;
-              }
-
-              FieldReference ref = new FieldReference(name);
-              ValueVector vvOut = 
container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(),
-                vvIn.getField().getType()), callBack);
-              TransferPair tp = vvIn.makeTransferPair(vvOut);
-              memoryManager.addTransferField(vvIn, vvIn.getField().getName(), 
vvOut.getField().getName());
-              transfers.add(tp);
-            }
-          } else if (value != null && value > 1) { // subsequent wildcards 
should do a copy of incoming value vectors
-            int k = 0;
-            for (VectorWrapper<?> wrapper : incomingBatch) {
-              ValueVector vvIn = wrapper.getValueVector();
-              SchemaPath originalPath = 
SchemaPath.getSimplePath(vvIn.getField().getName());
-              if (k > result.outputNames.size() - 1) {
-                assert false;
-              }
-              String name = result.outputNames.get(k++);  // get the renamed 
column names
-              if (name.isEmpty()) {
-                continue;
-              }
-
-              if (isImplicitFileColumn(vvIn)) {
-                continue;
-              }
-
-              LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(originalPath, incomingBatch, collector, 
context.getFunctionRegistry() );
-              if (collector.hasErrors()) {
-                throw new SchemaChangeException(String.format("Failure while 
trying to materialize incomingBatch schema.  Errors:\n %s.", 
collector.toErrorString()));
-              }
-
-              MaterializedField outputField = MaterializedField.create(name, 
expr.getMajorType());
-              ValueVector vv = container.addOrGet(outputField, callBack);
-              allocationVectors.add(vv);
-              TypedFieldId fid = 
container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
-              ValueVectorWriteExpression write = new 
ValueVectorWriteExpression(fid, expr, true);
-              memoryManager.addNewField(vv, write);
-              cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
-            }
-          }
-          continue;
-        }
-      } else {
-        // For the columns which do not needed to be classified,
-        // it is still necessary to ensure the output column name is unique
-        result.outputNames = Lists.newArrayList();
-        String outputName = 
getRef(namedExpression).getRootSegment().getPath(); //moved to before the if
-        addToResultMaps(outputName, result, true);
-      }
-      String outputName = getRef(namedExpression).getRootSegment().getPath();
-      if (result != null && result.outputNames != null && 
result.outputNames.size() > 0) {
-        boolean isMatched = false;
-        for (int j = 0; j < result.outputNames.size(); j++) {
-          if (!result.outputNames.get(j).isEmpty()) {
-            outputName = result.outputNames.get(j);
-            isMatched = true;
-            break;
-          }
-        }
-
-        if (!isMatched) {
-          continue;
-        }
-      }
-
-      LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incomingBatch,
-          collector, context.getFunctionRegistry(), true, unionTypeEnabled);
-      MaterializedField outputField = MaterializedField.create(outputName, 
expr.getMajorType());
-      if (collector.hasErrors()) {
-        throw new SchemaChangeException(String.format("Failure while trying to 
materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
-      }
-
-      // add value vector to transfer if direct reference and this is allowed, 
otherwise, add to evaluation stack.
-      if (expr instanceof ValueVectorReadExpression && 
incomingBatch.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
-          && !((ValueVectorReadExpression) expr).hasReadPath()
-          && !isAnyWildcard
-          && !transferFieldIds.contains(((ValueVectorReadExpression) 
expr).getFieldId().getFieldIds()[0])) {
-
-        ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) 
expr;
-        TypedFieldId id = vectorRead.getFieldId();
-        ValueVector vvIn = 
incomingBatch.getValueAccessorById(id.getIntermediateClass(), 
id.getFieldIds()).getValueVector();
-        Preconditions.checkNotNull(incomingBatch);
-
-        FieldReference ref = getRef(namedExpression);
-        ValueVector vvOut =
-          
container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(),
-            vectorRead.getMajorType()), callBack);
-        TransferPair tp = vvIn.makeTransferPair(vvOut);
-        memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id, 
incomingBatch), vvOut.getField().getName());
-        transfers.add(tp);
-        transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
-      } else if (expr instanceof DrillFuncHolderExpr &&
-          ((DrillFuncHolderExpr) 
expr).getHolder().isComplexWriterFuncHolder()) {
-        // Need to process ComplexWriter function evaluation.
-        // Lazy initialization of the list of complex writers, if not done yet.
-        if (complexWriters == null) {
-          complexWriters = Lists.newArrayList();
-        } else {
-          complexWriters.clear();
-        }
-
-        // The reference name will be passed to ComplexWriter, used as the 
name of the output vector from the writer.
-        ((DrillFuncHolderExpr) 
expr).setFieldReference(namedExpression.getRef());
-        cg.addExpr(expr, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
-        if (complexFieldReferencesList == null) {
-          complexFieldReferencesList = Lists.newArrayList();
-        } else {
-          complexFieldReferencesList.clear();
-        }
-
-        // save the field reference for later for getting schema when input is 
empty
-        complexFieldReferencesList.add(namedExpression.getRef());
-        memoryManager.addComplexField(null); // this will just add an estimate 
to the row width
-      } else {
-        // need to do evaluation.
-        ValueVector ouputVector = container.addOrGet(outputField, callBack);
-        allocationVectors.add(ouputVector);
-        TypedFieldId fid = 
container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
-        boolean useSetSafe = !(ouputVector instanceof FixedWidthVector);
-        ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, 
expr, useSetSafe);
-        cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
-        memoryManager.addNewField(ouputVector, write);
-
-        // We cannot do multiple transfers from the same vector. However we 
still need to instantiate the output vector.
-        if (expr instanceof ValueVectorReadExpression) {
-          ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) 
expr;
-          if (!vectorRead.hasReadPath()) {
-            TypedFieldId id = vectorRead.getFieldId();
-            ValueVector vvIn = 
incomingBatch.getValueAccessorById(id.getIntermediateClass(),
-                    id.getFieldIds()).getValueVector();
-            vvIn.makeTransferPair(ouputVector);
-          }
-        }
-      }
-    }
+    setupNewSchema(incomingBatch, configuredBatchSize);
 
     try {
-      CodeGenerator<Projector> codeGen = cg.getCodeGenerator();
-      codeGen.plainJavaCapable(true);
+      ProjectBatchBuilder batchBuilder = new ProjectBatchBuilder(this,
+          container, callBack, incomingBatch);
+      ProjectionMaterializer em = new 
ProjectionMaterializer(context.getOptions(),
+          incomingBatch, popConfig.getExprs(), context.getFunctionRegistry(),
+          batchBuilder, unionTypeEnabled);
+      boolean saveCode = false;
       // Uncomment this line to debug the generated code.
-      // codeGen.saveCodeForDebugging(true);
-      this.projector = context.getImplementationClass(codeGen);
-      projector.setup(context, incomingBatch, this, transfers);
+      // saveCode = true;
+      projector = em.generateProjector(context, saveCode);
+      projector.setup(context, incomingBatch, this, batchBuilder.transfers());
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException("Failure while attempting to load 
generated class", e);
     }
-
-    long setupNewSchemaEndTime = System.currentTimeMillis();
-      logger.trace("setupNewSchemaFromInput: time {}  ms, Project {}, incoming 
{}",
-                  (setupNewSchemaEndTime - setupNewSchemaStartTime), this, 
incomingBatch);
-  }
-
-  @Override
-  protected boolean setupNewSchema() throws SchemaChangeException {
-    setupNewSchemaFromInput(this.incoming);
-    if (container.isSchemaChanged() || callBack.getSchemaChangedAndReset()) {
-      container.buildSchema(SelectionVectorMode.NONE);
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  private boolean isImplicitFileColumn(ValueVector vvIn) {
-    return 
columnExplorer.isImplicitOrInternalFileColumn(vvIn.getField().getName());
-  }
-
-  private List<NamedExpression> getExpressionList() {
-    if (popConfig.getExprs() != null) {
-      return popConfig.getExprs();
-    }
-
-    List<NamedExpression> exprs = Lists.newArrayList();
-    for (MaterializedField field : incoming.getSchema()) {
-      String fieldName = field.getName();
-      if (Types.isComplex(field.getType()) || 
Types.isRepeated(field.getType())) {
-        LogicalExpression convertToJson = 
FunctionCallFactory.createConvert(ConvertExpression.CONVERT_TO, "JSON",
-                                                            
SchemaPath.getSimplePath(fieldName), ExpressionPosition.UNKNOWN);
-        String castFuncName = 
FunctionReplacementUtils.getCastFunc(MinorType.VARCHAR);
-        List<LogicalExpression> castArgs = Lists.newArrayList();
-        castArgs.add(convertToJson);  //input_expr
-        // implicitly casting to varchar, since we don't know actual source 
length, cast to undefined length, which will preserve source length
-        castArgs.add(new 
ValueExpressions.LongExpression(Types.MAX_VARCHAR_LENGTH, null));
-        FunctionCall castCall = new FunctionCall(castFuncName, castArgs, 
ExpressionPosition.UNKNOWN);
-        exprs.add(new NamedExpression(castCall, new 
FieldReference(fieldName)));
-      } else {
-        exprs.add(new NamedExpression(SchemaPath.getSimplePath(fieldName), new 
FieldReference(fieldName)));
-      }
-    }
-    return exprs;
-  }
-
-  private boolean isClassificationNeeded(List<NamedExpression> exprs) {
-    boolean needed = false;
-    for (NamedExpression ex : exprs) {
-      if (!(ex.getExpr() instanceof SchemaPath)) {
-        continue;
-      }
-      NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
-      NameSegment ref = ex.getRef().getRootSegment();
-      boolean refHasPrefix = 
ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
-      boolean exprContainsStar = 
expr.getPath().contains(SchemaPath.DYNAMIC_STAR);
-
-      if (refHasPrefix || exprContainsStar) {
-        needed = true;
-        break;
-      }
-    }
-    return needed;
-  }
-
-  private String getUniqueName(String name, ClassifierResult result) {
-    Integer currentSeq = (Integer) result.sequenceMap.get(name);
-    if (currentSeq == null) { // name is unique, so return the original name
-      result.sequenceMap.put(name, -1);
-      return name;
-    }
-    // create a new name
-    int newSeq = currentSeq + 1;
-    String newName = name + newSeq;
-    result.sequenceMap.put(name, newSeq);
-    result.sequenceMap.put(newName, -1);
-
-    return newName;
-  }
-
-  /**
-   * Helper method to ensure unique output column names. If allowDupsWithRename
-   * is set to true, the original name will be appended with a suffix number to
-   * ensure uniqueness. Otherwise, the original column would not be renamed 
even
-   * even if it has been used
-   *
-   * @param origName
-   *          the original input name of the column
-   * @param result
-   *          the data structure to keep track of the used names and decide 
what
-   *          output name should be to ensure uniqueness
-   * @param allowDupsWithRename
-   *          if the original name has been used, is renaming allowed to ensure
-   *          output name unique
-   */
-  private void addToResultMaps(String origName, ClassifierResult result, 
boolean allowDupsWithRename) {
-    String name = origName;
-    if (allowDupsWithRename) {
-      name = getUniqueName(origName, result);
-    }
-    if (!result.outputMap.containsKey(name)) {
-      result.outputNames.add(name);
-      result.outputMap.put(name,  name);
-    } else {
-      result.outputNames.add(EMPTY_STRING);
-    }
-  }
-
-  private void classifyExpr(NamedExpression ex, RecordBatch incoming, 
ClassifierResult result)  {
-    NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
-    NameSegment ref = ex.getRef().getRootSegment();
-    boolean exprHasPrefix = 
expr.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
-    boolean refHasPrefix = 
ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
-    boolean exprIsStar = expr.getPath().equals(SchemaPath.DYNAMIC_STAR);
-    boolean refContainsStar = ref.getPath().contains(SchemaPath.DYNAMIC_STAR);
-    boolean exprContainsStar = 
expr.getPath().contains(SchemaPath.DYNAMIC_STAR);
-    boolean refEndsWithStar = ref.getPath().endsWith(SchemaPath.DYNAMIC_STAR);
-
-    String exprPrefix = EMPTY_STRING;
-    String exprSuffix = expr.getPath();
-
-    if (exprHasPrefix) {
-      // get the prefix of the expr
-      String[] exprComponents = 
expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
-      assert(exprComponents.length == 2);
-      exprPrefix = exprComponents[0];
-      exprSuffix = exprComponents[1];
-      result.prefix = exprPrefix;
-    }
-
-    boolean exprIsFirstWildcard = false;
-    if (exprContainsStar) {
-      result.isStar = true;
-      Integer value = result.prefixMap.get(exprPrefix);
-      if (value == null) {
-        result.prefixMap.put(exprPrefix, 1);
-        exprIsFirstWildcard = true;
-      } else {
-        result.prefixMap.put(exprPrefix, value + 1);
-      }
-    }
-
-    int incomingSchemaSize = incoming.getSchema().getFieldCount();
-
-    // input is '*' and output is 'prefix_*'
-    if (exprIsStar && refHasPrefix && refEndsWithStar) {
-      String[] components = 
ref.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
-      assert(components.length == 2);
-      String prefix = components[0];
-      result.outputNames = Lists.newArrayList();
-      for (VectorWrapper<?> wrapper : incoming) {
-        ValueVector vvIn = wrapper.getValueVector();
-        String name = vvIn.getField().getName();
-
-        // add the prefix to the incoming column name
-        String newName = prefix + StarColumnHelper.PREFIX_DELIMITER + name;
-        addToResultMaps(newName, result, false);
-      }
-    }
-    // input and output are the same
-    else if (expr.getPath().equalsIgnoreCase(ref.getPath()) && 
(!exprContainsStar || exprIsFirstWildcard)) {
-      if (exprContainsStar && exprHasPrefix) {
-        assert exprPrefix != null;
-
-        int k = 0;
-        result.outputNames = 
Lists.newArrayListWithCapacity(incomingSchemaSize);
-        for (int j=0; j < incomingSchemaSize; j++) {
-          result.outputNames.add(EMPTY_STRING);  // initialize
-        }
-
-        for (VectorWrapper<?> wrapper : incoming) {
-          ValueVector vvIn = wrapper.getValueVector();
-          String incomingName = vvIn.getField().getName();
-          // get the prefix of the name
-          String[] nameComponents = 
incomingName.split(StarColumnHelper.PREFIX_DELIMITER, 2);
-          // if incoming value vector does not have a prefix, ignore it since
-          // this expression is not referencing it
-          if (nameComponents.length <= 1) {
-            k++;
-            continue;
-          }
-          String namePrefix = nameComponents[0];
-          if (exprPrefix.equalsIgnoreCase(namePrefix)) {
-            if (!result.outputMap.containsKey(incomingName)) {
-              result.outputNames.set(k, incomingName);
-              result.outputMap.put(incomingName, incomingName);
-            }
-          }
-          k++;
-        }
-      } else {
-        result.outputNames = Lists.newArrayList();
-        if (exprContainsStar) {
-          for (VectorWrapper<?> wrapper : incoming) {
-            ValueVector vvIn = wrapper.getValueVector();
-            String incomingName = vvIn.getField().getName();
-            if (refContainsStar) {
-              addToResultMaps(incomingName, result, true); // allow dups since 
this is likely top-level project
-            } else {
-              addToResultMaps(incomingName, result, false);
-            }
-          }
-        } else {
-          String newName = expr.getPath();
-          if (!refHasPrefix && !exprHasPrefix) {
-            addToResultMaps(newName, result, true); // allow dups since this 
is likely top-level project
-          } else {
-            addToResultMaps(newName, result, false);
-          }
-        }
-      }
-    }
-
-    // input is wildcard and it is not the first wildcard
-    else if (exprIsStar) {
-      result.outputNames = Lists.newArrayList();
-      for (VectorWrapper<?> wrapper : incoming) {
-        ValueVector vvIn = wrapper.getValueVector();
-        String incomingName = vvIn.getField().getName();
-        addToResultMaps(incomingName, result, true); // allow dups since this 
is likely top-level project
-      }
-    }
-
-    // only the output has prefix
-    else if (!exprHasPrefix && refHasPrefix) {
-      result.outputNames = Lists.newArrayList();
-      String newName = ref.getPath();
-      addToResultMaps(newName, result, false);
-    }
-    // input has prefix but output does not
-    else if (exprHasPrefix && !refHasPrefix) {
-      int k = 0;
-      result.outputNames = Lists.newArrayListWithCapacity(incomingSchemaSize);
-      for (int j=0; j < incomingSchemaSize; j++) {
-        result.outputNames.add(EMPTY_STRING);  // initialize
-      }
-
-      for (VectorWrapper<?> wrapper : incoming) {
-        ValueVector vvIn = wrapper.getValueVector();
-        String name = vvIn.getField().getName();
-        String[] components = name.split(StarColumnHelper.PREFIX_DELIMITER, 2);
-        if (components.length <= 1)  {
-          k++;
-          continue;
-        }
-        String namePrefix = components[0];
-        String nameSuffix = components[1];
-        if (exprPrefix.equalsIgnoreCase(namePrefix)) {  // // case insensitive 
matching of prefix.
-          if (refContainsStar) {
-            // remove the prefix from the incoming column names
-            String newName = getUniqueName(nameSuffix, result);  // for top 
level we need to make names unique
-            result.outputNames.set(k, newName);
-          } else if (exprSuffix.equalsIgnoreCase(nameSuffix)) { // case 
insensitive matching of field name.
-            // example: ref: $f1, expr: T0<PREFIX><column_name>
-            String newName = ref.getPath();
-            result.outputNames.set(k, newName);
-          }
-        } else {
-          result.outputNames.add(EMPTY_STRING);
-        }
-        k++;
-      }
-    }
-    // input and output have prefixes although they could be different...
-    else if (exprHasPrefix && refHasPrefix) {
-      String[] input = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 
2);
-      assert(input.length == 2);
-      assert false : "Unexpected project expression or reference";  // not 
handled yet
-    }
-    else {
-      // if the incoming schema's column name matches the expression name of 
the Project,
-      // then we just want to pick the ref name as the output column name
-
-      result.outputNames = Lists.newArrayList();
-      for (VectorWrapper<?> wrapper : incoming) {
-        ValueVector vvIn = wrapper.getValueVector();
-        String incomingName = vvIn.getField().getName();
-        if (expr.getPath().equalsIgnoreCase(incomingName)) {  // case 
insensitive matching of field name.
-          String newName = ref.getPath();
-          addToResultMaps(newName, result, true);
-        }
-      }
-    }
   }
 
   /**
@@ -886,6 +347,31 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
     return IterOutcome.OK_NEW_SCHEMA;
   }
 
+  private void setupNewSchema(RecordBatch incomingBatch, int 
configuredBatchSize) {
+    memoryManager = new ProjectMemoryManager(configuredBatchSize);
+    memoryManager.init(incomingBatch, ProjectRecordBatch.this);
+    if (allocationVectors != null) {
+      for (ValueVector v : allocationVectors) {
+        v.clear();
+      }
+    }
+    allocationVectors = new ArrayList<>();
+
+    if (complexWriters != null) {
+      container.clear();
+    } else {
+      // Release the underlying DrillBufs and reset the ValueVectors to empty
+      // Not clearing the container here is fine since Project output schema is
+      // not determined solely based on incoming batch. It is defined by the
+      // expressions it has to evaluate.
+      //
+      // If there is a case where only the type of ValueVector already present
+      // in container is changed then addOrGet method takes care of it by
+      // replacing the vectors.
+      container.zeroVectors();
+    }
+  }
+
   @Override
   public void dump() {
     logger.error("ProjectRecordBatch[projector={}, hasRemainder={}, 
remainderIndex={}, recordCount={}, container={}]",
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java
new file mode 100644
index 0000000..6dcb428
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java
@@ -0,0 +1,626 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.project;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.collections.map.CaseInsensitiveMap;
+import org.apache.drill.common.expression.ConvertExpression;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.FunctionCallFactory;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment.NameSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.fn.FunctionReplacementUtils;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.DrillFuncHolderExpr;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+import org.apache.drill.exec.expr.ValueVectorWriteExpression;
+import org.apache.drill.exec.expr.fn.FunctionLookupContext;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.resultSet.impl.VectorState;
+import org.apache.drill.exec.planner.StarColumnHelper;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.carrotsearch.hppc.IntHashSet;
+
+/**
+ * Plans the projection given the incoming and requested outgoing schemas. 
Works
+ * with the {@link VectorState} to create required vectors, writers and so on.
+ * Populates the code generator with the "projector" expressions.
+ */
+class ProjectionMaterializer {
+  private static final Logger logger = 
LoggerFactory.getLogger(ProjectionMaterializer.class);
+  private static final String EMPTY_STRING = "";
+
+  /**
+   * Abstracts the physical vector setup operations to separate
+   * the physical setup, in <code>ProjectRecordBatch</code>, from the
+   * logical setup in the materializer class.
+   */
+  public interface BatchBuilder {
+    void addTransferField(String name, ValueVector vvIn);
+    ValueVectorWriteExpression addOutputVector(String name, LogicalExpression 
expr);
+    int addDirectTransfer(FieldReference ref, ValueVectorReadExpression 
vectorRead);
+    void addComplexField(FieldReference ref);
+    ValueVectorWriteExpression addEvalVector(String outputName,
+        LogicalExpression expr);
+  }
+
+  private static class ClassifierResult {
+    private boolean isStar;
+    private List<String> outputNames;
+    private String prefix = "";
+    private final HashMap<String, Integer> prefixMap = Maps.newHashMap();
+    private final CaseInsensitiveMap outputMap = new CaseInsensitiveMap();
+    private final CaseInsensitiveMap sequenceMap = new CaseInsensitiveMap();
+
+    private void clear() {
+      isStar = false;
+      prefix = "";
+      if (outputNames != null) {
+        outputNames.clear();
+      }
+
+      // note: don't clear the internal maps since they have cumulative data..
+    }
+  }
+
+  private final ClassGenerator<Projector> cg;
+  private final VectorAccessible incomingBatch;
+  private final BatchSchema incomingSchema;
+  private final List<NamedExpression> exprSpec;
+  private final FunctionLookupContext functionLookupContext;
+  private final BatchBuilder batchBuilder;
+  private final boolean unionTypeEnabled;
+  private final ErrorCollector collector = new ErrorCollectorImpl();
+  private final ColumnExplorer columnExplorer;
+  private final IntHashSet transferFieldIds = new IntHashSet();
+  private final ProjectionMaterializer.ClassifierResult result = new 
ClassifierResult();
+  private boolean isAnyWildcard;
+  private boolean classify;
+
+  public ProjectionMaterializer(OptionManager options,
+      VectorAccessible incomingBatch, List<NamedExpression> exprSpec,
+      FunctionLookupContext functionLookupContext, BatchBuilder batchBuilder,
+      boolean unionTypeEnabled) {
+    this.incomingBatch = incomingBatch;
+    this.incomingSchema = incomingBatch.getSchema();
+    this.exprSpec = exprSpec;
+    this.functionLookupContext = functionLookupContext;
+    this.batchBuilder = batchBuilder;
+    this.unionTypeEnabled = unionTypeEnabled;
+    columnExplorer = new ColumnExplorer(options);
+    cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, options);
+  }
+
+  public Projector generateProjector(FragmentContext context, boolean saveCode)
+      throws ClassTransformationException, IOException, SchemaChangeException {
+    long setupNewSchemaStartTime = System.currentTimeMillis();
+    setup();
+    CodeGenerator<Projector> codeGen = cg.getCodeGenerator();
+    codeGen.plainJavaCapable(true);
+    codeGen.saveCodeForDebugging(saveCode);
+    Projector projector = context.getImplementationClass(codeGen);
+
+    long setupNewSchemaEndTime = System.currentTimeMillis();
+    logger.trace("generateProjector: time {}  ms, Project {}, incoming {}",
+             (setupNewSchemaEndTime - setupNewSchemaStartTime), exprSpec, 
incomingSchema);
+    return projector;
+  }
+
+  private void setup() throws SchemaChangeException {
+    List<NamedExpression> exprs = exprSpec != null ? exprSpec
+        : inferExpressions();
+    isAnyWildcard = isAnyWildcard(exprs);
+    classify = isClassificationNeeded(exprs);
+
+    for (NamedExpression namedExpression : exprs) {
+      setupExpression(namedExpression);
+    }
+  }
+
+  private List<NamedExpression> inferExpressions() {
+    List<NamedExpression> exprs = new ArrayList<>();
+    for (MaterializedField field : incomingSchema) {
+      String fieldName = field.getName();
+      if (Types.isComplex(field.getType())
+          || Types.isRepeated(field.getType())) {
+        LogicalExpression convertToJson = FunctionCallFactory.createConvert(
+            ConvertExpression.CONVERT_TO, "JSON",
+            SchemaPath.getSimplePath(fieldName), ExpressionPosition.UNKNOWN);
+        String castFuncName = FunctionReplacementUtils
+            .getCastFunc(MinorType.VARCHAR);
+        List<LogicalExpression> castArgs = new ArrayList<>();
+        castArgs.add(convertToJson); // input_expr
+        // Implicitly casting to varchar, since we don't know actual source
+        // length, cast to undefined length, which will preserve source length
+        castArgs.add(new ValueExpressions.LongExpression(
+            Types.MAX_VARCHAR_LENGTH, null));
+        FunctionCall castCall = new FunctionCall(castFuncName, castArgs,
+            ExpressionPosition.UNKNOWN);
+        exprs.add(new NamedExpression(castCall, new 
FieldReference(fieldName)));
+      } else {
+        exprs.add(new NamedExpression(SchemaPath.getSimplePath(fieldName),
+            new FieldReference(fieldName)));
+      }
+    }
+    return exprs;
+  }
+
+  private boolean isAnyWildcard(List<NamedExpression> exprs) {
+    for (NamedExpression e : exprs) {
+      if (isWildcard(e)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean isWildcard(NamedExpression ex) {
+    if (!(ex.getExpr() instanceof SchemaPath)) {
+      return false;
+    }
+    NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
+    return expr.getPath().contains(SchemaPath.DYNAMIC_STAR);
+  }
+
+  private boolean isClassificationNeeded(List<NamedExpression> exprs) {
+    boolean needed = false;
+    for (NamedExpression ex : exprs) {
+      if (!(ex.getExpr() instanceof SchemaPath)) {
+        continue;
+      }
+      NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
+      NameSegment ref = ex.getRef().getRootSegment();
+      boolean refHasPrefix = ref.getPath()
+          .contains(StarColumnHelper.PREFIX_DELIMITER);
+      boolean exprContainsStar = expr.getPath()
+          .contains(SchemaPath.DYNAMIC_STAR);
+
+      if (refHasPrefix || exprContainsStar) {
+        needed = true;
+        break;
+      }
+    }
+    return needed;
+  }
+
+  private void setupExpression(NamedExpression namedExpression)
+      throws SchemaChangeException {
+    result.clear();
+    if (classify && namedExpression.getExpr() instanceof SchemaPath) {
+      classifyExpr(namedExpression, result);
+
+      if (result.isStar) {
+        setupImplicitColumnRef(namedExpression);
+        return;
+      }
+    } else {
+      // For the columns which do not needed to be classified,
+      // it is still necessary to ensure the output column name is unique
+      result.outputNames = new ArrayList<>();
+      String outputName = getRef(namedExpression).getRootSegment().getPath(); 
// moved
+                                                                              
// to
+                                                                              
// before
+                                                                              
// the
+                                                                              
// if
+      addToResultMaps(outputName, result, true);
+    }
+    String outputName = getRef(namedExpression).getRootSegment().getPath();
+    if (result != null && result.outputNames != null
+        && result.outputNames.size() > 0) {
+      boolean isMatched = false;
+      for (int j = 0; j < result.outputNames.size(); j++) {
+        if (!result.outputNames.get(j).isEmpty()) {
+          outputName = result.outputNames.get(j);
+          isMatched = true;
+          break;
+        }
+      }
+
+      if (!isMatched) {
+        return;
+      }
+    }
+
+    LogicalExpression expr = ExpressionTreeMaterializer.materialize(
+        namedExpression.getExpr(), incomingBatch, collector,
+        functionLookupContext, true, unionTypeEnabled);
+    if (collector.hasErrors()) {
+      throw new SchemaChangeException(String.format(
+          "Failure while trying to materialize incoming schema.  Errors:\n 
%s.",
+          collector.toErrorString()));
+    }
+
+    // Add value vector to transfer if direct reference and this is allowed,
+    // otherwise, add to evaluation stack.
+    if (expr instanceof ValueVectorReadExpression
+        && incomingSchema.getSelectionVectorMode() == SelectionVectorMode.NONE
+        && !((ValueVectorReadExpression) expr).hasReadPath() && !isAnyWildcard
+        && !transferFieldIds.contains(
+            ((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])) 
{
+      setupDirectTransfer(namedExpression, expr);
+    } else if (expr instanceof DrillFuncHolderExpr
+        && ((DrillFuncHolderExpr) expr).getHolder()
+            .isComplexWriterFuncHolder()) {
+      setupFnCall(namedExpression, expr);
+    } else {
+      // need to do evaluation.
+      setupExprEval(namedExpression, expr, outputName);
+    }
+  }
+
+  private void setupImplicitColumnRef(NamedExpression namedExpression)
+      throws SchemaChangeException {
+    // The value indicates which wildcard we are processing now
+    Integer value = result.prefixMap.get(result.prefix);
+    if (value != null && value == 1) {
+      int k = 0;
+      for (VectorWrapper<?> wrapper : incomingBatch) {
+        ValueVector vvIn = wrapper.getValueVector();
+        if (k > result.outputNames.size() - 1) {
+          assert false;
+        }
+        String name = result.outputNames.get(k++); // get the renamed column
+                                                   // names
+        if (name.isEmpty()) {
+          continue;
+        }
+
+        if (isImplicitFileColumn(vvIn.getField())) {
+          continue;
+        }
+
+        batchBuilder.addTransferField(name, vvIn);
+      }
+    } else if (value != null && value > 1) { // subsequent wildcards should do 
a
+                                             // copy of incoming value vectors
+      int k = 0;
+      for (MaterializedField field : incomingSchema) {
+        SchemaPath originalPath = SchemaPath
+            .getSimplePath(field.getName());
+        if (k > result.outputNames.size() - 1) {
+          assert false;
+        }
+        String name = result.outputNames.get(k++); // get the renamed column
+                                                   // names
+        if (name.isEmpty()) {
+          continue;
+        }
+
+        if (isImplicitFileColumn(field)) {
+          continue;
+        }
+
+        LogicalExpression expr = ExpressionTreeMaterializer.materialize(
+            originalPath, incomingBatch, collector, functionLookupContext);
+        if (collector.hasErrors()) {
+          throw new SchemaChangeException(String.format(
+              "Failure while trying to materialize incomingBatch schema.  
Errors:\n %s.",
+              collector.toErrorString()));
+        }
+
+        ValueVectorWriteExpression write = batchBuilder.addOutputVector(name,
+            expr);
+        cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
+      }
+    }
+  }
+
+  private void setupDirectTransfer(NamedExpression namedExpression,
+      LogicalExpression expr) {
+    FieldReference ref = getRef(namedExpression);
+    int fid = batchBuilder.addDirectTransfer(ref,
+        (ValueVectorReadExpression) expr);
+    transferFieldIds.add(fid);
+  }
+
+  private void setupFnCall(NamedExpression namedExpression,
+      LogicalExpression expr) {
+
+    // Need to process ComplexWriter function evaluation.
+    // The reference name will be passed to ComplexWriter, used as the name of
+    // the output vector from the writer.
+    ((DrillFuncHolderExpr) expr).setFieldReference(namedExpression.getRef());
+    cg.addExpr(expr, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
+    batchBuilder.addComplexField(namedExpression.getRef());
+  }
+
+  private void setupExprEval(NamedExpression namedExpression,
+      LogicalExpression expr, String outputName) {
+    ValueVectorWriteExpression write = batchBuilder.addEvalVector(
+        outputName, expr);
+    cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
+  }
+
+  private boolean isImplicitFileColumn(MaterializedField field) {
+    return columnExplorer
+        .isImplicitOrInternalFileColumn(field.getName());
+  }
+
+  private void classifyExpr(NamedExpression ex, ClassifierResult result) {
+    NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
+    NameSegment ref = ex.getRef().getRootSegment();
+    boolean exprHasPrefix = expr.getPath()
+        .contains(StarColumnHelper.PREFIX_DELIMITER);
+    boolean refHasPrefix = ref.getPath()
+        .contains(StarColumnHelper.PREFIX_DELIMITER);
+    boolean exprIsStar = expr.getPath().equals(SchemaPath.DYNAMIC_STAR);
+    boolean refContainsStar = ref.getPath().contains(SchemaPath.DYNAMIC_STAR);
+    boolean exprContainsStar = 
expr.getPath().contains(SchemaPath.DYNAMIC_STAR);
+    boolean refEndsWithStar = ref.getPath().endsWith(SchemaPath.DYNAMIC_STAR);
+
+    String exprPrefix = EMPTY_STRING;
+    String exprSuffix = expr.getPath();
+
+    if (exprHasPrefix) {
+      // get the prefix of the expr
+      String[] exprComponents = expr.getPath()
+          .split(StarColumnHelper.PREFIX_DELIMITER, 2);
+      assert (exprComponents.length == 2);
+      exprPrefix = exprComponents[0];
+      exprSuffix = exprComponents[1];
+      result.prefix = exprPrefix;
+    }
+
+    boolean exprIsFirstWildcard = false;
+    if (exprContainsStar) {
+      result.isStar = true;
+      Integer value = result.prefixMap.get(exprPrefix);
+      if (value == null) {
+        result.prefixMap.put(exprPrefix, 1);
+        exprIsFirstWildcard = true;
+      } else {
+        result.prefixMap.put(exprPrefix, value + 1);
+      }
+    }
+
+    int incomingSchemaSize = incomingSchema.getFieldCount();
+
+    // input is '*' and output is 'prefix_*'
+    if (exprIsStar && refHasPrefix && refEndsWithStar) {
+      String[] components = ref.getPath()
+          .split(StarColumnHelper.PREFIX_DELIMITER, 2);
+      assert (components.length == 2);
+      String prefix = components[0];
+      result.outputNames = new ArrayList<>();
+      for (MaterializedField field : incomingSchema) {
+        String name = field.getName();
+
+        // add the prefix to the incoming column name
+        String newName = prefix + StarColumnHelper.PREFIX_DELIMITER + name;
+        addToResultMaps(newName, result, false);
+      }
+    }
+    // input and output are the same
+    else if (expr.getPath().equalsIgnoreCase(ref.getPath())
+        && (!exprContainsStar || exprIsFirstWildcard)) {
+      if (exprContainsStar && exprHasPrefix) {
+        assert exprPrefix != null;
+
+        int k = 0;
+        result.outputNames = new ArrayList<>(incomingSchemaSize);
+        for (int j = 0; j < incomingSchemaSize; j++) {
+          result.outputNames.add(EMPTY_STRING); // initialize
+        }
+
+        for (MaterializedField field : incomingSchema) {
+          String incomingName = field.getName();
+          // get the prefix of the name
+          String[] nameComponents = incomingName
+              .split(StarColumnHelper.PREFIX_DELIMITER, 2);
+          // if incoming value vector does not have a prefix, ignore it since
+          // this expression is not referencing it
+          if (nameComponents.length <= 1) {
+            k++;
+            continue;
+          }
+          String namePrefix = nameComponents[0];
+          if (exprPrefix.equalsIgnoreCase(namePrefix)) {
+            if (!result.outputMap.containsKey(incomingName)) {
+              result.outputNames.set(k, incomingName);
+              result.outputMap.put(incomingName, incomingName);
+            }
+          }
+          k++;
+        }
+      } else {
+        result.outputNames = new ArrayList<>();
+        if (exprContainsStar) {
+          for (MaterializedField field : incomingSchema) {
+            String incomingName = field.getName();
+            if (refContainsStar) {
+              addToResultMaps(incomingName, result, true); // allow dups since
+                                                           // this is likely
+                                                           // top-level project
+            } else {
+              addToResultMaps(incomingName, result, false);
+            }
+          }
+        } else {
+          String newName = expr.getPath();
+          if (!refHasPrefix && !exprHasPrefix) {
+            addToResultMaps(newName, result, true); // allow dups since this is
+                                                    // likely top-level project
+          } else {
+            addToResultMaps(newName, result, false);
+          }
+        }
+      }
+    }
+
+    // Input is wildcard and it is not the first wildcard
+    else if (exprIsStar) {
+      result.outputNames = new ArrayList<>();
+      for (MaterializedField field : incomingSchema) {
+        String incomingName = field.getName();
+        addToResultMaps(incomingName, result, true); // allow dups since this 
is
+                                                     // likely top-level 
project
+      }
+    }
+
+    // Only the output has prefix
+    else if (!exprHasPrefix && refHasPrefix) {
+      result.outputNames = new ArrayList<>();
+      String newName = ref.getPath();
+      addToResultMaps(newName, result, false);
+    }
+    // Input has prefix but output does not
+    else if (exprHasPrefix && !refHasPrefix) {
+      int k = 0;
+      result.outputNames = new ArrayList<>(incomingSchemaSize);
+      for (int j = 0; j < incomingSchemaSize; j++) {
+        result.outputNames.add(EMPTY_STRING); // initialize
+      }
+
+      for (MaterializedField field : incomingSchema) {
+        String name = field.getName();
+        String[] components = name.split(StarColumnHelper.PREFIX_DELIMITER, 2);
+        if (components.length <= 1) {
+          k++;
+          continue;
+        }
+        String namePrefix = components[0];
+        String nameSuffix = components[1];
+        if (exprPrefix.equalsIgnoreCase(namePrefix)) { // // case insensitive
+                                                       // matching of prefix.
+          if (refContainsStar) {
+            // remove the prefix from the incoming column names
+            String newName = getUniqueName(nameSuffix, result); // for top 
level
+                                                                // we need to
+                                                                // make names
+                                                                // unique
+            result.outputNames.set(k, newName);
+          } else if (exprSuffix.equalsIgnoreCase(nameSuffix)) { // case
+                                                                // insensitive
+                                                                // matching of
+                                                                // field name.
+            // example: ref: $f1, expr: T0<PREFIX><column_name>
+            String newName = ref.getPath();
+            result.outputNames.set(k, newName);
+          }
+        } else {
+          result.outputNames.add(EMPTY_STRING);
+        }
+        k++;
+      }
+    }
+    // input and output have prefixes although they could be different...
+    else if (exprHasPrefix && refHasPrefix) {
+      String[] input = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER,
+          2);
+      assert (input.length == 2);
+      assert false : "Unexpected project expression or reference"; // not
+                                                                   // handled
+                                                                   // yet
+    } else {
+      // if the incoming schema's column name matches the expression name of 
the
+      // Project,
+      // then we just want to pick the ref name as the output column name
+
+      result.outputNames = new ArrayList<>();
+      for (MaterializedField field : incomingSchema) {
+        String incomingName = field.getName();
+        if (expr.getPath().equalsIgnoreCase(incomingName)) { // case 
insensitive
+                                                             // matching of
+                                                             // field name.
+          String newName = ref.getPath();
+          addToResultMaps(newName, result, true);
+        }
+      }
+    }
+  }
+
+  private String getUniqueName(String name,
+      ProjectionMaterializer.ClassifierResult result) {
+    Integer currentSeq = (Integer) result.sequenceMap.get(name);
+    if (currentSeq == null) { // name is unique, so return the original name
+      result.sequenceMap.put(name, -1);
+      return name;
+    }
+    // create a new name
+    int newSeq = currentSeq + 1;
+    String newName = name + newSeq;
+    result.sequenceMap.put(name, newSeq);
+    result.sequenceMap.put(newName, -1);
+
+    return newName;
+  }
+
+  /**
+   * Helper method to ensure unique output column names. If allowDupsWithRename
+   * is set to true, the original name will be appended with a suffix number to
+   * ensure uniqueness. Otherwise, the original column would not be renamed 
even
+   * even if it has been used
+   *
+   * @param origName
+   *          the original input name of the column
+   * @param result
+   *          the data structure to keep track of the used names and decide 
what
+   *          output name should be to ensure uniqueness
+   * @param allowDupsWithRename
+   *          if the original name has been used, is renaming allowed to ensure
+   *          output name unique
+   */
+  private void addToResultMaps(String origName,
+      ProjectionMaterializer.ClassifierResult result,
+      boolean allowDupsWithRename) {
+    String name = origName;
+    if (allowDupsWithRename) {
+      name = getUniqueName(origName, result);
+    }
+    if (!result.outputMap.containsKey(name)) {
+      result.outputNames.add(name);
+      result.outputMap.put(name, name);
+    } else {
+      result.outputNames.add(EMPTY_STRING);
+    }
+  }
+
+  // Hack to make ref and full work together... need to figure out if this is
+  // still necessary.
+  private FieldReference getRef(NamedExpression e) {
+    return e.getRef();
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index 29c4dad..496c776 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -355,7 +355,7 @@ public class WindowFrameRecordBatch extends 
AbstractRecordBatch<WindowPOP> {
     CodeGenerator<WindowFramer> codeGen = cg.getCodeGenerator();
     codeGen.plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
-    codeGen.saveCodeForDebugging(true);
+    // codeGen.saveCodeForDebugging(true);
 
     return context.getImplementationClass(codeGen);
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
index 03e8ffa..b1156f6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
@@ -28,7 +28,8 @@ public interface VectorAccessible extends 
Iterable<VectorWrapper<?>> {
   /**
    * Get the value vector type and id for the given schema path. The 
TypedFieldId
    * should store a fieldId which is the same as the ordinal position of the 
field
-   * within the Iterator provided this classes implementation of 
Iterable<ValueVector>.
+   * within the Iterator provided this classes implementation of
+   * <code>Iterable&lt;ValueVector><code>.
    *
    * @param path the path where the vector should be located.
    * @return the local field id associated with this vector. If no field 
matches this
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
index 8d95701..d8885b7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
@@ -24,8 +24,6 @@ import org.apache.drill.exec.vector.ValueVector;
 
 public interface VectorWrapper<T extends ValueVector> {
 
-
-
   public Class<T> getVectorClass();
   public MaterializedField getField();
   public T getValueVector();
diff --git a/exec/java-exec/src/test/resources/project/test1.json 
b/exec/java-exec/src/test/resources/project/test1.json
index 39d8c18..c0bb3ee 100644
--- a/exec/java-exec/src/test/resources/project/test1.json
+++ b/exec/java-exec/src/test/resources/project/test1.json
@@ -25,10 +25,10 @@
             child: 1,
             pop:"project",
             exprs: [
-              { ref: "col1", expr:"red + 1" },
-              { ref: "col2", expr:"red + 2" },
-              { ref: "col3", expr:"orange"},
-              { ref: "col4", expr:"orange"}
+              { ref: "col1", expr: "red + 1" },
+              { ref: "col2", expr: "red + 2" },
+              { ref: "col3", expr: "orange"},
+              { ref: "col4", expr: "orange"}
             ]
         },
         {

Reply via email to