DRILL-1333: Flatten operator for allowing more complex queryies against repeated data.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2871c3ba Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2871c3ba Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2871c3ba Branch: refs/heads/master Commit: 2871c3ba515e06cd4bae98742139a99e12e42407 Parents: 7309506 Author: Jason Altekruse <altekruseja...@gmail.com> Authored: Thu Oct 9 17:55:45 2014 -0700 Committer: Jason Altekruse <altekruseja...@gmail.com> Committed: Wed Oct 29 08:44:11 2014 -0700 ---------------------------------------------------------------------- .../src/main/codegen/templates/BaseWriter.java | 3 +- .../codegen/templates/RepeatedValueVectors.java | 17 +- .../drill/exec/expr/EvaluationVisitor.java | 1 + .../exec/expr/fn/impl/conv/DummyFlatten.java | 44 +++ .../org/apache/drill/exec/ops/QueryContext.java | 2 +- .../physical/base/AbstractPhysicalVisitor.java | 6 + .../exec/physical/base/PhysicalVisitor.java | 2 + .../drill/exec/physical/config/FlattenPOP.java | 73 ++++ .../impl/flatten/FlattenBatchCreator.java | 40 ++ .../impl/flatten/FlattenRecordBatch.java | 374 +++++++++++++++++++ .../physical/impl/flatten/FlattenTemplate.java | 119 ++++++ .../exec/physical/impl/flatten/Flattener.java | 38 ++ .../exec/planner/physical/DrillFlattenPrel.java | 75 ++++ .../exec/planner/physical/PlannerSettings.java | 5 +- .../exec/planner/physical/ProjectPrel.java | 2 +- .../physical/visitor/BasePrelVisitor.java | 6 + .../planner/physical/visitor/PrelVisitor.java | 2 + .../visitor/RewriteProjectToFlatten.java | 114 ++++++ .../visitor/RexVisitorComplexExprSplitter.java | 119 ++++++ .../visitor/SplitUpComplexExpressions.java | 146 ++++++++ .../planner/sql/handlers/DefaultSqlHandler.java | 9 + .../exec/vector/RepeatedFixedWidthVector.java | 7 +- .../vector/RepeatedVariableWidthVector.java | 4 +- .../drill/exec/vector/RepeatedVector.java | 26 ++ .../exec/vector/complex/RepeatedListVector.java | 19 +- .../exec/vector/complex/RepeatedMapVector.java | 137 +++++-- .../vector/complex/impl/AbstractBaseWriter.java | 3 +- .../java/org/apache/drill/PlanningBase.java | 2 +- .../org/apache/drill/TestExampleQueries.java | 12 + .../resources/flatten/corrected_physical.json | 81 ++++ .../flatten/test_flatten_physical.json | 78 ++++ .../resources/jsoninput/input2_modified.json | 106 ++++++ .../resources/jsoninput/repeated_list_bug.json | 6 + .../test/resources/store/json/nested_json.json | 8 + .../store/json/test_flatten_mapify.json | 26 ++ 35 files changed, 1665 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/codegen/templates/BaseWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/BaseWriter.java b/exec/java-exec/src/main/codegen/templates/BaseWriter.java index 224148c..979d4ac 100644 --- a/exec/java-exec/src/main/codegen/templates/BaseWriter.java +++ b/exec/java-exec/src/main/codegen/templates/BaseWriter.java @@ -32,7 +32,8 @@ public interface BaseWriter extends Positionable{ boolean ok(); WriteState getState(); int getValueCapacity(); - + void resetState(); + public interface MapWriter extends BaseWriter{ MaterializedField getField(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java index 2132212..2853e83 100644 --- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java @@ -45,7 +45,7 @@ package org.apache.drill.exec.vector; * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker. */ - public final class Repeated${minor.class}Vector extends BaseValueVector implements Repeated<#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector { +public final class Repeated${minor.class}Vector extends BaseValueVector implements Repeated<#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector { private int parentValueCount; private int childValueCount; @@ -59,7 +59,8 @@ package org.apache.drill.exec.vector; public Repeated${minor.class}Vector(MaterializedField field, BufferAllocator allocator) { super(field, allocator); this.offsets = new UInt4Vector(null, allocator); - this.values = new ${minor.class}Vector(null, allocator); + MaterializedField mf = MaterializedField.create(field.getPath(), Types.required(field.getType().getMinorType())); + this.values = new ${minor.class}Vector(mf, allocator); } public int getValueCapacity(){ @@ -106,7 +107,7 @@ package org.apache.drill.exec.vector; int endPos = offsets.getAccessor().get(startIndex+length); values.splitAndTransferTo(startIndex, endPos-startPos, target.values); target.offsets.clear(); - target.offsets.allocateNew(length+1); + target.offsets.allocateNew(endPos - startPos + 1); int normalizedPos = 0; for (int i=0; i<length+1;i++) { normalizedPos = offsets.getAccessor().get(startIndex+i) - startPos; @@ -303,7 +304,11 @@ package org.apache.drill.exec.vector; public int getCount(int index) { return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index); } - + + public ValueVector getAllChildValues() { + return values; + } + public List<${friendlyType}> getObject(int index) { List<${friendlyType}> vals = new JsonStringArrayList(); int start = offsets.getAccessor().get(index); @@ -313,6 +318,10 @@ package org.apache.drill.exec.vector; } return vals; } + + public int getGroupSizeAtIndex(int index){ + return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index); + } public ${friendlyType} getSingleObject(int index, int arrayIndex){ int start = offsets.getAccessor().get(index); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java index d1e10ae..5cf4a35 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java @@ -301,6 +301,7 @@ public class EvaluationVisitor { TypeHelper.getWriterInterface(inputContainer.getMinorType(), inputContainer.getMajorType().getMode())); JVar writer = generator.declareClassField("writer", writerIFace); generator.getSetupBlock().assign(writer, JExpr._new(writerImpl).arg(vv).arg(JExpr._null())); + generator.getEvalBlock().add(writer.invoke("resetState")); generator.getEvalBlock().add(writer.invoke("setPosition").arg(outIndex)); String copyMethod = inputContainer.isSingularRepeated() ? "copyAsValueSingle" : "copyAsValue"; generator.getEvalBlock().add(inputContainer.getHolder().invoke(copyMethod).arg(writer)); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyFlatten.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyFlatten.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyFlatten.java new file mode 100644 index 0000000..d4e3115 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyFlatten.java @@ -0,0 +1,44 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.expr.fn.impl.conv; + + +import org.apache.drill.exec.expr.DrillSimpleFunc; +import org.apache.drill.exec.expr.annotations.FunctionTemplate; +import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope; +import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling; +import org.apache.drill.exec.expr.annotations.Output; +import org.apache.drill.exec.expr.holders.VarBinaryHolder; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.vector.complex.writer.BaseWriter; + +/** + * This and {@link DummyConvertTo} class merely act as a placeholder so that Optiq + * allows the 'flatten()' function in SQL. + */ +@FunctionTemplate(name = "flatten", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) +public class DummyFlatten implements DrillSimpleFunc { + + @Output BaseWriter.ComplexWriter out; + + @Override + public void setup(RecordBatch incoming) { } + + @Override + public void eval() { } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java index 4b290a5..ea48b05 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java @@ -58,7 +58,7 @@ public class QueryContext{ this.session = session; this.timer = new Multitimer<>(QuerySetup.class); this.queryOptions = new QueryOptionManager(session.getOptions()); - this.plannerSettings = new PlannerSettings(queryOptions); + this.plannerSettings = new PlannerSettings(queryOptions, getFunctionRegistry()); this.plannerSettings.setNumEndPoints(this.getActiveEndpoints().size()); this.table = new DrillOperatorTable(getFunctionRegistry()); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index 031ab10..27b0ecb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.base; import org.apache.drill.exec.physical.config.BroadcastSender; import org.apache.drill.exec.physical.config.Filter; +import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.config.HashJoinPOP; import org.apache.drill.exec.physical.config.HashPartitionSender; @@ -105,6 +106,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme } @Override + public T visitFlatten(FlattenPOP flatten, X value) throws E { + return visitOp(flatten, value); + } + + @Override public T visitReceiver(Receiver receiver, X value) throws E { return visitOp(receiver, value); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java index f9a9c21..e6a89d0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.base; import org.apache.drill.exec.physical.config.BroadcastSender; import org.apache.drill.exec.physical.config.Filter; +import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.config.HashJoinPOP; import org.apache.drill.exec.physical.config.HashPartitionSender; @@ -62,6 +63,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitTrace(Trace trace, EXTRA value) throws EXCEP; public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP; public RETURN visitLimit(Limit limit, EXTRA value) throws EXCEP; + public RETURN visitFlatten(FlattenPOP flatten, EXTRA value) throws EXCEP; public RETURN visitMergeJoin(MergeJoinPOP join, EXTRA value) throws EXCEP; public RETURN visitHashJoin(HashJoinPOP join, EXTRA value) throws EXCEP; public RETURN visitSender(Sender sender, EXTRA value) throws EXCEP; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/FlattenPOP.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/FlattenPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/FlattenPOP.java new file mode 100644 index 0000000..42e6870 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/FlattenPOP.java @@ -0,0 +1,73 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.physical.config; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.Iterators; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.AbstractSingle; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; + +import java.util.Iterator; +import java.util.List; + +@JsonTypeName("flatten") +public class FlattenPOP extends AbstractSingle { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenPOP.class); + + private SchemaPath column; + + @JsonCreator + public FlattenPOP( + @JsonProperty("child") PhysicalOperator child, + @JsonProperty("column") SchemaPath column) { + super(child); + this.column = column; + } + + + @Override + public Iterator<PhysicalOperator> iterator() { + return Iterators.singletonIterator(child); + } + + public SchemaPath getColumn() { + return column; + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitFlatten(this, value); + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new FlattenPOP(child, column); + } + + @Override + public int getOperatorType() { + // TODO - add this operator to the protobuf definition + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java new file mode 100644 index 0000000..6f02824 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.flatten; + +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.FlattenPOP; +import org.apache.drill.exec.physical.config.Project; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.RecordBatch; + +import com.google.common.base.Preconditions; + +public class FlattenBatchCreator implements BatchCreator<FlattenPOP>{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenBatchCreator.class); + + @Override + public RecordBatch getBatch(FragmentContext context, FlattenPOP config, List<RecordBatch> children) throws ExecutionSetupException { + Preconditions.checkArgument(children.size() == 1); + return new FlattenRecordBatch(config, children.iterator().next(), context); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java new file mode 100644 index 0000000..5171a25 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -0,0 +1,374 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.flatten; + +import java.io.IOException; +import java.util.List; + +import com.carrotsearch.hppc.IntOpenHashSet; +import com.google.common.base.Preconditions; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.expression.ErrorCollector; +import org.apache.drill.common.expression.ErrorCollectorImpl; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; +import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.expr.DrillFuncHolderExpr; +import org.apache.drill.exec.expr.ExpressionTreeMaterializer; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.expr.ValueVectorReadExpression; +import org.apache.drill.exec.expr.ValueVectorWriteExpression; +import org.apache.drill.exec.expr.fn.DrillComplexWriterFuncHolder; +import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.FlattenPOP; +import org.apache.drill.exec.record.AbstractSingleRecordBatch; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.RepeatedVector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.RepeatedMapVector; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; + +import com.google.common.collect.Lists; +import com.sun.codemodel.JExpr; + +public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenRecordBatch.class); + + private Flattener flattener; + private List<ValueVector> allocationVectors; + private List<ComplexWriter> complexWriters; + private boolean hasRemainder = false; + private int remainderIndex = 0; + private int recordCount; + // the buildSchema method is always called first by a short circuit path to return schema information to the client + // this information is not entirely accurate as Drill determines schema on the fly, so here it needs to have modified + // behavior for that call to setup the schema for the flatten operation + private boolean fastSchemaCalled; + + private static final String EMPTY_STRING = ""; + + private class ClassifierResult { + public boolean isStar = false; + public List<String> outputNames; + public String prefix = ""; + + private void clear() { + isStar = false; + prefix = ""; + if (outputNames != null) { + outputNames.clear(); + } + + // note: don't clear the internal maps since they have cumulative data.. + } + } + + public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { + super(pop, context, incoming); + fastSchemaCalled = false; + } + + @Override + public int getRecordCount() { + return recordCount; + } + + + @Override + protected void killIncoming(boolean sendUpstream) { + super.killIncoming(sendUpstream); + hasRemainder = false; + } + + + @Override + public IterOutcome innerNext() { + if (hasRemainder) { + handleRemainder(); + return IterOutcome.OK; + } + return super.innerNext(); + } + + @Override + public VectorContainer getOutgoingContainer() { + return this.container; + } + + private void setFlattenVector() { + try { + flattener.setFlattenField((RepeatedVector) incoming.getValueAccessorById( + incoming.getSchema().getColumn( + incoming.getValueVectorId( + popConfig.getColumn()).getFieldIds()[0]).getValueClass(), + incoming.getValueVectorId(popConfig.getColumn()).getFieldIds()).getValueVector()); + } catch (Exception ex) { + throw new DrillRuntimeException("Trying to flatten a non-repeated filed."); + } + } + + @Override + protected IterOutcome doWork() { + int incomingRecordCount = incoming.getRecordCount(); + + if (!doAlloc()) { + outOfMemory = true; + return IterOutcome.OUT_OF_MEMORY; + } + + // we call this in setupSchema, but we also need to call it here so we have a reference to the appropriate vector + // inside of the the flattener for the current batch + setFlattenVector(); + + int childCount = flattener.getFlattenField().getAccessor().getValueCount(); + int outputRecords = flattener.flattenRecords(0, incomingRecordCount, 0); + // TODO - change this to be based on the repeated vector length + if (outputRecords < childCount) { + setValueCount(outputRecords); + hasRemainder = true; + remainderIndex = outputRecords; + this.recordCount = remainderIndex; + } else { + setValueCount(outputRecords); + for(VectorWrapper<?> v: incoming) { + v.clear(); + } + this.recordCount = outputRecords; + } + // In case of complex writer expression, vectors would be added to batch run-time. + // We have to re-build the schema. + if (complexWriters != null) { + container.buildSchema(SelectionVectorMode.NONE); + } + + return IterOutcome.OK; + } + + private void handleRemainder() { + int remainingRecordCount = flattener.getFlattenField().getAccessor().getValueCount() - remainderIndex; + if (!doAlloc()) { + outOfMemory = true; + return; + } + + int projRecords = flattener.flattenRecords(remainderIndex, remainingRecordCount, 0); + if (projRecords < remainingRecordCount) { + setValueCount(projRecords); + this.recordCount = projRecords; + remainderIndex += projRecords; + } else { + setValueCount(remainingRecordCount); + hasRemainder = false; + remainderIndex = 0; + for (VectorWrapper<?> v : incoming) { + v.clear(); + } + this.recordCount = remainingRecordCount; + } + // In case of complex writer expression, vectors would be added to batch run-time. + // We have to re-build the schema. + if (complexWriters != null) { + container.buildSchema(SelectionVectorMode.NONE); + } + } + + public void addComplexWriter(ComplexWriter writer) { + complexWriters.add(writer); + } + + private boolean doAlloc() { + //Allocate vv in the allocationVectors. + for (ValueVector v : this.allocationVectors) { + if (!v.allocateNewSafe()) { + return false; + } + } + + //Allocate vv for complexWriters. + if (complexWriters == null) { + return true; + } + + for (ComplexWriter writer : complexWriters) { + writer.allocate(); + } + + return true; + } + + private void setValueCount(int count) { + for (ValueVector v : allocationVectors) { + ValueVector.Mutator m = v.getMutator(); + m.setValueCount(count); + } + + if (complexWriters == null) { + return; + } + + for (ComplexWriter writer : complexWriters) { + writer.setValueCount(count); + } + } + + private FieldReference getRef(NamedExpression e) { + FieldReference ref = e.getRef(); + PathSegment seg = ref.getRootSegment(); + + return ref; + } + + @Override + public IterOutcome buildSchema() throws SchemaChangeException { + incoming.buildSchema(); + if ( ! fastSchemaCalled ) { + for (VectorWrapper vw : incoming) { + ValueVector vector = container.addOrGet(vw.getField()); + container.add(vector); + } + fastSchemaCalled = true; + container.buildSchema(SelectionVectorMode.NONE); + return IterOutcome.OK_NEW_SCHEMA; + } + else { + setupNewSchema(); + return IterOutcome.OK_NEW_SCHEMA; + } + } + + @Override + protected boolean setupNewSchema() throws SchemaChangeException { + this.allocationVectors = Lists.newArrayList(); + container.clear(); + final List<NamedExpression> exprs = getExpressionList(); + final ErrorCollector collector = new ErrorCollectorImpl(); + final List<TransferPair> transfers = Lists.newArrayList(); + + final ClassGenerator<Flattener> cg = CodeGenerator.getRoot(Flattener.TEMPLATE_DEFINITION, context.getFunctionRegistry()); + IntOpenHashSet transferFieldIds = new IntOpenHashSet(); + + RepeatedVector flattenField = ((RepeatedVector) incoming.getValueAccessorById( + incoming.getSchema().getColumn( + incoming.getValueVectorId( + popConfig.getColumn()).getFieldIds()[0]).getValueClass(), + incoming.getValueVectorId(popConfig.getColumn()).getFieldIds()).getValueVector()); + + NamedExpression namedExpression = new NamedExpression(popConfig.getColumn(), new FieldReference(popConfig.getColumn())); + LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true); + ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr; + TypedFieldId id = vectorRead.getFieldId(); + Preconditions.checkNotNull(incoming); + + TransferPair tp = null; + if (flattenField instanceof RepeatedMapVector) { + tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap(); + } else { + ValueVector vvIn = flattenField.getAccessor().getAllChildValues(); + tp = vvIn.getTransferPair(); + } + transfers.add(tp); + container.add(tp.getTo()); + transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]); + + logger.debug("Added transfer for project expression."); + + ClassifierResult result = new ClassifierResult(); + + for (int i = 0; i < exprs.size(); i++) { + namedExpression = exprs.get(i); + result.clear(); + + String outputName = getRef(namedExpression).getRootSegment().getPath(); + if (result != null && result.outputNames != null && result.outputNames.size() > 0) { + for (int j = 0; j < result.outputNames.size(); j++) { + if (!result.outputNames.get(j).equals(EMPTY_STRING)) { + outputName = result.outputNames.get(j); + break; + } + } + } + + expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true); + final MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType()); + if (collector.hasErrors()) { + throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); + } + if (expr instanceof DrillFuncHolderExpr && + ((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder()) { + // Need to process ComplexWriter function evaluation. + // Lazy initialization of the list of complex writers, if not done yet. + if (complexWriters == null) { + complexWriters = Lists.newArrayList(); + } + + // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer. + ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef()); + cg.addExpr(expr); + } else{ + // need to do evaluation. + ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); + allocationVectors.add(vector); + TypedFieldId fid = container.add(vector); + ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); + HoldingContainer hc = cg.addExpr(write); + + cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); + logger.debug("Added eval for project expression."); + } + } + + cg.rotateBlock(); + cg.getEvalBlock()._return(JExpr.TRUE); + + container.buildSchema(SelectionVectorMode.NONE); + + try { + this.flattener = context.getImplementationClass(cg.getCodeGenerator()); + flattener.setup(context, incoming, this, transfers); + } catch (ClassTransformationException | IOException e) { + throw new SchemaChangeException("Failure while attempting to load generated class", e); + } + return true; + } + + private List<NamedExpression> getExpressionList() { + + List<NamedExpression> exprs = Lists.newArrayList(); + for (MaterializedField field : incoming.getSchema()) { + if (field.getPath().equals(popConfig.getColumn())) { + continue; + } + exprs.add(new NamedExpression(field.getPath(), new FieldReference(field.getPath()))); + } + return exprs; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java new file mode 100644 index 0000000..af4cead --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.flatten; + +import java.util.List; + +import javax.inject.Named; + +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; + +import com.google.common.collect.ImmutableList; +import org.apache.drill.exec.vector.RepeatedVector; + +public abstract class FlattenTemplate implements Flattener { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenTemplate.class); + + private ImmutableList<TransferPair> transfers; + private SelectionVector2 vector2; + private SelectionVector4 vector4; + private SelectionVectorMode svMode; + RepeatedVector fieldToFlatten; + private int groupIndex; + // this allows for groups to be written between batches if we run out of space, for cases where we have finished + // a batch on the boundary it will be set to 0 + private int childIndexWithinCurrGroup; + // calculating the current group size requires reading the start and end out of the offset vector, this only happens + // once and is stored here for faster reference + private int currGroupSize; + private int childIndex; + + public FlattenTemplate() throws SchemaChangeException { + childIndexWithinCurrGroup = -1; + } + + @Override + public void setFlattenField(RepeatedVector flattenField) { + this.fieldToFlatten = flattenField; + } + + public RepeatedVector getFlattenField() { + return fieldToFlatten; + } + + @Override + public final int flattenRecords(int startIndex, final int recordCount, int firstOutputIndex) { + startIndex = childIndex; + switch (svMode) { + case FOUR_BYTE: + throw new UnsupportedOperationException("Flatten does not support selection vector inputs."); + + case TWO_BYTE: + throw new UnsupportedOperationException("Flatten does not support selection vector inputs."); + + case NONE: + if (childIndexWithinCurrGroup == -1) { + childIndexWithinCurrGroup = 0; + } + outer: + for ( ; groupIndex < fieldToFlatten.getAccessor().getGroupCount(); groupIndex++) { + currGroupSize = fieldToFlatten.getAccessor().getGroupSizeAtIndex(groupIndex); + for ( ; childIndexWithinCurrGroup < currGroupSize; childIndexWithinCurrGroup++) { + if (!doEval(groupIndex, firstOutputIndex)) { + break outer; + } + firstOutputIndex++; + childIndex++; + } + childIndexWithinCurrGroup = 0; + } + + for (TransferPair t : transfers) { + t.splitAndTransfer(startIndex, childIndex - startIndex); + } + return childIndex - startIndex; + + default: + throw new UnsupportedOperationException(); + } + } + + @Override + public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException{ + + this.svMode = incoming.getSchema().getSelectionVectorMode(); + switch (svMode) { + case FOUR_BYTE: + throw new UnsupportedOperationException("Flatten does not support selection vector inputs."); + case TWO_BYTE: + throw new UnsupportedOperationException("Flatten does not support selection vector inputs."); + } + this.transfers = ImmutableList.copyOf(transfers); + doSetup(context, incoming, outgoing); + } + + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); + public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java new file mode 100644 index 0000000..49b9c1b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.flatten; + +import java.util.List; + +import org.apache.drill.exec.compile.TemplateClassDefinition; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.vector.RepeatedVector; + +public interface Flattener { + + public abstract void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException; + public abstract int flattenRecords(int startIndex, int recordCount, int firstOutputIndex); + public void setFlattenField(RepeatedVector repeatedColumn); + public RepeatedVector getFlattenField(); + + public static TemplateClassDefinition<Flattener> TEMPLATE_DEFINITION = new TemplateClassDefinition<Flattener>(Flattener.class, FlattenTemplate.class); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillFlattenPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillFlattenPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillFlattenPrel.java new file mode 100644 index 0000000..1409347 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillFlattenPrel.java @@ -0,0 +1,75 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.planner.physical; + +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.FlattenPOP; +import org.apache.drill.exec.planner.common.DrillFilterRelBase; +import org.apache.drill.exec.planner.logical.DrillOptiq; +import org.apache.drill.exec.planner.logical.DrillParseContext; +import org.apache.drill.exec.planner.physical.visitor.PrelVisitor; +import org.apache.drill.exec.record.BatchSchema; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.rex.RexNode; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +public class DrillFlattenPrel extends SinglePrel implements Prel { + + RexNode toFlatten; + + public DrillFlattenPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode toFlatten) { + super(cluster, traits, child); + this.toFlatten = toFlatten; + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new DrillFlattenPrel(getCluster(), traitSet, sole(inputs), toFlatten); + } + + @Override + public Iterator<Prel> iterator() { + return PrelUtil.iter(getChild()); + } + + @Override + public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { + Prel child = (Prel) this.getChild(); + + PhysicalOperator childPOP = child.getPhysicalOperator(creator); + FlattenPOP f = new FlattenPOP(childPOP, (SchemaPath) getFlattenExpression(new DrillParseContext())); + return creator.addMetadata(this, f); + } + + @Override + public BatchSchema.SelectionVectorMode getEncoding() { + return BatchSchema.SelectionVectorMode.NONE; + } + + protected LogicalExpression getFlattenExpression(DrillParseContext context){ + return DrillOptiq.toDrill(context, getChild(), toFlatten); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java index a036a46..38274c7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.planner.physical; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.OptionValidator; import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator; @@ -48,9 +49,11 @@ public class PlannerSettings implements Context{ public static final OptionValidator HASH_SINGLE_KEY = new BooleanValidator("planner.enable_hash_single_key", true); public OptionManager options = null; + public FunctionImplementationRegistry functionImplementationRegistry = null; - public PlannerSettings(OptionManager options){ + public PlannerSettings(OptionManager options, FunctionImplementationRegistry functionImplementationRegistry){ this.options = options; + this.functionImplementationRegistry = functionImplementationRegistry; } public OptionManager getOptions() { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java index 1f621da..310e18c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java @@ -66,7 +66,7 @@ public class ProjectPrel extends DrillProjectRelBase implements Prel{ @Override public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E { - return logicalVisitor.visitPrel(this, value); + return logicalVisitor.visitProject(this, value); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java index 81f247d..b84b345 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.physical.visitor; import org.apache.drill.exec.planner.physical.ExchangePrel; import org.apache.drill.exec.planner.physical.JoinPrel; import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.physical.ProjectPrel; import org.apache.drill.exec.planner.physical.ScanPrel; import org.apache.drill.exec.planner.physical.ScreenPrel; import org.apache.drill.exec.planner.physical.WriterPrel; @@ -43,6 +44,11 @@ public class BasePrelVisitor<RETURN, EXTRA, EXCEP extends Throwable> implements } @Override + public RETURN visitProject(ProjectPrel prel, EXTRA value) throws EXCEP { + return visitPrel(prel, value); + } + + @Override public RETURN visitScreen(ScreenPrel prel, EXTRA value) throws EXCEP { return visitPrel(prel, value); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java index f519220..b3a25c1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.physical.visitor; import org.apache.drill.exec.planner.physical.ExchangePrel; import org.apache.drill.exec.planner.physical.JoinPrel; import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.physical.ProjectPrel; import org.apache.drill.exec.planner.physical.ScanPrel; import org.apache.drill.exec.planner.physical.ScreenPrel; import org.apache.drill.exec.planner.physical.WriterPrel; @@ -33,6 +34,7 @@ public interface PrelVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitWriter(WriterPrel prel, EXTRA value) throws EXCEP; public RETURN visitScan(ScanPrel prel, EXTRA value) throws EXCEP; public RETURN visitJoin(JoinPrel prel, EXTRA value) throws EXCEP; + public RETURN visitProject(ProjectPrel prel, EXTRA value) throws EXCEP; public RETURN visitPrel(Prel prel, EXTRA value) throws EXCEP; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RewriteProjectToFlatten.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RewriteProjectToFlatten.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RewriteProjectToFlatten.java new file mode 100644 index 0000000..d4b3573 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RewriteProjectToFlatten.java @@ -0,0 +1,114 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.planner.physical.visitor; + +import com.google.common.collect.Lists; +import net.hydromatic.optiq.tools.RelConversionException; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.planner.physical.DrillFlattenPrel; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.physical.ProjectPrel; +import org.apache.drill.exec.planner.physical.visitor.BasePrelVisitor; +import org.apache.drill.exec.planner.types.RelDataTypeDrillImpl; +import org.apache.drill.exec.planner.types.RelDataTypeHolder; +import org.eigenbase.rel.ProjectRelBase; +import org.eigenbase.rel.RelShuttleImpl; +import org.apache.drill.exec.planner.sql.DrillOperatorTable; +import org.eigenbase.rel.ProjectRel; +import org.eigenbase.rel.RelNode; +import org.eigenbase.reltype.RelDataTypeFactory; +import org.eigenbase.reltype.RelDataTypeField; +import org.eigenbase.reltype.RelDataTypeFieldImpl; +import org.eigenbase.reltype.RelRecordType; +import org.eigenbase.rex.RexBuilder; +import org.eigenbase.rex.RexCall; +import org.eigenbase.rex.RexLiteral; +import org.eigenbase.rex.RexNode; +import org.eigenbase.sql.SqlFunction; +import org.eigenbase.sql.SqlOperator; +import org.eigenbase.util.NlsString; + +import java.util.ArrayList; +import java.util.List; + +public class RewriteProjectToFlatten extends BasePrelVisitor<Prel, Object, RelConversionException> { + + RelDataTypeFactory factory; + DrillOperatorTable table; + + public RewriteProjectToFlatten(RelDataTypeFactory factory, DrillOperatorTable table) { + super(); + this.factory = factory; + this.table = table; + } + + @Override + public Prel visitPrel(Prel prel, Object value) throws RelConversionException { + List<RelNode> children = Lists.newArrayList(); + for(Prel child : prel){ + child = child.accept(this, null); + children.add(child); + } + return (Prel) prel.copy(prel.getTraitSet(), children); + } + + + @Override + public Prel visitProject(ProjectPrel node, Object unused) throws RelConversionException { + ProjectPrel project = node; + List<RexNode> exprList = new ArrayList<>(); + boolean rewrite = false; + + List<RelDataTypeField> relDataTypes = new ArrayList(); + int i = 0; + RexNode flatttenExpr = null; + for (RexNode rex : project.getChildExps()) { + RexNode newExpr = rex; + if (rex instanceof RexCall) { + RexCall function = (RexCall) rex; + String functionName = function.getOperator().getName(); + int nArgs = function.getOperands().size(); + + if (functionName.equalsIgnoreCase("flatten") ) { + rewrite = true; + if (function.getOperands().size() != 1) { + throw new RelConversionException("Flatten expression expects a single input."); + } + newExpr = function.getOperands().get(0); + RexBuilder builder = new RexBuilder(factory); + flatttenExpr = builder.makeInputRef( new RelDataTypeDrillImpl(new RelDataTypeHolder(), factory), i); + } + } + relDataTypes.add(project.getRowType().getFieldList().get(i)); + i++; + exprList.add(newExpr); + } + if (rewrite == true) { + // TODO - figure out what is the right setting for the traits + Prel newChild = ((Prel)project.getInput(0)).accept(this, null); + ProjectPrel newProject = new ProjectPrel(node.getCluster(), project.getTraitSet(), newChild, exprList, new RelRecordType(relDataTypes)); + DrillFlattenPrel flatten = new DrillFlattenPrel(project.getCluster(), project.getTraitSet(), newProject, flatttenExpr); + return flatten; + } + + Prel child = ((Prel)project.getChild()).accept(this, null); + return new ProjectPrel(node.getCluster(), project.getTraitSet(), child, exprList, new RelRecordType(relDataTypes)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RexVisitorComplexExprSplitter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RexVisitorComplexExprSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RexVisitorComplexExprSplitter.java new file mode 100644 index 0000000..73242d5 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RexVisitorComplexExprSplitter.java @@ -0,0 +1,119 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.planner.physical.visitor; + +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.planner.physical.ProjectPrel; +import org.apache.drill.exec.planner.types.RelDataTypeDrillImpl; +import org.apache.drill.exec.planner.types.RelDataTypeHolder; +import org.eigenbase.reltype.RelDataTypeFactory; +import org.eigenbase.rex.RexBuilder; +import org.eigenbase.rex.RexCall; +import org.eigenbase.rex.RexCorrelVariable; +import org.eigenbase.rex.RexDynamicParam; +import org.eigenbase.rex.RexFieldAccess; +import org.eigenbase.rex.RexInputRef; +import org.eigenbase.rex.RexLiteral; +import org.eigenbase.rex.RexLocalRef; +import org.eigenbase.rex.RexNode; +import org.eigenbase.rex.RexOver; +import org.eigenbase.rex.RexRangeRef; +import org.eigenbase.rex.RexVisitorImpl; + +import java.util.ArrayList; +import java.util.List; + +public class RexVisitorComplexExprSplitter extends RexVisitorImpl<RexNode> { + + RelDataTypeFactory factory; + FunctionImplementationRegistry funcReg; + List<RexNode> complexExprs; + List<ProjectPrel> projects; + int lastUsedIndex; + + public RexVisitorComplexExprSplitter(RelDataTypeFactory factory, FunctionImplementationRegistry funcReg, int firstUnused) { + super(true); + this.factory = factory; + this.funcReg = funcReg; + this.complexExprs = new ArrayList(); + this.lastUsedIndex = firstUnused; + } + + public List<RexNode> getComplexExprs() { + return complexExprs; + } + + @Override + public RexNode visitInputRef(RexInputRef inputRef) { + return inputRef; + } + + @Override + public RexNode visitLocalRef(RexLocalRef localRef) { + return localRef; + } + + @Override + public RexNode visitLiteral(RexLiteral literal) { + return literal; + } + + @Override + public RexNode visitOver(RexOver over) { + return over; + } + + @Override + public RexNode visitCorrelVariable(RexCorrelVariable correlVariable) { + return correlVariable; + } + + public RexNode visitCall(RexCall call) { + + String functionName = call.getOperator().getName(); + + List<RexNode> newOps = new ArrayList(); + for (RexNode operand : call.operands) { + newOps.add(operand.accept(this)); + } + if (funcReg.isFunctionComplexOutput(functionName) ) { + RexBuilder builder = new RexBuilder(factory); + RexNode ret = builder.makeInputRef( new RelDataTypeDrillImpl(new RelDataTypeHolder(), factory), lastUsedIndex); + lastUsedIndex++; + complexExprs.add(call.clone(new RelDataTypeDrillImpl(new RelDataTypeHolder(),factory), newOps)); + return ret; + } + return call.clone(new RelDataTypeDrillImpl(new RelDataTypeHolder(),factory), newOps); + } + + @Override + public RexNode visitDynamicParam(RexDynamicParam dynamicParam) { + return dynamicParam; + } + + @Override + public RexNode visitRangeRef(RexRangeRef rangeRef) { + return rangeRef; + } + + @Override + public RexNode visitFieldAccess(RexFieldAccess fieldAccess) { + return fieldAccess; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SplitUpComplexExpressions.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SplitUpComplexExpressions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SplitUpComplexExpressions.java new file mode 100644 index 0000000..f53b228 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SplitUpComplexExpressions.java @@ -0,0 +1,146 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.planner.physical.visitor; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import net.hydromatic.optiq.tools.RelConversionException; + +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.planner.logical.DrillRel; +import org.apache.drill.exec.planner.logical.DrillScanRel; +import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.planner.physical.DrillFlattenPrel; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.physical.PrelUtil; +import org.apache.drill.exec.planner.physical.ProjectPrel; +import org.apache.drill.exec.planner.physical.PrelUtil.ProjectPushInfo; +import org.apache.drill.exec.planner.physical.visitor.BasePrelVisitor; +import org.apache.drill.exec.planner.types.RelDataTypeDrillImpl; +import org.apache.drill.exec.planner.types.RelDataTypeHolder; +import org.eigenbase.rel.ProjectRelBase; +import org.eigenbase.rel.RelShuttleImpl; +import org.apache.drill.exec.planner.sql.DrillOperatorTable; +import org.eigenbase.rel.ProjectRel; +import org.eigenbase.rel.RelNode; +import org.eigenbase.reltype.RelDataTypeFactory; +import org.eigenbase.reltype.RelDataTypeField; +import org.eigenbase.reltype.RelDataTypeFieldImpl; +import org.eigenbase.reltype.RelRecordType; +import org.eigenbase.rex.RexBuilder; +import org.eigenbase.rex.RexCall; +import org.eigenbase.rex.RexLiteral; +import org.eigenbase.rex.RexNode; +import org.eigenbase.sql.SqlFunction; +import org.eigenbase.sql.SqlOperator; +import org.eigenbase.sql.type.SqlTypeName; +import org.eigenbase.util.NlsString; + +import java.util.ArrayList; +import java.util.List; + +public class SplitUpComplexExpressions extends BasePrelVisitor<Prel, Object, RelConversionException> { + + RelDataTypeFactory factory; + DrillOperatorTable table; + FunctionImplementationRegistry funcReg; + + public SplitUpComplexExpressions(RelDataTypeFactory factory, DrillOperatorTable table, FunctionImplementationRegistry funcReg) { + super(); + this.factory = factory; + this.table = table; + this.funcReg = funcReg; + } + + @Override + public Prel visitPrel(Prel prel, Object value) throws RelConversionException { + List<RelNode> children = Lists.newArrayList(); + for(Prel child : prel){ + child = child.accept(this, null); + children.add(child); + } + return (Prel) prel.copy(prel.getTraitSet(), children); + } + + + @Override + public Prel visitProject(ProjectPrel project, Object unused) throws RelConversionException { + + List<RexNode> exprList = new ArrayList<>(); + + List<RelDataTypeField> relDataTypes = new ArrayList(); + List<RelDataTypeField> origRelDataTypes = new ArrayList(); + int i = 0; + + ProjectPushInfo columnInfo = PrelUtil.getColumns(project.getInput(0).getRowType(), project.getProjects()); + + List<RexNode> newProjects = Lists.newArrayList(); + if (columnInfo == null ) { + return project; + } + int lastRexInput = columnInfo.columns.size(); + RexVisitorComplexExprSplitter exprSplitter = new RexVisitorComplexExprSplitter(factory, funcReg, lastRexInput); + + for (RexNode n : project.getChildExps()) { + newProjects.add(n.accept(columnInfo.getInputRewriter())); + } + + for (RexNode rex : project.getChildExps()) { + origRelDataTypes.add(project.getRowType().getFieldList().get(i)); + i++; + exprList.add(rex.accept(exprSplitter)); + } + List<RexNode> complexExprs = exprSplitter.getComplexExprs(); + + RelNode originalInput = project.getInput(0); + ProjectPrel childProject; + + List<RexNode> allExprs = new ArrayList(); + for (int index = 0; index < lastRexInput; index++) { + RexBuilder builder = new RexBuilder(factory); + allExprs.add(builder.makeInputRef( new RelDataTypeDrillImpl(new RelDataTypeHolder(), factory), index)); + relDataTypes.add(new RelDataTypeFieldImpl("EXPR$" + index, allExprs.size(), factory.createSqlType(SqlTypeName.ANY) )); + } + RexNode currRexNode; + int index = lastRexInput - 1; + + // if the projection expressions contained complex outputs, split them into their own individual projects + if (complexExprs.size() > 0 ) { + while (complexExprs.size() > 0) { + if ( index >= lastRexInput ) { + allExprs.remove(allExprs.size() - 1); + RexBuilder builder = new RexBuilder(factory); + allExprs.add(builder.makeInputRef( new RelDataTypeDrillImpl(new RelDataTypeHolder(), factory), index)); + } + index++; + currRexNode = complexExprs.remove(0); + allExprs.add(currRexNode); + relDataTypes.add(new RelDataTypeFieldImpl("EXPR$" + index, allExprs.size(), factory.createSqlType(SqlTypeName.ANY) )); + childProject = new ProjectPrel(project.getCluster(), project.getTraitSet(), originalInput, ImmutableList.copyOf(allExprs), new RelRecordType(relDataTypes)); + originalInput = childProject; + } + // copied from above, find a better way to do this + allExprs.remove(allExprs.size() - 1); + RexBuilder builder = new RexBuilder(factory); + allExprs.add(builder.makeInputRef( new RelDataTypeDrillImpl(new RelDataTypeHolder(), factory), index)); + relDataTypes.add(new RelDataTypeFieldImpl("EXPR$" + index, allExprs.size(), factory.createSqlType(SqlTypeName.ANY) )); + } + return new ProjectPrel(project.getCluster(), project.getTraitSet(), originalInput, exprList, new RelRecordType(origRelDataTypes)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index 0bb59bf..ca444a3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -39,6 +39,7 @@ import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.logical.DrillScreenRel; import org.apache.drill.exec.planner.logical.DrillStoreRel; import org.apache.drill.exec.planner.logical.RewriteProjectRel; +import org.apache.drill.exec.planner.physical.visitor.RewriteProjectToFlatten; import org.apache.drill.exec.planner.physical.DrillDistributionTrait; import org.apache.drill.exec.planner.physical.PhysicalPlanCreator; import org.apache.drill.exec.planner.physical.PlannerSettings; @@ -52,6 +53,7 @@ import org.apache.drill.exec.planner.physical.visitor.MemoryEstimationVisitor; import org.apache.drill.exec.planner.physical.visitor.ProducerConsumerPrelVisitor; import org.apache.drill.exec.planner.physical.visitor.RelUniqifier; import org.apache.drill.exec.planner.physical.visitor.SelectionVectorPrelVisitor; +import org.apache.drill.exec.planner.physical.visitor.SplitUpComplexExpressions; import org.apache.drill.exec.planner.physical.visitor.StarColumnConverter; import org.apache.drill.exec.planner.sql.DrillSqlWorker; import org.apache.drill.exec.server.options.OptionManager; @@ -194,6 +196,13 @@ public class DefaultSqlHandler extends AbstractSqlHandler { */ phyRelNode = JoinPrelRenameVisitor.insertRenameProject(phyRelNode); + // TODO - re-enable once execution issues are resolved. This rule allows for several flatten operations to appear + // within a single query, it also breaks up all expressions with complex outputs into their own project operations. + // It currently appears to be producing good plans, but for the flatten case it is revealing execution errors in the + // project operator. +// phyRelNode = ((Prel) phyRelNode).accept(new SplitUpComplexExpressions(planner.getTypeFactory(), context.getDrillOperatorTable(), context.getPlannerSettings().functionImplementationRegistry), null); + phyRelNode = ((Prel) phyRelNode).accept(new RewriteProjectToFlatten(planner.getTypeFactory(), context.getDrillOperatorTable()), null); + // Definitely before this one /* * 2.) * Since our operators work via names rather than indices, we have to make to reorder any http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java index a28caf1..aadc563 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java @@ -19,7 +19,7 @@ package org.apache.drill.exec.vector; import io.netty.buffer.DrillBuf; -public interface RepeatedFixedWidthVector extends ValueVector{ +public interface RepeatedFixedWidthVector extends ValueVector, RepeatedVector { /** * Allocate a new memory space for this vector. Must be called prior to using the ValueVector. * @@ -37,12 +37,13 @@ public interface RepeatedFixedWidthVector extends ValueVector{ */ public int load(int parentValueCount, int childValueCount, DrillBuf buf); - public abstract RepeatedAccessor getAccessor(); - public abstract RepeatedMutator getMutator(); public interface RepeatedAccessor extends Accessor { public int getGroupCount(); + public int getValueCount(); + public int getGroupSizeAtIndex(int index); + public ValueVector getAllChildValues(); } public interface RepeatedMutator extends Mutator { public void setValueCounts(int parentValueCount, int childValueCount); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java index 2e7689c..a499341 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java @@ -19,7 +19,7 @@ package org.apache.drill.exec.vector; import io.netty.buffer.DrillBuf; -public interface RepeatedVariableWidthVector extends ValueVector{ +public interface RepeatedVariableWidthVector extends ValueVector, RepeatedVector { /** * Allocate a new memory space for this vector. Must be called prior to using the ValueVector. * @@ -35,8 +35,6 @@ public interface RepeatedVariableWidthVector extends ValueVector{ */ public int getByteCapacity(); - public abstract RepeatedFixedWidthVector.RepeatedAccessor getAccessor(); - /** * Load the records in the provided buffer based on the given number of values. * @param dataBytes The number of bytes associated with the data array. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java new file mode 100644 index 0000000..b23ee02 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +import io.netty.buffer.DrillBuf; + +public interface RepeatedVector { + + public RepeatedFixedWidthVector.RepeatedAccessor getAccessor(); + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java index 9870dd5..c75b359 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java @@ -202,9 +202,22 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea return l; } + public int getGroupSizeAtIndex(int index) { + return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index); + } + + @Override + public ValueVector getAllChildValues() { + return vector; + } + @Override public int getValueCount() { - return offsets.getAccessor().getValueCount() - 1; +// if (offsets.getAccessor().getValueCount() == 0 ) { +// return 0; +// } else { + return offsets.getAccessor().get(offsets.getAccessor().getValueCount() - 1); +// } } public void get(int index, RepeatedListHolder holder) { @@ -249,7 +262,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea @Override public int getGroupCount() { - return size(); + return offsets.getAccessor().getValueCount() - 1; } } @@ -402,7 +415,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea return getField() // .getAsBuilder() // .setBufferLength(getBufferSize()) // - .setValueCount(accessor.getValueCount()) // + .setValueCount(accessor.getGroupCount()) // .addChild(vector.getMetadata()) // .build(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java index 2612924..beb2475 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java @@ -28,6 +28,7 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.holders.ComplexHolder; import org.apache.drill.exec.expr.holders.RepeatedMapHolder; @@ -57,7 +58,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat public final static MajorType TYPE = MajorType.newBuilder().setMinorType(MinorType.MAP).setMode(DataMode.REPEATED).build(); - private final UInt4Vector offsets; // offsets to start of each record + private final UInt4Vector offsets; // offsets to start of each record (considering record indices are 0-indexed) private final Map<String, ValueVector> vectors = Maps.newLinkedHashMap(); private final Map<String, VectorWithOrdinal> vectorIds = Maps.newHashMap(); private final RepeatedMapReaderImpl reader = new RepeatedMapReaderImpl(RepeatedMapVector.this); @@ -66,7 +67,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat private final Mutator mutator = new Mutator(); private final BufferAllocator allocator; private final MaterializedField field; - private int lastSet = -1; + private int lastPopulatedValueIndex = -1; public RepeatedMapVector(MaterializedField field, BufferAllocator allocator) { this.field = field; @@ -76,12 +77,12 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat } @Override - public void allocateNew(int parentValueCount, int childValueCount) { + public void allocateNew(int topLevelValueCount, int childValueCount) { clear(); - offsets.allocateNew(parentValueCount+1); + offsets.allocateNew(topLevelValueCount+1); offsets.zeroVector(); for (ValueVector v : vectors.values()) { - AllocationHelper.allocatePrecomputedChildCount(v, parentValueCount, 50, childValueCount); + AllocationHelper.allocatePrecomputedChildCount(v, topLevelValueCount, 50, childValueCount); } mutator.reset(); accessor.reset(); @@ -204,6 +205,10 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat } } + public TransferPair getTransferPairToSingleMap() { + return new SingleMapTransferPair(field.getPath()); + } + @Override public TransferPair getTransferPair(FieldReference ref) { return new MapTransferPair(ref); @@ -230,6 +235,74 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat return true; } + private class SingleMapTransferPair implements TransferPair{ + private RepeatedMapVector from = RepeatedMapVector.this; + private TransferPair[] pairs; + private MapVector to; + + public SingleMapTransferPair(SchemaPath path) { + + MaterializedField mf = MaterializedField.create(path, Types.required(field.getType().getMinorType())); + MapVector v = new MapVector(mf, allocator); + pairs = new TransferPair[vectors.size()]; + int i =0; + for (Map.Entry<String, ValueVector> e : vectors.entrySet()) { + TransferPair otherSide = e.getValue().getTransferPair(); + v.put(e.getKey(), otherSide.getTo()); + pairs[i++] = otherSide; + } + this.to = v; + } + + public SingleMapTransferPair(MapVector to) { + this.to = to; + pairs = new TransferPair[vectors.size()]; + int i =0; + for (Map.Entry<String, ValueVector> e : vectors.entrySet()) { + int preSize = to.vectors.size(); + ValueVector v = to.addOrGet(e.getKey(), e.getValue().getField().getType(), e.getValue().getClass()); + if (to.vectors.size() != preSize) { + v.allocateNew(); + } + pairs[i++] = e.getValue().makeTransferPair(v); + } + } + + + @Override + public void transfer() { + for (TransferPair p : pairs) { + p.transfer(); + } + to.getMutator().setValueCount(from.getAccessor().getValueCount()); + clear(); + } + + @Override + public ValueVector getTo() { + return to; + } + + @Override + public boolean copyValueSafe(int from, int to) { + for (TransferPair p : pairs) { + if (!p.copyValueSafe(from, to)) { + return false; + } + } + return true; + } + + @Override + public void splitAndTransfer(int startIndex, int length) { + for (TransferPair p : pairs) { + p.splitAndTransfer(startIndex, length); + } + to.getMutator().setValueCount(length); + } + + } + private class MapTransferPair implements TransferPair{ private final TransferPair[] pairs; @@ -278,10 +351,10 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat } @Override - public boolean copyValueSafe(int from, int to) { + public boolean copyValueSafe(int srcIndex, int destIndex) { RepeatedMapHolder holder = new RepeatedMapHolder(); - accessor.get(from, holder); - int newIndex = this.to.offsets.getAccessor().get(to); + accessor.get(srcIndex, holder); + int newIndex = to.offsets.getAccessor().get(destIndex); //todo: make these bulk copies for (int i = holder.start; i < holder.end; i++, newIndex++) { for (TransferPair p : pairs) { @@ -290,10 +363,10 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat } } } - if (!this.to.offsets.getMutator().setSafe(to+1, newIndex)) { + if (!to.offsets.getMutator().setSafe(destIndex+1, newIndex)) { return false; } - this.to.lastSet++; + to.lastPopulatedValueIndex = destIndex; return true; } @@ -413,6 +486,15 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat return offsets.getAccessor().getValueCount() - 1; } + public int getGroupSizeAtIndex(int index) { + return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index); + } + + @Override + public ValueVector getAllChildValues() { + throw new UnsupportedOperationException("Cannot retrieve inner vector from repeated map."); + } + public void get(int index, RepeatedMapHolder holder) { assert index < getValueCapacity()-1; holder.start = offsets.getAccessor().get(index); @@ -458,44 +540,43 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat } } - private void populateEmpties(int groupCount) { - int previousEnd = offsets.getAccessor().get(lastSet + 1); - for (int i = lastSet + 2; i <= groupCount; i++) { - offsets.getMutator().setSafe(i, previousEnd); + private void populateEmpties(int topLevelValueCount) { + int previousEnd = offsets.getAccessor().get(lastPopulatedValueIndex + 1); + for (int i = lastPopulatedValueIndex + 1; i < topLevelValueCount; i++) { + offsets.getMutator().setSafe(i+1, previousEnd); } - lastSet = groupCount - 1; + lastPopulatedValueIndex = topLevelValueCount - 1; } public class Mutator implements ValueVector.Mutator, RepeatedMutator { public void startNewGroup(int index) { - populateEmpties(index); - lastSet = index; + populateEmpties(index+1); offsets.getMutator().set(index+1, offsets.getAccessor().get(index)); } public int add(int index) { - int nextOffset = offsets.getAccessor().get(index+1); - boolean success = offsets.getMutator().setSafe(index+1, nextOffset+1); + int prevEnd = offsets.getAccessor().get(index+1); + boolean success = offsets.getMutator().setSafe(index+1, prevEnd+1); if (!success) { return -1; } - return nextOffset; + return prevEnd; } - @Override - public void setValueCount(int groupCount) { - populateEmpties(groupCount); - offsets.getMutator().setValueCount(groupCount+1); - int valueCount = offsets.getAccessor().get(groupCount); + public void setValueCount(int topLevelValueCount) { + populateEmpties(topLevelValueCount); + offsets.getMutator().setValueCount(topLevelValueCount+1); + int childValueCount = offsets.getAccessor().get(topLevelValueCount); for (ValueVector v : vectors.values()) { - v.getMutator().setValueCount(valueCount); + v.getMutator().setValueCount(childValueCount); } } @Override public void reset() { - lastSet = 0; + // the last non empty element index starts from -1 + lastPopulatedValueIndex = -1; } @Override @@ -521,7 +602,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat @Override public void clear() { - lastSet = 0; + getMutator().reset(); offsets.clear(); for(ValueVector v : vectors.values()) { v.clear();; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java index 7aa9846..5d85d0a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java @@ -49,7 +49,8 @@ abstract class AbstractBaseWriter implements FieldWriter{ int idx(){ return index; } - protected void resetState(){ + + public void resetState(){ state.reset(); }