Repository: asterixdb
Updated Branches:
  refs/heads/master b9d55c40e -> fdedf6263


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractWindowPushRuntime.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractWindowPushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractWindowPushRuntime.java
new file mode 100644
index 0000000..f0bd204
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractWindowPushRuntime.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.aggrun;
+
+import java.nio.ByteBuffer;
+
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IWindowAggregateEvaluator;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import 
org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+
+public abstract class AbstractWindowPushRuntime extends 
AbstractRunningAggregatePushRuntime<IWindowAggregateEvaluator> {
+
+    private final int[] partitionColumnList;
+    private final IBinaryComparatorFactory[] partitionComparatorFactories;
+    private IBinaryComparator[] partitionComparators;
+    private final IBinaryComparatorFactory[] orderComparatorFactories;
+    private IFrame copyFrame;
+    private FrameTupleAccessor copyFrameAccessor;
+    private FrameTupleAccessor frameAccessor;
+    private long frameId;
+    private boolean inPartition;
+
+    public AbstractWindowPushRuntime(int[] outColumns, 
IRunningAggregateEvaluatorFactory[] aggFactories,
+            int[] projectionList, int[] partitionColumnList, 
IBinaryComparatorFactory[] partitionComparatorFactories,
+            IBinaryComparatorFactory[] orderComparatorFactories, 
IHyracksTaskContext ctx) {
+        super(outColumns, aggFactories, projectionList, ctx, 
IWindowAggregateEvaluator.class);
+        this.partitionColumnList = partitionColumnList;
+        this.partitionComparatorFactories = partitionComparatorFactories;
+        this.orderComparatorFactories = orderComparatorFactories;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        super.open();
+        frameId = 0;
+        inPartition = false;
+    }
+
+    @Override
+    protected void init() throws HyracksDataException {
+        super.init();
+        partitionComparators = 
createBinaryComparators(partitionComparatorFactories);
+        frameAccessor = new FrameTupleAccessor(inputRecordDesc);
+        copyFrame = new VSizeFrame(ctx);
+        copyFrameAccessor = new FrameTupleAccessor(inputRecordDesc);
+        copyFrameAccessor.reset(copyFrame.getBuffer());
+        IBinaryComparator[] orderComparators = 
createBinaryComparators(orderComparatorFactories);
+        for (IWindowAggregateEvaluator aggEval : aggEvals) {
+            aggEval.configure(orderComparators);
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (inPartition) {
+            endPartition();
+        }
+        super.close();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        frameAccessor.reset(buffer);
+        int nTuple = frameAccessor.getTupleCount();
+        if (nTuple == 0) {
+            return;
+        }
+
+        if (frameId == 0) {
+            beginPartition();
+        } else {
+            boolean samePartition = 
PreclusteredGroupWriter.sameGroup(copyFrameAccessor,
+                    copyFrameAccessor.getTupleCount() - 1, frameAccessor, 0, 
partitionColumnList, partitionComparators);
+            if (!samePartition) {
+                endPartition();
+                beginPartition();
+            }
+        }
+        if (nTuple == 1) {
+            partitionChunk(frameId, buffer, 0, 0);
+        } else {
+            int tBeginIndex = 0;
+            int tLastIndex = nTuple - 1;
+            for (int tIndex = 1; tIndex <= tLastIndex; tIndex++) {
+                boolean samePartition = 
PreclusteredGroupWriter.sameGroup(frameAccessor, tIndex - 1, frameAccessor,
+                        tIndex, partitionColumnList, partitionComparators);
+                if (!samePartition) {
+                    partitionChunk(frameId, buffer, tBeginIndex, tIndex - 1);
+                    endPartition();
+                    beginPartition();
+                    tBeginIndex = tIndex;
+                }
+            }
+            partitionChunk(frameId, buffer, tBeginIndex, tLastIndex);
+        }
+
+        copyFrame.resize(buffer.capacity());
+        FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
+        copyFrameAccessor.reset(copyFrame.getBuffer());
+        frameId++;
+    }
+
+    private void beginPartition() throws HyracksDataException {
+        if (inPartition) {
+            throw new IllegalStateException();
+        }
+        inPartition = true;
+        beginPartitionImpl();
+    }
+
+    private void partitionChunk(long frameId, ByteBuffer frameBuffer, int 
beginTupleIdx, int endTupleIdx)
+            throws HyracksDataException {
+        if (!inPartition || frameId < 0) {
+            throw new IllegalStateException();
+        }
+        partitionChunkImpl(frameId, frameBuffer, beginTupleIdx, endTupleIdx);
+    }
+
+    private void endPartition() throws HyracksDataException {
+        if (!inPartition) {
+            throw new IllegalStateException();
+        }
+        endPartitionImpl();
+        inPartition = false;
+    }
+
+    void aggInitPartition(long partitionLength) throws HyracksDataException {
+        for (IWindowAggregateEvaluator aggEval : aggEvals) {
+            aggEval.initPartition(partitionLength);
+        }
+    }
+
+    private static IBinaryComparator[] 
createBinaryComparators(IBinaryComparatorFactory[] factories) {
+        IBinaryComparator[] comparators = new 
IBinaryComparator[factories.length];
+        for (int i = 0; i < factories.length; i++) {
+            comparators[i] = factories[i].createBinaryComparator();
+        }
+        return comparators;
+    }
+
+    protected abstract void beginPartitionImpl() throws HyracksDataException;
+
+    protected abstract void partitionChunkImpl(long frameId, ByteBuffer 
frameBuffer, int tBeginIdx, int tEndIdx)
+            throws HyracksDataException;
+
+    protected abstract void endPartitionImpl() throws HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/MaterializingWindowPushRuntime.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/MaterializingWindowPushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/MaterializingWindowPushRuntime.java
new file mode 100644
index 0000000..850430f
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/MaterializingWindowPushRuntime.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.aggrun;
+
+import java.nio.ByteBuffer;
+
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.storage.common.arraylist.IntArrayList;
+
+class MaterializingWindowPushRuntime extends AbstractWindowPushRuntime {
+
+    private RunFileWriter run;
+
+    private IntArrayList runInfo;
+
+    private long partitionLength;
+
+    private IFrame curFrame;
+
+    private long curFrameId;
+
+    private long runLastFrameId;
+
+    MaterializingWindowPushRuntime(int[] outColumns, 
IRunningAggregateEvaluatorFactory[] aggFactories,
+            int[] projectionList, int[] partitionColumnList, 
IBinaryComparatorFactory[] partitionComparatorFactories,
+            IBinaryComparatorFactory[] orderComparatorFactories, 
IHyracksTaskContext ctx) {
+        super(outColumns, aggFactories, projectionList, partitionColumnList, 
partitionComparatorFactories,
+                orderComparatorFactories, ctx);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        super.open();
+        run = null;
+        curFrameId = -1;
+    }
+
+    @Override
+    protected void init() throws HyracksDataException {
+        super.init();
+        curFrame = new VSizeFrame(ctx);
+        runInfo = new IntArrayList(128, 128);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        super.close();
+        if (run != null) {
+            run.erase();
+        }
+    }
+
+    @Override
+    protected void beginPartitionImpl() {
+        runInfo.clear();
+        partitionLength = 0;
+        if (run != null) {
+            run.rewind();
+        }
+    }
+
+    @Override
+    protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, 
int tBeginIdx, int tEndIdx)
+            throws HyracksDataException {
+        boolean firstChunk = runInfo.isEmpty();
+        runInfo.add(tBeginIdx);
+        runInfo.add(tEndIdx);
+
+        // save frame. first one to memory, remaining ones to the run file
+        if (firstChunk || tBeginIdx == 0) {
+            int pos = frameBuffer.position();
+            frameBuffer.position(0);
+
+            if (firstChunk) {
+                if (frameId != curFrameId) {
+                    curFrame.resize(curFrame.getMinSize() * 
FrameHelper.deserializeNumOfMinFrame(frameBuffer));
+                    curFrame.getBuffer().clear();
+                    curFrame.getBuffer().put(frameBuffer);
+                    curFrameId = frameId;
+                }
+            } else {
+                if (run == null) {
+                    FileReference file = 
ctx.getJobletContext().createManagedWorkspaceFile(getClass().getSimpleName());
+                    run = new RunFileWriter(file, ctx.getIoManager());
+                    run.open();
+                }
+                run.nextFrame(frameBuffer);
+                runLastFrameId = frameId;
+            }
+
+            frameBuffer.position(pos);
+        }
+
+        partitionLength += tEndIdx - tBeginIdx + 1;
+    }
+
+    @Override
+    protected void endPartitionImpl() throws HyracksDataException {
+        aggInitPartition(partitionLength);
+        GeneratedRunFileReader reader = null;
+        try {
+            boolean runRead = false;
+            for (int idx = 0, ln = runInfo.size(); idx < ln; idx += 2) {
+                int tBeginIdx = runInfo.get(idx);
+                int tEndIdx = runInfo.get(idx + 1);
+                if (tBeginIdx == 0 && idx > 0) {
+                    if (reader == null) {
+                        reader = run.createReader();
+                        reader.open();
+                    }
+                    reader.nextFrame(curFrame);
+                    runRead = true;
+                }
+                tAccess.reset(curFrame.getBuffer());
+                produceTuples(tAccess, tBeginIdx, tEndIdx);
+            }
+            if (runRead) {
+                curFrameId = runLastFrameId;
+            }
+        } finally {
+            if (reader != null) {
+                reader.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java
new file mode 100644
index 0000000..7c39a21
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.aggrun;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+class RunningAggregatePushRuntime extends 
AbstractRunningAggregatePushRuntime<IRunningAggregateEvaluator> {
+
+    RunningAggregatePushRuntime(int[] outColumns, 
IRunningAggregateEvaluatorFactory[] aggFactories,
+            int[] projectionList, IHyracksTaskContext ctx) {
+        super(outColumns, aggFactories, projectionList, ctx, 
IRunningAggregateEvaluator.class);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        tAccess.reset(buffer);
+        int nTuple = tAccess.getTupleCount();
+        produceTuples(tAccess, 0, nTuple - 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregateRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregateRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregateRuntimeFactory.java
new file mode 100644
index 0000000..e97ea7d
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregateRuntimeFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.aggrun;
+
+import java.util.Arrays;
+
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class RunningAggregateRuntimeFactory extends 
AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    protected final int[] outColumns;
+
+    protected final IRunningAggregateEvaluatorFactory[] aggFactories;
+
+    /**
+     * @param outColumns
+     *            a sorted array of columns into which the result is written to
+     * @param aggFactories
+     * @param projectionList
+     *            an array of columns to be projected
+     */
+    public RunningAggregateRuntimeFactory(int[] outColumns, 
IRunningAggregateEvaluatorFactory[] aggFactories,
+            int[] projectionList) {
+        super(projectionList);
+        this.outColumns = outColumns;
+        this.aggFactories = aggFactories;
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(IHyracksTaskContext ctx) {
+        return new RunningAggregatePushRuntime(outColumns, aggFactories, 
projectionList, ctx);
+    }
+
+    @Override
+    public String toString() {
+        return "running-aggregate " + Arrays.toString(outColumns) + " := " + 
Arrays.toString(aggFactories);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/SimpleWindowPushRuntime.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/SimpleWindowPushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/SimpleWindowPushRuntime.java
new file mode 100644
index 0000000..61b06bf
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/SimpleWindowPushRuntime.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.aggrun;
+
+import java.nio.ByteBuffer;
+
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+class SimpleWindowPushRuntime extends AbstractWindowPushRuntime {
+
+    SimpleWindowPushRuntime(int[] outColumns, 
IRunningAggregateEvaluatorFactory[] aggFactories, int[] projectionList,
+            int[] partitionColumnList, IBinaryComparatorFactory[] 
partitionComparatorFactories,
+            IBinaryComparatorFactory[] orderComparatorFactories, 
IHyracksTaskContext ctx) {
+        super(outColumns, aggFactories, projectionList, partitionColumnList, 
partitionComparatorFactories,
+                orderComparatorFactories, ctx);
+    }
+
+    @Override
+    protected void beginPartitionImpl() throws HyracksDataException {
+        aggInitPartition(-1);
+    }
+
+    @Override
+    protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, 
int tBeginIdx, int tEndIdx)
+            throws HyracksDataException {
+        tAccess.reset(frameBuffer);
+        produceTuples(tAccess, tBeginIdx, tEndIdx);
+    }
+
+    @Override
+    protected void endPartitionImpl() {
+        // nothing to do
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/WindowRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/WindowRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/WindowRuntimeFactory.java
new file mode 100644
index 0000000..fe7f554
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/WindowRuntimeFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.aggrun;
+
+import java.util.Arrays;
+
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class WindowRuntimeFactory extends RunningAggregateRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int[] partitionColumnList;
+
+    private final IBinaryComparatorFactory[] partitionComparatorFactories;
+
+    private final boolean partitionMaterialization;
+
+    private final IBinaryComparatorFactory[] orderComparatorFactories;
+
+    public WindowRuntimeFactory(int[] outColumns, 
IRunningAggregateEvaluatorFactory[] aggFactories,
+            int[] projectionList, int[] partitionColumnList, 
IBinaryComparatorFactory[] partitionComparatorFactories,
+            boolean partitionMaterialization, IBinaryComparatorFactory[] 
orderComparatorFactories) {
+        super(outColumns, aggFactories, projectionList);
+        this.partitionColumnList = partitionColumnList;
+        this.partitionComparatorFactories = partitionComparatorFactories;
+        this.partitionMaterialization = partitionMaterialization;
+        this.orderComparatorFactories = orderComparatorFactories;
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(IHyracksTaskContext ctx) {
+        return partitionMaterialization
+                ? new MaterializingWindowPushRuntime(outColumns, aggFactories, 
projectionList, partitionColumnList,
+                        partitionComparatorFactories, 
orderComparatorFactories, ctx)
+                : new SimpleWindowPushRuntime(outColumns, aggFactories, 
projectionList, partitionColumnList,
+                        partitionComparatorFactories, 
orderComparatorFactories, ctx);
+    }
+
+    @Override
+    public String toString() {
+        return "window (" + Arrays.toString(partitionColumnList) + ") " + 
Arrays.toString(outColumns) + " := "
+                + Arrays.toString(aggFactories);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
deleted file mode 100644
index ca58d4d..0000000
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.operators.std;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
-import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
-import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-
-public class RunningAggregateRuntimeFactory extends 
AbstractOneInputOneOutputRuntimeFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    private final int[] outColumns;
-    private final IRunningAggregateEvaluatorFactory[] runningAggregates;
-
-    /**
-     * @param outColumns
-     *            a sorted array of columns into which the result is written to
-     * @param runningAggregates
-     * @param projectionList
-     *            an array of columns to be projected
-     */
-
-    public RunningAggregateRuntimeFactory(int[] outColumns, 
IRunningAggregateEvaluatorFactory[] runningAggregates,
-            int[] projectionList) {
-        super(projectionList);
-        this.outColumns = outColumns;
-        this.runningAggregates = runningAggregates;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("running-aggregate [");
-        for (int i = 0; i < outColumns.length; i++) {
-            if (i > 0) {
-                sb.append(", ");
-            }
-            sb.append(outColumns[i]);
-        }
-        sb.append("] := [");
-        for (int i = 0; i < runningAggregates.length; i++) {
-            if (i > 0) {
-                sb.append(", ");
-            }
-            sb.append(runningAggregates[i]);
-        }
-        sb.append("]");
-        return sb.toString();
-    }
-
-    @Override
-    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
-            throws HyracksDataException {
-        final int[] projectionToOutColumns = new int[projectionList.length];
-        for (int j = 0; j < projectionList.length; j++) {
-            projectionToOutColumns[j] = Arrays.binarySearch(outColumns, 
projectionList[j]);
-        }
-
-        return new AbstractOneInputOneOutputOneFramePushRuntime() {
-            private final IPointable p = 
VoidPointable.FACTORY.createPointable();
-            private final IRunningAggregateEvaluator[] raggs = new 
IRunningAggregateEvaluator[runningAggregates.length];
-            private final ArrayTupleBuilder tupleBuilder = new 
ArrayTupleBuilder(projectionList.length);
-            private boolean first = true;
-
-            @Override
-            public void open() throws HyracksDataException {
-                initAccessAppendRef(ctx);
-                if (first) {
-                    first = false;
-                    int n = runningAggregates.length;
-                    for (int i = 0; i < n; i++) {
-                        raggs[i] = 
runningAggregates[i].createRunningAggregateEvaluator(ctx);
-                    }
-                }
-                for (int i = 0; i < runningAggregates.length; i++) {
-                    raggs[i].init();
-                }
-                super.open();
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
-                tAccess.reset(buffer);
-                int nTuple = tAccess.getTupleCount();
-                for (int t = 0; t < nTuple; t++) {
-                    tRef.reset(tAccess, t);
-                    produceTuple(tupleBuilder, tAccess, t, tRef);
-                    appendToFrameFromTupleBuilder(tupleBuilder);
-                }
-            }
-
-            private void produceTuple(ArrayTupleBuilder tb, 
IFrameTupleAccessor accessor, int tIndex,
-                    FrameTupleReference tupleRef) throws HyracksDataException {
-                tb.reset();
-                for (int f = 0; f < projectionList.length; f++) {
-                    int k = projectionToOutColumns[f];
-                    if (k >= 0) {
-                        raggs[k].step(tupleRef, p);
-                        tb.addField(p.getByteArray(), p.getStartOffset(), 
p.getLength());
-                    } else {
-                        tb.addField(accessor, tIndex, projectionList[f]);
-                    }
-                }
-            }
-
-            @Override
-            public void flush() throws HyracksDataException {
-                appender.flush(writer);
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
 
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index d4944af..d235d6d 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -51,7 +51,7 @@ import 
org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.std.PrinterRuntimeFactory;
-import 
org.apache.hyracks.algebricks.runtime.operators.std.RunningAggregateRuntimeFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.aggrun.RunningAggregateRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.std.SplitOperatorDescriptor;
 import 
org.apache.hyracks.algebricks.runtime.operators.std.StreamLimitRuntimeFactory;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 7d126ac..690870c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -152,6 +152,7 @@ public class ErrorCode {
     public static final int ONE_TUPLE_RANGEMAP_EXPECTED = 116;
     public static final int NO_RANGEMAP_PRODUCED = 117;
     public static final int RANGEMAP_NOT_FOUND = 118;
+    public static final int UNSUPPORTED_WINDOW_SPEC = 119;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 50e92b3..413bbee 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -135,6 +135,7 @@
 116 = One tuple rangemap is expected
 117 = No range map produced for parallel sort
 118 = Range map was not found for parallel sort
+119 = Unsupported window specification: PARTITION BY %1$s, ORDER BY %2$s
 
 10000 = The given rule collection %1$s is not an instance of the List class.
 10001 = Cannot compose partition constraint %1$s with %2$s

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
index 915c63d..dabdd4f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
@@ -49,6 +49,11 @@ public class RunFileWriter implements IFrameWriter {
         maxOutputFrameSize = 0;
     }
 
+    public void rewind() {
+        size = 0;
+        maxOutputFrameSize = 0;
+    }
+
     @Override
     public void fail() throws HyracksDataException {
         ioManager.close(handle);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index ca78046..c018e9d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.dataflow.std.group.preclustered;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -138,9 +139,10 @@ public class PreclusteredGroupWriter implements 
IFrameWriter {
         }
     }
 
-    private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, 
int prevTupleIndex,
-            FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws 
HyracksDataException {
-        if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, 
currTupleIndex)) {
+    private void switchGroupIfRequired(IFrameTupleAccessor prevTupleAccessor, 
int prevTupleIndex,
+            IFrameTupleAccessor currTupleAccessor, int currTupleIndex) throws 
HyracksDataException {
+        if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, 
currTupleIndex, groupFields,
+                comparators)) {
             writeOutput(prevTupleAccessor, prevTupleIndex);
 
             tupleBuilder.reset();
@@ -153,7 +155,7 @@ public class PreclusteredGroupWriter implements 
IFrameWriter {
         }
     }
 
-    private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int 
lastTupleIndex)
+    private void writeOutput(final IFrameTupleAccessor lastTupleAccessor, int 
lastTupleIndex)
             throws HyracksDataException {
 
         tupleBuilder.reset();
@@ -171,8 +173,8 @@ public class PreclusteredGroupWriter implements 
IFrameWriter {
 
     }
 
-    private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, 
FrameTupleAccessor a2, int t2Idx)
-            throws HyracksDataException {
+    public static boolean sameGroup(IFrameTupleAccessor a1, int t1Idx, 
IFrameTupleAccessor a2, int t2Idx,
+            int[] groupFields, IBinaryComparator[] comparators) throws 
HyracksDataException {
         for (int i = 0; i < comparators.length; ++i) {
             int fIdx = groupFields[i];
             int s1 = a1.getAbsoluteFieldStartOffset(t1Idx, fIdx);

Reply via email to