Repository: asterixdb
Updated Branches:
  refs/heads/master 1cd58bced -> f2c18aa96


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
new file mode 100644
index 0000000..e7daf11
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspector;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.DataUtils;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
+
+/**
+ * Runtime for window operators that performs partition materialization and 
can evaluate running aggregates
+ * as well as regular aggregates (in nested plans) over window frames.
+ */
+public class WindowNestedPlansPushRuntime extends 
WindowMaterializingPushRuntime {
+
+    private final boolean frameValueExists;
+
+    private final IScalarEvaluatorFactory[] frameValueEvalFactories;
+
+    private IScalarEvaluator[] frameValueEvals;
+
+    private IPointable[] frameValuePointables;
+
+    private final IBinaryComparatorFactory[] frameValueComparatorFactories;
+
+    private IBinaryComparator[] frameValueComparators;
+
+    private final boolean frameStartExists;
+
+    private final IScalarEvaluatorFactory[] frameStartEvalFactories;
+
+    private IScalarEvaluator[] frameStartEvals;
+
+    private IPointable[] frameStartPointables;
+
+    private final boolean frameEndExists;
+
+    private final IScalarEvaluatorFactory[] frameEndEvalFactories;
+
+    private IScalarEvaluator[] frameEndEvals;
+
+    private IPointable[] frameEndPointables;
+
+    private final boolean frameExcludeExists;
+
+    private final IScalarEvaluatorFactory[] frameExcludeEvalFactories;
+
+    private IScalarEvaluator[] frameExcludeEvals;
+
+    private final int frameExcludeNegationStartIdx;
+
+    private IPointable[] frameExcludePointables;
+
+    private IPointable frameExcludePointable2;
+
+    private final IBinaryComparatorFactory[] frameExcludeComparatorFactories;
+
+    private IBinaryComparator[] frameExcludeComparators;
+
+    private final boolean frameOffsetExists;
+
+    private final IScalarEvaluatorFactory frameOffsetEvalFactory;
+
+    private IScalarEvaluator frameOffsetEval;
+
+    private IPointable frameOffsetPointable;
+
+    private final IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
+
+    private final int frameMaxObjects;
+
+    private final int nestedAggOutSchemaSize;
+
+    private final WindowAggregatorDescriptorFactory nestedAggFactory;
+
+    private IAggregatorDescriptor nestedAgg;
+
+    private IFrame copyFrame2;
+
+    private IFrame runFrame;
+
+    private FrameTupleAccessor tAccess2;
+
+    private FrameTupleReference tRef2;
+
+    private IBinaryIntegerInspector bii;
+
+    WindowNestedPlansPushRuntime(int[] partitionColumns, 
IBinaryComparatorFactory[] partitionComparatorFactories,
+            IBinaryComparatorFactory[] orderComparatorFactories, 
IScalarEvaluatorFactory[] frameValueEvalFactories,
+            IBinaryComparatorFactory[] frameValueComparatorFactories, 
IScalarEvaluatorFactory[] frameStartEvalFactories,
+            IScalarEvaluatorFactory[] frameEndEvalFactories, 
IScalarEvaluatorFactory[] frameExcludeEvalFactories,
+            int frameExcludeNegationStartIdx, IBinaryComparatorFactory[] 
frameExcludeComparatorFactories,
+            IScalarEvaluatorFactory frameOffsetEvalFactory,
+            IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int 
frameMaxObjects, int[] projectionColumns,
+            int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] 
runningAggFactories,
+            int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory 
nestedAggFactory, IHyracksTaskContext ctx) {
+        super(partitionColumns, partitionComparatorFactories, 
orderComparatorFactories, projectionColumns,
+                runningAggOutColumns, runningAggFactories, ctx);
+        this.frameValueEvalFactories = frameValueEvalFactories;
+        this.frameValueExists = frameValueEvalFactories != null && 
frameValueEvalFactories.length > 0;
+        this.frameStartEvalFactories = frameStartEvalFactories;
+        this.frameStartExists = frameStartEvalFactories != null && 
frameStartEvalFactories.length > 0;
+        this.frameEndEvalFactories = frameEndEvalFactories;
+        this.frameEndExists = frameEndEvalFactories != null && 
frameEndEvalFactories.length > 0;
+        this.frameValueComparatorFactories = frameValueComparatorFactories;
+        this.frameExcludeEvalFactories = frameExcludeEvalFactories;
+        this.frameExcludeExists = frameExcludeEvalFactories != null && 
frameExcludeEvalFactories.length > 0;
+        this.frameExcludeComparatorFactories = frameExcludeComparatorFactories;
+        this.frameExcludeNegationStartIdx = frameExcludeNegationStartIdx;
+        this.frameOffsetExists = frameOffsetEvalFactory != null;
+        this.frameOffsetEvalFactory = frameOffsetEvalFactory;
+        this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory;
+        this.frameMaxObjects = frameMaxObjects;
+        this.nestedAggFactory = nestedAggFactory;
+        this.nestedAggOutSchemaSize = nestedAggOutSchemaSize;
+    }
+
+    @Override
+    protected void init() throws HyracksDataException {
+        super.init();
+
+        if (frameValueExists) {
+            frameValueEvals = createEvaluators(frameValueEvalFactories, ctx);
+            frameValueComparators = 
createBinaryComparators(frameValueComparatorFactories);
+            frameValuePointables = 
createPointables(frameValueEvalFactories.length);
+        }
+        if (frameStartExists) {
+            frameStartEvals = createEvaluators(frameStartEvalFactories, ctx);
+            frameStartPointables = 
createPointables(frameStartEvalFactories.length);
+        }
+        if (frameEndExists) {
+            frameEndEvals = createEvaluators(frameEndEvalFactories, ctx);
+            frameEndPointables = 
createPointables(frameEndEvalFactories.length);
+        }
+        if (frameExcludeExists) {
+            frameExcludeEvals = createEvaluators(frameExcludeEvalFactories, 
ctx);
+            frameExcludeComparators = 
createBinaryComparators(frameExcludeComparatorFactories);
+            frameExcludePointables = 
createPointables(frameExcludeEvalFactories.length);
+            frameExcludePointable2 = VoidPointable.FACTORY.createPointable();
+        }
+        if (frameOffsetExists) {
+            frameOffsetEval = 
frameOffsetEvalFactory.createScalarEvaluator(ctx);
+            frameOffsetPointable = VoidPointable.FACTORY.createPointable();
+            bii = 
binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
+        }
+
+        nestedAgg = nestedAggFactory.createAggregator(ctx, null, null, null, 
null, null, -1);
+
+        runFrame = new VSizeFrame(ctx);
+        copyFrame2 = new VSizeFrame(ctx);
+        tAccess2 = new FrameTupleAccessor(inputRecordDesc);
+        tRef2 = new FrameTupleReference();
+    }
+
+    @Override
+    protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader 
reader) throws HyracksDataException {
+        long readerPos = -1;
+        int nChunks = getPartitionChunkCount();
+        if (nChunks > 1) {
+            readerPos = reader.position();
+            if (chunkIdx == 0) {
+                ByteBuffer curFrameBuffer = curFrame.getBuffer();
+                int nBlocks = 
FrameHelper.deserializeNumOfMinFrame(curFrameBuffer);
+                copyFrame2.ensureFrameSize(copyFrame2.getMinSize() * nBlocks);
+                int pos = curFrameBuffer.position();
+                FrameUtils.copyAndFlip(curFrameBuffer, copyFrame2.getBuffer());
+                curFrameBuffer.position(pos);
+            }
+        }
+
+        tAccess.reset(curFrame.getBuffer());
+        int tBeginIdx = getTupleBeginIdx(chunkIdx);
+        int tEndIdx = getTupleEndIdx(chunkIdx);
+        for (int tIdx = tBeginIdx; tIdx <= tEndIdx; tIdx++) {
+            tRef.reset(tAccess, tIdx);
+
+            // running aggregates
+            produceTuple(tupleBuilder, tAccess, tIdx, tRef);
+
+            // frame boundaries
+            if (frameStartExists) {
+                for (int i = 0; i < frameStartEvals.length; i++) {
+                    frameStartEvals[i].evaluate(tRef, frameStartPointables[i]);
+                }
+            }
+            if (frameEndExists) {
+                for (int i = 0; i < frameEndEvals.length; i++) {
+                    frameEndEvals[i].evaluate(tRef, frameEndPointables[i]);
+                }
+            }
+            if (frameExcludeExists) {
+                for (int i = 0; i < frameExcludeEvals.length; i++) {
+                    frameExcludeEvals[i].evaluate(tRef, 
frameExcludePointables[i]);
+                }
+            }
+            int toSkip = 0;
+            if (frameOffsetExists) {
+                frameOffsetEval.evaluate(tRef, frameOffsetPointable);
+                toSkip = 
bii.getIntegerValue(frameOffsetPointable.getByteArray(), 
frameOffsetPointable.getStartOffset(),
+                        frameOffsetPointable.getLength());
+            }
+            int toWrite = frameMaxObjects;
+
+            // aggregator created by WindowAggregatorDescriptorFactory does 
not process argument tuple in init()
+            nestedAgg.init(null, null, -1, null);
+
+            if (nChunks > 1) {
+                reader.seek(0);
+            }
+
+            frame_loop: for (int chunkIdx2 = 0; chunkIdx2 < nChunks; 
chunkIdx2++) {
+                IFrame innerFrame;
+                if (chunkIdx2 == 0) {
+                    // first chunk's frame is always in memory
+                    innerFrame = chunkIdx == 0 ? curFrame : copyFrame2;
+                } else {
+                    reader.nextFrame(runFrame);
+                    innerFrame = runFrame;
+                }
+                tAccess2.reset(innerFrame.getBuffer());
+
+                int tBeginIdx2 = getTupleBeginIdx(chunkIdx2);
+                int tEndIdx2 = getTupleEndIdx(chunkIdx2);
+                for (int tIdx2 = tBeginIdx2; tIdx2 <= tEndIdx2; tIdx2++) {
+                    tRef2.reset(tAccess2, tIdx2);
+
+                    if (frameStartExists || frameEndExists) {
+                        for (int frameValueIdx = 0; frameValueIdx < 
frameValueEvals.length; frameValueIdx++) {
+                            frameValueEvals[frameValueIdx].evaluate(tRef2, 
frameValuePointables[frameValueIdx]);
+                        }
+                        if (frameStartExists
+                                && compare(frameValuePointables, 
frameStartPointables, frameValueComparators) < 0) {
+                            // skip if value < start
+                            continue;
+                        }
+                        if (frameEndExists
+                                && compare(frameValuePointables, 
frameEndPointables, frameValueComparators) > 0) {
+                            // skip and exit if value > end
+                            break frame_loop;
+                        }
+                    }
+                    if (frameExcludeExists && isExcluded()) {
+                        // skip if excluded
+                        continue;
+                    }
+
+                    if (toSkip > 0) {
+                        // skip if offset hasn't been reached
+                        toSkip--;
+                        continue;
+                    }
+
+                    if (toWrite != 0) {
+                        nestedAgg.aggregate(tAccess2, tIdx2, null, -1, null);
+                    }
+                    if (toWrite > 0) {
+                        toWrite--;
+                    }
+                    if (toWrite == 0) {
+                        break frame_loop;
+                    }
+                }
+            }
+
+            nestedAgg.outputFinalResult(tupleBuilder, null, -1, null);
+            appendToFrameFromTupleBuilder(tupleBuilder);
+        }
+
+        if (nChunks > 1) {
+            reader.seek(readerPos);
+        }
+    }
+
+    private boolean isExcluded() throws HyracksDataException {
+        for (int i = 0; i < frameExcludeEvals.length; i++) {
+            frameExcludeEvals[i].evaluate(tRef2, frameExcludePointable2);
+            boolean b = DataUtils.compare(frameExcludePointables[i], 
frameExcludePointable2,
+                    frameExcludeComparators[i]) != 0;
+            if (i >= frameExcludeNegationStartIdx) {
+                b = !b;
+            }
+            if (b) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    protected ArrayTupleBuilder createOutputTupleBuilder(int[] projectionList) 
{
+        return new ArrayTupleBuilder(projectionList.length + 
nestedAggOutSchemaSize);
+    }
+
+    private static IScalarEvaluator[] 
createEvaluators(IScalarEvaluatorFactory[] evalFactories, IHyracksTaskContext 
ctx)
+            throws HyracksDataException {
+        IScalarEvaluator[] evals = new IScalarEvaluator[evalFactories.length];
+        for (int i = 0; i < evalFactories.length; i++) {
+            evals[i] = evalFactories[i].createScalarEvaluator(ctx);
+        }
+        return evals;
+    }
+
+    private static IPointable[] createPointables(int ln) {
+        IPointable[] pointables = new IPointable[ln];
+        for (int i = 0; i < ln; i++) {
+            pointables[i] = VoidPointable.FACTORY.createPointable();
+        }
+        return pointables;
+    }
+
+    private static int compare(IValueReference[] first, IValueReference[] 
second, IBinaryComparator[] comparators)
+            throws HyracksDataException {
+        for (int i = 0; i < first.length; i++) {
+            int c = DataUtils.compare(first[i], second[i], comparators[i]);
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
new file mode 100644
index 0000000..640e260
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import java.util.Arrays;
+
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+/**
+ * Runtime factory for window operators that performs partition 
materialization and can evaluate running aggregates
+ * as well as regular aggregates (in nested plans) over window frames.
+ */
+public class WindowNestedPlansRuntimeFactory extends 
AbstractWindowRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final IScalarEvaluatorFactory[] frameValueEvalFactories;
+
+    private final IScalarEvaluatorFactory[] frameStartEvalFactories;
+
+    private final IScalarEvaluatorFactory[] frameEndEvalFactories;
+
+    private final IBinaryComparatorFactory[] frameValueComparatorFactories;
+
+    private final IScalarEvaluatorFactory[] frameExcludeEvalFactories;
+
+    private final int frameExcludeNegationStartIdx;
+
+    private final IBinaryComparatorFactory[] frameExcludeComparatorFactories;
+
+    private final IScalarEvaluatorFactory frameOffsetEvalFactory;
+
+    private final IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
+
+    private final int frameMaxObjects;
+
+    private final int nestedAggOutSchemaSize;
+
+    private final WindowAggregatorDescriptorFactory nestedAggFactory;
+
+    public WindowNestedPlansRuntimeFactory(int[] partitionColumns,
+            IBinaryComparatorFactory[] partitionComparatorFactories,
+            IBinaryComparatorFactory[] orderComparatorFactories, 
IScalarEvaluatorFactory[] frameValueEvalFactories,
+            IBinaryComparatorFactory[] frameValueComparatorFactories, 
IScalarEvaluatorFactory[] frameStartEvalFactories,
+            IScalarEvaluatorFactory[] frameEndEvalFactories, 
IScalarEvaluatorFactory[] frameExcludeEvalFactories,
+            int frameExcludeNegationStartIdx, IBinaryComparatorFactory[] 
frameExcludeComparatorFactories,
+            IScalarEvaluatorFactory frameOffsetEvalFactory,
+            IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int 
frameMaxObjects,
+            int[] projectionColumnsExcludingSubplans, int[] 
runningAggOutColumns,
+            IRunningAggregateEvaluatorFactory[] runningAggFactories, int 
nestedAggOutSchemaSize,
+            WindowAggregatorDescriptorFactory nestedAggFactory) {
+        super(partitionColumns, partitionComparatorFactories, 
orderComparatorFactories,
+                projectionColumnsExcludingSubplans, runningAggOutColumns, 
runningAggFactories);
+        this.frameValueEvalFactories = frameValueEvalFactories;
+        this.frameStartEvalFactories = frameStartEvalFactories;
+        this.frameEndEvalFactories = frameEndEvalFactories;
+        this.frameValueComparatorFactories = frameValueComparatorFactories;
+        this.frameExcludeEvalFactories = frameExcludeEvalFactories;
+        this.frameExcludeComparatorFactories = frameExcludeComparatorFactories;
+        this.frameExcludeNegationStartIdx = frameExcludeNegationStartIdx;
+        this.frameOffsetEvalFactory = frameOffsetEvalFactory;
+        this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory;
+        this.frameMaxObjects = frameMaxObjects;
+        this.nestedAggFactory = nestedAggFactory;
+        this.nestedAggOutSchemaSize = nestedAggOutSchemaSize;
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(IHyracksTaskContext ctx) {
+        return new WindowNestedPlansPushRuntime(partitionColumns, 
partitionComparatorFactories,
+                orderComparatorFactories, frameValueEvalFactories, 
frameValueComparatorFactories,
+                frameStartEvalFactories, frameEndEvalFactories, 
frameExcludeEvalFactories, frameExcludeNegationStartIdx,
+                frameExcludeComparatorFactories, frameOffsetEvalFactory, 
binaryIntegerInspectorFactory, frameMaxObjects,
+                projectionList, runningAggOutColumns, runningAggFactories, 
nestedAggOutSchemaSize, nestedAggFactory,
+                ctx);
+    }
+
+    @Override
+    public String toString() {
+        return "window [nested] (" + Arrays.toString(partitionColumns) + ") " 
+ Arrays.toString(runningAggOutColumns)
+                + " := " + Arrays.toString(runningAggFactories);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
new file mode 100644
index 0000000..bf71ed9
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import java.nio.ByteBuffer;
+
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Runtime for window operators that evaluates running aggregates without 
partition materialization.
+ */
+class WindowSimplePushRuntime extends AbstractWindowPushRuntime {
+
+    WindowSimplePushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] 
partitionComparatorFactories,
+            IBinaryComparatorFactory[] orderComparatorFactories, int[] 
projectionColumns, int[] runningAggOutColumns,
+            IRunningAggregateEvaluatorFactory[] runningAggFactories, 
IHyracksTaskContext ctx) {
+        super(partitionColumns, partitionComparatorFactories, 
orderComparatorFactories, projectionColumns,
+                runningAggOutColumns, runningAggFactories, ctx);
+    }
+
+    @Override
+    protected void beginPartitionImpl() throws HyracksDataException {
+        runningAggInitPartition(-1);
+    }
+
+    @Override
+    protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, 
int tBeginIdx, int tEndIdx)
+            throws HyracksDataException {
+        tAccess.reset(frameBuffer);
+        produceTuples(tAccess, tBeginIdx, tEndIdx);
+    }
+
+    @Override
+    protected void endPartitionImpl() {
+        // nothing to do
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
new file mode 100644
index 0000000..ded399f
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.win;
+
+import java.util.Arrays;
+
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+/**
+ * Runtime factory for window operators that evaluates running aggregates 
without partition materialization.
+ */
+public class WindowSimpleRuntimeFactory extends AbstractWindowRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public WindowSimpleRuntimeFactory(int[] partitionColumns, 
IBinaryComparatorFactory[] partitionComparatorFactories,
+            IBinaryComparatorFactory[] orderComparatorFactories, int[] 
projectionColumnsExcludingSubplans,
+            int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] 
runningAggFactories) {
+        super(partitionColumns, partitionComparatorFactories, 
orderComparatorFactories,
+                projectionColumnsExcludingSubplans, runningAggOutColumns, 
runningAggFactories);
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(IHyracksTaskContext ctx) {
+        return new WindowSimplePushRuntime(partitionColumns, 
partitionComparatorFactories, orderComparatorFactories,
+                projectionList, runningAggOutColumns, runningAggFactories, 
ctx);
+    }
+
+    @Override
+    public String toString() {
+        return "window (" + Arrays.toString(partitionColumns) + ") " + 
Arrays.toString(runningAggOutColumns) + " := "
+                + Arrays.toString(runningAggFactories);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
 
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index d235d6d..6d763d1 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -494,9 +494,8 @@ public class PushRuntimeTest {
         RecordDescriptor unnestDesc =
                 new RecordDescriptor(new ISerializerDeserializer[] { 
IntegerSerializerDeserializer.INSTANCE });
 
-        RunningAggregateRuntimeFactory ragg = new 
RunningAggregateRuntimeFactory(new int[] { 1 },
-                new IRunningAggregateEvaluatorFactory[] { new 
TupleCountRunningAggregateFunctionFactory() },
-                new int[] { 0, 1 });
+        RunningAggregateRuntimeFactory ragg = new 
RunningAggregateRuntimeFactory(new int[] { 0, 1 }, new int[] { 1 },
+                new IRunningAggregateEvaluatorFactory[] { new 
TupleCountRunningAggregateFunctionFactory() });
         RecordDescriptor raggDesc = new RecordDescriptor(new 
ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, 
IntegerSerializerDeserializer.INSTANCE });
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BytePointable.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BytePointable.java
 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BytePointable.java
index ebfa903..a8195ac 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BytePointable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BytePointable.java
@@ -99,7 +99,7 @@ public final class BytePointable extends AbstractPointable 
implements IHashable,
     public int compareTo(byte[] bytes, int start, int length) {
         byte b = getByte();
         byte ob = getByte(bytes, start);
-        return b < ob ? -1 : (b > ob ? 1 : 0);
+        return Byte.compare(b, ob);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/DoublePointable.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/DoublePointable.java
 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/DoublePointable.java
index 9306818..07ba40e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/DoublePointable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/DoublePointable.java
@@ -120,7 +120,7 @@ public final class DoublePointable extends 
AbstractPointable implements IHashabl
     public int compareTo(byte[] bytes, int start, int length) {
         double v = getDouble();
         double ov = getDouble(bytes, start);
-        return v < ov ? -1 : (v > ov ? 1 : 0);
+        return Double.compare(v, ov);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/FloatPointable.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/FloatPointable.java
 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/FloatPointable.java
index 926ee32..a55a398 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/FloatPointable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/FloatPointable.java
@@ -105,7 +105,7 @@ public final class FloatPointable extends AbstractPointable 
implements IHashable
     public int compareTo(byte[] bytes, int start, int length) {
         float v = getFloat();
         float ov = getFloat(bytes, start);
-        return v < ov ? -1 : (v > ov ? 1 : 0);
+        return Float.compare(v, ov);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/IntegerPointable.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/IntegerPointable.java
 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/IntegerPointable.java
index b36d6a4..81b11d2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/IntegerPointable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/IntegerPointable.java
@@ -109,7 +109,7 @@ public final class IntegerPointable extends 
AbstractPointable implements IHashab
     public int compareTo(byte[] bytes, int start, int length) {
         int v = getInteger();
         int ov = getInteger(bytes, start);
-        return v < ov ? -1 : (v > ov ? 1 : 0);
+        return Integer.compare(v, ov);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/LongPointable.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/LongPointable.java
 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/LongPointable.java
index 007d0c7..1c2171c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/LongPointable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/LongPointable.java
@@ -123,7 +123,7 @@ public final class LongPointable extends AbstractPointable 
implements IHashable,
     public int compareTo(byte[] bytes, int start, int length) {
         long v = getLong();
         long ov = getLong(bytes, start);
-        return v < ov ? -1 : (v > ov ? 1 : 0);
+        return Long.compare(v, ov);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/ShortPointable.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/ShortPointable.java
 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/ShortPointable.java
index 7332ba8..73f3779 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/ShortPointable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/ShortPointable.java
@@ -100,7 +100,7 @@ public final class ShortPointable extends AbstractPointable 
implements IHashable
     public int compareTo(byte[] bytes, int start, int length) {
         short v = getShort();
         short ov = getShort(bytes, start);
-        return v < ov ? -1 : (v > ov ? 1 : 0);
+        return Short.compare(v, ov);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
index 23f4b66..4993527 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.data.std.util;
 
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
 
 public class DataUtils {
@@ -97,4 +99,20 @@ public class DataUtils {
         }
         return true;
     }
+
+    /**
+     * Compare two value references using given comparator
+     *
+     * @param first
+     *            first value
+     * @param second
+     *            second value
+     * @param cmp
+     *            comparator
+     */
+    public static int compare(IValueReference first, IValueReference second, 
IBinaryComparator cmp)
+            throws HyracksDataException {
+        return cmp.compare(first.getByteArray(), first.getStartOffset(), 
first.getLength(), second.getByteArray(),
+                second.getStartOffset(), second.getLength());
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index ffee1a6..03fdb49 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -53,6 +53,17 @@ public class RunFileReader implements IFrameReader {
         readPtr = 0;
     }
 
+    public void seek(long position) {
+        if (position < 0) {
+            throw new IllegalArgumentException(String.valueOf(position));
+        }
+        readPtr = position;
+    }
+
+    public long position() {
+        return readPtr;
+    }
+
     @Override
     public boolean nextFrame(IFrame frame) throws HyracksDataException {
         if (readPtr >= size) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java
index 49b5309..bbba85a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java
@@ -190,4 +190,19 @@ public class TupleUtils {
         }
         return true;
     }
+
+    public static void addFields(ArrayTupleBuilder sourceBuilder, 
ArrayTupleBuilder targetBuilder)
+            throws HyracksDataException {
+        byte[] data = sourceBuilder.getByteArray();
+        int[] fieldEnds = sourceBuilder.getFieldEndOffsets();
+        int start = 0;
+        int offset;
+        for (int i = 0; i < fieldEnds.length; i++) {
+            if (i > 0) {
+                start = fieldEnds[i - 1];
+            }
+            offset = fieldEnds[i] - start;
+            targetBuilder.addField(data, start, offset);
+        }
+    }
 }

Reply via email to