http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index b260a8a..0ee7c1d 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -21,9 +21,7 @@ package 
org.apache.hyracks.algebricks.runtime.operators.aggreg;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
-import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.meta.PipelineAssembler;
 import 
org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -31,9 +29,9 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
 import 
org.apache.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.AggregateState;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
@@ -56,10 +54,11 @@ public class NestedPlansAccumulatingAggregatorFactory 
extends AbstractAccumulati
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, 
RecordDescriptor inRecordDesc,
             RecordDescriptor outRecordDescriptor, int[] keys, int[] 
partialKeys, long memoryBudget)
             throws HyracksDataException {
-        final AggregatorOutput outputWriter = new AggregatorOutput(subplans, 
keyFieldIdx.length, decorFieldIdx.length);
+        final AggregatorOutput outputWriter = new AggregatorOutput(subplans, 
keyFieldIdx.length + decorFieldIdx.length);
         final NestedTupleSourceRuntime[] pipelines = new 
NestedTupleSourceRuntime[subplans.length];
         for (int i = 0; i < subplans.length; i++) {
-            pipelines[i] = (NestedTupleSourceRuntime) 
assemblePipeline(subplans[i], outputWriter, ctx);
+            pipelines[i] =
+                    (NestedTupleSourceRuntime) 
PipelineAssembler.assemblePipeline(subplans[i], outputWriter, ctx);
         }
 
         return new IAggregatorDescriptor() {
@@ -78,7 +77,6 @@ public class NestedPlansAccumulatingAggregatorFactory extends 
AbstractAccumulati
                 for (int i = 0; i < pipelines.length; ++i) {
                     pipelines[i].open();
                 }
-
                 // aggregate the first tuple
                 for (int i = 0; i < pipelines.length; i++) {
                     pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
@@ -108,18 +106,8 @@ public class NestedPlansAccumulatingAggregatorFactory 
extends AbstractAccumulati
                 memoryUsageCheck();
 
                 tupleBuilder.reset();
-                ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
-                byte[] data = tb.getByteArray();
-                int[] fieldEnds = tb.getFieldEndOffsets();
-                int start = 0;
-                int offset;
-                for (int i = 0; i < fieldEnds.length; i++) {
-                    if (i > 0) {
-                        start = fieldEnds[i - 1];
-                    }
-                    offset = fieldEnds[i] - start;
-                    tupleBuilder.addField(data, start, offset);
-                }
+                TupleUtils.addFields(outputWriter.getTupleBuilder(), 
tupleBuilder);
+
                 return true;
             }
 
@@ -159,34 +147,10 @@ public class NestedPlansAccumulatingAggregatorFactory 
extends AbstractAccumulati
         };
     }
 
-    private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, 
IFrameWriter writer, IHyracksTaskContext ctx)
-            throws HyracksDataException {
-        // plug the operators
-        IFrameWriter start = writer;
-        IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
-        RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
-        // should enforce protocol
-        boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
-        for (int i = runtimeFactories.length - 1; i >= 0; i--) {
-            IPushRuntime newRuntime = 
runtimeFactories[i].createPushRuntime(ctx)[0];
-            newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : 
newRuntime;
-            start = enforce ? EnforcePushRuntime.enforce(start) : start;
-            newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);
-            if (i > 0) {
-                newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 
1]);
-            } else {
-                // the nts has the same input and output rec. desc.
-                newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]);
-            }
-            start = newRuntime;
-        }
-        return start;
-    }
-
     /**
      * We suppose for now, that each subplan only produces one tuple.
      */
