Patch for DRILL-705

Currently only supports partitioning/ordering, not yet preceding or
after offsets


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8def6e91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8def6e91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8def6e91

Branch: refs/heads/smp-merge-230914
Commit: 8def6e91489455c0ae670f49ef5ba51ef71b31bd
Parents: 9bc71fc
Author: Timothy Chen <tnac...@gmail.com>
Authored: Sun Sep 21 23:54:40 2014 -0700
Committer: Steven Phillips <sphill...@maprtech.com>
Committed: Tue Sep 23 22:24:21 2014 -0700

----------------------------------------------------------------------
 .../drill/common/expression/CastExpression.java |    4 +-
 .../common/logical/data/AbstractBuilder.java    |    5 +
 .../logical/data/LogicalOperatorBase.java       |    3 +-
 .../common/logical/data/NamedExpression.java    |    2 +-
 .../drill/common/logical/data/Window.java       |  112 ++
 .../drill/common/logical/data/WindowFrame.java  |   98 --
 .../data/visitors/AbstractLogicalVisitor.java   |    6 +-
 .../logical/data/visitors/LogicalVisitor.java   |    5 +-
 .../src/main/codegen/data/AggrTypes1.tdd        |    6 +-
 .../src/main/codegen/templates/TypeHelper.java  |    5 +-
 .../drill/exec/expr/EvaluationVisitor.java      |    3 +
 .../org/apache/drill/exec/memory/Accountor.java |    9 +-
 .../apache/drill/exec/opt/BasicOptimizer.java   |   27 +-
 .../physical/base/AbstractPhysicalVisitor.java  |    6 +
 .../exec/physical/base/AbstractSingle.java      |    2 +-
 .../exec/physical/base/PhysicalOperator.java    |    2 +-
 .../exec/physical/base/PhysicalVisitor.java     |    2 +
 .../drill/exec/physical/config/WindowPOP.java   |   79 ++
 .../physical/impl/aggregate/InternalBatch.java  |    8 +-
 .../impl/aggregate/StreamingAggBatch.java       |    2 -
 .../physical/impl/filter/FilterRecordBatch.java |    4 +-
 .../physical/impl/limit/LimitRecordBatch.java   |    4 +-
 .../impl/project/ProjectRecordBatch.java        |   12 +-
 .../impl/svremover/RemovingRecordBatch.java     |    3 +-
 .../physical/impl/trace/TraceRecordBatch.java   |    3 +-
 .../StreamingWindowFrameBatchCreator.java       |   36 +
 .../window/StreamingWindowFrameRecordBatch.java |  268 +++++
 .../window/StreamingWindowFrameTemplate.java    |  286 +++++
 .../impl/window/StreamingWindowFramer.java      |   44 +
 .../exec/planner/common/DrillWindowRelBase.java |   41 +
 .../exec/planner/fragment/StatsCollector.java   |    6 +
 .../exec/planner/logical/DrillAggregateRel.java |    2 +-
 .../exec/planner/logical/DrillLimitRule.java    |    2 +-
 .../planner/logical/DrillPushProjIntoScan.java  |    3 +
 .../drill/exec/planner/logical/DrillRel.java    |    2 +-
 .../exec/planner/logical/DrillRuleSets.java     |    6 +
 .../exec/planner/logical/DrillWindowRel.java    |  111 ++
 .../exec/planner/logical/DrillWindowRule.java   |   52 +
 .../exec/planner/physical/AggPrelBase.java      |   19 +-
 .../drill/exec/planner/physical/LimitPrel.java  |    8 +-
 .../planner/physical/StreamingWindowPrel.java   |  136 +++
 .../planner/physical/StreamingWindowPrule.java  |  133 +++
 .../exec/planner/sql/DrillOperatorTable.java    |    7 +-
 .../exec/planner/sql/DrillSqlAggOperator.java   |    2 +-
 .../drill/exec/planner/sql/DrillSqlWorker.java  |   21 +-
 .../sql/handlers/CreateTableHandler.java        |    5 +-
 .../planner/sql/handlers/DefaultSqlHandler.java |   21 +-
 .../sql/handlers/DescribeTableHandler.java      |    4 +-
 .../planner/sql/handlers/ExplainHandler.java    |   10 +-
 .../planner/sql/handlers/ShowFileHandler.java   |    5 +-
 .../sql/handlers/ShowSchemasHandler.java        |    4 +-
 .../planner/sql/handlers/ShowTablesHandler.java |    4 +-
 .../planner/sql/handlers/SqlHandlerConfig.java  |   47 +
 .../exec/planner/sql/parser/DrillSqlCall.java   |    6 +-
 .../exec/planner/sql/parser/SqlCreateTable.java |    6 +-
 .../exec/planner/sql/parser/SqlCreateView.java  |   15 +-
 .../planner/sql/parser/SqlDescribeTable.java    |    6 +-
 .../exec/planner/sql/parser/SqlDropView.java    |    6 +-
 .../exec/planner/sql/parser/SqlShowFiles.java   |    6 +-
 .../exec/planner/sql/parser/SqlShowSchemas.java |    6 +-
 .../exec/planner/sql/parser/SqlShowTables.java  |   13 +-
 .../exec/planner/sql/parser/SqlUseSchema.java   |   14 +-
 .../exec/record/AbstractSingleRecordBatch.java  |    9 +-
 .../drill/exec/record/VectorContainer.java      |   17 +
 .../java/org/apache/drill/exec/ExecTest.java    |    2 +-
 .../physical/impl/limit/TestSimpleLimit.java    |    1 +
 .../physical/impl/window/TestWindowFrame.java   |  202 ++++
 .../drill/exec/sql/TestWindowFunctions.java     |   29 +
 .../src/test/resources/window/mediumData.json   | 1000 ++++++++++++++++++
 .../src/test/resources/window/oneKeyCount.json  |   43 +
 .../test/resources/window/oneKeyCountData.json  |    4 +
 .../resources/window/oneKeyCountMultiBatch.json |   72 ++
 .../src/test/resources/window/twoKeys.json      |   44 +
 .../src/test/resources/window/twoKeysData.json  |    8 +
 .../drill/jdbc/test/TestJdbcDistQuery.java      |    2 +-
 .../apache/drill/exec/proto/UserBitShared.java  |   15 +-
 .../exec/proto/beans/CoreOperatorType.java      |    4 +-
 protocol/src/main/protobuf/UserBitShared.proto  |   11 +-
 78 files changed, 2997 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/common/src/main/java/org/apache/drill/common/expression/CastExpression.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/expression/CastExpression.java 
b/common/src/main/java/org/apache/drill/common/expression/CastExpression.java
index b73a447..5d09a24 100644
--- 
a/common/src/main/java/org/apache/drill/common/expression/CastExpression.java
+++ 
b/common/src/main/java/org/apache/drill/common/expression/CastExpression.java
@@ -23,6 +23,8 @@ import java.util.Iterator;
 import org.apache.drill.common.expression.visitors.ExprVisitor;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 public class CastExpression extends LogicalExpressionBase implements 
