Patch for DRILL-705 Currently only supports partitioning/ordering, not yet preceding or after offsets
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8def6e91 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8def6e91 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8def6e91 Branch: refs/heads/master Commit: 8def6e91489455c0ae670f49ef5ba51ef71b31bd Parents: 9bc71fc Author: Timothy Chen <tnac...@gmail.com> Authored: Sun Sep 21 23:54:40 2014 -0700 Committer: Steven Phillips <sphill...@maprtech.com> Committed: Tue Sep 23 22:24:21 2014 -0700 ---------------------------------------------------------------------- .../drill/common/expression/CastExpression.java | 4 +- .../common/logical/data/AbstractBuilder.java | 5 + .../logical/data/LogicalOperatorBase.java | 3 +- .../common/logical/data/NamedExpression.java | 2 +- .../drill/common/logical/data/Window.java | 112 ++ .../drill/common/logical/data/WindowFrame.java | 98 -- .../data/visitors/AbstractLogicalVisitor.java | 6 +- .../logical/data/visitors/LogicalVisitor.java | 5 +- .../src/main/codegen/data/AggrTypes1.tdd | 6 +- .../src/main/codegen/templates/TypeHelper.java | 5 +- .../drill/exec/expr/EvaluationVisitor.java | 3 + .../org/apache/drill/exec/memory/Accountor.java | 9 +- .../apache/drill/exec/opt/BasicOptimizer.java | 27 +- .../physical/base/AbstractPhysicalVisitor.java | 6 + .../exec/physical/base/AbstractSingle.java | 2 +- .../exec/physical/base/PhysicalOperator.java | 2 +- .../exec/physical/base/PhysicalVisitor.java | 2 + .../drill/exec/physical/config/WindowPOP.java | 79 ++ .../physical/impl/aggregate/InternalBatch.java | 8 +- .../impl/aggregate/StreamingAggBatch.java | 2 - .../physical/impl/filter/FilterRecordBatch.java | 4 +- .../physical/impl/limit/LimitRecordBatch.java | 4 +- .../impl/project/ProjectRecordBatch.java | 12 +- .../impl/svremover/RemovingRecordBatch.java | 3 +- .../physical/impl/trace/TraceRecordBatch.java | 3 +- .../StreamingWindowFrameBatchCreator.java | 36 + .../window/StreamingWindowFrameRecordBatch.java | 268 +++++ .../window/StreamingWindowFrameTemplate.java | 286 +++++ .../impl/window/StreamingWindowFramer.java | 44 + .../exec/planner/common/DrillWindowRelBase.java | 41 + .../exec/planner/fragment/StatsCollector.java | 6 + .../exec/planner/logical/DrillAggregateRel.java | 2 +- .../exec/planner/logical/DrillLimitRule.java | 2 +- .../planner/logical/DrillPushProjIntoScan.java | 3 + .../drill/exec/planner/logical/DrillRel.java | 2 +- .../exec/planner/logical/DrillRuleSets.java | 6 + .../exec/planner/logical/DrillWindowRel.java | 111 ++ .../exec/planner/logical/DrillWindowRule.java | 52 + .../exec/planner/physical/AggPrelBase.java | 19 +- .../drill/exec/planner/physical/LimitPrel.java | 8 +- .../planner/physical/StreamingWindowPrel.java | 136 +++ .../planner/physical/StreamingWindowPrule.java | 133 +++ .../exec/planner/sql/DrillOperatorTable.java | 7 +- .../exec/planner/sql/DrillSqlAggOperator.java | 2 +- .../drill/exec/planner/sql/DrillSqlWorker.java | 21 +- .../sql/handlers/CreateTableHandler.java | 5 +- .../planner/sql/handlers/DefaultSqlHandler.java | 21 +- .../sql/handlers/DescribeTableHandler.java | 4 +- .../planner/sql/handlers/ExplainHandler.java | 10 +- .../planner/sql/handlers/ShowFileHandler.java | 5 +- .../sql/handlers/ShowSchemasHandler.java | 4 +- .../planner/sql/handlers/ShowTablesHandler.java | 4 +- .../planner/sql/handlers/SqlHandlerConfig.java | 47 + .../exec/planner/sql/parser/DrillSqlCall.java | 6 +- .../exec/planner/sql/parser/SqlCreateTable.java | 6 +- .../exec/planner/sql/parser/SqlCreateView.java | 15 +- .../planner/sql/parser/SqlDescribeTable.java | 6 +- .../exec/planner/sql/parser/SqlDropView.java | 6 +- .../exec/planner/sql/parser/SqlShowFiles.java | 6 +- .../exec/planner/sql/parser/SqlShowSchemas.java | 6 +- .../exec/planner/sql/parser/SqlShowTables.java | 13 +- .../exec/planner/sql/parser/SqlUseSchema.java | 14 +- .../exec/record/AbstractSingleRecordBatch.java | 9 +- .../drill/exec/record/VectorContainer.java | 17 + .../java/org/apache/drill/exec/ExecTest.java | 2 +- .../physical/impl/limit/TestSimpleLimit.java | 1 + .../physical/impl/window/TestWindowFrame.java | 202 ++++ .../drill/exec/sql/TestWindowFunctions.java | 29 + .../src/test/resources/window/mediumData.json | 1000 ++++++++++++++++++ .../src/test/resources/window/oneKeyCount.json | 43 + .../test/resources/window/oneKeyCountData.json | 4 + .../resources/window/oneKeyCountMultiBatch.json | 72 ++ .../src/test/resources/window/twoKeys.json | 44 + .../src/test/resources/window/twoKeysData.json | 8 + .../drill/jdbc/test/TestJdbcDistQuery.java | 2 +- .../apache/drill/exec/proto/UserBitShared.java | 15 +- .../exec/proto/beans/CoreOperatorType.java | 4 +- protocol/src/main/protobuf/UserBitShared.proto | 11 +- 78 files changed, 2997 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/common/src/main/java/org/apache/drill/common/expression/CastExpression.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/expression/CastExpression.java b/common/src/main/java/org/apache/drill/common/expression/CastExpression.java index b73a447..5d09a24 100644 --- a/common/src/main/java/org/apache/drill/common/expression/CastExpression.java +++ b/common/src/main/java/org/apache/drill/common/expression/CastExpression.java @@ -23,6 +23,8 @@ import java.util.Iterator; import org.apache.drill.common.expression.visitors.ExprVisitor; import org.apache.drill.common.types.TypeProtos.MajorType; +import static com.google.common.base.Preconditions.checkNotNull; + public class CastExpression extends LogicalExpressionBase implements Iterable<LogicalExpression>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CastExpression.class); @@ -33,7 +35,7 @@ public class CastExpression extends LogicalExpressionBase implements Iterable<Lo public CastExpression(LogicalExpression input, MajorType type, ExpressionPosition pos) { super(pos); this.input = input; - this.type = type; + this.type = checkNotNull(type, "Major type cannot be null"); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/common/src/main/java/org/apache/drill/common/logical/data/AbstractBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/logical/data/AbstractBuilder.java b/common/src/main/java/org/apache/drill/common/logical/data/AbstractBuilder.java index 427b040..28424a5 100644 --- a/common/src/main/java/org/apache/drill/common/logical/data/AbstractBuilder.java +++ b/common/src/main/java/org/apache/drill/common/logical/data/AbstractBuilder.java @@ -19,6 +19,7 @@ package org.apache.drill.common.logical.data; import java.util.List; +import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.LogicalExpression; public abstract class AbstractBuilder<T extends LogicalOperator> { @@ -30,6 +31,10 @@ public abstract class AbstractBuilder<T extends LogicalOperator> { return exprs.toArray(new LogicalExpression[exprs.size()]); } + protected FieldReference[] aF(List<FieldReference> exprs){ + return exprs.toArray(new FieldReference[exprs.size()]); + } + protected NamedExpression[] aN(List<NamedExpression> exprs){ return exprs.toArray(new NamedExpression[exprs.size()]); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java b/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java index 58a00fe..6c534eb 100644 --- a/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java +++ b/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java @@ -89,5 +89,4 @@ public abstract class LogicalOperatorBase implements LogicalOperator{ logger.debug("Adding Logical Operator sub types: {}", ((Object) ops) ); return ops; } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/common/src/main/java/org/apache/drill/common/logical/data/NamedExpression.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/logical/data/NamedExpression.java b/common/src/main/java/org/apache/drill/common/logical/data/NamedExpression.java index 1550ddf..a166c25 100644 --- a/common/src/main/java/org/apache/drill/common/logical/data/NamedExpression.java +++ b/common/src/main/java/org/apache/drill/common/logical/data/NamedExpression.java @@ -30,7 +30,7 @@ public class NamedExpression { private final FieldReference ref; @JsonCreator - public NamedExpression( @JsonProperty("expr") LogicalExpression expr, @JsonProperty("ref") FieldReference ref) { + public NamedExpression(@JsonProperty("expr") LogicalExpression expr, @JsonProperty("ref") FieldReference ref) { super(); this.expr = expr; this.ref = ref; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/common/src/main/java/org/apache/drill/common/logical/data/Window.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/logical/data/Window.java b/common/src/main/java/org/apache/drill/common/logical/data/Window.java new file mode 100644 index 0000000..6dba77c --- /dev/null +++ b/common/src/main/java/org/apache/drill/common/logical/data/Window.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.common.logical.data; + +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.drill.common.expression.FieldReference; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.logical.data.visitors.LogicalVisitor; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +@JsonTypeName("window") +public class Window extends SingleInputOperator { + private final NamedExpression[] withins; + private final NamedExpression[] aggregations; + private final long start; + private final long end; + + + @JsonCreator + public Window(@JsonProperty("withins") NamedExpression[] withins, + @JsonProperty("aggregations") NamedExpression[] aggregations, + @JsonProperty("start") Long start, + @JsonProperty("end") Long end) { + super(); + this.withins = withins; + this.start = start == null ? Long.MIN_VALUE : start; + this.end = end == null ? Long.MIN_VALUE : end; + this.aggregations = aggregations; + } + + public NamedExpression[] getWithins() { + return withins; + } + + public long getStart() { + return start; + } + + public long getEnd() { + return end; + } + + public NamedExpression[] getAggregations() { + return aggregations; + } + + @Override + public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> logicalVisitor, X value) throws E { + return logicalVisitor.visitWindow(this, value); + } + + @Override + public Iterator<LogicalOperator> iterator() { + return Iterators.singletonIterator(getInput()); + } + + public static class Builder extends AbstractSingleBuilder<Window, Builder>{ + private List<NamedExpression> aggregations = Lists.newArrayList(); + private List<NamedExpression> withins = Lists.newArrayList(); + private List<Order.Ordering> orderings = Lists.newArrayList(); + private long start = Long.MIN_VALUE; + private long end = Long.MIN_VALUE; + + + public Builder addAggregation(FieldReference ref, LogicalExpression expr){ + aggregations.add(new NamedExpression(expr, ref)); + return this; + } + + public Builder addWithin(FieldReference within, LogicalExpression expr) { + withins.add(new NamedExpression(expr, within)); + return this; + } + + public Window internalBuild() { + checkState(!withins.isEmpty(), "Withins in window must not be empty."); + checkState(!aggregations.isEmpty(), "Aggregations in window must not be empty."); + return new Window(aN(withins), aN(aggregations), start, end); + } + + public Builder addOrdering(Order.Ordering ordering) { + orderings.add(ordering); + return this; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java b/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java deleted file mode 100644 index 6fb64bc..0000000 --- a/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.common.logical.data; - -import java.util.Iterator; - -import org.apache.drill.common.expression.FieldReference; -import org.apache.drill.common.logical.data.visitors.LogicalVisitor; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.collect.Iterators; - -@JsonTypeName("windowframe") -public class WindowFrame extends SingleInputOperator{ - - - private final FieldReference within; - private final FrameRef frame; - private final long start; - private final long end; - - - @JsonCreator - public WindowFrame(@JsonProperty("within") FieldReference within, @JsonProperty("ref") FrameRef frame, @JsonProperty("start") Long start, @JsonProperty("end") Long end) { - super(); - this.within = within; - this.frame = frame; - this.start = start == null ? Long.MIN_VALUE : start; - this.end = end == null ? Long.MIN_VALUE : end; - } - - - @JsonProperty("ref") - public FrameRef getFrame() { - return frame; - } - - public FieldReference getWithin() { - return within; - } - - public long getStart() { - return start; - } - - public long getEnd() { - return end; - } - - @Override - public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> logicalVisitor, X value) throws E { - return logicalVisitor.visitWindowFrame(this, value); - } - - @Override - public Iterator<LogicalOperator> iterator() { - return Iterators.singletonIterator(getInput()); - } - - - public static class FrameRef{ - private final FieldReference segment; - private final FieldReference position; - - @JsonCreator - public FrameRef(@JsonProperty("segment") FieldReference segment, @JsonProperty("position") FieldReference position) { - super(); - this.segment = segment; - this.position = position; - } - - public FieldReference getSegment() { - return segment; - } - public FieldReference getPosition() { - return position; - } - - - } -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/common/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java b/common/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java index d128b10..92e370f 100644 --- a/common/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java +++ b/common/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java @@ -32,7 +32,7 @@ import org.apache.drill.common.logical.data.Sequence; import org.apache.drill.common.logical.data.Store; import org.apache.drill.common.logical.data.Transform; import org.apache.drill.common.logical.data.Union; -import org.apache.drill.common.logical.data.WindowFrame; +import org.apache.drill.common.logical.data.Window; import org.apache.drill.common.logical.data.Writer; @@ -110,8 +110,8 @@ public abstract class AbstractLogicalVisitor<T, X, E extends Throwable> implemen } @Override - public T visitWindowFrame(WindowFrame windowFrame, X value) throws E { - return visitOp(windowFrame, value); + public T visitWindow(Window window, X value) throws E { + return visitOp(window, value); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/common/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java b/common/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java index 4bf9fbf..3a426bf 100644 --- a/common/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java +++ b/common/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java @@ -32,7 +32,8 @@ import org.apache.drill.common.logical.data.Sequence; import org.apache.drill.common.logical.data.Store; import org.apache.drill.common.logical.data.Transform; import org.apache.drill.common.logical.data.Union; -import org.apache.drill.common.logical.data.WindowFrame; +import org.apache.drill.common.logical.data.Window; +import org.apache.drill.common.logical.data.Window; import org.apache.drill.common.logical.data.Writer; /** @@ -60,6 +61,6 @@ public interface LogicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitSequence(Sequence sequence, EXTRA value) throws EXCEP; public RETURN visitTransform(Transform transform, EXTRA value) throws EXCEP; public RETURN visitUnion(Union union, EXTRA value) throws EXCEP; - public RETURN visitWindowFrame(WindowFrame windowFrame, EXTRA value) throws EXCEP; + public RETURN visitWindow(Window window, EXTRA value) throws EXCEP; public RETURN visitWriter(Writer writer, EXTRA value) throws EXCEP; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd b/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd index 812c289..5b4041c 100644 --- a/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd +++ b/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd @@ -78,7 +78,7 @@ {inputType: "NullableVarBinary", outputType: "VarBinary", runningType: "VarBinary", major: "VarBytes"} ] }, - {className: "Sum", funcName: "sum", types: [ + {className: "Sum", funcName: "sum", types: [ {inputType: "Bit", outputType: "Bit", runningType: "Bit", major: "Numeric"}, {inputType: "Int", outputType: "BigInt", runningType: "BigInt", major: "Numeric"}, {inputType: "BigInt", outputType: "BigInt", runningType: "BigInt", major: "Numeric"}, @@ -96,7 +96,7 @@ {inputType: "Interval", outputType: "Interval", runningType: "Interval", major: "Date", initialValue: "0"}, {inputType: "NullableInterval", outputType: "Interval", runningType: "Interval", major: "Date", initialValue: "0"} ] - }, + }, {className: "Count", funcName: "count", types: [ {inputType: "Bit", outputType: "Bit", runningType: "Bit", major: "Numeric"}, {inputType: "Int", outputType: "BigInt", runningType: "BigInt", major: "Numeric"}, @@ -129,4 +129,4 @@ ] } ] -} +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/codegen/templates/TypeHelper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/TypeHelper.java b/exec/java-exec/src/main/codegen/templates/TypeHelper.java index 3c0d9d3..cb6a030 100644 --- a/exec/java-exec/src/main/codegen/templates/TypeHelper.java +++ b/exec/java-exec/src/main/codegen/templates/TypeHelper.java @@ -73,8 +73,7 @@ public class TypeHelper { case LIST: return new GenericAccessor(vector); } - - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("Unable to find sql accessor for minor type [" + vector.getField().getType().getMinorType() + "] and mode [" + vector.getField().getType().getMode() + "]"); } public static ValueVector getNewVector(SchemaPath parentPath, String name, BufferAllocator allocator, MajorType type){ @@ -255,7 +254,7 @@ public class TypeHelper { default: break; } - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("Unable to find holder type for minorType: " + type); } public static ValueVector getNewVector(MaterializedField field, BufferAllocator allocator){ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java index a5b7bee..6280c40 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java @@ -71,6 +71,9 @@ import com.sun.codemodel.JLabel; import com.sun.codemodel.JType; import com.sun.codemodel.JVar; +/** + * Visitor that generates code for eval + */ public class EvaluationVisitor { private final FunctionImplementationRegistry registry; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java index 6ef46de..18c5072 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java @@ -17,6 +17,9 @@ */ package org.apache.drill.exec.memory; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import io.netty.buffer.ByteBuf; import io.netty.buffer.DrillBuf; @@ -31,9 +34,9 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.util.AssertionUtil; -import com.google.common.collect.LinkedListMultimap; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ConcurrentMap; public class Accountor { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Accountor.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java index 876ba37..98202ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java @@ -17,12 +17,7 @@ */ package org.apache.drill.exec.opt; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - +import com.google.common.collect.Lists; import org.apache.drill.common.JSONOptions; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -40,6 +35,7 @@ import org.apache.drill.common.logical.data.Project; import org.apache.drill.common.logical.data.Scan; import org.apache.drill.common.logical.data.SinkOperator; import org.apache.drill.common.logical.data.Store; +import org.apache.drill.common.logical.data.Window; import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.DataMode; @@ -54,13 +50,18 @@ import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.config.SelectionVectorRemover; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.physical.config.StreamingAggregate; +import org.apache.drill.exec.physical.config.WindowPOP; import org.apache.drill.exec.rpc.user.UserServer; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.StoragePlugin; import org.eigenbase.rel.RelFieldCollation.Direction; import org.eigenbase.rel.RelFieldCollation.NullDirection; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; public class BasicOptimizer extends Optimizer{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class); @@ -183,13 +184,22 @@ public class BasicOptimizer extends Optimizer{ return sa; } + @Override + public PhysicalOperator visitWindow(Window window, Object value) throws OptimizerException { + PhysicalOperator input = window.getInput().accept(this, value); + List<Ordering> ods = Lists.newArrayList(); + + input = new Sort(input, ods, false); + + return new WindowPOP(input, window.getWithins(), window.getAggregations(), window.getStart(), window.getEnd()); + } @Override public PhysicalOperator visitOrder(Order order, Object value) throws OptimizerException { PhysicalOperator input = order.getInput().accept(this, value); List<Ordering> ods = Lists.newArrayList(); - for(Ordering o : order.getOrderings()){ + for (Ordering o : order.getOrderings()){ ods.add(o); } return new SelectionVectorRemover(new Sort(input, ods, false)); @@ -250,7 +260,6 @@ public class BasicOptimizer extends Optimizer{ @Override public PhysicalOperator visitProject(Project project, Object obj) throws OptimizerException { -// return project.getInput().accept(this, obj); return new org.apache.drill.exec.physical.config.Project(Arrays.asList(project.getSelections()), project.iterator().next().accept(this, obj)); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index 48b3801..031ab10 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -39,6 +39,7 @@ import org.apache.drill.exec.physical.config.Trace; import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.physical.config.UnionExchange; import org.apache.drill.exec.physical.config.UnorderedReceiver; +import org.apache.drill.exec.physical.config.WindowPOP; public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> implements PhysicalVisitor<T, X, E> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractPhysicalVisitor.class); @@ -64,6 +65,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme } @Override + public T visitWindowFrame(WindowPOP windowFrame, X value) throws E { + return visitOp(windowFrame, value); + } + + @Override public T visitProject(Project project, X value) throws E{ return visitOp(project, value); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java index 2b10e6d..6d6c591 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java @@ -28,7 +28,7 @@ import com.google.common.collect.Iterators; * Describes an operator that expects a single child operator as its input. * @param <T> The type of Exec model supported. */ -public abstract class AbstractSingle extends AbstractBase{ +public abstract class AbstractSingle extends AbstractBase { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSingle.class); protected final PhysicalOperator child; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java index 8f51390..a5518ca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java @@ -37,7 +37,7 @@ import com.fasterxml.jackson.annotation.ObjectIdGenerators; @JsonPropertyOrder({ "@id" }) @JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "@id") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "pop") -public interface PhysicalOperator extends GraphValue<PhysicalOperator> { +public interface PhysicalOperator extends GraphValue<PhysicalOperator> { /** * Describes whether or not a particular physical operator can actually be executed. Most physical operators can be http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java index 8da06cb..f9a9c21 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java @@ -39,6 +39,7 @@ import org.apache.drill.exec.physical.config.Trace; import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.physical.config.UnionExchange; import org.apache.drill.exec.physical.config.UnorderedReceiver; +import org.apache.drill.exec.physical.config.WindowPOP; /** * Visitor class designed to traversal of a operator tree. Basis for a number of operator manipulations including fragmentation and materialization. @@ -80,6 +81,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP; public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP; public RETURN visitUnionExchange(UnionExchange op, EXTRA value) throws EXCEP; + public RETURN visitWindowFrame(WindowPOP op, EXTRA value) throws EXCEP; public RETURN visitProducerConsumer(ProducerConsumer op, EXTRA value) throws EXCEP; public RETURN visitIteratorValidator(IteratorValidator op, EXTRA value) throws EXCEP; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java new file mode 100644 index 0000000..17738ee --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.exec.physical.base.AbstractSingle; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.proto.UserBitShared; + +@JsonTypeName("window") +public class WindowPOP extends AbstractSingle { + + private final NamedExpression[] withins; + private final NamedExpression[] aggregations; + private final long start; + private final long end; + + public WindowPOP(@JsonProperty("child") PhysicalOperator child, + @JsonProperty("within") NamedExpression[] withins, + @JsonProperty("aggregations") NamedExpression[] aggregations, + @JsonProperty("start") long start, + @JsonProperty("end") long end) { + super(child); + this.withins = withins; + this.aggregations = aggregations; + this.start = start; + this.end = end; + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new WindowPOP(child, withins, aggregations, start, end); + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitWindowFrame(this, value); + } + + @Override + public int getOperatorType() { + return UserBitShared.CoreOperatorType.WINDOW_VALUE; + } + + public long getStart() { + return start; + } + + public long getEnd() { + return end; + } + + public NamedExpression[] getAggregations() { + return aggregations; + } + + public NamedExpression[] getWithins() { + return withins; + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java index e690060..dae9eae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java @@ -35,7 +35,11 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{ private final SelectionVector4 sv4; public InternalBatch(RecordBatch incoming) { - switch(incoming.getSchema().getSelectionVectorMode()) { + this(incoming, null); + } + + public InternalBatch(RecordBatch incoming, VectorWrapper[] ignoreWrappers){ + switch(incoming.getSchema().getSelectionVectorMode()){ case FOUR_BYTE: this.sv4 = incoming.getSelectionVector4().createNewWrapperCurrent(); this.sv2 = null; @@ -49,7 +53,7 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{ this.sv2 = null; } this.schema = incoming.getSchema(); - this.container = VectorContainer.getTransferClone(incoming); + this.container = VectorContainer.getTransferClone(incoming, ignoreWrappers); } public BatchSchema getSchema() { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index ced5179..4d3925e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -325,9 +325,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { default: throw new IllegalStateException(); - } - } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index f1fcce0..85f664c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -76,13 +76,15 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ } @Override - protected void doWork() { + protected IterOutcome doWork() { int recordCount = incoming.getRecordCount(); filter.filterBatch(recordCount); // for (VectorWrapper<?> v : container) { // ValueVector.Mutator m = v.getValueVector().getMutator(); // m.setValueCount(recordCount); // } + + return IterOutcome.OK; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index f5bc9f9..8f4d90f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -122,7 +122,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { } @Override - protected void doWork() { + protected IterOutcome doWork() { for(TransferPair tp : transfers) { tp.transfer(); } @@ -139,6 +139,8 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { limitWithNoSV(recordCount); } } + + return IterOutcome.OK; } private IterOutcome produceEmptyFirstBatch() { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index a1a8340..224753e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -70,7 +70,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.sun.codemodel.JExpr; -public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ +public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class); private Projector projector; @@ -133,13 +133,13 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } @Override - protected void doWork() { + protected IterOutcome doWork() { // VectorUtil.showVectorAccessibleContent(incoming, ","); int incomingRecordCount = incoming.getRecordCount(); if (!doAlloc()) { outOfMemory = true; - return; + return IterOutcome.OUT_OF_MEMORY; } int outputRecords = projector.projectRecords(0, incomingRecordCount, 0); @@ -160,6 +160,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ if (complexWriters != null) { container.buildSchema(SelectionVectorMode.NONE); } + + return IterOutcome.OK; } private void handleRemainder() { @@ -177,7 +179,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ setValueCount(remainingRecordCount); hasRemainder = false; remainderIndex = 0; - for(VectorWrapper<?> v: incoming) { + for (VectorWrapper<?> v : incoming) { v.clear(); } this.recordCount = remainingRecordCount; @@ -259,7 +261,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } @Override - protected void setupNewSchema() throws SchemaChangeException{ + protected void setupNewSchema() throws SchemaChangeException { this.allocationVectors = Lists.newArrayList(); container.clear(); final List<NamedExpression> exprs = getExpressionList(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 97f3608..7178d4c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -91,7 +91,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect } @Override - protected void doWork() { + protected IterOutcome doWork() { int incomingRecordCount = incoming.getRecordCount(); int copiedRecords = copier.copyRecords(0, incomingRecordCount); @@ -125,6 +125,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect incomingRecordCount, incomingRecordCount - remainderIndex, incoming.getSchema()); + return IterOutcome.OK; } private void handleRemainder() { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java index 6d90962..4e644df 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java @@ -100,7 +100,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { * this record batch to a log file. */ @Override - protected void doWork() { + protected IterOutcome doWork() { boolean incomingHasSv2 = incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE; if (incomingHasSv2) { @@ -121,6 +121,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { if (incomingHasSv2) { sv = wrap.getSv2(); } + return IterOutcome.OK; } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java new file mode 100644 index 0000000..9b8929f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.window; + +import com.google.common.collect.Iterables; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.WindowPOP; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.RecordBatch; + +import java.util.List; + +public class StreamingWindowFrameBatchCreator implements BatchCreator<WindowPOP> { + + @Override + public RecordBatch getBatch(FragmentContext context, WindowPOP config, List<RecordBatch> children) throws ExecutionSetupException { + return new StreamingWindowFrameRecordBatch(config, context, Iterables.getOnlyElement(children)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java new file mode 100644 index 0000000..2a92089 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.window; + +import com.google.common.collect.Lists; +import com.sun.codemodel.JExpr; +import com.sun.codemodel.JVar; +import org.apache.drill.common.expression.ErrorCollector; +import org.apache.drill.common.expression.ErrorCollectorImpl; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.exec.compile.sig.GeneratorMapping; +import org.apache.drill.exec.compile.sig.MappingSet; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.expr.ExpressionTreeMaterializer; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.expr.ValueVectorReadExpression; +import org.apache.drill.exec.expr.ValueVectorWriteExpression; +import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; +import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.WindowPOP; +import org.apache.drill.exec.record.AbstractSingleRecordBatch; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.vector.ValueVector; + +import java.io.IOException; +import java.util.List; + +public class StreamingWindowFrameRecordBatch extends AbstractSingleRecordBatch<WindowPOP> { + private StreamingWindowFramer framer; + + public StreamingWindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { + super(popConfig, context, incoming); + } + + @Override + protected void setupNewSchema() throws SchemaChangeException { + container.clear(); + + try { + this.framer = createFramer(); + } catch (ClassTransformationException | IOException ex) { + throw new SchemaChangeException("Failed to create framer: " + ex); + } + } + + private void getIndex(ClassGenerator<StreamingWindowFramer> g) { + switch (incoming.getSchema().getSelectionVectorMode()) { + case FOUR_BYTE: { + JVar var = g.declareClassField("sv4_", g.getModel()._ref(SelectionVector4.class)); + g.getBlock("setupInterior").assign(var, JExpr.direct("incoming").invoke("getSelectionVector4")); + g.getBlock("getVectorIndex")._return(var.invoke("get").arg(JExpr.direct("recordIndex"))); + return; + } + case NONE: { + g.getBlock("getVectorIndex")._return(JExpr.direct("recordIndex")); + return; + } + case TWO_BYTE: { + JVar var = g.declareClassField("sv2_", g.getModel()._ref(SelectionVector2.class)); + g.getBlock("setupInterior").assign(var, JExpr.direct("incoming").invoke("getSelectionVector2")); + g.getBlock("getVectorIndex")._return(var.invoke("get").arg(JExpr.direct("recordIndex"))); + return; + } + + default: + throw new IllegalStateException(); + } + } + + private StreamingWindowFramer createFramer() throws SchemaChangeException, IOException, ClassTransformationException { + int configLength = popConfig.getAggregations().length; + List<LogicalExpression> valueExprs = Lists.newArrayList(); + LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getWithins().length]; + + ErrorCollector collector = new ErrorCollectorImpl(); + + for (int i = 0; i < configLength; i++) { + NamedExpression ne = popConfig.getAggregations()[i]; + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry()); + if (expr == null) { + continue; + } + + final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); + ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); + TypedFieldId id = container.add(vector); + valueExprs.add(new ValueVectorWriteExpression(id, expr, true)); + } + + int j = 0; + LogicalExpression[] windowExprs = new LogicalExpression[incoming.getSchema().getFieldCount()]; + // TODO: Should transfer all existing columns instead of copy. Currently this is not easily doable because + // we are not processing one entire batch in one iteration, so cannot simply transfer. + for (VectorWrapper wrapper : incoming) { + ValueVector vv = wrapper.isHyper() ? wrapper.getValueVectors()[0] : wrapper.getValueVector(); + ValueVector vector = TypeHelper.getNewVector(vv.getField(), oContext.getAllocator()); + TypedFieldId id = container.add(vector); + final LogicalExpression expr = ExpressionTreeMaterializer.materialize( + new ValueVectorReadExpression(new TypedFieldId(vv.getField().getType(), wrapper.isHyper(), j)), + incoming, + collector, + context.getFunctionRegistry()); + windowExprs[j] = new ValueVectorWriteExpression(id, expr, true); + j++; + } + + for (int i = 0; i < keyExprs.length; i++) { + NamedExpression ne = popConfig.getWithins()[i]; + + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry()); + if (expr == null) { + continue; + } + + keyExprs[i] = expr; + } + + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } + + final ClassGenerator<StreamingWindowFramer> cg = CodeGenerator.getRoot(StreamingWindowFramer.TEMPLATE_DEFINITION, context.getFunctionRegistry()); + setupIsSame(cg, keyExprs); + setupIsSameFromBatch(cg, keyExprs); + addRecordValues(cg, valueExprs.toArray(new LogicalExpression[valueExprs.size()])); + outputWindowValues(cg, windowExprs); + + cg.getBlock("resetValues")._return(JExpr.TRUE); + + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + getIndex(cg); + StreamingWindowFramer agg = context.getImplementationClass(cg); + agg.setup( + context, + incoming, + this, + StreamingWindowFrameTemplate.UNBOUNDED, StreamingWindowFrameTemplate.CURRENT_ROW + ); + return agg; + } + + private final GeneratorMapping IS_SAME_PREV_INTERNAL_BATCH_READ = GeneratorMapping.create("isSameFromBatch", "isSameFromBatch", null, null); // the internal batch changes each time so we need to redo setup. + private final GeneratorMapping IS_SAME_PREV = GeneratorMapping.create("setupInterior", "isSameFromBatch", null, null); + private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ); + private final MappingSet ISA_B2 = new MappingSet("b2Index", null, "incoming", null, IS_SAME_PREV, IS_SAME_PREV); + + private void setupIsSameFromBatch(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] keyExprs) { + cg.setMappingSet(ISA_B1); + for (LogicalExpression expr : keyExprs) { + // first, we rewrite the evaluation stack for each side of the comparison. + cg.setMappingSet(ISA_B1); + ClassGenerator.HoldingContainer first = cg.addExpr(expr, false); + cg.setMappingSet(ISA_B2); + ClassGenerator.HoldingContainer second = cg.addExpr(expr, false); + + LogicalExpression fh = FunctionGenerationHelper.getComparator(first, second, context.getFunctionRegistry()); + ClassGenerator.HoldingContainer out = cg.addExpr(fh, false); + cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE); + } + cg.getEvalBlock()._return(JExpr.TRUE); + } + + private final GeneratorMapping IS_SAME = GeneratorMapping.create("setupInterior", "isSame", null, null); + private final MappingSet IS_SAME_I1 = new MappingSet("index1", null, IS_SAME, IS_SAME); + private final MappingSet IS_SAME_I2 = new MappingSet("index2", null, IS_SAME, IS_SAME); + + private void setupIsSame(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] keyExprs) { + cg.setMappingSet(IS_SAME_I1); + for (LogicalExpression expr : keyExprs) { + // first, we rewrite the evaluation stack for each side of the comparison. + cg.setMappingSet(IS_SAME_I1); + ClassGenerator.HoldingContainer first = cg.addExpr(expr, false); + cg.setMappingSet(IS_SAME_I2); + ClassGenerator.HoldingContainer second = cg.addExpr(expr, false); + + LogicalExpression fh = FunctionGenerationHelper.getComparator(first, second, context.getFunctionRegistry()); + ClassGenerator.HoldingContainer out = cg.addExpr(fh, false); + cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE); + } + cg.getEvalBlock()._return(JExpr.TRUE); + } + + private final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupInterior", "addRecord", null, null); + private final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupInterior", "outputRecordValues", "resetValues", "cleanup"); + private final MappingSet EVAL = new MappingSet("index", "outIndex", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE); + + private void addRecordValues(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] valueExprs) { + cg.setMappingSet(EVAL); + for (LogicalExpression ex : valueExprs) { + ClassGenerator.HoldingContainer hc = cg.addExpr(ex); + cg.getBlock(ClassGenerator.BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); + } + cg.getBlock(ClassGenerator.BlockType.EVAL)._return(JExpr.TRUE); + } + + private final GeneratorMapping OUTPUT_WINDOW_VALUES = GeneratorMapping.create("setupInterior", "outputWindowValues", null, null); + private final MappingSet WINDOW_VALUES = new MappingSet("inIndex" /* read index */, "outIndex" /* write index */, "incoming", "outgoing", OUTPUT_WINDOW_VALUES, OUTPUT_WINDOW_VALUES); + + private void outputWindowValues(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] valueExprs) { + cg.setMappingSet(WINDOW_VALUES); + for (int i = 0; i < valueExprs.length; i++) { + ClassGenerator.HoldingContainer hc = cg.addExpr(valueExprs[i]); + cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); + } + cg.getEvalBlock()._return(JExpr.TRUE); + } + + @Override + protected IterOutcome doWork() { + StreamingWindowFramer.AggOutcome out = framer.doWork(); + + while (out == StreamingWindowFramer.AggOutcome.UPDATE_AGGREGATOR) { + framer = null; + try { + setupNewSchema(); + } catch (SchemaChangeException e) { + return IterOutcome.STOP; + } + out = framer.doWork(); + } + + if (out == StreamingWindowFramer.AggOutcome.RETURN_AND_COMPLETE) { + done = true; + } + + return framer.getOutcome(); + } + + @Override + public int getRecordCount() { + return framer.getOutputCount(); + } + + @Override + public void cleanup() { + if (framer != null) { + framer.cleanup(); + } + super.cleanup(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java new file mode 100644 index 0000000..b4e3fed --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.window; + +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.aggregate.InternalBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorWrapper; + +import javax.inject.Named; + +public abstract class StreamingWindowFrameTemplate implements StreamingWindowFramer { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingWindowFramer.class); + private static final String TOO_BIG_ERROR = "Couldn't add value to an empty batch. This likely means that a single value is too long for a varlen field."; + private static final boolean EXTRA_DEBUG = false; + public static final int UNBOUNDED = -1; + public static final int CURRENT_ROW = 0; + private boolean first = true; + private int previousIndex = 0; + private int underlyingIndex = -1; + private int currentIndex; + private boolean pendingOutput = false; + private RecordBatch.IterOutcome outcome; + private int outputCount = 0; + private RecordBatch incoming; + private RecordBatch outgoing; + private FragmentContext context; + private InternalBatch previousBatch = null; + private int precedingConfig = UNBOUNDED; + private int followingConfig = CURRENT_ROW; + + + @Override + public void setup(FragmentContext context, + RecordBatch incoming, + RecordBatch outgoing, + int precedingConfig, + int followingConfig) throws SchemaChangeException { + this.context = context; + this.incoming = incoming; + this.outgoing = outgoing; + this.precedingConfig = precedingConfig; + this.followingConfig = followingConfig; + + setupInterior(incoming, outgoing); + } + + + private void allocateOutgoing() { + for (VectorWrapper<?> w : outgoing){ + w.getValueVector().allocateNew(); + } + } + + @Override + public RecordBatch.IterOutcome getOutcome() { + return outcome; + } + + @Override + public int getOutputCount() { + return outputCount; + } + + private AggOutcome tooBigFailure() { + context.fail(new Exception(TOO_BIG_ERROR)); + this.outcome = RecordBatch.IterOutcome.STOP; + return AggOutcome.RETURN_OUTCOME; + } + + @Override + public AggOutcome doWork() { + // if we're in the first state, allocate outgoing. + try { + if (first) { + allocateOutgoing(); + } + + // setup for new output and pick any remainder. + if (pendingOutput) { + allocateOutgoing(); + pendingOutput = false; + outputToBatch(previousIndex); + } + + boolean recordsAdded = false; + + outside: while (true) { + if (EXTRA_DEBUG) { + logger.trace("Looping from underlying index {}, previous {}, current {}", underlyingIndex, previousIndex, currentIndex); + logger.debug("Processing {} records in window framer", incoming.getRecordCount()); + } + // loop through existing records, adding as necessary. + while(incIndex()) { + if (previousBatch != null) { + boolean isSameFromBatch = isSameFromBatch(previousIndex, previousBatch, currentIndex); + if (EXTRA_DEBUG) { + logger.trace("Same as previous batch: {}, previous index {}, current index {}", isSameFromBatch, previousIndex, currentIndex); + } + + if(!isSameFromBatch) { + resetValues(); + } + previousBatch.clear(); + previousBatch = null; + } else if (!isSame(previousIndex, currentIndex)) { + resetValues(); + } + + addRecord(currentIndex); + + if (!outputToBatch(currentIndex)) { + if (outputCount == 0) { + return tooBigFailure(); + } + + // mark the pending output but move forward for the next cycle. + pendingOutput = true; + incIndex(); + return setOkAndReturn(); + } + + recordsAdded = true; + } + + if (EXTRA_DEBUG) { + logger.debug("Exit Loop from underlying index {}, previous {}, current {}", underlyingIndex, previousIndex, currentIndex); + } + + previousBatch = new InternalBatch(incoming); + + while (true) { + RecordBatch.IterOutcome out = incoming.next(); + switch (out) { + case NONE: + outcome = innerOutcome(out, recordsAdded); + if (EXTRA_DEBUG) { + logger.trace("Received IterOutcome of {}, assigning {} outcome", out, outcome); + } + return AggOutcome.RETURN_AND_COMPLETE; + case NOT_YET: + outcome = innerOutcome(out, recordsAdded); + if (EXTRA_DEBUG) { + logger.trace("Received IterOutcome of {}, assigning {} outcome", out, outcome); + } + return AggOutcome.RETURN_OUTCOME; + + case OK_NEW_SCHEMA: + if (EXTRA_DEBUG) { + logger.trace("Received new schema. Batch has {} records.", incoming.getRecordCount()); + } + resetIndex(); + return AggOutcome.UPDATE_AGGREGATOR; + + case OK: + if (EXTRA_DEBUG) { + logger.trace("Received OK with {} records.", incoming.getRecordCount()); + } + resetIndex(); + if (incoming.getRecordCount() == 0) { + continue; + } else { + continue outside; + } + case STOP: + default: + outcome = out; + if (EXTRA_DEBUG) { + logger.trace("Stop received.", incoming.getRecordCount()); + } + return AggOutcome.RETURN_OUTCOME; + } + } + } + } finally { + first = false; + } + } + + private RecordBatch.IterOutcome innerOutcome(RecordBatch.IterOutcome innerOutcome, boolean newRecordsAdded) { + if(newRecordsAdded) { + setOkAndReturn(); + return outcome; + } + return innerOutcome; + } + + + private final boolean incIndex() { + underlyingIndex++; + + if(currentIndex != -1) { + previousIndex = currentIndex; + } + + if (underlyingIndex >= incoming.getRecordCount()) { + return false; + } + + currentIndex = getVectorIndex(underlyingIndex); + return true; + } + + private final void resetIndex() { + underlyingIndex = -1; + currentIndex = getVectorIndex(underlyingIndex); + if (EXTRA_DEBUG) { + logger.trace("Reset new indexes: underlying {}, previous {}, current {}", underlyingIndex, previousIndex, currentIndex); + } + } + + private final AggOutcome setOkAndReturn() { + if (first) { + this.outcome = RecordBatch.IterOutcome.OK_NEW_SCHEMA; + } else { + this.outcome = RecordBatch.IterOutcome.OK; + } + + if (EXTRA_DEBUG) { + logger.debug("Setting output count {}", outputCount); + } + for (VectorWrapper<?> v : outgoing) { + v.getValueVector().getMutator().setValueCount(outputCount); + } + return AggOutcome.RETURN_OUTCOME; + } + + private final boolean outputToBatch(int inIndex) { + boolean success = outputRecordValues(outputCount) + && outputWindowValues(inIndex, outputCount); + + if (success) { + outputCount++; + } + + return success; + } + + @Override + public void cleanup() { + if(previousBatch != null) { + previousBatch.clear(); + previousBatch = null; + } + } + + public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException; + + /** + * Compares withins from two indexes in the same batch + * @param index1 First record index + * @param index2 Second record index + * @return does within value match + */ + public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2); + /** + * Compares withins from one index of given batch (typically previous completed batch), and one index from current batch + * @param b1Index First record index + * @param index2 Second record index + * @return does within value match + */ + public abstract boolean isSameFromBatch(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int index2); + public abstract void addRecord(@Named("index") int index); + public abstract boolean outputRecordValues(@Named("outIndex") int outIndex); + public abstract boolean outputWindowValues(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + public abstract int getVectorIndex(@Named("recordIndex") int recordIndex); + + public abstract boolean resetValues(); +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java new file mode 100644 index 0000000..9588cef --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.window; + +import org.apache.drill.exec.compile.TemplateClassDefinition; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.RecordBatch; + +public interface StreamingWindowFramer { + public static TemplateClassDefinition<StreamingWindowFramer> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(StreamingWindowFramer.class, StreamingWindowFrameTemplate.class); + + + public static enum AggOutcome { + RETURN_OUTCOME, UPDATE_AGGREGATOR, RETURN_AND_COMPLETE; + } + + public abstract void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, + int precedingConfig, int followingConfig) throws SchemaChangeException; + + public abstract RecordBatch.IterOutcome getOutcome(); + + public abstract int getOutputCount(); + + public abstract AggOutcome doWork(); + + public abstract void cleanup(); +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWindowRelBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWindowRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWindowRelBase.java new file mode 100644 index 0000000..fcf52ee --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWindowRelBase.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.planner.common; + +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.WindowRelBase; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.reltype.RelDataType; +import org.eigenbase.rex.RexLiteral; + +import java.util.List; + +public class DrillWindowRelBase extends WindowRelBase implements DrillRelNode { + + public DrillWindowRelBase( + RelOptCluster cluster, + RelTraitSet traits, + RelNode child, + List<RexLiteral> constants, + RelDataType rowType, + List<Window> windows) { + super(cluster, traits, child, constants, rowType, windows); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java index 3fc3b89..a2685ad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java @@ -24,6 +24,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Store; import org.apache.drill.exec.physical.base.SubScan; import org.apache.drill.exec.physical.config.Limit; +import org.apache.drill.exec.physical.config.WindowPOP; import org.apache.drill.exec.planner.AbstractOpWrapperVisitor; import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair; @@ -113,6 +114,11 @@ public class StatsCollector { return null; } + @Override + public Void visitWindowFrame(WindowPOP window, Wrapper value) throws RuntimeException { + return visitOp(window, value); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java index 6b0c3b4..ee035c6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java @@ -82,7 +82,7 @@ public class DrillAggregateRel extends DrillAggregateRelBase implements DrillRel return builder.build(); } - private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillImplementor implementor) { + public static LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillImplementor implementor) { List<LogicalExpression> args = Lists.newArrayList(); for(Integer i : call.getArgList()) { args.add(new FieldReference(fn.get(i))); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java index c3b0d00..f6c910e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java @@ -25,7 +25,7 @@ import org.eigenbase.relopt.RelOptRuleCall; import org.eigenbase.relopt.RelTraitSet; /** - * This rule converts a SortRel that has either a offset and fetch into a Drill Sort and Limit Rel + * This rule converts a SortRel that has either a offset and fetch into a Drill Sort and LimitPOP Rel */ public class DrillLimitRule extends RelOptRule { public static DrillLimitRule INSTANCE = new DrillLimitRule(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java index 082dacc..fcfced2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java @@ -27,6 +27,9 @@ import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.planner.physical.PrelUtil.ProjectPushInfo; import org.eigenbase.rel.ProjectRel; +import org.eigenbase.rel.ProjectRelBase; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.rules.PushProjector; import org.eigenbase.rel.rules.RemoveTrivialProjectRule; import org.eigenbase.relopt.RelOptRule; import org.eigenbase.relopt.RelOptRuleCall; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java index 7eca54e..7ed7885 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java @@ -27,7 +27,7 @@ import org.eigenbase.relopt.Convention; public interface DrillRel extends DrillRelNode { /** Calling convention for relational expressions that are "implemented" by * generating Drill logical plans. */ - Convention DRILL_LOGICAL = new Convention.Impl("LOGICAL", DrillRel.class); + public static final Convention DRILL_LOGICAL = new Convention.Impl("LOGICAL", DrillRel.class); LogicalOperator implement(DrillImplementor implementor); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java index dbb85b2..1d3ce9a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java @@ -38,6 +38,8 @@ import org.apache.drill.exec.planner.physical.ScreenPrule; import org.apache.drill.exec.planner.physical.SortConvertPrule; import org.apache.drill.exec.planner.physical.SortPrule; import org.apache.drill.exec.planner.physical.StreamAggPrule; +import org.apache.drill.exec.planner.physical.StreamingWindowPrule; +import org.apache.drill.exec.planner.physical.StreamingWindowPrule; import org.apache.drill.exec.planner.physical.UnionAllPrule; import org.apache.drill.exec.planner.physical.WriterPrule; import org.eigenbase.rel.RelFactories; @@ -100,6 +102,7 @@ public class DrillRuleSets { DrillScanRule.INSTANCE, DrillFilterRule.INSTANCE, DrillProjectRule.INSTANCE, + DrillWindowRule.INSTANCE, DrillAggregateRule.INSTANCE, DrillLimitRule.INSTANCE, @@ -139,6 +142,8 @@ public class DrillRuleSets { HashJoinPrule.INSTANCE, FilterPrule.INSTANCE, LimitPrule.INSTANCE, + WindowPrule.INSTANCE, + WriterPrule.INSTANCE, PushLimitToTopN.INSTANCE @@ -182,6 +187,7 @@ public class DrillRuleSets { ruleList.add(FilterPrule.INSTANCE); ruleList.add(LimitPrule.INSTANCE); ruleList.add(WriterPrule.INSTANCE); + ruleList.add(StreamingWindowPrule.INSTANCE); ruleList.add(PushLimitToTopN.INSTANCE); ruleList.add(UnionAllPrule.INSTANCE); // ruleList.add(UnionDistinctPrule.INSTANCE);