-    private static class AggregatorOutput implements IFrameWriter {
+    public static class AggregatorOutput implements IFrameWriter {
 
         // private ByteBuffer frame;
         private FrameTupleAccessor[] tAccess;
@@ -195,9 +159,8 @@ public class NestedPlansAccumulatingAggregatorFactory 
extends AbstractAccumulati
         private ArrayTupleBuilder tb;
         private AlgebricksPipeline[] subplans;
 
-        public AggregatorOutput(AlgebricksPipeline[] subplans, int numKeys, 
int numDecors) {
+        public AggregatorOutput(AlgebricksPipeline[] subplans, int 
numPropagatedFields) {
             this.subplans = subplans;
-            // this.keyFieldIndexes = keyFieldIndexes;
             int totalAggFields = 0;
             this.inputRecDesc = new RecordDescriptor[subplans.length];
             for (int i = 0; i < subplans.length; i++) {
@@ -205,7 +168,7 @@ public class NestedPlansAccumulatingAggregatorFactory 
extends AbstractAccumulati
                 this.inputRecDesc[i] = rd[rd.length - 1];
                 totalAggFields += subplans[i].getOutputWidth();
             }
-            tb = new ArrayTupleBuilder(numKeys + numDecors + totalAggFields);
+            tb = new ArrayTupleBuilder(numPropagatedFields + totalAggFields);
 
             this.tAccess = new FrameTupleAccessor[inputRecDesc.length];
             for (int i = 0; i < inputRecDesc.length; i++) {
@@ -250,5 +213,4 @@ public class NestedPlansAccumulatingAggregatorFactory 
extends AbstractAccumulati
         public void fail() throws HyracksDataException {
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index f057515..d14f5c1 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -21,9 +21,7 @@ package 
org.apache.hyracks.algebricks.runtime.operators.aggreg;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
-import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.meta.PipelineAssembler;
 import 
org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -37,9 +35,11 @@ import 
org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
 import 
org.apache.hyracks.dataflow.std.group.AbstractAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.AggregateState;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
 
 public class NestedPlansRunningAggregatorFactory extends 
AbstractAggregatorDescriptorFactory {
 
@@ -62,13 +62,14 @@ public class NestedPlansRunningAggregatorFactory extends 
AbstractAggregatorDescr
             RecordDescriptor outRecordDescriptor, int[] keyFields, int[] 
keyFieldsInPartialResults,
             final IFrameWriter writer, long memoryBudget) throws 
HyracksDataException {
         final RunningAggregatorOutput outputWriter =
-                new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length, 
decorFieldIdx.length, writer);
+                new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length 
+ decorFieldIdx.length, writer);
         // should enforce protocol
         boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
         IFrameWriter enforcedWriter = enforce ? 
EnforceFrameWriter.enforce(outputWriter) : outputWriter;
         final NestedTupleSourceRuntime[] pipelines = new 
NestedTupleSourceRuntime[subplans.length];
         for (int i = 0; i < subplans.length; i++) {
-            pipelines[i] = (NestedTupleSourceRuntime) 
assemblePipeline(subplans[i], enforcedWriter, ctx);
+            pipelines[i] =
+                    (NestedTupleSourceRuntime) 
PipelineAssembler.assemblePipeline(subplans[i], enforcedWriter, ctx);
         }
 
         final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder();
@@ -140,30 +141,6 @@ public class NestedPlansRunningAggregatorFactory extends 
AbstractAggregatorDescr
         };
     }
 
-    private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, 
IFrameWriter writer, IHyracksTaskContext ctx)
-            throws HyracksDataException {
-        // should enforce protocol
-        boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
-        // plug the operators
-        IFrameWriter start = writer;
-        IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
-        RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
-        for (int i = runtimeFactories.length - 1; i >= 0; i--) {
-            IPushRuntime newRuntime = 
runtimeFactories[i].createPushRuntime(ctx)[0];
-            newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : 
newRuntime;
-            start = enforce ? EnforceFrameWriter.enforce(start) : start;
-            newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);
-            if (i > 0) {
-                newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 
1]);
-            } else {
-                // the nts has the same input and output rec. desc.
-                newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]);
-            }
-            start = newRuntime;
-        }
-        return start;
-    }
-
     private static class RunningAggregatorOutput implements IFrameWriter {
 
         private final FrameTupleAccessor[] tAccess;
@@ -175,8 +152,8 @@ public class NestedPlansRunningAggregatorFactory extends 
AbstractAggregatorDescr
         private final IFrameWriter outputWriter;
         private final FrameTupleAppender outputAppender;
 
-        public RunningAggregatorOutput(IHyracksTaskContext ctx, 
AlgebricksPipeline[] subplans, int numKeys,
-                int numDecors, IFrameWriter outputWriter) throws 
HyracksDataException {
+        public RunningAggregatorOutput(IHyracksTaskContext ctx, 
AlgebricksPipeline[] subplans, int numPropagatedFields,
+                IFrameWriter outputWriter) throws HyracksDataException {
             this.subplans = subplans;
             this.outputWriter = outputWriter;
 
@@ -188,8 +165,8 @@ public class NestedPlansRunningAggregatorFactory extends 
AbstractAggregatorDescr
                 this.inputRecDesc[i] = rd[rd.length - 1];
                 totalAggFields += subplans[i].getOutputWidth();
             }
-            tb = new ArrayTupleBuilder(numKeys + numDecors + totalAggFields);
-            gbyTb = new ArrayTupleBuilder(numKeys + numDecors);
+            tb = new ArrayTupleBuilder(numPropagatedFields + totalAggFields);
+            gbyTb = new ArrayTupleBuilder(numPropagatedFields);
 
             this.tAccess = new FrameTupleAccessor[inputRecDesc.length];
             for (int i = 0; i < inputRecDesc.length; i++) {
@@ -211,17 +188,7 @@ public class NestedPlansRunningAggregatorFactory extends 
AbstractAggregatorDescr
             accessor.reset(buffer);
             for (int tIndex = 0; tIndex < accessor.getTupleCount(); tIndex++) {
                 tb.reset();
-                byte[] data = gbyTb.getByteArray();
-                int[] fieldEnds = gbyTb.getFieldEndOffsets();
-                int start = 0;
-                int offset = 0;
-                for (int i = 0; i < fieldEnds.length; i++) {
-                    if (i > 0) {
-                        start = fieldEnds[i - 1];
-                    }
-                    offset = fieldEnds[i] - start;
-                    tb.addField(data, start, offset);
-                }
+                TupleUtils.addFields(gbyTb, tb);
                 for (int f = 0; f < w; f++) {
                     tb.addField(accessor, tIndex, f);
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java
index 27354cb..0449cf5 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java
@@ -37,51 +37,55 @@ import 
org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 public abstract class AbstractRunningAggregatePushRuntime<T extends 
IRunningAggregateEvaluator>
         extends AbstractOneInputOneOutputOneFramePushRuntime {
     protected final IHyracksTaskContext ctx;
-    private final IRunningAggregateEvaluatorFactory[] aggFactories;
-    private final Class<T> aggEvalClass;
-    protected final List<T> aggEvals;
-    private final int[] projectionList;
+    private final IRunningAggregateEvaluatorFactory[] runningAggFactories;
+    private final Class<T> runningAggEvalClass;
+    protected final List<T> runningAggEvals;
+    private final int[] projectionColumns;
     private final int[] projectionToOutColumns;
     private final IPointable p = VoidPointable.FACTORY.createPointable();
-    private final ArrayTupleBuilder tupleBuilder;
-    private boolean first;
+    protected ArrayTupleBuilder tupleBuilder;
+    private boolean isFirst;
 
-    public AbstractRunningAggregatePushRuntime(int[] outColumns, 
IRunningAggregateEvaluatorFactory[] aggFactories,
-            int[] projectionList, IHyracksTaskContext ctx, Class<T> 
aggEvalClass) {
+    public AbstractRunningAggregatePushRuntime(int[] projectionColumns, int[] 
runningAggOutColumns,
+            IRunningAggregateEvaluatorFactory[] runningAggFactories, Class<T> 
runningAggEvalClass,
+            IHyracksTaskContext ctx) {
         this.ctx = ctx;
-        this.projectionList = projectionList;
-        this.aggFactories = aggFactories;
-        this.aggEvalClass = aggEvalClass;
-        aggEvals = new ArrayList<>(aggFactories.length);
-        tupleBuilder = new ArrayTupleBuilder(projectionList.length);
-        projectionToOutColumns = new int[projectionList.length];
-
-        for (int j = 0; j < projectionList.length; j++) {
-            projectionToOutColumns[j] = Arrays.binarySearch(outColumns, 
projectionList[j]);
+        this.projectionColumns = projectionColumns;
+        this.runningAggFactories = runningAggFactories;
+        this.runningAggEvalClass = runningAggEvalClass;
+        runningAggEvals = new ArrayList<>(runningAggFactories.length);
+        projectionToOutColumns = new int[projectionColumns.length];
+        for (int j = 0; j < projectionColumns.length; j++) {
+            projectionToOutColumns[j] = 
Arrays.binarySearch(runningAggOutColumns, projectionColumns[j]);
         }
-        first = true;
+        isFirst = true;
     }
 
     @Override
     public void open() throws HyracksDataException {
         super.open();
-        if (first) {
-            first = false;
+        if (isFirst) {
+            isFirst = false;
             init();
         }
-        for (T aggEval : aggEvals) {
-            aggEval.init();
+        for (T runningAggEval : runningAggEvals) {
+            runningAggEval.init();
         }
     }
 
     protected void init() throws HyracksDataException {
+        tupleBuilder = createOutputTupleBuilder(projectionColumns);
         initAccessAppendRef(ctx);
-        for (IRunningAggregateEvaluatorFactory aggFactory : aggFactories) {
-            IRunningAggregateEvaluator aggEval = 
aggFactory.createRunningAggregateEvaluator(ctx);
-            aggEvals.add(aggEvalClass.cast(aggEval));
+        for (IRunningAggregateEvaluatorFactory runningAggFactory : 
runningAggFactories) {
+            IRunningAggregateEvaluator runningAggEval = 
runningAggFactory.createRunningAggregateEvaluator(ctx);
+            runningAggEvals.add(runningAggEvalClass.cast(runningAggEval));
         }
     }
 
+    protected ArrayTupleBuilder createOutputTupleBuilder(int[] projectionList) 
{
+        return new ArrayTupleBuilder(projectionList.length);
+    }
+
     protected void produceTuples(IFrameTupleAccessor accessor, int beginIdx, 
int endIdx) throws HyracksDataException {
         for (int t = beginIdx; t <= endIdx; t++) {
             tRef.reset(accessor, t);
@@ -90,16 +94,16 @@ public abstract class AbstractRunningAggregatePushRuntime<T 
extends IRunningAggr
         }
     }
 
-    private void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor 
accessor, int tIndex,
+    protected void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor 
accessor, int tIndex,
             FrameTupleReference tupleRef) throws HyracksDataException {
         tb.reset();
-        for (int f = 0; f < projectionList.length; f++) {
+        for (int f = 0; f < projectionColumns.length; f++) {
             int k = projectionToOutColumns[f];
             if (k >= 0) {
-                aggEvals.get(k).step(tupleRef, p);
+                runningAggEvals.get(k).step(tupleRef, p);
                 tb.addField(p.getByteArray(), p.getStartOffset(), 
p.getLength());
             } else {
-                tb.addField(accessor, tIndex, projectionList[f]);
+                tb.addField(accessor, tIndex, projectionColumns[f]);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractWindowPushRuntime.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractWindowPushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractWindowPushRuntime.java
deleted file mode 100644
index f0bd204..0000000
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractWindowPushRuntime.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.algebricks.runtime.operators.aggrun;
-
-import java.nio.ByteBuffer;
-
-import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.IWindowAggregateEvaluator;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import 
org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
-
-public abstract class AbstractWindowPushRuntime extends 
AbstractRunningAggregatePushRuntime<IWindowAggregateEvaluator> {
-
-    private final int[] partitionColumnList;
-    private final IBinaryComparatorFactory[] partitionComparatorFactories;
-    private IBinaryComparator[] partitionComparators;
-    private final IBinaryComparatorFactory[] orderComparatorFactories;
-    private IFrame copyFrame;
-    private FrameTupleAccessor copyFrameAccessor;
-    private FrameTupleAccessor frameAccessor;
-    private long frameId;
-    private boolean inPartition;
-
-    public AbstractWindowPushRuntime(int[] outColumns, 
IRunningAggregateEvaluatorFactory[] aggFactories,
-            int[] projectionList, int[] partitionColumnList, 
IBinaryComparatorFactory[] partitionComparatorFactories,
-            IBinaryComparatorFactory[] orderComparatorFactories, 
IHyracksTaskContext ctx) {
-        super(outColumns, aggFactories, projectionList, ctx, 
IWindowAggregateEvaluator.class);
-        this.partitionColumnList = partitionColumnList;
-        this.partitionComparatorFactories = partitionComparatorFactories;
-        this.orderComparatorFactories = orderComparatorFactories;
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        super.open();
-        frameId = 0;
-        inPartition = false;
-    }
-
-    @Override
-    protected void init() throws HyracksDataException {
-        super.init();
-        partitionComparators = 
createBinaryComparators(partitionComparatorFactories);
-        frameAccessor = new FrameTupleAccessor(inputRecordDesc);
-        copyFrame = new VSizeFrame(ctx);
-        copyFrameAccessor = new FrameTupleAccessor(inputRecordDesc);
-        copyFrameAccessor.reset(copyFrame.getBuffer());
-        IBinaryComparator[] orderComparators = 
createBinaryComparators(orderComparatorFactories);
-        for (IWindowAggregateEvaluator aggEval : aggEvals) {
-            aggEval.configure(orderComparators);
-        }
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (inPartition) {
-            endPartition();
-        }
-        super.close();
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        frameAccessor.reset(buffer);
-        int nTuple = frameAccessor.getTupleCount();
-        if (nTuple == 0) {
-            return;
-        }
-
-        if (frameId == 0) {
-            beginPartition();
-        } else {
-            boolean samePartition = 
PreclusteredGroupWriter.sameGroup(copyFrameAccessor,
-                    copyFrameAccessor.getTupleCount() - 1, frameAccessor, 0, 
partitionColumnList, partitionComparators);
-            if (!samePartition) {
-                endPartition();
-                beginPartition();
-            }
-        }
-        if (nTuple == 1) {
-            partitionChunk(frameId, buffer, 0, 0);
-        } else {
-            int tBeginIndex = 0;
-            int tLastIndex = nTuple - 1;
-            for (int tIndex = 1; tIndex <= tLastIndex; tIndex++) {
-                boolean samePartition = 
PreclusteredGroupWriter.sameGroup(frameAccessor, tIndex - 1, frameAccessor,
-                        tIndex, partitionColumnList, partitionComparators);
-                if (!samePartition) {
-                    partitionChunk(frameId, buffer, tBeginIndex, tIndex - 1);
-                    endPartition();
-                    beginPartition();
-                    tBeginIndex = tIndex;
-                }
-            }
-            partitionChunk(frameId, buffer, tBeginIndex, tLastIndex);
-        }
-
-        copyFrame.resize(buffer.capacity());
-        FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
-        copyFrameAccessor.reset(copyFrame.getBuffer());
-        frameId++;
-    }
-
-    private void beginPartition() throws HyracksDataException {
-        if (inPartition) {
-            throw new IllegalStateException();
-        }
-        inPartition = true;
-        beginPartitionImpl();
-    }
-
-    private void partitionChunk(long frameId, ByteBuffer frameBuffer, int 
beginTupleIdx, int endTupleIdx)
-            throws HyracksDataException {
-        if (!inPartition || frameId < 0) {
-            throw new IllegalStateException();
-        }
-        partitionChunkImpl(frameId, frameBuffer, beginTupleIdx, endTupleIdx);
-    }
-
-    private void endPartition() throws HyracksDataException {
-        if (!inPartition) {
-            throw new IllegalStateException();
-        }
-        endPartitionImpl();
-        inPartition = false;
-    }
-
-    void aggInitPartition(long partitionLength) throws HyracksDataException {
-        for (IWindowAggregateEvaluator aggEval : aggEvals) {
-            aggEval.initPartition(partitionLength);
-        }
-    }
-
-    private static IBinaryComparator[] 
createBinaryComparators(IBinaryComparatorFactory[] factories) {
-        IBinaryComparator[] comparators = new 
IBinaryComparator[factories.length];
-        for (int i = 0; i < factories.length; i++) {
-            comparators[i] = factories[i].createBinaryComparator();
-        }
-        return comparators;
-    }
-
-    protected abstract void beginPartitionImpl() throws HyracksDataException;
-
-    protected abstract void partitionChunkImpl(long frameId, ByteBuffer 
frameBuffer, int tBeginIdx, int tEndIdx)
-            throws HyracksDataException;
-
-    protected abstract void endPartitionImpl() throws HyracksDataException;
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/MaterializingWindowPushRuntime.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/MaterializingWindowPushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/MaterializingWindowPushRuntime.java
deleted file mode 100644
index 850430f..0000000
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/MaterializingWindowPushRuntime.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.algebricks.runtime.operators.aggrun;
-
-import java.nio.ByteBuffer;
-
-import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
-import org.apache.hyracks.api.comm.FrameHelper;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.storage.common.arraylist.IntArrayList;
-
-class MaterializingWindowPushRuntime extends AbstractWindowPushRuntime {
-
-    private RunFileWriter run;
-
-    private IntArrayList runInfo;
-
-    private long partitionLength;
-
-    private IFrame curFrame;
-
-    private long curFrameId;
-
-    private long runLastFrameId;
-
-    MaterializingWindowPushRuntime(int[] outColumns, 
IRunningAggregateEvaluatorFactory[] aggFactories,
-            int[] projectionList, int[] partitionColumnList, 
IBinaryComparatorFactory[] partitionComparatorFactories,
-            IBinaryComparatorFactory[] orderComparatorFactories, 
IHyracksTaskContext ctx) {
-        super(outColumns, aggFactories, projectionList, partitionColumnList, 
partitionComparatorFactories,
-                orderComparatorFactories, ctx);
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        super.open();
-        run = null;
-        curFrameId = -1;
-    }
-
-    @Override
-    protected void init() throws HyracksDataException {
-        super.init();
-        curFrame = new VSizeFrame(ctx);
-        runInfo = new IntArrayList(128, 128);
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        super.close();
-        if (run != null) {
-            run.erase();
-        }
-    }
-
-    @Override
-    protected void beginPartitionImpl() {
-        runInfo.clear();
-        partitionLength = 0;
-        if (run != null) {
-            run.rewind();
-        }
-    }
-
-    @Override
-    protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, 
int tBeginIdx, int tEndIdx)
-            throws HyracksDataException {
-        boolean firstChunk = runInfo.isEmpty();
-        runInfo.add(tBeginIdx);
-        runInfo.add(tEndIdx);
-
-        // save frame. first one to memory, remaining ones to the run file
-        if (firstChunk || tBeginIdx == 0) {
-            int pos = frameBuffer.position();
-            frameBuffer.position(0);
-
-            if (firstChunk) {
-                if (frameId != curFrameId) {
-                    curFrame.resize(curFrame.getMinSize() * 
FrameHelper.deserializeNumOfMinFrame(frameBuffer));
-                    curFrame.getBuffer().clear();
-                    curFrame.getBuffer().put(frameBuffer);
-                    curFrameId = frameId;
-                }
-            } else {
-                if (run == null) {
-                    FileReference file = 
ctx.getJobletContext().createManagedWorkspaceFile(getClass().getSimpleName());
-                    run = new RunFileWriter(file, ctx.getIoManager());
-                    run.open();
-                }
-                run.nextFrame(frameBuffer);
-                runLastFrameId = frameId;
-            }
-
-            frameBuffer.position(pos);
-        }
-
-        partitionLength += tEndIdx - tBeginIdx + 1;
-    }
-
-    @Override
-    protected void endPartitionImpl() throws HyracksDataException {
-        aggInitPartition(partitionLength);
-        GeneratedRunFileReader reader = null;
-        try {
-            boolean runRead = false;
-            for (int idx = 0, ln = runInfo.size(); idx < ln; idx += 2) {
-                int tBeginIdx = runInfo.get(idx);
-                int tEndIdx = runInfo.get(idx + 1);
-                if (tBeginIdx == 0 && idx > 0) {
-                    if (reader == null) {
-                        reader = run.createReader();
-                        reader.open();
-                    }
-                    reader.nextFrame(curFrame);
-                    runRead = true;
-                }
-                tAccess.reset(curFrame.getBuffer());
-                produceTuples(tAccess, tBeginIdx, tEndIdx);
-            }
-            if (runRead) {
-                curFrameId = runLastFrameId;
-            }
-        } finally {
-            if (reader != null) {
-                reader.close();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java
index 7c39a21..4ca166f 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java
@@ -28,15 +28,14 @@ import 
org.apache.hyracks.api.exceptions.HyracksDataException;
 
 class RunningAggregatePushRuntime extends 
AbstractRunningAggregatePushRuntime<IRunningAggregateEvaluator> {
 
-    RunningAggregatePushRuntime(int[] outColumns, 
IRunningAggregateEvaluatorFactory[] aggFactories,
-            int[] projectionList, IHyracksTaskContext ctx) {
-        super(outColumns, aggFactories, projectionList, ctx, 
IRunningAggregateEvaluator.class);
+    RunningAggregatePushRuntime(int[] projectionColumns, int[] 
runningAggOutColumns,
+            IRunningAggregateEvaluatorFactory[] runningAggFactories, 
IHyracksTaskContext ctx) {
+        super(projectionColumns, runningAggOutColumns, runningAggFactories, 
IRunningAggregateEvaluator.class, ctx);
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         tAccess.reset(buffer);
-        int nTuple = tAccess.getTupleCount();
-        produceTuples(tAccess, 0, nTuple - 1);
+        produceTuples(tAccess, 0, tAccess.getTupleCount() - 1);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregateRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregateRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregateRuntimeFactory.java
index e97ea7d..d9b3b5c 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregateRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregateRuntimeFactory.java
@@ -29,31 +29,32 @@ public class RunningAggregateRuntimeFactory extends 
AbstractOneInputOneOutputRun
 
     private static final long serialVersionUID = 1L;
 
-    protected final int[] outColumns;
+    protected final int[] runningAggOutColumns;
 
-    protected final IRunningAggregateEvaluatorFactory[] aggFactories;
+    protected final IRunningAggregateEvaluatorFactory[] runningAggFactories;
 
     /**
-     * @param outColumns
-     *            a sorted array of columns into which the result is written to
-     * @param aggFactories
-     * @param projectionList
+     * @param projectionColumns
      *            an array of columns to be projected
+     * @param runningAggOutColumns
+     *            a sorted array of columns into which the result is written to
+     * @param runningAggFactories
      */
-    public RunningAggregateRuntimeFactory(int[] outColumns, 
IRunningAggregateEvaluatorFactory[] aggFactories,
-            int[] projectionList) {
-        super(projectionList);
-        this.outColumns = outColumns;
-        this.aggFactories = aggFactories;
+    public RunningAggregateRuntimeFactory(int[] projectionColumns, int[] 
runningAggOutColumns,
+            IRunningAggregateEvaluatorFactory[] runningAggFactories) {
+        super(projectionColumns);
+        this.runningAggOutColumns = runningAggOutColumns;
+        this.runningAggFactories = runningAggFactories;
     }
 
     @Override
     public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(IHyracksTaskContext ctx) {
-        return new RunningAggregatePushRuntime(outColumns, aggFactories, 
projectionList, ctx);
+        return new RunningAggregatePushRuntime(projectionList, 
runningAggOutColumns, runningAggFactories, ctx);
     }
 
     @Override
     public String toString() {
-        return "running-aggregate " + Arrays.toString(outColumns) + " := " + 
Arrays.toString(aggFactories);
+        return "running-aggregate " + Arrays.toString(runningAggOutColumns) + 
" := "
+                + Arrays.toString(runningAggFactories);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/SimpleWindowPushRuntime.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/SimpleWindowPushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/SimpleWindowPushRuntime.java
deleted file mode 100644
index 61b06bf..0000000
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/SimpleWindowPushRuntime.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.algebricks.runtime.operators.aggrun;
-
-import java.nio.ByteBuffer;
-
-import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-class SimpleWindowPushRuntime extends AbstractWindowPushRuntime {
-
-    SimpleWindowPushRuntime(int[] outColumns, 
IRunningAggregateEvaluatorFactory[] aggFactories, int[] projectionList,
-            int[] partitionColumnList, IBinaryComparatorFactory[] 
partitionComparatorFactories,
-            IBinaryComparatorFactory[] orderComparatorFactories, 
IHyracksTaskContext ctx) {
-        super(outColumns, aggFactories, projectionList, partitionColumnList, 
partitionComparatorFactories,
-                orderComparatorFactories, ctx);
-    }
-
-    @Override
-    protected void beginPartitionImpl() throws HyracksDataException {
-        aggInitPartition(-1);
-    }
-
-    @Override
-    protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, 
int tBeginIdx, int tEndIdx)
-            throws HyracksDataException {
-        tAccess.reset(frameBuffer);
-        produceTuples(tAccess, tBeginIdx, tEndIdx);
-    }
-
-    @Override
-    protected void endPartitionImpl() {
-        // nothing to do
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/WindowRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/WindowRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/WindowRuntimeFactory.java
deleted file mode 100644
index fe7f554..0000000
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/WindowRuntimeFactory.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.algebricks.runtime.operators.aggrun;
-
-import java.util.Arrays;
-
-import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
-import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-
-public class WindowRuntimeFactory extends RunningAggregateRuntimeFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    private final int[] partitionColumnList;
-
-    private final IBinaryComparatorFactory[] partitionComparatorFactories;
-
-    private final boolean partitionMaterialization;
-
-    private final IBinaryComparatorFactory[] orderComparatorFactories;
-
-    public WindowRuntimeFactory(int[] outColumns, 
IRunningAggregateEvaluatorFactory[] aggFactories,
-            int[] projectionList, int[] partitionColumnList, 
IBinaryComparatorFactory[] partitionComparatorFactories,
-            boolean partitionMaterialization, IBinaryComparatorFactory[] 
orderComparatorFactories) {
-        super(outColumns, aggFactories, projectionList);
-        this.partitionColumnList = partitionColumnList;
-        this.partitionComparatorFactories = partitionComparatorFactories;
-        this.partitionMaterialization = partitionMaterialization;
-        this.orderComparatorFactories = orderComparatorFactories;
-    }
-
-    @Override
-    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(IHyracksTaskContext ctx) {
-        return partitionMaterialization
-                ? new MaterializingWindowPushRuntime(outColumns, aggFactories, 
projectionList, partitionColumnList,
-                        partitionComparatorFactories, 
orderComparatorFactories, ctx)
-                : new SimpleWindowPushRuntime(outColumns, aggFactories, 
projectionList, partitionColumnList,
-                        partitionComparatorFactories, 
orderComparatorFactories, ctx);
-    }
-
-    @Override
-    public String toString() {
-        return "window (" + Arrays.toString(partitionColumnList) + ") " + 
Arrays.toString(outColumns) + " := "
-                + Arrays.toString(aggFactories);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index a717794..376a7a1 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -93,4 +93,29 @@ public class PipelineAssembler {
     public IPushRuntime[] getPushRuntime(IPushRuntimeFactory runtimeFactory) {
         return runtimeMap.get(runtimeFactory);
     }
+
+    //TODO: refactoring is needed
+    public static IFrameWriter assemblePipeline(AlgebricksPipeline subplan, 
IFrameWriter writer,
+            IHyracksTaskContext ctx) throws HyracksDataException {
+        // should enforce protocol
+        boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
+        // plug the operators
+        IFrameWriter start = writer;
+        IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
+        RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
+        for (int i = runtimeFactories.length - 1; i >= 0; i--) {
+            IPushRuntime newRuntime = 
runtimeFactories[i].createPushRuntime(ctx)[0];
+            newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : 
newRuntime;
+            start = enforce ? EnforceFrameWriter.enforce(start) : start;
+            newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);
+            if (i > 0) {
+                newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 
1]);
+            } else {
+                // the nts has the same input and output rec. desc.
+                newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]);
+            }
+            start = newRuntime;
+        }
+        return start;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java
new file mode 100644
index 0000000..a63aaf8
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import java.nio.ByteBuffer;
+
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IWindowAggregateEvaluator;
+import 
org.apache.hyracks.algebricks.runtime.operators.aggrun.AbstractRunningAggregatePushRuntime;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import 
org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+
+public abstract class AbstractWindowPushRuntime extends 
AbstractRunningAggregatePushRuntime<IWindowAggregateEvaluator> {
+
+    private final int[] partitionColumns;
+    private final IBinaryComparatorFactory[] partitionComparatorFactories;
+    private IBinaryComparator[] partitionComparators;
+    private final IBinaryComparatorFactory[] orderComparatorFactories;
+    private IFrame copyFrame;
+    private FrameTupleAccessor copyFrameAccessor;
+    private FrameTupleAccessor frameAccessor;
+    private long frameId;
+    private boolean inPartition;
+
+    AbstractWindowPushRuntime(int[] partitionColumns, 
IBinaryComparatorFactory[] partitionComparatorFactories,
+            IBinaryComparatorFactory[] orderComparatorFactories, int[] 
projectionColumns, int[] runningAggOutColumns,
+            IRunningAggregateEvaluatorFactory[] runningAggFactories, 
IHyracksTaskContext ctx) {
+        super(projectionColumns, runningAggOutColumns, runningAggFactories, 
IWindowAggregateEvaluator.class, ctx);
+        this.partitionColumns = partitionColumns;
+        this.partitionComparatorFactories = partitionComparatorFactories;
+        this.orderComparatorFactories = orderComparatorFactories;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        super.open();
+        frameId = 0;
+        inPartition = false;
+    }
+
+    @Override
+    protected void init() throws HyracksDataException {
+        super.init();
+        partitionComparators = 
createBinaryComparators(partitionComparatorFactories);
+        frameAccessor = new FrameTupleAccessor(inputRecordDesc);
+        copyFrame = new VSizeFrame(ctx);
+        copyFrameAccessor = new FrameTupleAccessor(inputRecordDesc);
+        copyFrameAccessor.reset(copyFrame.getBuffer());
+        IBinaryComparator[] orderComparators = 
createBinaryComparators(orderComparatorFactories);
+        for (IWindowAggregateEvaluator runningAggEval : runningAggEvals) {
+            runningAggEval.configure(orderComparators);
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (inPartition) {
+            endPartition();
+        }
+        super.close();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        frameAccessor.reset(buffer);
+        int nTuple = frameAccessor.getTupleCount();
+        if (nTuple == 0) {
+            return;
+        }
+
+        if (frameId == 0) {
+            beginPartition();
+        } else {
+            boolean samePartition = 
PreclusteredGroupWriter.sameGroup(copyFrameAccessor,
+                    copyFrameAccessor.getTupleCount() - 1, frameAccessor, 0, 
partitionColumns, partitionComparators);
+            if (!samePartition) {
+                endPartition();
+                beginPartition();
+            }
+        }
+        if (nTuple == 1) {
+            partitionChunk(frameId, buffer, 0, 0);
+        } else {
+            int tBeginIndex = 0;
+            int tLastIndex = nTuple - 1;
+            for (int tIndex = 1; tIndex <= tLastIndex; tIndex++) {
+                boolean samePartition = 
PreclusteredGroupWriter.sameGroup(frameAccessor, tIndex - 1, frameAccessor,
+                        tIndex, partitionColumns, partitionComparators);
+                if (!samePartition) {
+                    partitionChunk(frameId, buffer, tBeginIndex, tIndex - 1);
+                    endPartition();
+                    beginPartition();
+                    tBeginIndex = tIndex;
+                }
+            }
+            partitionChunk(frameId, buffer, tBeginIndex, tLastIndex);
+        }
+
+        copyFrame.resize(buffer.capacity());
+        FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
+        copyFrameAccessor.reset(copyFrame.getBuffer());
+        frameId++;
+    }
+
+    private void beginPartition() throws HyracksDataException {
+        if (inPartition) {
+            throw new IllegalStateException();
+        }
+        inPartition = true;
+        beginPartitionImpl();
+    }
+
+    private void partitionChunk(long frameId, ByteBuffer frameBuffer, int 
beginTupleIdx, int endTupleIdx)
+            throws HyracksDataException {
+        if (!inPartition || frameId < 0) {
+            throw new IllegalStateException();
+        }
+        partitionChunkImpl(frameId, frameBuffer, beginTupleIdx, endTupleIdx);
+    }
+
+    private void endPartition() throws HyracksDataException {
+        if (!inPartition) {
+            throw new IllegalStateException();
+        }
+        endPartitionImpl();
+        inPartition = false;
+    }
+
+    void runningAggInitPartition(long partitionLength) throws 
HyracksDataException {
+        for (IWindowAggregateEvaluator runningAggEval : runningAggEvals) {
+            runningAggEval.initPartition(partitionLength);
+        }
+    }
+
+    protected static IBinaryComparator[] 
createBinaryComparators(IBinaryComparatorFactory[] factories) {
+        IBinaryComparator[] comparators = new 
IBinaryComparator[factories.length];
+        for (int i = 0; i < factories.length; i++) {
+            comparators[i] = factories[i].createBinaryComparator();
+        }
+        return comparators;
+    }
+
+    protected abstract void beginPartitionImpl() throws HyracksDataException;
+
+    protected abstract void partitionChunkImpl(long frameId, ByteBuffer 
frameBuffer, int tBeginIdx, int tEndIdx)
+            throws HyracksDataException;
+
+    protected abstract void endPartitionImpl() throws HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowRuntimeFactory.java
new file mode 100644
index 0000000..6c52b0b
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowRuntimeFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public abstract class AbstractWindowRuntimeFactory extends 
AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    final int[] partitionColumns;
+
+    final IBinaryComparatorFactory[] partitionComparatorFactories;
+
+    final IBinaryComparatorFactory[] orderComparatorFactories;
+
+    final int[] runningAggOutColumns;
+
+    final IRunningAggregateEvaluatorFactory[] runningAggFactories;
+
+    AbstractWindowRuntimeFactory(int[] partitionColumns, 
IBinaryComparatorFactory[] partitionComparatorFactories,
+            IBinaryComparatorFactory[] orderComparatorFactories, int[] 
projectionColumns, int[] runningAggOutColumns,
+            IRunningAggregateEvaluatorFactory[] runningAggFactories) {
+        super(projectionColumns);
+        this.runningAggOutColumns = runningAggOutColumns;
+        this.runningAggFactories = runningAggFactories;
+        this.partitionColumns = partitionColumns;
+        this.partitionComparatorFactories = partitionComparatorFactories;
+        this.orderComparatorFactories = orderComparatorFactories;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..6177723
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import 
org.apache.hyracks.algebricks.runtime.operators.aggreg.NestedPlansAccumulatingAggregatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.meta.PipelineAssembler;
+import 
org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import 
org.apache.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
+import org.apache.hyracks.dataflow.std.group.AggregateState;
+import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
+
+/**
+ * Aggregator factory for window operators
+ */
+public final class WindowAggregatorDescriptorFactory extends 
AbstractAccumulatingAggregatorDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private AlgebricksPipeline[] subplans;
+
+    public WindowAggregatorDescriptorFactory(AlgebricksPipeline[] subplans) {
+        this.subplans = subplans;
+    }
+
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, 
RecordDescriptor inRecordDesc,
+            RecordDescriptor outRecordDescriptor, int[] keys, int[] 
partialKeys, long memoryBudget)
+            throws HyracksDataException {
+        NestedPlansAccumulatingAggregatorFactory.AggregatorOutput outputWriter 
=
+                new 
NestedPlansAccumulatingAggregatorFactory.AggregatorOutput(subplans, 0);
+        NestedTupleSourceRuntime[] pipelines = new 
NestedTupleSourceRuntime[subplans.length];
+        for (int i = 0; i < subplans.length; i++) {
+            pipelines[i] =
+                    (NestedTupleSourceRuntime) 
PipelineAssembler.assemblePipeline(subplans[i], outputWriter, ctx);
+        }
+
+        return new IAggregatorDescriptor() {
+
+            @Override
+            public void init(ArrayTupleBuilder tupleBuilder, 
IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                outputWriter.getTupleBuilder().reset();
+
+                for (NestedTupleSourceRuntime pipeline : pipelines) {
+                    pipeline.open();
+                }
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, 
IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws 
HyracksDataException {
+                memoryUsageCheck();
+                for (NestedTupleSourceRuntime pipeline : pipelines) {
+                    pipeline.writeTuple(accessor.getBuffer(), tIndex);
+                }
+            }
+
+            @Override
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, 
IFrameTupleAccessor stateAccessor,
+                    int tIndex, AggregateState state) throws 
HyracksDataException {
+                for (int i = 0; i < pipelines.length; i++) {
+                    outputWriter.setInputIdx(i);
+                    pipelines[i].close();
+                }
+                memoryUsageCheck();
+                TupleUtils.addFields(outputWriter.getTupleBuilder(), 
tupleBuilder);
+                return true;
+            }
+
+            @Override
+            public AggregateState createAggregateStates() {
+                return null;
+            }
+
+            @Override
+            public void reset() {
+            }
+
+            @Override
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, 
IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public void close() {
+            }
+
+            private void memoryUsageCheck() {
+                // TODO: implement as in 
NestedPlansAccumulatingAggregatorFactory.memoryUsageCheck()
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
new file mode 100644
index 0000000..ec13c5f
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import java.nio.ByteBuffer;
+
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.storage.common.arraylist.IntArrayList;
+
+/**
+ * Runtime for window operators that performs partition materialization and 
evaluates running aggregates
+ * that require information about number of tuples in the partition.
+ */
+class WindowMaterializingPushRuntime extends AbstractWindowPushRuntime {
+
+    private long partitionLength;
+
+    IFrame curFrame;
+
+    private long curFrameId;
+
+    private int chunkBeginIdx;
+
+    private IntArrayList chunkEndIdx;
+
+    private RunFileWriter run;
+
+    private long runLastFrameId;
+
+    WindowMaterializingPushRuntime(int[] partitionColumns, 
IBinaryComparatorFactory[] partitionComparatorFactories,
+            IBinaryComparatorFactory[] orderComparatorFactories, int[] 
projectionColumns, int[] runningAggOutColumns,
+            IRunningAggregateEvaluatorFactory[] runningAggFactories, 
IHyracksTaskContext ctx) {
+        super(partitionColumns, partitionComparatorFactories, 
orderComparatorFactories, projectionColumns,
+                runningAggOutColumns, runningAggFactories, ctx);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        super.open();
+        run = null;
+        curFrameId = -1;
+    }
+
+    @Override
+    protected void init() throws HyracksDataException {
+        super.init();
+        curFrame = new VSizeFrame(ctx);
+        chunkEndIdx = new IntArrayList(128, 128);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        super.close();
+        if (run != null) {
+            run.erase();
+        }
+    }
+
+    @Override
+    protected void beginPartitionImpl() {
+        chunkEndIdx.clear();
+        partitionLength = 0;
+        if (run != null) {
+            run.rewind();
+        }
+    }
+
+    @Override
+    protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, 
int tBeginIdx, int tEndIdx)
+            throws HyracksDataException {
+        // save the frame. first one to memory, remaining ones to the run file
+        boolean isFirstChunk = chunkEndIdx.isEmpty();
+        if (isFirstChunk) {
+            if (frameId != curFrameId) {
+                int nBlocks = 
FrameHelper.deserializeNumOfMinFrame(frameBuffer);
+                curFrame.ensureFrameSize(curFrame.getMinSize() * nBlocks);
+                int pos = frameBuffer.position();
+                FrameUtils.copyAndFlip(frameBuffer, curFrame.getBuffer());
+                frameBuffer.position(pos);
+                curFrameId = frameId;
+            }
+            chunkBeginIdx = tBeginIdx;
+        } else {
+            if (tBeginIdx != 0) {
+                throw new IllegalStateException(String.valueOf(tBeginIdx));
+            }
+            if (run == null) {
+                FileReference file = 
ctx.getJobletContext().createManagedWorkspaceFile(getClass().getSimpleName());
+                run = new RunFileWriter(file, ctx.getIoManager());
+                run.open();
+            }
+            int pos = frameBuffer.position();
+            frameBuffer.position(0);
+            run.nextFrame(frameBuffer);
+            frameBuffer.position(pos);
+            runLastFrameId = frameId;
+        }
+
+        chunkEndIdx.add(tEndIdx);
+        partitionLength += tEndIdx - tBeginIdx + 1;
+    }
+
+    @Override
+    protected void endPartitionImpl() throws HyracksDataException {
+        runningAggInitPartition(partitionLength);
+
+        int nChunks = getPartitionChunkCount();
+        if (nChunks == 1) {
+            producePartitionTuples(0, null);
+        } else {
+            GeneratedRunFileReader reader = run.createReader();
+            reader.open();
+            try {
+                for (int chunkIdx = 0; chunkIdx < nChunks; chunkIdx++) {
+                    if (chunkIdx > 0) {
+                        reader.nextFrame(curFrame);
+                    }
+                    producePartitionTuples(chunkIdx, reader);
+                }
+                curFrameId = runLastFrameId;
+            } finally {
+                reader.close();
+            }
+        }
+    }
+
+    protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader 
reader) throws HyracksDataException {
+        tAccess.reset(curFrame.getBuffer());
+        produceTuples(tAccess, getTupleBeginIdx(chunkIdx), 
getTupleEndIdx(chunkIdx));
+    }
+
+    int getPartitionChunkCount() {
+        return chunkEndIdx.size();
+    }
+
+    int getTupleBeginIdx(int chunkIdx) {
+        return chunkIdx == 0 ? chunkBeginIdx : 0;
+    }
+
+    int getTupleEndIdx(int chunkIdx) {
+        return chunkEndIdx.get(chunkIdx);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java
new file mode 100644
index 0000000..1b02fb1
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import java.util.Arrays;
+
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+/**
+ * Runtime factory for window operators that performs partition 
materialization and evaluates running aggregates
+ * that require information about number of tuples in the partition.
+ */
+public class WindowMaterializingRuntimeFactory extends 
AbstractWindowRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public WindowMaterializingRuntimeFactory(int[] partitionColumns,
+            IBinaryComparatorFactory[] partitionComparatorFactories,
+            IBinaryComparatorFactory[] orderComparatorFactories, int[] 
projectionColumnsExcludingSubplans,
+            int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] 
runningAggFactories) {
+        super(partitionColumns, partitionComparatorFactories, 
orderComparatorFactories,
+                projectionColumnsExcludingSubplans, runningAggOutColumns, 
runningAggFactories);
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(IHyracksTaskContext ctx) {
+        return new WindowMaterializingPushRuntime(partitionColumns, 
partitionComparatorFactories,
+                orderComparatorFactories, projectionList, 
runningAggOutColumns, runningAggFactories, ctx);
+    }
+
+    @Override
+    public String toString() {
+        return "window [materialize] (" + Arrays.toString(partitionColumns) + 
") "
+                + Arrays.toString(runningAggOutColumns) + " := " + 
Arrays.toString(runningAggFactories);
+    }
+}

Reply via email to