Iterable<LogicalExpression>{
 
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(CastExpression.class);
@@ -33,7 +35,7 @@ public class CastExpression extends LogicalExpressionBase 
implements Iterable<Lo
   public CastExpression(LogicalExpression input, MajorType type, 
ExpressionPosition pos) {
     super(pos);
     this.input = input;
-    this.type = type;
+    this.type = checkNotNull(type, "Major type cannot be null");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/common/src/main/java/org/apache/drill/common/logical/data/AbstractBuilder.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/logical/data/AbstractBuilder.java
 
b/common/src/main/java/org/apache/drill/common/logical/data/AbstractBuilder.java
index 427b040..28424a5 100644
--- 
a/common/src/main/java/org/apache/drill/common/logical/data/AbstractBuilder.java
+++ 
b/common/src/main/java/org/apache/drill/common/logical/data/AbstractBuilder.java
@@ -19,6 +19,7 @@ package org.apache.drill.common.logical.data;
 
 import java.util.List;
 
+import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
 
 public abstract class AbstractBuilder<T extends LogicalOperator> {
@@ -30,6 +31,10 @@ public abstract class AbstractBuilder<T extends 
LogicalOperator> {
     return exprs.toArray(new LogicalExpression[exprs.size()]);
   }
 
+  protected FieldReference[] aF(List<FieldReference> exprs){
+    return exprs.toArray(new FieldReference[exprs.size()]);
+  }
+
   protected NamedExpression[] aN(List<NamedExpression> exprs){
     return exprs.toArray(new NamedExpression[exprs.size()]);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java
 
b/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java
index 58a00fe..6c534eb 100644
--- 
a/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java
+++ 
b/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java
@@ -89,5 +89,4 @@ public abstract class LogicalOperatorBase implements 
LogicalOperator{
     logger.debug("Adding Logical Operator sub types: {}", ((Object) ops) );
     return ops;
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/common/src/main/java/org/apache/drill/common/logical/data/NamedExpression.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/logical/data/NamedExpression.java
 
b/common/src/main/java/org/apache/drill/common/logical/data/NamedExpression.java
index 1550ddf..a166c25 100644
--- 
a/common/src/main/java/org/apache/drill/common/logical/data/NamedExpression.java
+++ 
b/common/src/main/java/org/apache/drill/common/logical/data/NamedExpression.java
@@ -30,7 +30,7 @@ public class NamedExpression {
   private final FieldReference ref;
 
   @JsonCreator
-  public NamedExpression( @JsonProperty("expr") LogicalExpression expr, 
@JsonProperty("ref") FieldReference ref) {
+  public NamedExpression(@JsonProperty("expr") LogicalExpression expr, 
@JsonProperty("ref") FieldReference ref) {
     super();
     this.expr = expr;
     this.ref = ref;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/common/src/main/java/org/apache/drill/common/logical/data/Window.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/logical/data/Window.java 
b/common/src/main/java/org/apache/drill/common/logical/data/Window.java
new file mode 100644
index 0000000..6dba77c
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/logical/data/Window.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.logical.data;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.expression.FieldReference;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.visitors.LogicalVisitor;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+@JsonTypeName("window")
+public class Window extends SingleInputOperator {
+  private final NamedExpression[] withins;
+  private final NamedExpression[] aggregations;
+  private final long start;
+  private final long end;
+
+
+  @JsonCreator
+  public Window(@JsonProperty("withins") NamedExpression[] withins,
+                @JsonProperty("aggregations") NamedExpression[] aggregations,
+                @JsonProperty("start") Long start,
+                @JsonProperty("end") Long end) {
+    super();
+    this.withins = withins;
+    this.start = start == null ? Long.MIN_VALUE : start;
+    this.end = end == null ? Long.MIN_VALUE : end;
+    this.aggregations = aggregations;
+  }
+
+  public NamedExpression[] getWithins() {
+    return withins;
+  }
+
+  public long getStart() {
+    return start;
+  }
+
+  public long getEnd() {
+    return end;
+  }
+
+  public NamedExpression[] getAggregations() {
+    return aggregations;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> 
logicalVisitor, X value) throws E {
+    return logicalVisitor.visitWindow(this, value);
+  }
+
+  @Override
+  public Iterator<LogicalOperator> iterator() {
+    return Iterators.singletonIterator(getInput());
+  }
+
+  public static class Builder extends AbstractSingleBuilder<Window, Builder>{
+    private List<NamedExpression> aggregations = Lists.newArrayList();
+    private List<NamedExpression> withins = Lists.newArrayList();
+    private List<Order.Ordering> orderings = Lists.newArrayList();
+    private long start = Long.MIN_VALUE;
+    private long end = Long.MIN_VALUE;
+
+
+    public Builder addAggregation(FieldReference ref, LogicalExpression expr){
+      aggregations.add(new NamedExpression(expr, ref));
+      return this;
+    }
+
+    public Builder addWithin(FieldReference within, LogicalExpression expr) {
+      withins.add(new NamedExpression(expr, within));
+      return this;
+    }
+
+    public Window internalBuild() {
+      checkState(!withins.isEmpty(), "Withins in window must not be empty.");
+      checkState(!aggregations.isEmpty(), "Aggregations in window must not be 
empty.");
+      return new Window(aN(withins), aN(aggregations), start, end);
+    }
+
+    public Builder addOrdering(Order.Ordering ordering) {
+      orderings.add(ordering);
+      return this;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java 
b/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java
deleted file mode 100644
index 6fb64bc..0000000
--- a/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.common.logical.data;
-
-import java.util.Iterator;
-
-import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.logical.data.visitors.LogicalVisitor;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.collect.Iterators;
-
-@JsonTypeName("windowframe")
-public class WindowFrame extends SingleInputOperator{
-
-
-  private final FieldReference within;
-  private final FrameRef frame;
-  private final long start;
-  private final long end;
-
-
-  @JsonCreator
-  public WindowFrame(@JsonProperty("within") FieldReference within, 
@JsonProperty("ref") FrameRef frame, @JsonProperty("start") Long start, 
@JsonProperty("end") Long end) {
-    super();
-    this.within = within;
-    this.frame = frame;
-    this.start = start == null ? Long.MIN_VALUE : start;
-    this.end = end == null ? Long.MIN_VALUE : end;
-  }
-
-
-  @JsonProperty("ref")
-  public FrameRef getFrame() {
-    return frame;
-  }
-
-  public FieldReference getWithin() {
-    return within;
-  }
-
-  public long getStart() {
-    return start;
-  }
-
-  public long getEnd() {
-    return end;
-  }
-
-    @Override
-    public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> 
logicalVisitor, X value) throws E {
-        return logicalVisitor.visitWindowFrame(this, value);
-    }
-
-    @Override
-    public Iterator<LogicalOperator> iterator() {
-        return Iterators.singletonIterator(getInput());
-    }
-
-
-    public static class FrameRef{
-    private final FieldReference segment;
-    private final FieldReference position;
-
-    @JsonCreator
-    public FrameRef(@JsonProperty("segment") FieldReference segment, 
@JsonProperty("position") FieldReference position) {
-      super();
-      this.segment = segment;
-      this.position = position;
-    }
-
-    public FieldReference getSegment() {
-      return segment;
-    }
-    public FieldReference getPosition() {
-      return position;
-    }
-
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/common/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
 
b/common/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
index d128b10..92e370f 100644
--- 
a/common/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
+++ 
b/common/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
@@ -32,7 +32,7 @@ import org.apache.drill.common.logical.data.Sequence;
 import org.apache.drill.common.logical.data.Store;
 import org.apache.drill.common.logical.data.Transform;
 import org.apache.drill.common.logical.data.Union;
-import org.apache.drill.common.logical.data.WindowFrame;
+import org.apache.drill.common.logical.data.Window;
 import org.apache.drill.common.logical.data.Writer;
 
 
@@ -110,8 +110,8 @@ public abstract class AbstractLogicalVisitor<T, X, E 
extends Throwable> implemen
     }
 
     @Override
-    public T visitWindowFrame(WindowFrame windowFrame, X value) throws E {
-        return visitOp(windowFrame, value);
+    public T visitWindow(Window window, X value) throws E {
+        return visitOp(window, value);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/common/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
 
b/common/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
index 4bf9fbf..3a426bf 100644
--- 
a/common/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
+++ 
b/common/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
@@ -32,7 +32,8 @@ import org.apache.drill.common.logical.data.Sequence;
 import org.apache.drill.common.logical.data.Store;
 import org.apache.drill.common.logical.data.Transform;
 import org.apache.drill.common.logical.data.Union;
-import org.apache.drill.common.logical.data.WindowFrame;
+import org.apache.drill.common.logical.data.Window;
+import org.apache.drill.common.logical.data.Window;
 import org.apache.drill.common.logical.data.Writer;
 
 /**
@@ -60,6 +61,6 @@ public interface LogicalVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> {
     public RETURN visitSequence(Sequence sequence, EXTRA value) throws EXCEP;
     public RETURN visitTransform(Transform transform, EXTRA value) throws 
EXCEP;
     public RETURN visitUnion(Union union, EXTRA value) throws EXCEP;
-    public RETURN visitWindowFrame(WindowFrame windowFrame, EXTRA value) 
throws EXCEP;
+    public RETURN visitWindow(Window window, EXTRA value) throws EXCEP;
     public RETURN visitWriter(Writer writer, EXTRA value) throws EXCEP;
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd 
b/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd
index 812c289..5b4041c 100644
--- a/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd
+++ b/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd
@@ -78,7 +78,7 @@
       {inputType: "NullableVarBinary", outputType: "VarBinary", runningType: 
"VarBinary", major: "VarBytes"}
      ]
    },
-   {className: "Sum", funcName: "sum", types: [   
+   {className: "Sum", funcName: "sum", types: [
       {inputType: "Bit", outputType: "Bit", runningType: "Bit", major: 
"Numeric"},
       {inputType: "Int", outputType: "BigInt", runningType: "BigInt", major: 
"Numeric"},
       {inputType: "BigInt", outputType: "BigInt", runningType: "BigInt", 
major: "Numeric"},
@@ -96,7 +96,7 @@
       {inputType: "Interval", outputType: "Interval", runningType: "Interval", 
major: "Date", initialValue: "0"},
       {inputType: "NullableInterval", outputType: "Interval", runningType: 
"Interval", major: "Date", initialValue: "0"}
      ]
-   }, 
+   },
    {className: "Count", funcName: "count", types: [
       {inputType: "Bit", outputType: "Bit", runningType: "Bit", major: 
"Numeric"},
       {inputType: "Int", outputType: "BigInt", runningType: "BigInt", major: 
"Numeric"},
@@ -129,4 +129,4 @@
      ]
    }
   ]
-} 
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/codegen/templates/TypeHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/TypeHelper.java 
b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
index 3c0d9d3..cb6a030 100644
--- a/exec/java-exec/src/main/codegen/templates/TypeHelper.java
+++ b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
@@ -73,8 +73,7 @@ public class TypeHelper {
     case LIST:
       return new GenericAccessor(vector);
     }
-
-    throw new UnsupportedOperationException();
+    throw new UnsupportedOperationException("Unable to find sql accessor for 
minor type [" + vector.getField().getType().getMinorType() + "] and mode [" + 
vector.getField().getType().getMode() + "]");
   }
   
   public static ValueVector getNewVector(SchemaPath parentPath, String name, 
BufferAllocator allocator, MajorType type){
@@ -255,7 +254,7 @@ public class TypeHelper {
       default:
         break;
       }
-      throw new UnsupportedOperationException();
+      throw new UnsupportedOperationException("Unable to find holder type for 
minorType: " + type);
   }
 
   public static ValueVector getNewVector(MaterializedField field, 
BufferAllocator allocator){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index a5b7bee..6280c40 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -71,6 +71,9 @@ import com.sun.codemodel.JLabel;
 import com.sun.codemodel.JType;
 import com.sun.codemodel.JVar;
 
+/**
+ * Visitor that generates code for eval
+ */
 public class EvaluationVisitor {
 
   private final FunctionImplementationRegistry registry;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
index 6ef46de..18c5072 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
@@ -17,6 +17,9 @@
  */
 package org.apache.drill.exec.memory;
 
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
 
@@ -31,9 +34,9 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.util.AssertionUtil;
 
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
 
 public class Accountor {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(Accountor.class);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 876ba37..98202ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -17,12 +17,7 @@
  */
 package org.apache.drill.exec.opt;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
+import com.google.common.collect.Lists;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -40,6 +35,7 @@ import org.apache.drill.common.logical.data.Project;
 import org.apache.drill.common.logical.data.Scan;
 import org.apache.drill.common.logical.data.SinkOperator;
 import org.apache.drill.common.logical.data.Store;
+import org.apache.drill.common.logical.data.Window;
 import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -54,13 +50,18 @@ import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.WindowPOP;
 import org.apache.drill.exec.rpc.user.UserServer;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.StoragePlugin;
 import org.eigenbase.rel.RelFieldCollation.Direction;
 import org.eigenbase.rel.RelFieldCollation.NullDirection;
 
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 
 public class BasicOptimizer extends Optimizer{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class);
@@ -183,13 +184,22 @@ public class BasicOptimizer extends Optimizer{
       return sa;
     }
 
+    @Override
+    public PhysicalOperator visitWindow(Window window, Object value) throws 
OptimizerException {
+      PhysicalOperator input = window.getInput().accept(this, value);
 
+      List<Ordering> ods = Lists.newArrayList();
+
+      input = new Sort(input, ods, false);
+
+      return new WindowPOP(input, window.getWithins(), 
window.getAggregations(), window.getStart(), window.getEnd());
+    }
 
     @Override
     public PhysicalOperator visitOrder(Order order, Object value) throws 
OptimizerException {
       PhysicalOperator input = order.getInput().accept(this, value);
       List<Ordering> ods = Lists.newArrayList();
-      for(Ordering o : order.getOrderings()){
+      for (Ordering o : order.getOrderings()){
         ods.add(o);
       }
       return new SelectionVectorRemover(new Sort(input, ods, false));
@@ -250,7 +260,6 @@ public class BasicOptimizer extends Optimizer{
 
     @Override
     public PhysicalOperator visitProject(Project project, Object obj) throws 
OptimizerException {
-//      return project.getInput().accept(this, obj);
       return new 
org.apache.drill.exec.physical.config.Project(Arrays.asList(project.getSelections()),
 project.iterator().next().accept(this, obj));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index 48b3801..031ab10 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.physical.config.Trace;
 import org.apache.drill.exec.physical.config.UnionAll;
 import org.apache.drill.exec.physical.config.UnionExchange;
 import org.apache.drill.exec.physical.config.UnorderedReceiver;
+import org.apache.drill.exec.physical.config.WindowPOP;
 
 public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> 
implements PhysicalVisitor<T, X, E> {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractPhysicalVisitor.class);
@@ -64,6 +65,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E 
extends Throwable> impleme
   }
 
   @Override
+  public T visitWindowFrame(WindowPOP windowFrame, X value) throws E {
+    return visitOp(windowFrame, value);
+  }
+
+  @Override
   public T visitProject(Project project, X value) throws E{
     return visitOp(project, value);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java
index 2b10e6d..6d6c591 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java
@@ -28,7 +28,7 @@ import com.google.common.collect.Iterators;
  * Describes an operator that expects a single child operator as its input.
  * @param <T> The type of Exec model supported.
  */
-public abstract class AbstractSingle extends AbstractBase{
+public abstract class AbstractSingle extends AbstractBase {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractSingle.class);
 
   protected final PhysicalOperator child;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index 8f51390..a5518ca 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -37,7 +37,7 @@ import com.fasterxml.jackson.annotation.ObjectIdGenerators;
 @JsonPropertyOrder({ "@id" })
 @JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, 
property = "@id")
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "pop")
-public interface  PhysicalOperator extends GraphValue<PhysicalOperator> {
+public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
 
   /**
    * Describes whether or not a particular physical operator can actually be 
executed. Most physical operators can be

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 8da06cb..f9a9c21 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.physical.config.Trace;
 import org.apache.drill.exec.physical.config.UnionAll;
 import org.apache.drill.exec.physical.config.UnionExchange;
 import org.apache.drill.exec.physical.config.UnorderedReceiver;
+import org.apache.drill.exec.physical.config.WindowPOP;
 
 /**
  * Visitor class designed to traversal of a operator tree.  Basis for a number 
of operator manipulations including fragmentation and materialization.
@@ -80,6 +81,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> {
   public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP;
   public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP;
   public RETURN visitUnionExchange(UnionExchange op, EXTRA value) throws EXCEP;
+  public RETURN visitWindowFrame(WindowPOP op, EXTRA value) throws EXCEP;
   public RETURN visitProducerConsumer(ProducerConsumer op, EXTRA value) throws 
EXCEP;
 
   public RETURN visitIteratorValidator(IteratorValidator op, EXTRA value) 
throws EXCEP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
new file mode 100644
index 0000000..17738ee
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.UserBitShared;
+
+@JsonTypeName("window")
+public class WindowPOP extends AbstractSingle {
+
+  private final NamedExpression[] withins;
+  private final NamedExpression[] aggregations;
+  private final long start;
+  private final long end;
+
+  public WindowPOP(@JsonProperty("child") PhysicalOperator child,
+                   @JsonProperty("within") NamedExpression[] withins,
+                   @JsonProperty("aggregations") NamedExpression[] 
aggregations,
+                   @JsonProperty("start") long start,
+                   @JsonProperty("end") long end) {
+    super(child);
+    this.withins = withins;
+    this.aggregations = aggregations;
+    this.start = start;
+    this.end = end;
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new WindowPOP(child, withins, aggregations, start, end);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
+    return physicalVisitor.visitWindowFrame(this, value);
+  }
+
+  @Override
+  public int getOperatorType() {
+    return UserBitShared.CoreOperatorType.WINDOW_VALUE;
+  }
+
+  public long getStart() {
+    return start;
+  }
+
+  public long getEnd() {
+    return end;
+  }
+
+  public NamedExpression[] getAggregations() {
+    return aggregations;
+  }
+
+  public NamedExpression[] getWithins() {
+    return withins;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
index e690060..dae9eae 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
@@ -35,7 +35,11 @@ public class InternalBatch implements 
Iterable<VectorWrapper<?>>{
   private final SelectionVector4 sv4;
 
   public InternalBatch(RecordBatch incoming) {
-    switch(incoming.getSchema().getSelectionVectorMode()) {
+    this(incoming, null);
+  }
+
+  public InternalBatch(RecordBatch incoming, VectorWrapper[] ignoreWrappers){
+    switch(incoming.getSchema().getSelectionVectorMode()){
     case FOUR_BYTE:
       this.sv4 = incoming.getSelectionVector4().createNewWrapperCurrent();
       this.sv2 = null;
@@ -49,7 +53,7 @@ public class InternalBatch implements 
Iterable<VectorWrapper<?>>{
       this.sv2 = null;
     }
     this.schema = incoming.getSchema();
-    this.container = VectorContainer.getTransferClone(incoming);
+    this.container = VectorContainer.getTransferClone(incoming, 
ignoreWrappers);
   }
 
   public BatchSchema getSchema() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index ced5179..4d3925e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -325,9 +325,7 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
 
     default:
       throw new IllegalStateException();
-
     }
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index f1fcce0..85f664c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -76,13 +76,15 @@ public class FilterRecordBatch extends 
AbstractSingleRecordBatch<Filter>{
   }
 
   @Override
-  protected void doWork() {
+  protected IterOutcome doWork() {
     int recordCount = incoming.getRecordCount();
     filter.filterBatch(recordCount);
 //    for (VectorWrapper<?> v : container) {
 //      ValueVector.Mutator m = v.getValueVector().getMutator();
 //      m.setValueCount(recordCount);
 //    }
+
+    return IterOutcome.OK;
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index f5bc9f9..8f4d90f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -122,7 +122,7 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
   }
 
   @Override
-  protected void doWork() {
+  protected IterOutcome doWork() {
     for(TransferPair tp : transfers) {
       tp.transfer();
     }
@@ -139,6 +139,8 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
        limitWithNoSV(recordCount);
       }
     }
+
+    return IterOutcome.OK;
   }
 
   private IterOutcome produceEmptyFirstBatch() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index a1a8340..224753e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -70,7 +70,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.sun.codemodel.JExpr;
 
-public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
+public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
 
   private Projector projector;
@@ -133,13 +133,13 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project>{
   }
 
   @Override
-  protected void doWork() {
+  protected IterOutcome doWork() {
 //    VectorUtil.showVectorAccessibleContent(incoming, ",");
     int incomingRecordCount = incoming.getRecordCount();
 
     if (!doAlloc()) {
       outOfMemory = true;
-      return;
+      return IterOutcome.OUT_OF_MEMORY;
     }
 
     int outputRecords = projector.projectRecords(0, incomingRecordCount, 0);
@@ -160,6 +160,8 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project>{
     if (complexWriters != null) {
       container.buildSchema(SelectionVectorMode.NONE);
     }
+
+    return IterOutcome.OK;
   }
 
   private void handleRemainder() {
@@ -177,7 +179,7 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project>{
       setValueCount(remainingRecordCount);
       hasRemainder = false;
       remainderIndex = 0;
-      for(VectorWrapper<?> v: incoming) {
+      for (VectorWrapper<?> v : incoming) {
         v.clear();
       }
       this.recordCount = remainingRecordCount;
@@ -259,7 +261,7 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project>{
   }
 
   @Override
-  protected void setupNewSchema() throws SchemaChangeException{
+  protected void setupNewSchema() throws SchemaChangeException {
     this.allocationVectors = Lists.newArrayList();
     container.clear();
     final List<NamedExpression> exprs = getExpressionList();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 97f3608..7178d4c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -91,7 +91,7 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
   }
 
   @Override
-  protected void doWork() {
+  protected IterOutcome doWork() {
     int incomingRecordCount = incoming.getRecordCount();
     int copiedRecords = copier.copyRecords(0, incomingRecordCount);
 
@@ -125,6 +125,7 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
         incomingRecordCount,
         incomingRecordCount - remainderIndex,
         incoming.getSchema());
+    return IterOutcome.OK;
   }
 
   private void handleRemainder() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 6d90962..4e644df 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -100,7 +100,7 @@ public class TraceRecordBatch extends 
AbstractSingleRecordBatch<Trace> {
    * this record batch to a log file.
    */
   @Override
-  protected void doWork() {
+  protected IterOutcome doWork() {
 
     boolean incomingHasSv2 = incoming.getSchema().getSelectionVectorMode() == 
SelectionVectorMode.TWO_BYTE;
     if (incomingHasSv2) {
@@ -121,6 +121,7 @@ public class TraceRecordBatch extends 
AbstractSingleRecordBatch<Trace> {
     if (incomingHasSv2) {
       sv = wrap.getSv2();
     }
+    return IterOutcome.OK;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java
new file mode 100644
index 0000000..9b8929f
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.window;
+
+import com.google.common.collect.Iterables;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.WindowPOP;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class StreamingWindowFrameBatchCreator implements 
BatchCreator<WindowPOP> {
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, WindowPOP config, 
List<RecordBatch> children) throws ExecutionSetupException {
+    return new StreamingWindowFrameRecordBatch(config, context, 
Iterables.getOnlyElement(children));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
new file mode 100644
index 0000000..2a92089
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.window;
+
+import com.google.common.collect.Lists;
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JVar;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+import org.apache.drill.exec.expr.ValueVectorWriteExpression;
+import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.WindowPOP;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
+
+import java.io.IOException;
+import java.util.List;
+
+public class StreamingWindowFrameRecordBatch extends 
AbstractSingleRecordBatch<WindowPOP> {
+  private StreamingWindowFramer framer;
+
+  public StreamingWindowFrameRecordBatch(WindowPOP popConfig, FragmentContext 
context, RecordBatch incoming) throws OutOfMemoryException {
+    super(popConfig, context, incoming);
+  }
+
+  @Override
+  protected void setupNewSchema() throws SchemaChangeException {
+    container.clear();
+
+    try {
+      this.framer = createFramer();
+    } catch (ClassTransformationException | IOException ex) {
+      throw new SchemaChangeException("Failed to create framer: " + ex);
+    }
+  }
+
+  private void getIndex(ClassGenerator<StreamingWindowFramer> g) {
+    switch (incoming.getSchema().getSelectionVectorMode()) {
+      case FOUR_BYTE: {
+        JVar var = g.declareClassField("sv4_", 
g.getModel()._ref(SelectionVector4.class));
+        g.getBlock("setupInterior").assign(var, 
JExpr.direct("incoming").invoke("getSelectionVector4"));
+        
g.getBlock("getVectorIndex")._return(var.invoke("get").arg(JExpr.direct("recordIndex")));
+        return;
+      }
+      case NONE: {
+        g.getBlock("getVectorIndex")._return(JExpr.direct("recordIndex"));
+        return;
+      }
+      case TWO_BYTE: {
+        JVar var = g.declareClassField("sv2_", 
g.getModel()._ref(SelectionVector2.class));
+        g.getBlock("setupInterior").assign(var, 
JExpr.direct("incoming").invoke("getSelectionVector2"));
+        
g.getBlock("getVectorIndex")._return(var.invoke("get").arg(JExpr.direct("recordIndex")));
+        return;
+      }
+
+      default:
+        throw new IllegalStateException();
+    }
+  }
+
+  private StreamingWindowFramer createFramer() throws SchemaChangeException, 
IOException, ClassTransformationException {
+    int configLength = popConfig.getAggregations().length;
+    List<LogicalExpression> valueExprs = Lists.newArrayList();
+    LogicalExpression[] keyExprs = new 
LogicalExpression[popConfig.getWithins().length];
+
+    ErrorCollector collector = new ErrorCollectorImpl();
+
+    for (int i = 0; i < configLength; i++) {
+      NamedExpression ne = popConfig.getAggregations()[i];
+      final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, 
context.getFunctionRegistry());
+      if (expr == null) {
+        continue;
+      }
+
+      final MaterializedField outputField = 
MaterializedField.create(ne.getRef(), expr.getMajorType());
+      ValueVector vector = TypeHelper.getNewVector(outputField, 
oContext.getAllocator());
+      TypedFieldId id = container.add(vector);
+      valueExprs.add(new ValueVectorWriteExpression(id, expr, true));
+    }
+
+    int j = 0;
+    LogicalExpression[] windowExprs = new 
LogicalExpression[incoming.getSchema().getFieldCount()];
+    // TODO: Should transfer all existing columns instead of copy. Currently 
this is not easily doable because
+    // we are not processing one entire batch in one iteration, so cannot 
simply transfer.
+    for (VectorWrapper wrapper : incoming) {
+      ValueVector vv = wrapper.isHyper() ? wrapper.getValueVectors()[0] : 
wrapper.getValueVector();
+      ValueVector vector = TypeHelper.getNewVector(vv.getField(), 
oContext.getAllocator());
+      TypedFieldId id = container.add(vector);
+      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(
+          new ValueVectorReadExpression(new 
TypedFieldId(vv.getField().getType(), wrapper.isHyper(), j)),
+          incoming,
+          collector,
+          context.getFunctionRegistry());
+      windowExprs[j] = new ValueVectorWriteExpression(id, expr, true);
+      j++;
+    }
+
+    for (int i = 0; i < keyExprs.length; i++) {
+      NamedExpression ne = popConfig.getWithins()[i];
+
+      final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, 
context.getFunctionRegistry());
+      if (expr == null) {
+        continue;
+      }
+
+      keyExprs[i] = expr;
+    }
+
+    if (collector.hasErrors()) {
+      throw new SchemaChangeException("Failure while materializing expression. 
" + collector.toErrorString());
+    }
+
+    final ClassGenerator<StreamingWindowFramer> cg = 
CodeGenerator.getRoot(StreamingWindowFramer.TEMPLATE_DEFINITION, 
context.getFunctionRegistry());
+    setupIsSame(cg, keyExprs);
+    setupIsSameFromBatch(cg, keyExprs);
+    addRecordValues(cg, valueExprs.toArray(new 
LogicalExpression[valueExprs.size()]));
+    outputWindowValues(cg, windowExprs);
+
+    cg.getBlock("resetValues")._return(JExpr.TRUE);
+
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+    getIndex(cg);
+    StreamingWindowFramer agg = context.getImplementationClass(cg);
+    agg.setup(
+        context,
+        incoming,
+        this,
+        StreamingWindowFrameTemplate.UNBOUNDED, 
StreamingWindowFrameTemplate.CURRENT_ROW
+    );
+    return agg;
+  }
+
+  private final GeneratorMapping IS_SAME_PREV_INTERNAL_BATCH_READ = 
GeneratorMapping.create("isSameFromBatch", "isSameFromBatch", null, null); // 
the internal batch changes each time so we need to redo setup.
+  private final GeneratorMapping IS_SAME_PREV = 
GeneratorMapping.create("setupInterior", "isSameFromBatch", null, null);
+  private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", 
null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ);
+  private final MappingSet ISA_B2 = new MappingSet("b2Index", null, 
"incoming", null, IS_SAME_PREV, IS_SAME_PREV);
+
+  private void setupIsSameFromBatch(ClassGenerator<StreamingWindowFramer> cg, 
LogicalExpression[] keyExprs) {
+    cg.setMappingSet(ISA_B1);
+    for (LogicalExpression expr : keyExprs) {
+      // first, we rewrite the evaluation stack for each side of the 
comparison.
+      cg.setMappingSet(ISA_B1);
+      ClassGenerator.HoldingContainer first = cg.addExpr(expr, false);
+      cg.setMappingSet(ISA_B2);
+      ClassGenerator.HoldingContainer second = cg.addExpr(expr, false);
+
+      LogicalExpression fh = FunctionGenerationHelper.getComparator(first, 
second, context.getFunctionRegistry());
+      ClassGenerator.HoldingContainer out = cg.addExpr(fh, false);
+      
cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+    }
+    cg.getEvalBlock()._return(JExpr.TRUE);
+  }
+
+  private final GeneratorMapping IS_SAME = 
GeneratorMapping.create("setupInterior", "isSame", null, null);
+  private final MappingSet IS_SAME_I1 = new MappingSet("index1", null, 
IS_SAME, IS_SAME);
+  private final MappingSet IS_SAME_I2 = new MappingSet("index2", null, 
IS_SAME, IS_SAME);
+
+  private void setupIsSame(ClassGenerator<StreamingWindowFramer> cg, 
LogicalExpression[] keyExprs) {
+    cg.setMappingSet(IS_SAME_I1);
+    for (LogicalExpression expr : keyExprs) {
+      // first, we rewrite the evaluation stack for each side of the 
comparison.
+      cg.setMappingSet(IS_SAME_I1);
+      ClassGenerator.HoldingContainer first = cg.addExpr(expr, false);
+      cg.setMappingSet(IS_SAME_I2);
+      ClassGenerator.HoldingContainer second = cg.addExpr(expr, false);
+
+      LogicalExpression fh = FunctionGenerationHelper.getComparator(first, 
second, context.getFunctionRegistry());
+      ClassGenerator.HoldingContainer out = cg.addExpr(fh, false);
+      
cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+    }
+    cg.getEvalBlock()._return(JExpr.TRUE);
+  }
+
+  private final GeneratorMapping EVAL_INSIDE = 
GeneratorMapping.create("setupInterior", "addRecord", null, null);
+  private final GeneratorMapping EVAL_OUTSIDE = 
GeneratorMapping.create("setupInterior", "outputRecordValues", "resetValues", 
"cleanup");
+  private final MappingSet EVAL = new MappingSet("index", "outIndex", 
EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE);
+
+  private void addRecordValues(ClassGenerator<StreamingWindowFramer> cg, 
LogicalExpression[] valueExprs) {
+    cg.setMappingSet(EVAL);
+    for (LogicalExpression ex : valueExprs) {
+      ClassGenerator.HoldingContainer hc = cg.addExpr(ex);
+      
cg.getBlock(ClassGenerator.BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+    }
+    cg.getBlock(ClassGenerator.BlockType.EVAL)._return(JExpr.TRUE);
+  }
+
+  private final GeneratorMapping OUTPUT_WINDOW_VALUES = 
GeneratorMapping.create("setupInterior", "outputWindowValues", null, null);
+  private final MappingSet WINDOW_VALUES = new MappingSet("inIndex" /* read 
index */, "outIndex" /* write index */, "incoming", "outgoing", 
OUTPUT_WINDOW_VALUES, OUTPUT_WINDOW_VALUES);
+
+  private void outputWindowValues(ClassGenerator<StreamingWindowFramer> cg, 
LogicalExpression[] valueExprs) {
+    cg.setMappingSet(WINDOW_VALUES);
+    for (int i = 0; i < valueExprs.length; i++) {
+      ClassGenerator.HoldingContainer hc = cg.addExpr(valueExprs[i]);
+      
cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+    }
+    cg.getEvalBlock()._return(JExpr.TRUE);
+  }
+
+  @Override
+  protected IterOutcome doWork() {
+    StreamingWindowFramer.AggOutcome out = framer.doWork();
+
+    while (out == StreamingWindowFramer.AggOutcome.UPDATE_AGGREGATOR) {
+      framer = null;
+      try {
+        setupNewSchema();
+      } catch (SchemaChangeException e) {
+        return IterOutcome.STOP;
+      }
+      out = framer.doWork();
+    }
+
+    if (out == StreamingWindowFramer.AggOutcome.RETURN_AND_COMPLETE) {
+      done = true;
+    }
+
+    return framer.getOutcome();
+  }
+
+  @Override
+  public int getRecordCount() {
+    return framer.getOutputCount();
+  }
+
+  @Override
+  public void cleanup() {
+    if (framer != null) {
+      framer.cleanup();
+    }
+    super.cleanup();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java
new file mode 100644
index 0000000..b4e3fed
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.window;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.aggregate.InternalBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorWrapper;
+
+import javax.inject.Named;
+
+public abstract class StreamingWindowFrameTemplate implements 
StreamingWindowFramer {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(StreamingWindowFramer.class);
+  private static final String TOO_BIG_ERROR = "Couldn't add value to an empty 
batch. This likely means that a single value is too long for a varlen field.";
+  private static final boolean EXTRA_DEBUG = false;
+  public static final int UNBOUNDED = -1;
+  public static final int CURRENT_ROW = 0;
+  private boolean first = true;
+  private int previousIndex = 0;
+  private int underlyingIndex = -1;
+  private int currentIndex;
+  private boolean pendingOutput = false;
+  private RecordBatch.IterOutcome outcome;
+  private int outputCount = 0;
+  private RecordBatch incoming;
+  private RecordBatch outgoing;
+  private FragmentContext context;
+  private InternalBatch previousBatch = null;
+  private int precedingConfig = UNBOUNDED;
+  private int followingConfig = CURRENT_ROW;
+
+
+  @Override
+  public void setup(FragmentContext context,
+                    RecordBatch incoming,
+                    RecordBatch outgoing,
+                    int precedingConfig,
+                    int followingConfig) throws SchemaChangeException {
+    this.context = context;
+    this.incoming = incoming;
+    this.outgoing = outgoing;
+    this.precedingConfig = precedingConfig;
+    this.followingConfig = followingConfig;
+
+    setupInterior(incoming, outgoing);
+  }
+
+
+  private void allocateOutgoing() {
+    for (VectorWrapper<?> w : outgoing){
+      w.getValueVector().allocateNew();
+    }
+  }
+
+  @Override
+  public RecordBatch.IterOutcome getOutcome() {
+    return outcome;
+  }
+
+  @Override
+  public int getOutputCount() {
+    return outputCount;
+  }
+
+  private AggOutcome tooBigFailure() {
+    context.fail(new Exception(TOO_BIG_ERROR));
+    this.outcome = RecordBatch.IterOutcome.STOP;
+    return AggOutcome.RETURN_OUTCOME;
+  }
+
+  @Override
+  public AggOutcome doWork() {
+    // if we're in the first state, allocate outgoing.
+    try {
+      if (first) {
+        allocateOutgoing();
+      }
+
+      // setup for new output and pick any remainder.
+      if (pendingOutput) {
+        allocateOutgoing();
+        pendingOutput = false;
+        outputToBatch(previousIndex);
+      }
+
+      boolean recordsAdded = false;
+
+      outside: while (true) {
+        if (EXTRA_DEBUG) {
+          logger.trace("Looping from underlying index {}, previous {}, current 
{}", underlyingIndex, previousIndex, currentIndex);
+          logger.debug("Processing {} records in window framer", 
incoming.getRecordCount());
+        }
+        // loop through existing records, adding as necessary.
+        while(incIndex()) {
+          if (previousBatch != null) {
+            boolean isSameFromBatch = isSameFromBatch(previousIndex, 
previousBatch, currentIndex);
+            if (EXTRA_DEBUG) {
+              logger.trace("Same as previous batch: {}, previous index {}, 
current index {}", isSameFromBatch, previousIndex, currentIndex);
+            }
+
+            if(!isSameFromBatch) {
+              resetValues();
+            }
+            previousBatch.clear();
+            previousBatch = null;
+          } else if (!isSame(previousIndex, currentIndex)) {
+            resetValues();
+          }
+
+          addRecord(currentIndex);
+
+          if (!outputToBatch(currentIndex)) {
+            if (outputCount == 0) {
+              return tooBigFailure();
+            }
+
+            // mark the pending output but move forward for the next cycle.
+            pendingOutput = true;
+            incIndex();
+            return setOkAndReturn();
+          }
+
+          recordsAdded = true;
+        }
+
+        if (EXTRA_DEBUG) {
+          logger.debug("Exit Loop from underlying index {}, previous {}, 
current {}", underlyingIndex, previousIndex, currentIndex);
+        }
+
+        previousBatch = new InternalBatch(incoming);
+
+        while (true) {
+          RecordBatch.IterOutcome out = incoming.next();
+          switch (out) {
+            case NONE:
+              outcome = innerOutcome(out, recordsAdded);
+              if (EXTRA_DEBUG) {
+                logger.trace("Received IterOutcome of {}, assigning {} 
outcome", out, outcome);
+              }
+              return AggOutcome.RETURN_AND_COMPLETE;
+            case NOT_YET:
+              outcome = innerOutcome(out, recordsAdded);
+              if (EXTRA_DEBUG) {
+                logger.trace("Received IterOutcome of {}, assigning {} 
outcome", out, outcome);
+              }
+              return AggOutcome.RETURN_OUTCOME;
+
+            case OK_NEW_SCHEMA:
+              if (EXTRA_DEBUG) {
+                logger.trace("Received new schema.  Batch has {} records.", 
incoming.getRecordCount());
+              }
+              resetIndex();
+              return AggOutcome.UPDATE_AGGREGATOR;
+
+            case OK:
+              if (EXTRA_DEBUG) {
+                logger.trace("Received OK with {} records.", 
incoming.getRecordCount());
+              }
+              resetIndex();
+              if (incoming.getRecordCount() == 0) {
+                continue;
+              } else {
+                continue outside;
+              }
+            case STOP:
+            default:
+              outcome = out;
+              if (EXTRA_DEBUG) {
+                logger.trace("Stop received.", incoming.getRecordCount());
+              }
+              return AggOutcome.RETURN_OUTCOME;
+          }
+        }
+      }
+    } finally {
+      first = false;
+    }
+  }
+
+  private RecordBatch.IterOutcome innerOutcome(RecordBatch.IterOutcome 
innerOutcome, boolean newRecordsAdded) {
+    if(newRecordsAdded) {
+      setOkAndReturn();
+      return outcome;
+    }
+    return innerOutcome;
+  }
+
+
+  private final boolean incIndex() {
+    underlyingIndex++;
+
+    if(currentIndex != -1) {
+      previousIndex = currentIndex;
+    }
+
+    if (underlyingIndex >= incoming.getRecordCount()) {
+      return false;
+    }
+
+    currentIndex = getVectorIndex(underlyingIndex);
+    return true;
+  }
+
+  private final void resetIndex() {
+    underlyingIndex = -1;
+    currentIndex = getVectorIndex(underlyingIndex);
+    if (EXTRA_DEBUG) {
+      logger.trace("Reset new indexes: underlying {}, previous {}, current 
{}", underlyingIndex, previousIndex, currentIndex);
+    }
+  }
+
+  private final AggOutcome setOkAndReturn() {
+    if (first) {
+      this.outcome = RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+    } else {
+      this.outcome = RecordBatch.IterOutcome.OK;
+    }
+
+    if (EXTRA_DEBUG) {
+      logger.debug("Setting output count {}", outputCount);
+    }
+    for (VectorWrapper<?> v : outgoing) {
+      v.getValueVector().getMutator().setValueCount(outputCount);
+    }
+    return AggOutcome.RETURN_OUTCOME;
+  }
+
+  private final boolean outputToBatch(int inIndex) {
+    boolean success = outputRecordValues(outputCount)
+        && outputWindowValues(inIndex, outputCount);
+
+    if (success) {
+      outputCount++;
+    }
+
+    return success;
+  }
+
+  @Override
+  public void cleanup() {
+    if(previousBatch != null) {
+      previousBatch.clear();
+      previousBatch = null;
+    }
+  }
+
+  public abstract void setupInterior(@Named("incoming") RecordBatch incoming, 
@Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
+
+  /**
+   * Compares withins from two indexes in the same batch
+   * @param index1 First record index
+   * @param index2 Second record index
+   * @return does within value match
+   */
+  public abstract boolean isSame(@Named("index1") int index1, @Named("index2") 
int index2);
+  /**
+   * Compares withins from one index of given batch (typically previous 
completed batch), and one index from current batch
+   * @param b1Index First record index
+   * @param index2 Second record index
+   * @return does within value match
+   */
+  public abstract boolean isSameFromBatch(@Named("b1Index") int b1Index, 
@Named("b1") InternalBatch b1, @Named("b2Index") int index2);
+  public abstract void addRecord(@Named("index") int index);
+  public abstract boolean outputRecordValues(@Named("outIndex") int outIndex);
+  public abstract boolean outputWindowValues(@Named("inIndex") int inIndex, 
@Named("outIndex") int outIndex);
+  public abstract int getVectorIndex(@Named("recordIndex") int recordIndex);
+
+  public abstract boolean resetValues();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java
new file mode 100644
index 0000000..9588cef
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.window;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface StreamingWindowFramer {
+  public static TemplateClassDefinition<StreamingWindowFramer> 
TEMPLATE_DEFINITION = new 
TemplateClassDefinition<>(StreamingWindowFramer.class, 
StreamingWindowFrameTemplate.class);
+
+
+  public static enum AggOutcome {
+    RETURN_OUTCOME, UPDATE_AGGREGATOR, RETURN_AND_COMPLETE;
+  }
+
+  public abstract void setup(FragmentContext context, RecordBatch incoming, 
RecordBatch outgoing,
+                             int precedingConfig, int followingConfig) throws 
SchemaChangeException;
+
+  public abstract RecordBatch.IterOutcome getOutcome();
+
+  public abstract int getOutputCount();
+
+  public abstract AggOutcome doWork();
+
+  public abstract void cleanup();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWindowRelBase.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWindowRelBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWindowRelBase.java
new file mode 100644
index 0000000..fcf52ee
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWindowRelBase.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.common;
+
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.WindowRelBase;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.rex.RexLiteral;
+
+import java.util.List;
+
+public class DrillWindowRelBase extends WindowRelBase implements DrillRelNode {
+
+  public DrillWindowRelBase(
+      RelOptCluster cluster,
+      RelTraitSet traits,
+      RelNode child,
+      List<RexLiteral> constants,
+      RelDataType rowType,
+      List<Window> windows) {
+    super(cluster, traits, child, constants, rowType, windows);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
index 3fc3b89..a2685ad 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Store;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.config.WindowPOP;
 import org.apache.drill.exec.planner.AbstractOpWrapperVisitor;
 import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
 
@@ -113,6 +114,11 @@ public class StatsCollector {
       return null;
     }
 
+    @Override
+    public Void visitWindowFrame(WindowPOP window, Wrapper value) throws 
RuntimeException {
+      return visitOp(window, value);
+    }
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
index 6b0c3b4..ee035c6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
@@ -82,7 +82,7 @@ public class DrillAggregateRel extends DrillAggregateRelBase 
implements DrillRel
     return builder.build();
   }
 
-  private LogicalExpression toDrill(AggregateCall call, List<String> fn, 
DrillImplementor implementor) {
+  public static LogicalExpression toDrill(AggregateCall call, List<String> fn, 
DrillImplementor implementor) {
     List<LogicalExpression> args = Lists.newArrayList();
     for(Integer i : call.getArgList()) {
       args.add(new FieldReference(fn.get(i)));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java
index c3b0d00..f6c910e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java
@@ -25,7 +25,7 @@ import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelTraitSet;
 
 /**
- * This rule converts a SortRel that has either a offset and fetch into a 
Drill Sort and Limit Rel
+ * This rule converts a SortRel that has either a offset and fetch into a 
Drill Sort and LimitPOP Rel
  */
 public class DrillLimitRule extends RelOptRule {
   public static DrillLimitRule INSTANCE = new DrillLimitRule();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
index 082dacc..fcfced2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
@@ -27,6 +27,9 @@ import 
org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.planner.physical.PrelUtil.ProjectPushInfo;
 import org.eigenbase.rel.ProjectRel;
+import org.eigenbase.rel.ProjectRelBase;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.rules.PushProjector;
 import org.eigenbase.rel.rules.RemoveTrivialProjectRule;
 import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java
index 7eca54e..7ed7885 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java
@@ -27,7 +27,7 @@ import org.eigenbase.relopt.Convention;
 public interface DrillRel extends DrillRelNode {
   /** Calling convention for relational expressions that are "implemented" by
    * generating Drill logical plans. */
-  Convention DRILL_LOGICAL = new Convention.Impl("LOGICAL", DrillRel.class);
+  public static final Convention DRILL_LOGICAL = new 
Convention.Impl("LOGICAL", DrillRel.class);
 
   LogicalOperator implement(DrillImplementor implementor);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8def6e91/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index dbb85b2..1d3ce9a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -38,6 +38,8 @@ import org.apache.drill.exec.planner.physical.ScreenPrule;
 import org.apache.drill.exec.planner.physical.SortConvertPrule;
 import org.apache.drill.exec.planner.physical.SortPrule;
 import org.apache.drill.exec.planner.physical.StreamAggPrule;
+import org.apache.drill.exec.planner.physical.StreamingWindowPrule;
+import org.apache.drill.exec.planner.physical.StreamingWindowPrule;
 import org.apache.drill.exec.planner.physical.UnionAllPrule;
 import org.apache.drill.exec.planner.physical.WriterPrule;
 import org.eigenbase.rel.RelFactories;
@@ -100,6 +102,7 @@ public class DrillRuleSets {
       DrillScanRule.INSTANCE,
       DrillFilterRule.INSTANCE,
       DrillProjectRule.INSTANCE,
+      DrillWindowRule.INSTANCE,
       DrillAggregateRule.INSTANCE,
 
       DrillLimitRule.INSTANCE,
@@ -139,6 +142,8 @@ public class DrillRuleSets {
       HashJoinPrule.INSTANCE,
       FilterPrule.INSTANCE,
       LimitPrule.INSTANCE,
+      WindowPrule.INSTANCE,
+
       WriterPrule.INSTANCE,
       PushLimitToTopN.INSTANCE
 
@@ -182,6 +187,7 @@ public class DrillRuleSets {
     ruleList.add(FilterPrule.INSTANCE);
     ruleList.add(LimitPrule.INSTANCE);
     ruleList.add(WriterPrule.INSTANCE);
+    ruleList.add(StreamingWindowPrule.INSTANCE);
     ruleList.add(PushLimitToTopN.INSTANCE);
     ruleList.add(UnionAllPrule.INSTANCE);
     // ruleList.add(UnionDistinctPrule.INSTANCE);

Reply via email to