This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 0af04cd44622c288278965b13685907fd1e40fb4 Author: Paul Rogers <[email protected]> AuthorDate: Mon Dec 30 19:28:25 2019 -0800 DRILL-7503: Refactor the project operator Breaks the big "setup" function into its own class, and separates out physical vector setup from logical projection planning. No functional change; just rearranging existing code. closes #1944 --- .../physical/impl/project/OutputWidthVisitor.java | 418 +++++++------- .../physical/impl/project/ProjectBatchBuilder.java | 141 +++++ .../impl/project/ProjectMemoryManager.java | 526 ++++++++--------- .../physical/impl/project/ProjectRecordBatch.java | 620 ++------------------ .../impl/project/ProjectionMaterializer.java | 626 +++++++++++++++++++++ .../impl/window/WindowFrameRecordBatch.java | 2 +- .../apache/drill/exec/record/VectorAccessible.java | 3 +- .../apache/drill/exec/record/VectorWrapper.java | 2 - .../src/test/resources/project/test1.json | 8 +- 9 files changed, 1265 insertions(+), 1081 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java index 7b2e2b8..4fa7a89 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java @@ -43,238 +43,212 @@ import org.apache.drill.exec.record.TypedFieldId; import java.util.ArrayList; -public class OutputWidthVisitor extends AbstractExecExprVisitor<OutputWidthExpression, OutputWidthVisitorState, - RuntimeException> { - - @Override - public OutputWidthExpression visitVarDecimalConstant(VarDecimalExpression varDecimalExpression, - OutputWidthVisitorState state) throws RuntimeException { - Preconditions.checkArgument(varDecimalExpression.getMajorType().hasPrecision()); - return new FixedLenExpr(varDecimalExpression.getMajorType().getPrecision()); - } - - - /** - * - * Records the {@link IfExpression} as a {@link IfElseWidthExpr}. IfElseWidthExpr will be reduced to - * a {@link FixedLenExpr} by taking the max of the if-expr-width and the else-expr-width. - * - * @param ifExpression - * @param state - * @return IfElseWidthExpr - * @throws RuntimeException - */ - @Override - public OutputWidthExpression visitIfExpression(IfExpression ifExpression, OutputWidthVisitorState state) - throws RuntimeException { - IfExpression.IfCondition condition = ifExpression.ifCondition; - LogicalExpression ifExpr = condition.expression; - LogicalExpression elseExpr = ifExpression.elseExpression; - - OutputWidthExpression ifWidthExpr = ifExpr.accept(this, state); - OutputWidthExpression elseWidthExpr = null; - if (elseExpr != null) { - elseWidthExpr = elseExpr.accept(this, state); - } - return new IfElseWidthExpr(ifWidthExpr, elseWidthExpr); +public class OutputWidthVisitor extends + AbstractExecExprVisitor<OutputWidthExpression, + OutputWidthVisitorState, RuntimeException> { + + @Override + public OutputWidthExpression visitVarDecimalConstant(VarDecimalExpression varDecimalExpression, + OutputWidthVisitorState state) throws RuntimeException { + Preconditions.checkArgument(varDecimalExpression.getMajorType().hasPrecision()); + return new FixedLenExpr(varDecimalExpression.getMajorType().getPrecision()); + } + + /** + * Records the {@link IfExpression} as a {@link IfElseWidthExpr}. + * IfElseWidthExpr will be reduced to a {@link FixedLenExpr} by taking the max + * of the if-expr-width and the else-expr-width. + */ + @Override + public OutputWidthExpression visitIfExpression(IfExpression ifExpression, OutputWidthVisitorState state) + throws RuntimeException { + IfExpression.IfCondition condition = ifExpression.ifCondition; + LogicalExpression ifExpr = condition.expression; + LogicalExpression elseExpr = ifExpression.elseExpression; + + OutputWidthExpression ifWidthExpr = ifExpr.accept(this, state); + OutputWidthExpression elseWidthExpr = null; + if (elseExpr != null) { + elseWidthExpr = elseExpr.accept(this, state); } - - /** - * Handles a {@link FunctionHolderExpression}. Functions that produce fixed-width output are trivially - * converted to a {@link FixedLenExpr}. For functions that produce variable width output, the output width calculator - * annotation is looked-up and recorded in a {@link FunctionCallExpr}. This calculator will later be used to convert - * the FunctionCallExpr to a {@link FixedLenExpr} expression - * @param holderExpr - * @param state - * @return FunctionCallExpr - * @throws RuntimeException - */ - @Override - public OutputWidthExpression visitFunctionHolderExpression(FunctionHolderExpression holderExpr, - OutputWidthVisitorState state) throws RuntimeException { - OutputWidthExpression fixedWidth = getFixedLenExpr(holderExpr.getMajorType()); - if (fixedWidth != null) { return fixedWidth; } - // Only Drill functions can be handled. Non-drill Functions, like HiveFunctions - // will default to a fixed value - if (!(holderExpr instanceof DrillFuncHolderExpr)) { - // We currently only know how to handle DrillFuncs. - // Use a default if this is not a DrillFunc - return new FixedLenExpr(OutputSizeEstimateConstants.NON_DRILL_FUNCTION_OUTPUT_SIZE_ESTIMATE); - } - - final DrillFuncHolder holder = ((DrillFuncHolderExpr) holderExpr).getHolder(); - - // If the user has provided a size estimate, use it - int estimate = holder.variableOutputSizeEstimate(); - if (estimate != FunctionTemplate.OUTPUT_SIZE_ESTIMATE_DEFAULT) { - return new FixedLenExpr(estimate); - } - // Use the calculator provided by the user or use the default - OutputWidthCalculator widthCalculator = holder.getOutputWidthCalculator(); - final int argSize = holderExpr.args.size(); - ArrayList<OutputWidthExpression> arguments = null; - if (argSize != 0) { - arguments = new ArrayList<>(argSize); - for (LogicalExpression expr : holderExpr.args) { - arguments.add(expr.accept(this, state)); - } - } - return new FunctionCallExpr(holderExpr, widthCalculator, arguments); + return new IfElseWidthExpr(ifWidthExpr, elseWidthExpr); + } + + /** + * Handles a {@link FunctionHolderExpression}. Functions that produce + * fixed-width output are trivially converted to a {@link FixedLenExpr}. For + * functions that produce variable width output, the output width calculator + * annotation is looked-up and recorded in a {@link FunctionCallExpr}. This + * calculator will later be used to convert the FunctionCallExpr to a + * {@link FixedLenExpr} expression + */ + @Override + public OutputWidthExpression visitFunctionHolderExpression(FunctionHolderExpression holderExpr, + OutputWidthVisitorState state) throws RuntimeException { + OutputWidthExpression fixedWidth = getFixedLenExpr(holderExpr.getMajorType()); + if (fixedWidth != null) { return fixedWidth; } + // Only Drill functions can be handled. Non-drill Functions, like HiveFunctions + // will default to a fixed value + if (!(holderExpr instanceof DrillFuncHolderExpr)) { + // We currently only know how to handle DrillFuncs. + // Use a default if this is not a DrillFunc + return new FixedLenExpr(OutputSizeEstimateConstants.NON_DRILL_FUNCTION_OUTPUT_SIZE_ESTIMATE); } - /** - * Records a variable width write expression. This will be converted to a {@link FixedLenExpr} expression by walking - * the tree of expression attached to the write expression. - * @param writeExpr - * @param state - * @return - * @throws RuntimeException - */ - @Override - public OutputWidthExpression visitValueVectorWriteExpression(ValueVectorWriteExpression writeExpr, - OutputWidthVisitorState state) throws RuntimeException { - TypedFieldId fieldId = writeExpr.getFieldId(); - ProjectMemoryManager manager = state.getManager(); - OutputWidthExpression outputExpr; - if (manager.isFixedWidth(fieldId)) { - outputExpr = getFixedLenExpr(fieldId.getFinalType()); - } else { - LogicalExpression writeArg = writeExpr.getChild(); - outputExpr = writeArg.accept(this, state); - } - return outputExpr; - } + final DrillFuncHolder holder = ((DrillFuncHolderExpr) holderExpr).getHolder(); - /** - * Records a variable width read expression as a {@link VarLenReadExpr}. This will be converted to a - * {@link FixedLenExpr} expression by getting the size for the corresponding column from the {@link RecordBatchSizer}. - * - * @param readExpr - * @param state - * @return - * @throws RuntimeException - */ - @Override - public OutputWidthExpression visitValueVectorReadExpression(ValueVectorReadExpression readExpr, - OutputWidthVisitorState state) throws RuntimeException { - return new VarLenReadExpr(readExpr); + // If the user has provided a size estimate, use it + int estimate = holder.variableOutputSizeEstimate(); + if (estimate != FunctionTemplate.OUTPUT_SIZE_ESTIMATE_DEFAULT) { + return new FixedLenExpr(estimate); } - - @Override - public OutputWidthExpression visitQuotedStringConstant(ValueExpressions.QuotedString quotedString, - OutputWidthVisitorState state) throws RuntimeException { - return new FixedLenExpr(quotedString.getString().length()); + // Use the calculator provided by the user or use the default + OutputWidthCalculator widthCalculator = holder.getOutputWidthCalculator(); + final int argSize = holderExpr.args.size(); + ArrayList<OutputWidthExpression> arguments = null; + if (argSize != 0) { + arguments = new ArrayList<>(argSize); + for (LogicalExpression expr : holderExpr.args) { + arguments.add(expr.accept(this, state)); + } } - - @Override - public OutputWidthExpression visitUnknown(LogicalExpression logicalExpression, OutputWidthVisitorState state) { - OutputWidthExpression fixedLenExpr = getFixedLenExpr(logicalExpression.getMajorType()); - if (fixedLenExpr != null) { - return fixedLenExpr; - } - throw new IllegalStateException("Unknown variable width expression: " + logicalExpression); + return new FunctionCallExpr(holderExpr, widthCalculator, arguments); + } + + /** + * Records a variable width write expression. This will be converted to a + * {@link FixedLenExpr} expression by walking the tree of expression attached + * to the write expression. + */ + @Override + public OutputWidthExpression visitValueVectorWriteExpression(ValueVectorWriteExpression writeExpr, + OutputWidthVisitorState state) throws RuntimeException { + TypedFieldId fieldId = writeExpr.getFieldId(); + ProjectMemoryManager manager = state.getManager(); + OutputWidthExpression outputExpr; + if (manager.isFixedWidth(fieldId)) { + outputExpr = getFixedLenExpr(fieldId.getFinalType()); + } else { + LogicalExpression writeArg = writeExpr.getChild(); + outputExpr = writeArg.accept(this, state); } - - @Override - public OutputWidthExpression visitNullConstant(TypedNullConstant nullConstant, OutputWidthVisitorState state) - throws RuntimeException { - int width; - if (nullConstant.getMajorType().hasPrecision()) { - width = nullConstant.getMajorType().getPrecision(); - } else { - width = 0; - } - return new FixedLenExpr(width); + return outputExpr; + } + + /** + * Records a variable width read expression as a {@link VarLenReadExpr}. This + * will be converted to a {@link FixedLenExpr} expression by getting the size + * for the corresponding column from the {@link RecordBatchSizer}. + */ + @Override + public OutputWidthExpression visitValueVectorReadExpression(ValueVectorReadExpression readExpr, + OutputWidthVisitorState state) throws RuntimeException { + return new VarLenReadExpr(readExpr); + } + + @Override + public OutputWidthExpression visitQuotedStringConstant(ValueExpressions.QuotedString quotedString, + OutputWidthVisitorState state) throws RuntimeException { + return new FixedLenExpr(quotedString.getString().length()); + } + + @Override + public OutputWidthExpression visitUnknown(LogicalExpression logicalExpression, OutputWidthVisitorState state) { + OutputWidthExpression fixedLenExpr = getFixedLenExpr(logicalExpression.getMajorType()); + if (fixedLenExpr != null) { + return fixedLenExpr; } - - - @Override - public OutputWidthExpression visitFixedLenExpr(FixedLenExpr fixedLenExpr, OutputWidthVisitorState state) - throws RuntimeException { - return fixedLenExpr; + throw new IllegalStateException("Unknown variable width expression: " + logicalExpression); + } + + @Override + public OutputWidthExpression visitNullConstant(TypedNullConstant nullConstant, OutputWidthVisitorState state) + throws RuntimeException { + int width; + if (nullConstant.getMajorType().hasPrecision()) { + width = nullConstant.getMajorType().getPrecision(); + } else { + width = 0; } - - /** - * Converts the {@link VarLenReadExpr} to a {@link FixedLenExpr} by getting the size for the corresponding column - * from the RecordBatchSizer. - * @param varLenReadExpr - * @param state - * @return - * @throws RuntimeException - */ - @Override - public OutputWidthExpression visitVarLenReadExpr(VarLenReadExpr varLenReadExpr, OutputWidthVisitorState state) - throws RuntimeException { - String columnName = varLenReadExpr.getInputColumnName(); - if (columnName == null) { - TypedFieldId fieldId = varLenReadExpr.getReadExpression().getTypedFieldId(); - columnName = TypedFieldId.getPath(fieldId, state.manager.getIncomingBatch()); - } - final RecordBatchSizer.ColumnSize columnSize = state.manager.getColumnSize(columnName); - - int columnWidth = columnSize.getDataSizePerEntry(); - return new FixedLenExpr(columnWidth); + return new FixedLenExpr(width); + } + + + @Override + public OutputWidthExpression visitFixedLenExpr(FixedLenExpr fixedLenExpr, OutputWidthVisitorState state) + throws RuntimeException { + return fixedLenExpr; + } + + /** + * Converts the {@link VarLenReadExpr} to a {@link FixedLenExpr} by getting + * the size for the corresponding column from the RecordBatchSizer. + */ + @Override + public OutputWidthExpression visitVarLenReadExpr(VarLenReadExpr varLenReadExpr, OutputWidthVisitorState state) + throws RuntimeException { + String columnName = varLenReadExpr.getInputColumnName(); + if (columnName == null) { + TypedFieldId fieldId = varLenReadExpr.getReadExpression().getTypedFieldId(); + columnName = TypedFieldId.getPath(fieldId, state.manager.incomingBatch()); } - - /** - * Converts a {@link FunctionCallExpr} to a {@link FixedLenExpr} by passing the the args of the function to the - * width calculator for this function. - * @param functionCallExpr - * @param state - * @return - * @throws RuntimeException - */ - @Override - public OutputWidthExpression visitFunctionCallExpr(FunctionCallExpr functionCallExpr, OutputWidthVisitorState state) - throws RuntimeException { - ArrayList<OutputWidthExpression> args = functionCallExpr.getArgs(); - ArrayList<FixedLenExpr> estimatedArgs = null; - - if (args != null && args.size() != 0) { - estimatedArgs = new ArrayList<>(args.size()); - for (OutputWidthExpression expr : args) { - // Once the args are visited, they will all become FixedWidthExpr - FixedLenExpr fixedLenExpr = (FixedLenExpr) expr.accept(this, state); - estimatedArgs.add(fixedLenExpr); - } - } - OutputWidthCalculator estimator = functionCallExpr.getCalculator(); - int estimatedSize = estimator.getOutputWidth(estimatedArgs); - return new FixedLenExpr(estimatedSize); + final RecordBatchSizer.ColumnSize columnSize = state.manager.getColumnSize(columnName); + + int columnWidth = columnSize.getDataSizePerEntry(); + return new FixedLenExpr(columnWidth); + } + + /** + * Converts a {@link FunctionCallExpr} to a {@link FixedLenExpr} by passing + * the the args of the function to the width calculator for this function. + */ + @Override + public OutputWidthExpression visitFunctionCallExpr(FunctionCallExpr functionCallExpr, OutputWidthVisitorState state) + throws RuntimeException { + ArrayList<OutputWidthExpression> args = functionCallExpr.getArgs(); + ArrayList<FixedLenExpr> estimatedArgs = null; + + if (args != null && args.size() != 0) { + estimatedArgs = new ArrayList<>(args.size()); + for (OutputWidthExpression expr : args) { + // Once the args are visited, they will all become FixedWidthExpr + FixedLenExpr fixedLenExpr = (FixedLenExpr) expr.accept(this, state); + estimatedArgs.add(fixedLenExpr); + } } - - /** - * Converts the {@link IfElseWidthExpr} to a {@link FixedLenExpr} by taking the max of the if-expr-width and the - * else-expr-width. - * @param ifElseWidthExpr - * @param state - * @return - * @throws RuntimeException - */ - @Override - public OutputWidthExpression visitIfElseWidthExpr(IfElseWidthExpr ifElseWidthExpr, OutputWidthVisitorState state) - throws RuntimeException { - OutputWidthExpression ifReducedExpr = ifElseWidthExpr.expressions[0].accept(this, state); - assert ifReducedExpr instanceof FixedLenExpr; - int ifWidth = ((FixedLenExpr)ifReducedExpr).getDataWidth(); - int elseWidth = -1; - if (ifElseWidthExpr.expressions[1] != null) { - OutputWidthExpression elseReducedExpr = ifElseWidthExpr.expressions[1].accept(this, state); - assert elseReducedExpr instanceof FixedLenExpr; - elseWidth = ((FixedLenExpr)elseReducedExpr).getDataWidth(); - } - int outputWidth = Math.max(ifWidth, elseWidth); - return new FixedLenExpr(outputWidth); + OutputWidthCalculator estimator = functionCallExpr.getCalculator(); + int estimatedSize = estimator.getOutputWidth(estimatedArgs); + return new FixedLenExpr(estimatedSize); + } + + /** + * Converts the {@link IfElseWidthExpr} to a {@link FixedLenExpr} by taking + * the max of the if-expr-width and the else-expr-width. + */ + @Override + public OutputWidthExpression visitIfElseWidthExpr(IfElseWidthExpr ifElseWidthExpr, OutputWidthVisitorState state) + throws RuntimeException { + OutputWidthExpression ifReducedExpr = ifElseWidthExpr.expressions[0].accept(this, state); + assert ifReducedExpr instanceof FixedLenExpr; + int ifWidth = ((FixedLenExpr)ifReducedExpr).getDataWidth(); + int elseWidth = -1; + if (ifElseWidthExpr.expressions[1] != null) { + OutputWidthExpression elseReducedExpr = ifElseWidthExpr.expressions[1].accept(this, state); + assert elseReducedExpr instanceof FixedLenExpr; + elseWidth = ((FixedLenExpr)elseReducedExpr).getDataWidth(); } - - private OutputWidthExpression getFixedLenExpr(MajorType majorType) { - MajorType type = majorType; - if (Types.isFixedWidthType(type)) { - // Use only the width of the data. Metadata width will be accounted for at the end - // This is to avoid using metadata size in intermediate calculations - int fixedDataWidth = ProjectMemoryManager.getDataWidthOfFixedWidthType(type); - return new OutputWidthExpression.FixedLenExpr(fixedDataWidth); - } - return null; + int outputWidth = Math.max(ifWidth, elseWidth); + return new FixedLenExpr(outputWidth); + } + + private OutputWidthExpression getFixedLenExpr(MajorType majorType) { + MajorType type = majorType; + if (Types.isFixedWidthType(type)) { + // Use only the width of the data. Metadata width will be accounted for at the end + // This is to avoid using metadata size in intermediate calculations + int fixedDataWidth = ProjectMemoryManager.getFixedWidth(type); + return new OutputWidthExpression.FixedLenExpr(fixedDataWidth); } -} \ No newline at end of file + return null; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java new file mode 100644 index 0000000..ce5cb72 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.project; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.expr.ValueVectorReadExpression; +import org.apache.drill.exec.expr.ValueVectorWriteExpression; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.FixedWidthVector; +import org.apache.drill.exec.vector.SchemaChangeCallBack; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; + +/** + * Implements callbacks to build the physical vectors for the project + * record batch. + */ +public class ProjectBatchBuilder implements ProjectionMaterializer.BatchBuilder { + private final ProjectRecordBatch projectBatch; + private final VectorContainer container; + private final SchemaChangeCallBack callBack; + private final RecordBatch incomingBatch; + private final List<TransferPair> transfers = new ArrayList<>(); + + public ProjectBatchBuilder(ProjectRecordBatch projectBatch, VectorContainer container, + SchemaChangeCallBack callBack, RecordBatch incomingBatch) { + this.projectBatch = projectBatch; + this.container = container; + this.callBack = callBack; + this.incomingBatch = incomingBatch; + } + + public List<TransferPair> transfers() { return transfers; } + + @Override + public void addTransferField(String name, ValueVector vvIn) { + FieldReference ref = new FieldReference(name); + ValueVector vvOut = container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(), + vvIn.getField().getType()), callBack); + projectBatch.memoryManager.addTransferField(vvIn, vvIn.getField().getName(), vvOut.getField().getName()); + transfers.add(vvIn.makeTransferPair(vvOut)); + } + + @Override + public int addDirectTransfer(FieldReference ref, ValueVectorReadExpression vectorRead) { + TypedFieldId id = vectorRead.getFieldId(); + ValueVector vvIn = incomingBatch.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector(); + Preconditions.checkNotNull(incomingBatch); + + ValueVector vvOut = + container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(), + vectorRead.getMajorType()), callBack); + TransferPair tp = vvIn.makeTransferPair(vvOut); + projectBatch.memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id, incomingBatch), vvOut.getField().getName()); + transfers.add(tp); + return vectorRead.getFieldId().getFieldIds()[0]; + } + + @Override + public ValueVectorWriteExpression addOutputVector(String name, LogicalExpression expr) { + MaterializedField outputField = MaterializedField.create(name, expr.getMajorType()); + ValueVector vv = container.addOrGet(outputField, callBack); + projectBatch.allocationVectors.add(vv); + TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName())); + ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); + projectBatch.memoryManager.addNewField(vv, write); + return write; + } + + @Override + public void addComplexField(FieldReference ref) { + initComplexWriters(); + if (projectBatch.complexFieldReferencesList == null) { + projectBatch.complexFieldReferencesList = Lists.newArrayList(); + } else { + projectBatch.complexFieldReferencesList.clear(); + } + + // save the field reference for later for getting schema when input is empty + projectBatch.complexFieldReferencesList.add(ref); + projectBatch.memoryManager.addComplexField(null); // this will just add an estimate to the row width + } + + private void initComplexWriters() { + // Lazy initialization of the list of complex writers, if not done yet. + if (projectBatch.complexWriters == null) { + projectBatch.complexWriters = new ArrayList<>(); + } else { + projectBatch.complexWriters.clear(); + } + } + + @Override + public ValueVectorWriteExpression addEvalVector(String outputName, LogicalExpression expr) { + MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType()); + ValueVector ouputVector = container.addOrGet(outputField, callBack); + projectBatch.allocationVectors.add(ouputVector); + TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName())); + boolean useSetSafe = !(ouputVector instanceof FixedWidthVector); + ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe); + projectBatch.memoryManager.addNewField(ouputVector, write); + + // We cannot do multiple transfers from the same vector. However we still + // need to instantiate the output vector. + if (expr instanceof ValueVectorReadExpression) { + ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr; + if (!vectorRead.hasReadPath()) { + TypedFieldId id = vectorRead.getFieldId(); + ValueVector vvIn = incomingBatch.getValueAccessorById(id.getIntermediateClass(), + id.getFieldIds()).getValueVector(); + vvIn.makeTransferPair(ouputVector); + } + } + return write; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java index 92f2b3d..571871b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java @@ -18,10 +18,12 @@ package org.apache.drill.exec.physical.impl.project; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.MajorType; -import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.physical.impl.project.OutputWidthExpression.VarLenReadExpr; import org.apache.drill.exec.record.RecordBatch; @@ -42,307 +44,263 @@ import java.util.HashMap; import java.util.Map; /** - * - * ProjectMemoryManager(PMM) is used to estimate the size of rows produced by ProjectRecordBatch. - * The PMM works as follows: - * - * Setup phase: As and when ProjectRecordBatch creates or transfers a field, it registers the field with PMM. - * If the field is a variable width field, PMM records the expression that produces the variable - * width field. The expression is a tree of LogicalExpressions. The PMM walks this tree of LogicalExpressions - * to produce a tree of OutputWidthExpressions. The widths of Fixed width fields are just accumulated into a single - * total. Note: The PMM, currently, cannot handle new complex fields, it just uses a hard-coded estimate for such fields. - * - * - * Execution phase: Just before a batch is processed by Project, the PMM walks the tree of OutputWidthExpressions - * and converts them to FixedWidthExpressions. It uses the RecordBatchSizer and the function annotations to do this conversion. - * See OutputWidthVisitor for details. + * ProjectMemoryManager(PMM) is used to estimate the size of rows produced by + * ProjectRecordBatch. The PMM works as follows: + * <p> + * Setup phase: As and when ProjectRecordBatch creates or transfers a field, it + * registers the field with PMM. If the field is a variable-width field, PMM + * records the expression that produces the variable-width field. The expression + * is a tree of LogicalExpressions. The PMM walks this tree of + * LogicalExpressions to produce a tree of OutputWidthExpressions. The widths of + * fixed-width fields are just accumulated into a single total. Note: The PMM, + * currently, cannot handle new complex fields, it just uses a hard-coded + * estimate for such fields. + * <p> + * Execution phase: Just before a batch is processed by Project, the PMM walks + * the tree of OutputWidthExpressions and converts them to + * FixedWidthExpressions. It uses the RecordBatchSizer and the function + * annotations to do this conversion. See OutputWidthVisitor for details. */ public class ProjectMemoryManager extends RecordBatchMemoryManager { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectMemoryManager.class); - - public RecordBatch getIncomingBatch() { - return incomingBatch; - } - - RecordBatch incomingBatch = null; - ProjectRecordBatch outgoingBatch = null; - - int rowWidth = 0; - Map<String, ColumnWidthInfo> outputColumnSizes; - // Number of variable width columns in the batch - int variableWidthColumnCount = 0; - // Number of fixed width columns in the batch - int fixedWidthColumnCount = 0; - // Number of complex columns in the batch - int complexColumnsCount = 0; - - - // Holds sum of all fixed width column widths - int totalFixedWidthColumnWidth = 0; - // Holds sum of all complex column widths - // Currently, this is just a guess - int totalComplexColumnWidth = 0; - - enum WidthType { - FIXED, - VARIABLE - } - - enum OutputColumnType { - TRANSFER, - NEW - } - - class ColumnWidthInfo { - OutputWidthExpression outputExpression; - int width; - WidthType widthType; - OutputColumnType outputColumnType; - ValueVector outputVV; // for transfers, this is the transfer src - - - ColumnWidthInfo(OutputWidthExpression outputWidthExpression, - OutputColumnType outputColumnType, - WidthType widthType, - int fieldWidth, ValueVector outputVV) { - this.outputExpression = outputWidthExpression; - this.width = fieldWidth; - this.outputColumnType = outputColumnType; - this.widthType = widthType; - this.outputVV = outputVV; - } - - public OutputWidthExpression getOutputExpression() { return outputExpression; } - - public OutputColumnType getOutputColumnType() { return outputColumnType; } - - boolean isFixedWidth() { return widthType == WidthType.FIXED; } - - public int getWidth() { return width; } - - } - - void ShouldNotReachHere() { - throw new IllegalStateException(); - } - - private void setIncomingBatch(RecordBatch recordBatch) { - incomingBatch = recordBatch; - } - - private void setOutgoingBatch(ProjectRecordBatch outgoingBatch) { - this.outgoingBatch = outgoingBatch; - } + private static final Logger logger = LoggerFactory.getLogger(ProjectMemoryManager.class); - public ProjectMemoryManager(int configuredOutputSize) { - super(configuredOutputSize); - outputColumnSizes = new HashMap<>(); - } + private enum OutputColumnType { + TRANSFER, + NEW + } - public boolean isComplex(MajorType majorType) { - MinorType minorType = majorType.getMinorType(); - return minorType == MinorType.MAP || minorType == MinorType.UNION || minorType == MinorType.LIST; - } + public static class VariableWidthColumnInfo { + private final OutputWidthExpression outputExpression; + private final ValueVector outputVV; // for transfers, this is the transfer src - boolean isFixedWidth(TypedFieldId fieldId) { - ValueVector vv = getOutgoingValueVector(fieldId); - return isFixedWidth(vv); - } - public ValueVector getOutgoingValueVector(TypedFieldId fieldId) { - Class<?> clazz = fieldId.getIntermediateClass(); - int[] fieldIds = fieldId.getFieldIds(); - return outgoingBatch.getValueAccessorById(clazz, fieldIds).getValueVector(); + VariableWidthColumnInfo(OutputWidthExpression outputWidthExpression, + ValueVector outputVV) { + this.outputExpression = outputWidthExpression; + this.outputVV = outputVV; } - static boolean isFixedWidth(ValueVector vv) { return (vv instanceof FixedWidthVector); } - - - static int getNetWidthOfFixedWidthType(ValueVector vv) { - assert isFixedWidth(vv); - return ((FixedWidthVector)vv).getValueWidth(); + public OutputWidthExpression getOutputExpression() { return outputExpression; } + } + + private RecordBatch incomingBatch; + private ProjectRecordBatch outgoingBatch; + + private int rowWidth; + private final Map<String, VariableWidthColumnInfo> varWidthColumnSizes; + // Number of variable width columns in the batch + private int variableWidthColumnCount; + // Number of fixed width columns in the batch + private int fixedWidthColumnCount; + // Number of complex columns in the batch + private int complexColumnsCount; + + // Holds sum of all fixed width column widths + private int totalFixedWidthColumnWidth; + // Holds sum of all complex column widths + // Currently, this is just a guess + private int totalComplexColumnWidth; + + public ProjectMemoryManager(int configuredOutputSize) { + super(configuredOutputSize); + varWidthColumnSizes = new HashMap<>(); + } + + private void reset() { + rowWidth = 0; + totalFixedWidthColumnWidth = 0; + totalComplexColumnWidth = 0; + + fixedWidthColumnCount = 0; + complexColumnsCount = 0; + } + + private void setIncomingBatch(RecordBatch recordBatch) { + incomingBatch = recordBatch; + } + + public RecordBatch incomingBatch() { return incomingBatch; } + + private void setOutgoingBatch(ProjectRecordBatch outgoingBatch) { + this.outgoingBatch = outgoingBatch; + } + + public boolean isComplex(MajorType majorType) { + return Types.isComplex(majorType) || Types.isUnion(majorType); + } + + public boolean isFixedWidth(TypedFieldId fieldId) { + ValueVector vv = getOutgoingValueVector(fieldId); + return isFixedWidth(vv); + } + + private ValueVector getOutgoingValueVector(TypedFieldId fieldId) { + Class<?> clazz = fieldId.getIntermediateClass(); + int[] fieldIds = fieldId.getFieldIds(); + return outgoingBatch.getValueAccessorById(clazz, fieldIds).getValueVector(); + } + + private static boolean isFixedWidth(ValueVector vv) { return (vv instanceof FixedWidthVector); } + + + public static int getFixedWidth(TypeProtos.MajorType majorType) { + Preconditions.checkArgument(!Types.isVarWidthType(majorType.getMinorType()), + "Expected fixed type but was '%s'.", majorType.getMinorType()); + return TypeHelper.getSize(majorType); + } + + public void addTransferField(ValueVector vvIn, String inputColumnName, String outputColumnName) { + addField(vvIn, null, OutputColumnType.TRANSFER, inputColumnName, outputColumnName); + } + + public void addNewField(ValueVector vvOut, LogicalExpression logicalExpression) { + addField(vvOut, logicalExpression, OutputColumnType.NEW, null, vvOut.getField().getName()); + } + + private void addField(ValueVector vv, LogicalExpression logicalExpression, OutputColumnType outputColumnType, + String inputColumnName, String outputColumnName) { + if(isFixedWidth(vv)) { + addFixedWidthField(vv); + } else { + addVariableWidthField(vv, logicalExpression, outputColumnType, inputColumnName, outputColumnName); } - - public static int getDataWidthOfFixedWidthType(TypeProtos.MajorType majorType) { - MinorType minorType = majorType.getMinorType(); - final boolean isVariableWidth = (minorType == MinorType.VARCHAR || minorType == MinorType.VAR16CHAR - || minorType == MinorType.VARBINARY); - - if (isVariableWidth) { - throw new IllegalArgumentException("getWidthOfFixedWidthType() cannot handle variable width types"); - } - - if (minorType == MinorType.NULL) { - return 0; - } - - return TypeHelper.getSize(majorType); + } + + private void addVariableWidthField(ValueVector vv, LogicalExpression logicalExpression, + OutputColumnType outputColumnType, String inputColumnName, + String outputColumnName) { + variableWidthColumnCount++; + logger.trace("addVariableWidthField(): vv {} totalCount: {} outputColumnType: {}", + vvAsString(vv), variableWidthColumnCount, outputColumnType); + OutputWidthExpression outWidthExpr; + if (outputColumnType == OutputColumnType.TRANSFER) { + // Variable width transfers + // fieldWidth has to be obtained from the RecordBatchSizer + outWidthExpr = new VarLenReadExpr(inputColumnName); + } else { + // fieldWidth has to be obtained from the OutputWidthExpression + // Walk the tree of LogicalExpressions to get a tree of OutputWidthExpressions + outWidthExpr = logicalExpression.accept(new OutputWidthVisitor(), + new OutputWidthVisitorState(this)); } - - - void addTransferField(ValueVector vvIn, String inputColumnName, String outputColumnName) { - addField(vvIn, null, OutputColumnType.TRANSFER, inputColumnName, outputColumnName); + VariableWidthColumnInfo columnWidthInfo = new VariableWidthColumnInfo(outWidthExpr, vv); + VariableWidthColumnInfo existingInfo = varWidthColumnSizes.put(outputColumnName, columnWidthInfo); + Preconditions.checkState(existingInfo == null); + } + + private static String vvAsString(ValueVector vv) { + return vv == null ? "null" : + String.format("%s %s", + vv.getField().getName(), + vv.getField().getType()); + } + + public void addComplexField(ValueVector vv) { + //Complex types are not yet supported. Just use a guess for the size + assert vv == null || isComplex(vv.getField().getType()); + complexColumnsCount++; + // just a guess + totalComplexColumnWidth += OutputSizeEstimateConstants.COMPLEX_FIELD_ESTIMATE; + if (logger.isTraceEnabled()) { + logger.trace("addComplexField(): vv {} totalCount: {} totalComplexColumnWidth: {}", + vvAsString(vv), complexColumnsCount, totalComplexColumnWidth); } - - void addNewField(ValueVector vvOut, LogicalExpression logicalExpression) { - addField(vvOut, logicalExpression, OutputColumnType.NEW, null, vvOut.getField().getName()); + } + + private void addFixedWidthField(ValueVector vv) { + assert isFixedWidth(vv); + fixedWidthColumnCount++; + int fixedFieldWidth = ((FixedWidthVector) vv).getValueWidth(); + totalFixedWidthColumnWidth += fixedFieldWidth; + if (logger.isTraceEnabled()) { + logger.trace("addFixedWidthField(): vv {} totalCount: {} totalComplexColumnWidth: {}", + vvAsString(vv), fixedWidthColumnCount, totalFixedWidthColumnWidth); } - - void addField(ValueVector vv, LogicalExpression logicalExpression, OutputColumnType outputColumnType, - String inputColumnName, String outputColumnName) { - if(isFixedWidth(vv)) { - addFixedWidthField(vv); - } else { - addVariableWidthField(vv, logicalExpression, outputColumnType, inputColumnName, outputColumnName); - } - } - - private void addVariableWidthField(ValueVector vv, LogicalExpression logicalExpression, - OutputColumnType outputColumnType, String inputColumnName, String outputColumnName) { - variableWidthColumnCount++; - ColumnWidthInfo columnWidthInfo; - logger.trace("addVariableWidthField(): vv {} totalCount: {} outputColumnType: {}", - printVV(vv), variableWidthColumnCount, outputColumnType); - //Variable width transfers - if(outputColumnType == OutputColumnType.TRANSFER) { - VarLenReadExpr readExpr = new VarLenReadExpr(inputColumnName); - columnWidthInfo = new ColumnWidthInfo(readExpr, outputColumnType, - WidthType.VARIABLE, -1, vv); //fieldWidth has to be obtained from the RecordBatchSizer - } else if (isComplex(vv.getField().getType())) { - addComplexField(vv); - return; - } else { - // Walk the tree of LogicalExpressions to get a tree of OutputWidthExpressions - OutputWidthVisitorState state = new OutputWidthVisitorState(this); - OutputWidthExpression outputWidthExpression = logicalExpression.accept(new OutputWidthVisitor(), state); - columnWidthInfo = new ColumnWidthInfo(outputWidthExpression, outputColumnType, - WidthType.VARIABLE, -1, vv); //fieldWidth has to be obtained from the OutputWidthExpression - } - ColumnWidthInfo existingInfo = outputColumnSizes.put(outputColumnName, columnWidthInfo); - Preconditions.checkState(existingInfo == null); + } + + public void init(RecordBatch incomingBatch, ProjectRecordBatch outgoingBatch) { + setIncomingBatch(incomingBatch); + setOutgoingBatch(outgoingBatch); + reset(); + + RecordBatchStats.printConfiguredBatchSize(outgoingBatch.getRecordBatchStatsContext(), + getOutputBatchSize()); + } + + @Override + public void update() { + long updateStartTime = System.currentTimeMillis(); + RecordBatchSizer batchSizer = new RecordBatchSizer(incomingBatch); + long batchSizerEndTime = System.currentTimeMillis(); + + setRecordBatchSizer(batchSizer); + rowWidth = 0; + int totalVariableColumnWidth = 0; + for (String outputColumnName : varWidthColumnSizes.keySet()) { + VariableWidthColumnInfo columnWidthInfo = varWidthColumnSizes.get(outputColumnName); + int width = -1; + //Walk the tree of OutputWidthExpressions to get a FixedLenExpr + //As the tree is walked, the RecordBatchSizer and function annotations + //are looked-up to come up with the final FixedLenExpr + OutputWidthExpression savedWidthExpr = columnWidthInfo.getOutputExpression(); + OutputWidthVisitorState state = new OutputWidthVisitorState(this); + OutputWidthExpression reducedExpr = savedWidthExpr.accept(new OutputWidthVisitor(), state); + width = ((FixedLenExpr)reducedExpr).getDataWidth(); + Preconditions.checkState(width >= 0); + int metadataWidth = getMetadataWidth(columnWidthInfo.outputVV); + logger.trace("update(): fieldName {} width: {} metadataWidth: {}", + columnWidthInfo.outputVV.getField().getName(), width, metadataWidth); + width += metadataWidth; + totalVariableColumnWidth += width; } - - public static String printVV(ValueVector vv) { - String str = "null"; - if (vv != null) { - str = vv.getField().getName() + " " + vv.getField().getType(); - } - return str; + rowWidth += totalFixedWidthColumnWidth; + rowWidth += totalComplexColumnWidth; + rowWidth += totalVariableColumnWidth; + int outPutRowCount; + if (rowWidth != 0) { + // if rowWidth is not zero, set the output row count in the sizer + setOutputRowCount(getOutputBatchSize(), rowWidth); + // if more rows can be allowed than the incoming row count, then set the + // output row count to the incoming row count. + outPutRowCount = Math.min(getOutputRowCount(), batchSizer.rowCount()); + } else { + // if rowWidth == 0 then the memory manager does + // not have sufficient information to size the batch + // let the entire batch pass through. + // If incoming rc == 0, all RB Sizer look-ups will have + // 0 width and so total width can be 0 + outPutRowCount = incomingBatch.getRecordCount(); } - - void addComplexField(ValueVector vv) { - //Complex types are not yet supported. Just use a guess for the size - assert vv == null || isComplex(vv.getField().getType()); - complexColumnsCount++; - // just a guess - totalComplexColumnWidth += OutputSizeEstimateConstants.COMPLEX_FIELD_ESTIMATE; - logger.trace("addComplexField(): vv {} totalCount: {} totalComplexColumnWidth: {}", - printVV(vv), complexColumnsCount, totalComplexColumnWidth); - } - - void addFixedWidthField(ValueVector vv) { - assert isFixedWidth(vv); - fixedWidthColumnCount++; - int fixedFieldWidth = getNetWidthOfFixedWidthType(vv); - totalFixedWidthColumnWidth += fixedFieldWidth; - logger.trace("addFixedWidthField(): vv {} totalCount: {} totalComplexColumnWidth: {}", - printVV(vv), fixedWidthColumnCount, totalFixedWidthColumnWidth); - } - - public void init(RecordBatch incomingBatch, ProjectRecordBatch outgoingBatch) { - setIncomingBatch(incomingBatch); - setOutgoingBatch(outgoingBatch); - reset(); - - RecordBatchStats.printConfiguredBatchSize(outgoingBatch.getRecordBatchStatsContext(), - getOutputBatchSize()); - } - - private void reset() { - rowWidth = 0; - totalFixedWidthColumnWidth = 0; - totalComplexColumnWidth = 0; - - fixedWidthColumnCount = 0; - complexColumnsCount = 0; + setOutputRowCount(outPutRowCount); + long updateEndTime = System.currentTimeMillis(); + logger.trace("update() : Output RC {}, BatchSizer RC {}, incoming RC {}, width {}, total fixed width {}" + + ", total variable width {}, total complex width {}, batchSizer time {} ms, update time {} ms" + + ", manager {}, incoming {}",outPutRowCount, batchSizer.rowCount(), incomingBatch.getRecordCount(), + rowWidth, totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth, + (batchSizerEndTime - updateStartTime),(updateEndTime - updateStartTime), this, incomingBatch); + + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), outgoingBatch.getRecordBatchStatsContext()); + updateIncomingStats(); + } + + // TODO: Move this logic to TypesHelper or the ValueVector so it + // is more generic and reusable. + public static int getMetadataWidth(ValueVector vv) { + int width = 0; + if (vv instanceof NullableVector) { + width += ((NullableVector) vv).getBitsVector().getPayloadByteCount(1); } - @Override - public void update() { - long updateStartTime = System.currentTimeMillis(); - RecordBatchSizer batchSizer = new RecordBatchSizer(incomingBatch); - long batchSizerEndTime = System.currentTimeMillis(); - - setRecordBatchSizer(batchSizer); - rowWidth = 0; - int totalVariableColumnWidth = 0; - for (String outputColumnName : outputColumnSizes.keySet()) { - ColumnWidthInfo columnWidthInfo = outputColumnSizes.get(outputColumnName); - int width = -1; - if (columnWidthInfo.isFixedWidth()) { - // fixed width columns are accumulated in totalFixedWidthColumnWidth - ShouldNotReachHere(); - } else { - //Walk the tree of OutputWidthExpressions to get a FixedLenExpr - //As the tree is walked, the RecordBatchSizer and function annotations - //are looked-up to come up with the final FixedLenExpr - OutputWidthExpression savedWidthExpr = columnWidthInfo.getOutputExpression(); - OutputWidthVisitorState state = new OutputWidthVisitorState(this); - OutputWidthExpression reducedExpr = savedWidthExpr.accept(new OutputWidthVisitor(), state); - width = ((FixedLenExpr)reducedExpr).getDataWidth(); - Preconditions.checkState(width >= 0); - int metadataWidth = getMetadataWidth(columnWidthInfo.outputVV); - logger.trace("update(): fieldName {} width: {} metadataWidth: {}", - columnWidthInfo.outputVV.getField().getName(), width, metadataWidth); - width += metadataWidth; - } - totalVariableColumnWidth += width; - } - rowWidth += totalFixedWidthColumnWidth; - rowWidth += totalComplexColumnWidth; - rowWidth += totalVariableColumnWidth; - int outPutRowCount; - if (rowWidth != 0) { - //if rowWidth is not zero, set the output row count in the sizer - setOutputRowCount(getOutputBatchSize(), rowWidth); - // if more rows can be allowed than the incoming row count, then set the - // output row count to the incoming row count. - outPutRowCount = Math.min(getOutputRowCount(), batchSizer.rowCount()); - } else { - // if rowWidth == 0 then the memory manager does - // not have sufficient information to size the batch - // let the entire batch pass through. - // If incoming rc == 0, all RB Sizer look-ups will have - // 0 width and so total width can be 0 - outPutRowCount = incomingBatch.getRecordCount(); - } - setOutputRowCount(outPutRowCount); - long updateEndTime = System.currentTimeMillis(); - logger.trace("update() : Output RC {}, BatchSizer RC {}, incoming RC {}, width {}, total fixed width {}" - + ", total variable width {}, total complex width {}, batchSizer time {} ms, update time {} ms" - + ", manager {}, incoming {}",outPutRowCount, batchSizer.rowCount(), incomingBatch.getRecordCount(), - rowWidth, totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth, - (batchSizerEndTime - updateStartTime),(updateEndTime - updateStartTime), this, incomingBatch); - - RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), outgoingBatch.getRecordBatchStatsContext()); - updateIncomingStats(); + if (vv instanceof VariableWidthVector) { + width += ((VariableWidthVector) vv).getOffsetVector().getPayloadByteCount(1); } - public static int getMetadataWidth(ValueVector vv) { - int width = 0; - if (vv instanceof NullableVector) { - width += ((NullableVector)vv).getBitsVector().getPayloadByteCount(1); - } - - if (vv instanceof VariableWidthVector) { - width += ((VariableWidthVector)vv).getOffsetVector().getPayloadByteCount(1); - } - - if (vv instanceof BaseRepeatedValueVector) { - width += ((BaseRepeatedValueVector)vv).getOffsetVector().getPayloadByteCount(1); - width += (getMetadataWidth(((BaseRepeatedValueVector)vv).getDataVector()) * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD); - } - return width; + if (vv instanceof BaseRepeatedValueVector) { + width += ((BaseRepeatedValueVector) vv).getOffsetVector().getPayloadByteCount(1); + width += (getMetadataWidth(((BaseRepeatedValueVector) vv).getDataVector()) * + RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD); } + return width; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 9857102..c021023 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -20,103 +20,47 @@ package org.apache.drill.exec.physical.impl.project; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; import java.io.IOException; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; -import org.apache.commons.collections.map.CaseInsensitiveMap; -import org.apache.drill.common.expression.ConvertExpression; -import org.apache.drill.common.expression.ErrorCollector; -import org.apache.drill.common.expression.ErrorCollectorImpl; -import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FieldReference; -import org.apache.drill.common.expression.FunctionCall; -import org.apache.drill.common.expression.FunctionCallFactory; -import org.apache.drill.common.expression.LogicalExpression; -import org.apache.drill.common.expression.PathSegment.NameSegment; -import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.expression.ValueExpressions; -import org.apache.drill.common.expression.fn.FunctionReplacementUtils; -import org.apache.drill.common.logical.data.NamedExpression; -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.ClassTransformationException; -import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.expr.ClassGenerator; -import org.apache.drill.exec.expr.CodeGenerator; -import org.apache.drill.exec.expr.DrillFuncHolderExpr; -import org.apache.drill.exec.expr.ExpressionTreeMaterializer; -import org.apache.drill.exec.expr.ValueVectorReadExpression; -import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Project; -import org.apache.drill.exec.planner.StarColumnHelper; import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.SimpleRecordBatch; -import org.apache.drill.exec.record.TransferPair; -import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.store.ColumnExplorer; import org.apache.drill.exec.util.record.RecordBatchStats; import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import org.apache.drill.exec.vector.AllocationHelper; -import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.UntypedNullHolder; import org.apache.drill.exec.vector.UntypedNullVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; -import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; -import org.apache.drill.shaded.guava.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.carrotsearch.hppc.IntHashSet; - public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { private static final Logger logger = LoggerFactory.getLogger(ProjectRecordBatch.class); - private static final String EMPTY_STRING = ""; - + protected List<ValueVector> allocationVectors; + protected List<ComplexWriter> complexWriters; + protected List<FieldReference> complexFieldReferencesList; + protected ProjectMemoryManager memoryManager; private Projector projector; - private List<ValueVector> allocationVectors; - private List<ComplexWriter> complexWriters; - private List<FieldReference> complexFieldReferencesList; private boolean hasRemainder; private int remainderIndex; private int recordCount; - private ProjectMemoryManager memoryManager; private boolean first = true; private boolean wasNone; // whether a NONE iter outcome was already seen - private final ColumnExplorer columnExplorer; - - private class ClassifierResult { - public boolean isStar; - public List<String> outputNames; - public String prefix = ""; - public HashMap<String, Integer> prefixMap = Maps.newHashMap(); - public CaseInsensitiveMap outputMap = new CaseInsensitiveMap(); - private final CaseInsensitiveMap sequenceMap = new CaseInsensitiveMap(); - - private void clear() { - isStar = false; - prefix = ""; - if (outputNames != null) { - outputNames.clear(); - } - - // note: don't clear the internal maps since they have cumulative data.. - } - } - public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { + public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext context) { super(pop, context, incoming); - columnExplorer = new ColumnExplorer(context.getOptions()); } @Override @@ -249,7 +193,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { private void handleRemainder() { int remainingRecordCount = incoming.getRecordCount() - remainderIndex; - assert this.memoryManager.incomingBatch == incoming; + assert memoryManager.incomingBatch() == incoming; int recordsToProcess = Math.min(remainingRecordCount, memoryManager.getOutputRowCount()); doAlloc(recordsToProcess); @@ -283,11 +227,13 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext()); } + // Called from generated code. + public void addComplexWriter(ComplexWriter writer) { complexWriters.add(writer); } - private void doAlloc(int recordCount) { + public void doAlloc(int recordCount) { // Allocate vv in the allocationVectors. for (ValueVector v : allocationVectors) { AllocationHelper.allocateNew(v, recordCount); @@ -301,7 +247,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } } - private void setValueCount(int count) { + public void setValueCount(int count) { if (count == 0) { container.setEmpty(); return; @@ -323,521 +269,36 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } } - // hack to make ref and full work together... need to figure out if this is - // still necessary. - private FieldReference getRef(NamedExpression e) { - return e.getRef(); - } - - private boolean isAnyWildcard(List<NamedExpression> exprs) { - for (NamedExpression e : exprs) { - if (isWildcard(e)) { - return true; - } - } - return false; - } - - private boolean isWildcard(NamedExpression ex) { - if (!(ex.getExpr() instanceof SchemaPath)) { + @Override + protected boolean setupNewSchema() throws SchemaChangeException { + setupNewSchemaFromInput(incoming); + if (container.isSchemaChanged() || callBack.getSchemaChangedAndReset()) { + container.buildSchema(SelectionVectorMode.NONE); + return true; + } else { return false; } - NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment(); - return expr.getPath().contains(SchemaPath.DYNAMIC_STAR); } private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaChangeException { - long setupNewSchemaStartTime = System.currentTimeMillis(); // get the output batch size from config. int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); - memoryManager = new ProjectMemoryManager(configuredBatchSize); - memoryManager.init(incomingBatch, this); - if (allocationVectors != null) { - for (ValueVector v : allocationVectors) { - v.clear(); - } - } - allocationVectors = Lists.newArrayList(); - - if (complexWriters != null) { - container.clear(); - } else { - // Release the underlying DrillBufs and reset the ValueVectors to empty - // Not clearing the container here is fine since Project output schema is - // not determined solely based on incoming batch. It is defined by the - // expressions it has to evaluate. - // - // If there is a case where only the type of ValueVector already present - // in container is changed then addOrGet method takes care of it by - // replacing the vectors. - container.zeroVectors(); - } - - List<NamedExpression> exprs = getExpressionList(); - ErrorCollector collector = new ErrorCollectorImpl(); - List<TransferPair> transfers = Lists.newArrayList(); - - ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getOptions()); - cg.getCodeGenerator().plainJavaCapable(true); - - IntHashSet transferFieldIds = new IntHashSet(); - - boolean isAnyWildcard = isAnyWildcard(exprs); - - ClassifierResult result = new ClassifierResult(); - boolean classify = isClassificationNeeded(exprs); - - for (NamedExpression namedExpression : exprs) { - result.clear(); - if (classify && namedExpression.getExpr() instanceof SchemaPath) { - classifyExpr(namedExpression, incomingBatch, result); - - if (result.isStar) { - // The value indicates which wildcard we are processing now - Integer value = result.prefixMap.get(result.prefix); - if (value != null && value == 1) { - int k = 0; - for (VectorWrapper<?> wrapper : incomingBatch) { - ValueVector vvIn = wrapper.getValueVector(); - if (k > result.outputNames.size() - 1) { - assert false; - } - String name = result.outputNames.get(k++); // get the renamed column names - if (name.isEmpty()) { - continue; - } - - if (isImplicitFileColumn(vvIn)) { - continue; - } - - FieldReference ref = new FieldReference(name); - ValueVector vvOut = container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(), - vvIn.getField().getType()), callBack); - TransferPair tp = vvIn.makeTransferPair(vvOut); - memoryManager.addTransferField(vvIn, vvIn.getField().getName(), vvOut.getField().getName()); - transfers.add(tp); - } - } else if (value != null && value > 1) { // subsequent wildcards should do a copy of incoming value vectors - int k = 0; - for (VectorWrapper<?> wrapper : incomingBatch) { - ValueVector vvIn = wrapper.getValueVector(); - SchemaPath originalPath = SchemaPath.getSimplePath(vvIn.getField().getName()); - if (k > result.outputNames.size() - 1) { - assert false; - } - String name = result.outputNames.get(k++); // get the renamed column names - if (name.isEmpty()) { - continue; - } - - if (isImplicitFileColumn(vvIn)) { - continue; - } - - LogicalExpression expr = ExpressionTreeMaterializer.materialize(originalPath, incomingBatch, collector, context.getFunctionRegistry() ); - if (collector.hasErrors()) { - throw new SchemaChangeException(String.format("Failure while trying to materialize incomingBatch schema. Errors:\n %s.", collector.toErrorString())); - } - - MaterializedField outputField = MaterializedField.create(name, expr.getMajorType()); - ValueVector vv = container.addOrGet(outputField, callBack); - allocationVectors.add(vv); - TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName())); - ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); - memoryManager.addNewField(vv, write); - cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND); - } - } - continue; - } - } else { - // For the columns which do not needed to be classified, - // it is still necessary to ensure the output column name is unique - result.outputNames = Lists.newArrayList(); - String outputName = getRef(namedExpression).getRootSegment().getPath(); //moved to before the if - addToResultMaps(outputName, result, true); - } - String outputName = getRef(namedExpression).getRootSegment().getPath(); - if (result != null && result.outputNames != null && result.outputNames.size() > 0) { - boolean isMatched = false; - for (int j = 0; j < result.outputNames.size(); j++) { - if (!result.outputNames.get(j).isEmpty()) { - outputName = result.outputNames.get(j); - isMatched = true; - break; - } - } - - if (!isMatched) { - continue; - } - } - - LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incomingBatch, - collector, context.getFunctionRegistry(), true, unionTypeEnabled); - MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType()); - if (collector.hasErrors()) { - throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); - } - - // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack. - if (expr instanceof ValueVectorReadExpression && incomingBatch.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE - && !((ValueVectorReadExpression) expr).hasReadPath() - && !isAnyWildcard - && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])) { - - ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr; - TypedFieldId id = vectorRead.getFieldId(); - ValueVector vvIn = incomingBatch.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector(); - Preconditions.checkNotNull(incomingBatch); - - FieldReference ref = getRef(namedExpression); - ValueVector vvOut = - container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(), - vectorRead.getMajorType()), callBack); - TransferPair tp = vvIn.makeTransferPair(vvOut); - memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id, incomingBatch), vvOut.getField().getName()); - transfers.add(tp); - transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]); - } else if (expr instanceof DrillFuncHolderExpr && - ((DrillFuncHolderExpr) expr).getHolder().isComplexWriterFuncHolder()) { - // Need to process ComplexWriter function evaluation. - // Lazy initialization of the list of complex writers, if not done yet. - if (complexWriters == null) { - complexWriters = Lists.newArrayList(); - } else { - complexWriters.clear(); - } - - // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer. - ((DrillFuncHolderExpr) expr).setFieldReference(namedExpression.getRef()); - cg.addExpr(expr, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND); - if (complexFieldReferencesList == null) { - complexFieldReferencesList = Lists.newArrayList(); - } else { - complexFieldReferencesList.clear(); - } - - // save the field reference for later for getting schema when input is empty - complexFieldReferencesList.add(namedExpression.getRef()); - memoryManager.addComplexField(null); // this will just add an estimate to the row width - } else { - // need to do evaluation. - ValueVector ouputVector = container.addOrGet(outputField, callBack); - allocationVectors.add(ouputVector); - TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName())); - boolean useSetSafe = !(ouputVector instanceof FixedWidthVector); - ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe); - cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND); - memoryManager.addNewField(ouputVector, write); - - // We cannot do multiple transfers from the same vector. However we still need to instantiate the output vector. - if (expr instanceof ValueVectorReadExpression) { - ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr; - if (!vectorRead.hasReadPath()) { - TypedFieldId id = vectorRead.getFieldId(); - ValueVector vvIn = incomingBatch.getValueAccessorById(id.getIntermediateClass(), - id.getFieldIds()).getValueVector(); - vvIn.makeTransferPair(ouputVector); - } - } - } - } + setupNewSchema(incomingBatch, configuredBatchSize); try { - CodeGenerator<Projector> codeGen = cg.getCodeGenerator(); - codeGen.plainJavaCapable(true); + ProjectBatchBuilder batchBuilder = new ProjectBatchBuilder(this, + container, callBack, incomingBatch); + ProjectionMaterializer em = new ProjectionMaterializer(context.getOptions(), + incomingBatch, popConfig.getExprs(), context.getFunctionRegistry(), + batchBuilder, unionTypeEnabled); + boolean saveCode = false; // Uncomment this line to debug the generated code. - // codeGen.saveCodeForDebugging(true); - this.projector = context.getImplementationClass(codeGen); - projector.setup(context, incomingBatch, this, transfers); + // saveCode = true; + projector = em.generateProjector(context, saveCode); + projector.setup(context, incomingBatch, this, batchBuilder.transfers()); } catch (ClassTransformationException | IOException e) { throw new SchemaChangeException("Failure while attempting to load generated class", e); } - - long setupNewSchemaEndTime = System.currentTimeMillis(); - logger.trace("setupNewSchemaFromInput: time {} ms, Project {}, incoming {}", - (setupNewSchemaEndTime - setupNewSchemaStartTime), this, incomingBatch); - } - - @Override - protected boolean setupNewSchema() throws SchemaChangeException { - setupNewSchemaFromInput(this.incoming); - if (container.isSchemaChanged() || callBack.getSchemaChangedAndReset()) { - container.buildSchema(SelectionVectorMode.NONE); - return true; - } else { - return false; - } - } - - private boolean isImplicitFileColumn(ValueVector vvIn) { - return columnExplorer.isImplicitOrInternalFileColumn(vvIn.getField().getName()); - } - - private List<NamedExpression> getExpressionList() { - if (popConfig.getExprs() != null) { - return popConfig.getExprs(); - } - - List<NamedExpression> exprs = Lists.newArrayList(); - for (MaterializedField field : incoming.getSchema()) { - String fieldName = field.getName(); - if (Types.isComplex(field.getType()) || Types.isRepeated(field.getType())) { - LogicalExpression convertToJson = FunctionCallFactory.createConvert(ConvertExpression.CONVERT_TO, "JSON", - SchemaPath.getSimplePath(fieldName), ExpressionPosition.UNKNOWN); - String castFuncName = FunctionReplacementUtils.getCastFunc(MinorType.VARCHAR); - List<LogicalExpression> castArgs = Lists.newArrayList(); - castArgs.add(convertToJson); //input_expr - // implicitly casting to varchar, since we don't know actual source length, cast to undefined length, which will preserve source length - castArgs.add(new ValueExpressions.LongExpression(Types.MAX_VARCHAR_LENGTH, null)); - FunctionCall castCall = new FunctionCall(castFuncName, castArgs, ExpressionPosition.UNKNOWN); - exprs.add(new NamedExpression(castCall, new FieldReference(fieldName))); - } else { - exprs.add(new NamedExpression(SchemaPath.getSimplePath(fieldName), new FieldReference(fieldName))); - } - } - return exprs; - } - - private boolean isClassificationNeeded(List<NamedExpression> exprs) { - boolean needed = false; - for (NamedExpression ex : exprs) { - if (!(ex.getExpr() instanceof SchemaPath)) { - continue; - } - NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment(); - NameSegment ref = ex.getRef().getRootSegment(); - boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); - boolean exprContainsStar = expr.getPath().contains(SchemaPath.DYNAMIC_STAR); - - if (refHasPrefix || exprContainsStar) { - needed = true; - break; - } - } - return needed; - } - - private String getUniqueName(String name, ClassifierResult result) { - Integer currentSeq = (Integer) result.sequenceMap.get(name); - if (currentSeq == null) { // name is unique, so return the original name - result.sequenceMap.put(name, -1); - return name; - } - // create a new name - int newSeq = currentSeq + 1; - String newName = name + newSeq; - result.sequenceMap.put(name, newSeq); - result.sequenceMap.put(newName, -1); - - return newName; - } - - /** - * Helper method to ensure unique output column names. If allowDupsWithRename - * is set to true, the original name will be appended with a suffix number to - * ensure uniqueness. Otherwise, the original column would not be renamed even - * even if it has been used - * - * @param origName - * the original input name of the column - * @param result - * the data structure to keep track of the used names and decide what - * output name should be to ensure uniqueness - * @param allowDupsWithRename - * if the original name has been used, is renaming allowed to ensure - * output name unique - */ - private void addToResultMaps(String origName, ClassifierResult result, boolean allowDupsWithRename) { - String name = origName; - if (allowDupsWithRename) { - name = getUniqueName(origName, result); - } - if (!result.outputMap.containsKey(name)) { - result.outputNames.add(name); - result.outputMap.put(name, name); - } else { - result.outputNames.add(EMPTY_STRING); - } - } - - private void classifyExpr(NamedExpression ex, RecordBatch incoming, ClassifierResult result) { - NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment(); - NameSegment ref = ex.getRef().getRootSegment(); - boolean exprHasPrefix = expr.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); - boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); - boolean exprIsStar = expr.getPath().equals(SchemaPath.DYNAMIC_STAR); - boolean refContainsStar = ref.getPath().contains(SchemaPath.DYNAMIC_STAR); - boolean exprContainsStar = expr.getPath().contains(SchemaPath.DYNAMIC_STAR); - boolean refEndsWithStar = ref.getPath().endsWith(SchemaPath.DYNAMIC_STAR); - - String exprPrefix = EMPTY_STRING; - String exprSuffix = expr.getPath(); - - if (exprHasPrefix) { - // get the prefix of the expr - String[] exprComponents = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2); - assert(exprComponents.length == 2); - exprPrefix = exprComponents[0]; - exprSuffix = exprComponents[1]; - result.prefix = exprPrefix; - } - - boolean exprIsFirstWildcard = false; - if (exprContainsStar) { - result.isStar = true; - Integer value = result.prefixMap.get(exprPrefix); - if (value == null) { - result.prefixMap.put(exprPrefix, 1); - exprIsFirstWildcard = true; - } else { - result.prefixMap.put(exprPrefix, value + 1); - } - } - - int incomingSchemaSize = incoming.getSchema().getFieldCount(); - - // input is '*' and output is 'prefix_*' - if (exprIsStar && refHasPrefix && refEndsWithStar) { - String[] components = ref.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2); - assert(components.length == 2); - String prefix = components[0]; - result.outputNames = Lists.newArrayList(); - for (VectorWrapper<?> wrapper : incoming) { - ValueVector vvIn = wrapper.getValueVector(); - String name = vvIn.getField().getName(); - - // add the prefix to the incoming column name - String newName = prefix + StarColumnHelper.PREFIX_DELIMITER + name; - addToResultMaps(newName, result, false); - } - } - // input and output are the same - else if (expr.getPath().equalsIgnoreCase(ref.getPath()) && (!exprContainsStar || exprIsFirstWildcard)) { - if (exprContainsStar && exprHasPrefix) { - assert exprPrefix != null; - - int k = 0; - result.outputNames = Lists.newArrayListWithCapacity(incomingSchemaSize); - for (int j=0; j < incomingSchemaSize; j++) { - result.outputNames.add(EMPTY_STRING); // initialize - } - - for (VectorWrapper<?> wrapper : incoming) { - ValueVector vvIn = wrapper.getValueVector(); - String incomingName = vvIn.getField().getName(); - // get the prefix of the name - String[] nameComponents = incomingName.split(StarColumnHelper.PREFIX_DELIMITER, 2); - // if incoming value vector does not have a prefix, ignore it since - // this expression is not referencing it - if (nameComponents.length <= 1) { - k++; - continue; - } - String namePrefix = nameComponents[0]; - if (exprPrefix.equalsIgnoreCase(namePrefix)) { - if (!result.outputMap.containsKey(incomingName)) { - result.outputNames.set(k, incomingName); - result.outputMap.put(incomingName, incomingName); - } - } - k++; - } - } else { - result.outputNames = Lists.newArrayList(); - if (exprContainsStar) { - for (VectorWrapper<?> wrapper : incoming) { - ValueVector vvIn = wrapper.getValueVector(); - String incomingName = vvIn.getField().getName(); - if (refContainsStar) { - addToResultMaps(incomingName, result, true); // allow dups since this is likely top-level project - } else { - addToResultMaps(incomingName, result, false); - } - } - } else { - String newName = expr.getPath(); - if (!refHasPrefix && !exprHasPrefix) { - addToResultMaps(newName, result, true); // allow dups since this is likely top-level project - } else { - addToResultMaps(newName, result, false); - } - } - } - } - - // input is wildcard and it is not the first wildcard - else if (exprIsStar) { - result.outputNames = Lists.newArrayList(); - for (VectorWrapper<?> wrapper : incoming) { - ValueVector vvIn = wrapper.getValueVector(); - String incomingName = vvIn.getField().getName(); - addToResultMaps(incomingName, result, true); // allow dups since this is likely top-level project - } - } - - // only the output has prefix - else if (!exprHasPrefix && refHasPrefix) { - result.outputNames = Lists.newArrayList(); - String newName = ref.getPath(); - addToResultMaps(newName, result, false); - } - // input has prefix but output does not - else if (exprHasPrefix && !refHasPrefix) { - int k = 0; - result.outputNames = Lists.newArrayListWithCapacity(incomingSchemaSize); - for (int j=0; j < incomingSchemaSize; j++) { - result.outputNames.add(EMPTY_STRING); // initialize - } - - for (VectorWrapper<?> wrapper : incoming) { - ValueVector vvIn = wrapper.getValueVector(); - String name = vvIn.getField().getName(); - String[] components = name.split(StarColumnHelper.PREFIX_DELIMITER, 2); - if (components.length <= 1) { - k++; - continue; - } - String namePrefix = components[0]; - String nameSuffix = components[1]; - if (exprPrefix.equalsIgnoreCase(namePrefix)) { // // case insensitive matching of prefix. - if (refContainsStar) { - // remove the prefix from the incoming column names - String newName = getUniqueName(nameSuffix, result); // for top level we need to make names unique - result.outputNames.set(k, newName); - } else if (exprSuffix.equalsIgnoreCase(nameSuffix)) { // case insensitive matching of field name. - // example: ref: $f1, expr: T0<PREFIX><column_name> - String newName = ref.getPath(); - result.outputNames.set(k, newName); - } - } else { - result.outputNames.add(EMPTY_STRING); - } - k++; - } - } - // input and output have prefixes although they could be different... - else if (exprHasPrefix && refHasPrefix) { - String[] input = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2); - assert(input.length == 2); - assert false : "Unexpected project expression or reference"; // not handled yet - } - else { - // if the incoming schema's column name matches the expression name of the Project, - // then we just want to pick the ref name as the output column name - - result.outputNames = Lists.newArrayList(); - for (VectorWrapper<?> wrapper : incoming) { - ValueVector vvIn = wrapper.getValueVector(); - String incomingName = vvIn.getField().getName(); - if (expr.getPath().equalsIgnoreCase(incomingName)) { // case insensitive matching of field name. - String newName = ref.getPath(); - addToResultMaps(newName, result, true); - } - } - } } /** @@ -886,6 +347,31 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { return IterOutcome.OK_NEW_SCHEMA; } + private void setupNewSchema(RecordBatch incomingBatch, int configuredBatchSize) { + memoryManager = new ProjectMemoryManager(configuredBatchSize); + memoryManager.init(incomingBatch, ProjectRecordBatch.this); + if (allocationVectors != null) { + for (ValueVector v : allocationVectors) { + v.clear(); + } + } + allocationVectors = new ArrayList<>(); + + if (complexWriters != null) { + container.clear(); + } else { + // Release the underlying DrillBufs and reset the ValueVectors to empty + // Not clearing the container here is fine since Project output schema is + // not determined solely based on incoming batch. It is defined by the + // expressions it has to evaluate. + // + // If there is a case where only the type of ValueVector already present + // in container is changed then addOrGet method takes care of it by + // replacing the vectors. + container.zeroVectors(); + } + } + @Override public void dump() { logger.error("ProjectRecordBatch[projector={}, hasRemainder={}, remainderIndex={}, recordCount={}, container={}]", diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java new file mode 100644 index 0000000..6dcb428 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java @@ -0,0 +1,626 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.project; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.collections.map.CaseInsensitiveMap; +import org.apache.drill.common.expression.ConvertExpression; +import org.apache.drill.common.expression.ErrorCollector; +import org.apache.drill.common.expression.ErrorCollectorImpl; +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.FunctionCall; +import org.apache.drill.common.expression.FunctionCallFactory; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.PathSegment.NameSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.ValueExpressions; +import org.apache.drill.common.expression.fn.FunctionReplacementUtils; +import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.expr.DrillFuncHolderExpr; +import org.apache.drill.exec.expr.ExpressionTreeMaterializer; +import org.apache.drill.exec.expr.ValueVectorReadExpression; +import org.apache.drill.exec.expr.ValueVectorWriteExpression; +import org.apache.drill.exec.expr.fn.FunctionLookupContext; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.resultSet.impl.VectorState; +import org.apache.drill.exec.planner.StarColumnHelper; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.store.ColumnExplorer; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.shaded.guava.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.carrotsearch.hppc.IntHashSet; + +/** + * Plans the projection given the incoming and requested outgoing schemas. Works + * with the {@link VectorState} to create required vectors, writers and so on. + * Populates the code generator with the "projector" expressions. + */ +class ProjectionMaterializer { + private static final Logger logger = LoggerFactory.getLogger(ProjectionMaterializer.class); + private static final String EMPTY_STRING = ""; + + /** + * Abstracts the physical vector setup operations to separate + * the physical setup, in <code>ProjectRecordBatch</code>, from the + * logical setup in the materializer class. + */ + public interface BatchBuilder { + void addTransferField(String name, ValueVector vvIn); + ValueVectorWriteExpression addOutputVector(String name, LogicalExpression expr); + int addDirectTransfer(FieldReference ref, ValueVectorReadExpression vectorRead); + void addComplexField(FieldReference ref); + ValueVectorWriteExpression addEvalVector(String outputName, + LogicalExpression expr); + } + + private static class ClassifierResult { + private boolean isStar; + private List<String> outputNames; + private String prefix = ""; + private final HashMap<String, Integer> prefixMap = Maps.newHashMap(); + private final CaseInsensitiveMap outputMap = new CaseInsensitiveMap(); + private final CaseInsensitiveMap sequenceMap = new CaseInsensitiveMap(); + + private void clear() { + isStar = false; + prefix = ""; + if (outputNames != null) { + outputNames.clear(); + } + + // note: don't clear the internal maps since they have cumulative data.. + } + } + + private final ClassGenerator<Projector> cg; + private final VectorAccessible incomingBatch; + private final BatchSchema incomingSchema; + private final List<NamedExpression> exprSpec; + private final FunctionLookupContext functionLookupContext; + private final BatchBuilder batchBuilder; + private final boolean unionTypeEnabled; + private final ErrorCollector collector = new ErrorCollectorImpl(); + private final ColumnExplorer columnExplorer; + private final IntHashSet transferFieldIds = new IntHashSet(); + private final ProjectionMaterializer.ClassifierResult result = new ClassifierResult(); + private boolean isAnyWildcard; + private boolean classify; + + public ProjectionMaterializer(OptionManager options, + VectorAccessible incomingBatch, List<NamedExpression> exprSpec, + FunctionLookupContext functionLookupContext, BatchBuilder batchBuilder, + boolean unionTypeEnabled) { + this.incomingBatch = incomingBatch; + this.incomingSchema = incomingBatch.getSchema(); + this.exprSpec = exprSpec; + this.functionLookupContext = functionLookupContext; + this.batchBuilder = batchBuilder; + this.unionTypeEnabled = unionTypeEnabled; + columnExplorer = new ColumnExplorer(options); + cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, options); + } + + public Projector generateProjector(FragmentContext context, boolean saveCode) + throws ClassTransformationException, IOException, SchemaChangeException { + long setupNewSchemaStartTime = System.currentTimeMillis(); + setup(); + CodeGenerator<Projector> codeGen = cg.getCodeGenerator(); + codeGen.plainJavaCapable(true); + codeGen.saveCodeForDebugging(saveCode); + Projector projector = context.getImplementationClass(codeGen); + + long setupNewSchemaEndTime = System.currentTimeMillis(); + logger.trace("generateProjector: time {} ms, Project {}, incoming {}", + (setupNewSchemaEndTime - setupNewSchemaStartTime), exprSpec, incomingSchema); + return projector; + } + + private void setup() throws SchemaChangeException { + List<NamedExpression> exprs = exprSpec != null ? exprSpec + : inferExpressions(); + isAnyWildcard = isAnyWildcard(exprs); + classify = isClassificationNeeded(exprs); + + for (NamedExpression namedExpression : exprs) { + setupExpression(namedExpression); + } + } + + private List<NamedExpression> inferExpressions() { + List<NamedExpression> exprs = new ArrayList<>(); + for (MaterializedField field : incomingSchema) { + String fieldName = field.getName(); + if (Types.isComplex(field.getType()) + || Types.isRepeated(field.getType())) { + LogicalExpression convertToJson = FunctionCallFactory.createConvert( + ConvertExpression.CONVERT_TO, "JSON", + SchemaPath.getSimplePath(fieldName), ExpressionPosition.UNKNOWN); + String castFuncName = FunctionReplacementUtils + .getCastFunc(MinorType.VARCHAR); + List<LogicalExpression> castArgs = new ArrayList<>(); + castArgs.add(convertToJson); // input_expr + // Implicitly casting to varchar, since we don't know actual source + // length, cast to undefined length, which will preserve source length + castArgs.add(new ValueExpressions.LongExpression( + Types.MAX_VARCHAR_LENGTH, null)); + FunctionCall castCall = new FunctionCall(castFuncName, castArgs, + ExpressionPosition.UNKNOWN); + exprs.add(new NamedExpression(castCall, new FieldReference(fieldName))); + } else { + exprs.add(new NamedExpression(SchemaPath.getSimplePath(fieldName), + new FieldReference(fieldName))); + } + } + return exprs; + } + + private boolean isAnyWildcard(List<NamedExpression> exprs) { + for (NamedExpression e : exprs) { + if (isWildcard(e)) { + return true; + } + } + return false; + } + + private boolean isWildcard(NamedExpression ex) { + if (!(ex.getExpr() instanceof SchemaPath)) { + return false; + } + NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment(); + return expr.getPath().contains(SchemaPath.DYNAMIC_STAR); + } + + private boolean isClassificationNeeded(List<NamedExpression> exprs) { + boolean needed = false; + for (NamedExpression ex : exprs) { + if (!(ex.getExpr() instanceof SchemaPath)) { + continue; + } + NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment(); + NameSegment ref = ex.getRef().getRootSegment(); + boolean refHasPrefix = ref.getPath() + .contains(StarColumnHelper.PREFIX_DELIMITER); + boolean exprContainsStar = expr.getPath() + .contains(SchemaPath.DYNAMIC_STAR); + + if (refHasPrefix || exprContainsStar) { + needed = true; + break; + } + } + return needed; + } + + private void setupExpression(NamedExpression namedExpression) + throws SchemaChangeException { + result.clear(); + if (classify && namedExpression.getExpr() instanceof SchemaPath) { + classifyExpr(namedExpression, result); + + if (result.isStar) { + setupImplicitColumnRef(namedExpression); + return; + } + } else { + // For the columns which do not needed to be classified, + // it is still necessary to ensure the output column name is unique + result.outputNames = new ArrayList<>(); + String outputName = getRef(namedExpression).getRootSegment().getPath(); // moved + // to + // before + // the + // if + addToResultMaps(outputName, result, true); + } + String outputName = getRef(namedExpression).getRootSegment().getPath(); + if (result != null && result.outputNames != null + && result.outputNames.size() > 0) { + boolean isMatched = false; + for (int j = 0; j < result.outputNames.size(); j++) { + if (!result.outputNames.get(j).isEmpty()) { + outputName = result.outputNames.get(j); + isMatched = true; + break; + } + } + + if (!isMatched) { + return; + } + } + + LogicalExpression expr = ExpressionTreeMaterializer.materialize( + namedExpression.getExpr(), incomingBatch, collector, + functionLookupContext, true, unionTypeEnabled); + if (collector.hasErrors()) { + throw new SchemaChangeException(String.format( + "Failure while trying to materialize incoming schema. Errors:\n %s.", + collector.toErrorString())); + } + + // Add value vector to transfer if direct reference and this is allowed, + // otherwise, add to evaluation stack. + if (expr instanceof ValueVectorReadExpression + && incomingSchema.getSelectionVectorMode() == SelectionVectorMode.NONE + && !((ValueVectorReadExpression) expr).hasReadPath() && !isAnyWildcard + && !transferFieldIds.contains( + ((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])) { + setupDirectTransfer(namedExpression, expr); + } else if (expr instanceof DrillFuncHolderExpr + && ((DrillFuncHolderExpr) expr).getHolder() + .isComplexWriterFuncHolder()) { + setupFnCall(namedExpression, expr); + } else { + // need to do evaluation. + setupExprEval(namedExpression, expr, outputName); + } + } + + private void setupImplicitColumnRef(NamedExpression namedExpression) + throws SchemaChangeException { + // The value indicates which wildcard we are processing now + Integer value = result.prefixMap.get(result.prefix); + if (value != null && value == 1) { + int k = 0; + for (VectorWrapper<?> wrapper : incomingBatch) { + ValueVector vvIn = wrapper.getValueVector(); + if (k > result.outputNames.size() - 1) { + assert false; + } + String name = result.outputNames.get(k++); // get the renamed column + // names + if (name.isEmpty()) { + continue; + } + + if (isImplicitFileColumn(vvIn.getField())) { + continue; + } + + batchBuilder.addTransferField(name, vvIn); + } + } else if (value != null && value > 1) { // subsequent wildcards should do a + // copy of incoming value vectors + int k = 0; + for (MaterializedField field : incomingSchema) { + SchemaPath originalPath = SchemaPath + .getSimplePath(field.getName()); + if (k > result.outputNames.size() - 1) { + assert false; + } + String name = result.outputNames.get(k++); // get the renamed column + // names + if (name.isEmpty()) { + continue; + } + + if (isImplicitFileColumn(field)) { + continue; + } + + LogicalExpression expr = ExpressionTreeMaterializer.materialize( + originalPath, incomingBatch, collector, functionLookupContext); + if (collector.hasErrors()) { + throw new SchemaChangeException(String.format( + "Failure while trying to materialize incomingBatch schema. Errors:\n %s.", + collector.toErrorString())); + } + + ValueVectorWriteExpression write = batchBuilder.addOutputVector(name, + expr); + cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND); + } + } + } + + private void setupDirectTransfer(NamedExpression namedExpression, + LogicalExpression expr) { + FieldReference ref = getRef(namedExpression); + int fid = batchBuilder.addDirectTransfer(ref, + (ValueVectorReadExpression) expr); + transferFieldIds.add(fid); + } + + private void setupFnCall(NamedExpression namedExpression, + LogicalExpression expr) { + + // Need to process ComplexWriter function evaluation. + // The reference name will be passed to ComplexWriter, used as the name of + // the output vector from the writer. + ((DrillFuncHolderExpr) expr).setFieldReference(namedExpression.getRef()); + cg.addExpr(expr, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND); + batchBuilder.addComplexField(namedExpression.getRef()); + } + + private void setupExprEval(NamedExpression namedExpression, + LogicalExpression expr, String outputName) { + ValueVectorWriteExpression write = batchBuilder.addEvalVector( + outputName, expr); + cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND); + } + + private boolean isImplicitFileColumn(MaterializedField field) { + return columnExplorer + .isImplicitOrInternalFileColumn(field.getName()); + } + + private void classifyExpr(NamedExpression ex, ClassifierResult result) { + NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment(); + NameSegment ref = ex.getRef().getRootSegment(); + boolean exprHasPrefix = expr.getPath() + .contains(StarColumnHelper.PREFIX_DELIMITER); + boolean refHasPrefix = ref.getPath() + .contains(StarColumnHelper.PREFIX_DELIMITER); + boolean exprIsStar = expr.getPath().equals(SchemaPath.DYNAMIC_STAR); + boolean refContainsStar = ref.getPath().contains(SchemaPath.DYNAMIC_STAR); + boolean exprContainsStar = expr.getPath().contains(SchemaPath.DYNAMIC_STAR); + boolean refEndsWithStar = ref.getPath().endsWith(SchemaPath.DYNAMIC_STAR); + + String exprPrefix = EMPTY_STRING; + String exprSuffix = expr.getPath(); + + if (exprHasPrefix) { + // get the prefix of the expr + String[] exprComponents = expr.getPath() + .split(StarColumnHelper.PREFIX_DELIMITER, 2); + assert (exprComponents.length == 2); + exprPrefix = exprComponents[0]; + exprSuffix = exprComponents[1]; + result.prefix = exprPrefix; + } + + boolean exprIsFirstWildcard = false; + if (exprContainsStar) { + result.isStar = true; + Integer value = result.prefixMap.get(exprPrefix); + if (value == null) { + result.prefixMap.put(exprPrefix, 1); + exprIsFirstWildcard = true; + } else { + result.prefixMap.put(exprPrefix, value + 1); + } + } + + int incomingSchemaSize = incomingSchema.getFieldCount(); + + // input is '*' and output is 'prefix_*' + if (exprIsStar && refHasPrefix && refEndsWithStar) { + String[] components = ref.getPath() + .split(StarColumnHelper.PREFIX_DELIMITER, 2); + assert (components.length == 2); + String prefix = components[0]; + result.outputNames = new ArrayList<>(); + for (MaterializedField field : incomingSchema) { + String name = field.getName(); + + // add the prefix to the incoming column name + String newName = prefix + StarColumnHelper.PREFIX_DELIMITER + name; + addToResultMaps(newName, result, false); + } + } + // input and output are the same + else if (expr.getPath().equalsIgnoreCase(ref.getPath()) + && (!exprContainsStar || exprIsFirstWildcard)) { + if (exprContainsStar && exprHasPrefix) { + assert exprPrefix != null; + + int k = 0; + result.outputNames = new ArrayList<>(incomingSchemaSize); + for (int j = 0; j < incomingSchemaSize; j++) { + result.outputNames.add(EMPTY_STRING); // initialize + } + + for (MaterializedField field : incomingSchema) { + String incomingName = field.getName(); + // get the prefix of the name + String[] nameComponents = incomingName + .split(StarColumnHelper.PREFIX_DELIMITER, 2); + // if incoming value vector does not have a prefix, ignore it since + // this expression is not referencing it + if (nameComponents.length <= 1) { + k++; + continue; + } + String namePrefix = nameComponents[0]; + if (exprPrefix.equalsIgnoreCase(namePrefix)) { + if (!result.outputMap.containsKey(incomingName)) { + result.outputNames.set(k, incomingName); + result.outputMap.put(incomingName, incomingName); + } + } + k++; + } + } else { + result.outputNames = new ArrayList<>(); + if (exprContainsStar) { + for (MaterializedField field : incomingSchema) { + String incomingName = field.getName(); + if (refContainsStar) { + addToResultMaps(incomingName, result, true); // allow dups since + // this is likely + // top-level project + } else { + addToResultMaps(incomingName, result, false); + } + } + } else { + String newName = expr.getPath(); + if (!refHasPrefix && !exprHasPrefix) { + addToResultMaps(newName, result, true); // allow dups since this is + // likely top-level project + } else { + addToResultMaps(newName, result, false); + } + } + } + } + + // Input is wildcard and it is not the first wildcard + else if (exprIsStar) { + result.outputNames = new ArrayList<>(); + for (MaterializedField field : incomingSchema) { + String incomingName = field.getName(); + addToResultMaps(incomingName, result, true); // allow dups since this is + // likely top-level project + } + } + + // Only the output has prefix + else if (!exprHasPrefix && refHasPrefix) { + result.outputNames = new ArrayList<>(); + String newName = ref.getPath(); + addToResultMaps(newName, result, false); + } + // Input has prefix but output does not + else if (exprHasPrefix && !refHasPrefix) { + int k = 0; + result.outputNames = new ArrayList<>(incomingSchemaSize); + for (int j = 0; j < incomingSchemaSize; j++) { + result.outputNames.add(EMPTY_STRING); // initialize + } + + for (MaterializedField field : incomingSchema) { + String name = field.getName(); + String[] components = name.split(StarColumnHelper.PREFIX_DELIMITER, 2); + if (components.length <= 1) { + k++; + continue; + } + String namePrefix = components[0]; + String nameSuffix = components[1]; + if (exprPrefix.equalsIgnoreCase(namePrefix)) { // // case insensitive + // matching of prefix. + if (refContainsStar) { + // remove the prefix from the incoming column names + String newName = getUniqueName(nameSuffix, result); // for top level + // we need to + // make names + // unique + result.outputNames.set(k, newName); + } else if (exprSuffix.equalsIgnoreCase(nameSuffix)) { // case + // insensitive + // matching of + // field name. + // example: ref: $f1, expr: T0<PREFIX><column_name> + String newName = ref.getPath(); + result.outputNames.set(k, newName); + } + } else { + result.outputNames.add(EMPTY_STRING); + } + k++; + } + } + // input and output have prefixes although they could be different... + else if (exprHasPrefix && refHasPrefix) { + String[] input = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, + 2); + assert (input.length == 2); + assert false : "Unexpected project expression or reference"; // not + // handled + // yet + } else { + // if the incoming schema's column name matches the expression name of the + // Project, + // then we just want to pick the ref name as the output column name + + result.outputNames = new ArrayList<>(); + for (MaterializedField field : incomingSchema) { + String incomingName = field.getName(); + if (expr.getPath().equalsIgnoreCase(incomingName)) { // case insensitive + // matching of + // field name. + String newName = ref.getPath(); + addToResultMaps(newName, result, true); + } + } + } + } + + private String getUniqueName(String name, + ProjectionMaterializer.ClassifierResult result) { + Integer currentSeq = (Integer) result.sequenceMap.get(name); + if (currentSeq == null) { // name is unique, so return the original name + result.sequenceMap.put(name, -1); + return name; + } + // create a new name + int newSeq = currentSeq + 1; + String newName = name + newSeq; + result.sequenceMap.put(name, newSeq); + result.sequenceMap.put(newName, -1); + + return newName; + } + + /** + * Helper method to ensure unique output column names. If allowDupsWithRename + * is set to true, the original name will be appended with a suffix number to + * ensure uniqueness. Otherwise, the original column would not be renamed even + * even if it has been used + * + * @param origName + * the original input name of the column + * @param result + * the data structure to keep track of the used names and decide what + * output name should be to ensure uniqueness + * @param allowDupsWithRename + * if the original name has been used, is renaming allowed to ensure + * output name unique + */ + private void addToResultMaps(String origName, + ProjectionMaterializer.ClassifierResult result, + boolean allowDupsWithRename) { + String name = origName; + if (allowDupsWithRename) { + name = getUniqueName(origName, result); + } + if (!result.outputMap.containsKey(name)) { + result.outputNames.add(name); + result.outputMap.put(name, name); + } else { + result.outputNames.add(EMPTY_STRING); + } + } + + // Hack to make ref and full work together... need to figure out if this is + // still necessary. + private FieldReference getRef(NamedExpression e) { + return e.getRef(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java index 29c4dad..496c776 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java @@ -355,7 +355,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { CodeGenerator<WindowFramer> codeGen = cg.getCodeGenerator(); codeGen.plainJavaCapable(true); // Uncomment out this line to debug the generated code. - codeGen.saveCodeForDebugging(true); + // codeGen.saveCodeForDebugging(true); return context.getImplementationClass(codeGen); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java index 03e8ffa..b1156f6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java @@ -28,7 +28,8 @@ public interface VectorAccessible extends Iterable<VectorWrapper<?>> { /** * Get the value vector type and id for the given schema path. The TypedFieldId * should store a fieldId which is the same as the ordinal position of the field - * within the Iterator provided this classes implementation of Iterable<ValueVector>. + * within the Iterator provided this classes implementation of + * <code>Iterable<ValueVector><code>. * * @param path the path where the vector should be located. * @return the local field id associated with this vector. If no field matches this diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java index 8d95701..d8885b7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java @@ -24,8 +24,6 @@ import org.apache.drill.exec.vector.ValueVector; public interface VectorWrapper<T extends ValueVector> { - - public Class<T> getVectorClass(); public MaterializedField getField(); public T getValueVector(); diff --git a/exec/java-exec/src/test/resources/project/test1.json b/exec/java-exec/src/test/resources/project/test1.json index 39d8c18..c0bb3ee 100644 --- a/exec/java-exec/src/test/resources/project/test1.json +++ b/exec/java-exec/src/test/resources/project/test1.json @@ -25,10 +25,10 @@ child: 1, pop:"project", exprs: [ - { ref: "col1", expr:"red + 1" }, - { ref: "col2", expr:"red + 2" }, - { ref: "col3", expr:"orange"}, - { ref: "col4", expr:"orange"} + { ref: "col1", expr: "red + 1" }, + { ref: "col2", expr: "red + 2" }, + { ref: "col3", expr: "orange"}, + { ref: "col4", expr: "orange"} ] }, {
