Repository: asterixdb Updated Branches: refs/heads/master 1cd58bced -> f2c18aa96
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java new file mode 100644 index 0000000..e7daf11 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.win; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.algebricks.data.IBinaryIntegerInspector; +import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.comm.FrameHelper; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.data.std.util.DataUtils; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; +import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; +import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor; + +/** + * Runtime for window operators that performs partition materialization and can evaluate running aggregates + * as well as regular aggregates (in nested plans) over window frames. + */ +public class WindowNestedPlansPushRuntime extends WindowMaterializingPushRuntime { + + private final boolean frameValueExists; + + private final IScalarEvaluatorFactory[] frameValueEvalFactories; + + private IScalarEvaluator[] frameValueEvals; + + private IPointable[] frameValuePointables; + + private final IBinaryComparatorFactory[] frameValueComparatorFactories; + + private IBinaryComparator[] frameValueComparators; + + private final boolean frameStartExists; + + private final IScalarEvaluatorFactory[] frameStartEvalFactories; + + private IScalarEvaluator[] frameStartEvals; + + private IPointable[] frameStartPointables; + + private final boolean frameEndExists; + + private final IScalarEvaluatorFactory[] frameEndEvalFactories; + + private IScalarEvaluator[] frameEndEvals; + + private IPointable[] frameEndPointables; + + private final boolean frameExcludeExists; + + private final IScalarEvaluatorFactory[] frameExcludeEvalFactories; + + private IScalarEvaluator[] frameExcludeEvals; + + private final int frameExcludeNegationStartIdx; + + private IPointable[] frameExcludePointables; + + private IPointable frameExcludePointable2; + + private final IBinaryComparatorFactory[] frameExcludeComparatorFactories; + + private IBinaryComparator[] frameExcludeComparators; + + private final boolean frameOffsetExists; + + private final IScalarEvaluatorFactory frameOffsetEvalFactory; + + private IScalarEvaluator frameOffsetEval; + + private IPointable frameOffsetPointable; + + private final IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory; + + private final int frameMaxObjects; + + private final int nestedAggOutSchemaSize; + + private final WindowAggregatorDescriptorFactory nestedAggFactory; + + private IAggregatorDescriptor nestedAgg; + + private IFrame copyFrame2; + + private IFrame runFrame; + + private FrameTupleAccessor tAccess2; + + private FrameTupleReference tRef2; + + private IBinaryIntegerInspector bii; + + WindowNestedPlansPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, + IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories, + IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartEvalFactories, + IScalarEvaluatorFactory[] frameEndEvalFactories, IScalarEvaluatorFactory[] frameExcludeEvalFactories, + int frameExcludeNegationStartIdx, IBinaryComparatorFactory[] frameExcludeComparatorFactories, + IScalarEvaluatorFactory frameOffsetEvalFactory, + IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int frameMaxObjects, int[] projectionColumns, + int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories, + int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) { + super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns, + runningAggOutColumns, runningAggFactories, ctx); + this.frameValueEvalFactories = frameValueEvalFactories; + this.frameValueExists = frameValueEvalFactories != null && frameValueEvalFactories.length > 0; + this.frameStartEvalFactories = frameStartEvalFactories; + this.frameStartExists = frameStartEvalFactories != null && frameStartEvalFactories.length > 0; + this.frameEndEvalFactories = frameEndEvalFactories; + this.frameEndExists = frameEndEvalFactories != null && frameEndEvalFactories.length > 0; + this.frameValueComparatorFactories = frameValueComparatorFactories; + this.frameExcludeEvalFactories = frameExcludeEvalFactories; + this.frameExcludeExists = frameExcludeEvalFactories != null && frameExcludeEvalFactories.length > 0; + this.frameExcludeComparatorFactories = frameExcludeComparatorFactories; + this.frameExcludeNegationStartIdx = frameExcludeNegationStartIdx; + this.frameOffsetExists = frameOffsetEvalFactory != null; + this.frameOffsetEvalFactory = frameOffsetEvalFactory; + this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory; + this.frameMaxObjects = frameMaxObjects; + this.nestedAggFactory = nestedAggFactory; + this.nestedAggOutSchemaSize = nestedAggOutSchemaSize; + } + + @Override + protected void init() throws HyracksDataException { + super.init(); + + if (frameValueExists) { + frameValueEvals = createEvaluators(frameValueEvalFactories, ctx); + frameValueComparators = createBinaryComparators(frameValueComparatorFactories); + frameValuePointables = createPointables(frameValueEvalFactories.length); + } + if (frameStartExists) { + frameStartEvals = createEvaluators(frameStartEvalFactories, ctx); + frameStartPointables = createPointables(frameStartEvalFactories.length); + } + if (frameEndExists) { + frameEndEvals = createEvaluators(frameEndEvalFactories, ctx); + frameEndPointables = createPointables(frameEndEvalFactories.length); + } + if (frameExcludeExists) { + frameExcludeEvals = createEvaluators(frameExcludeEvalFactories, ctx); + frameExcludeComparators = createBinaryComparators(frameExcludeComparatorFactories); + frameExcludePointables = createPointables(frameExcludeEvalFactories.length); + frameExcludePointable2 = VoidPointable.FACTORY.createPointable(); + } + if (frameOffsetExists) { + frameOffsetEval = frameOffsetEvalFactory.createScalarEvaluator(ctx); + frameOffsetPointable = VoidPointable.FACTORY.createPointable(); + bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx); + } + + nestedAgg = nestedAggFactory.createAggregator(ctx, null, null, null, null, null, -1); + + runFrame = new VSizeFrame(ctx); + copyFrame2 = new VSizeFrame(ctx); + tAccess2 = new FrameTupleAccessor(inputRecordDesc); + tRef2 = new FrameTupleReference(); + } + + @Override + protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader reader) throws HyracksDataException { + long readerPos = -1; + int nChunks = getPartitionChunkCount(); + if (nChunks > 1) { + readerPos = reader.position(); + if (chunkIdx == 0) { + ByteBuffer curFrameBuffer = curFrame.getBuffer(); + int nBlocks = FrameHelper.deserializeNumOfMinFrame(curFrameBuffer); + copyFrame2.ensureFrameSize(copyFrame2.getMinSize() * nBlocks); + int pos = curFrameBuffer.position(); + FrameUtils.copyAndFlip(curFrameBuffer, copyFrame2.getBuffer()); + curFrameBuffer.position(pos); + } + } + + tAccess.reset(curFrame.getBuffer()); + int tBeginIdx = getTupleBeginIdx(chunkIdx); + int tEndIdx = getTupleEndIdx(chunkIdx); + for (int tIdx = tBeginIdx; tIdx <= tEndIdx; tIdx++) { + tRef.reset(tAccess, tIdx); + + // running aggregates + produceTuple(tupleBuilder, tAccess, tIdx, tRef); + + // frame boundaries + if (frameStartExists) { + for (int i = 0; i < frameStartEvals.length; i++) { + frameStartEvals[i].evaluate(tRef, frameStartPointables[i]); + } + } + if (frameEndExists) { + for (int i = 0; i < frameEndEvals.length; i++) { + frameEndEvals[i].evaluate(tRef, frameEndPointables[i]); + } + } + if (frameExcludeExists) { + for (int i = 0; i < frameExcludeEvals.length; i++) { + frameExcludeEvals[i].evaluate(tRef, frameExcludePointables[i]); + } + } + int toSkip = 0; + if (frameOffsetExists) { + frameOffsetEval.evaluate(tRef, frameOffsetPointable); + toSkip = bii.getIntegerValue(frameOffsetPointable.getByteArray(), frameOffsetPointable.getStartOffset(), + frameOffsetPointable.getLength()); + } + int toWrite = frameMaxObjects; + + // aggregator created by WindowAggregatorDescriptorFactory does not process argument tuple in init() + nestedAgg.init(null, null, -1, null); + + if (nChunks > 1) { + reader.seek(0); + } + + frame_loop: for (int chunkIdx2 = 0; chunkIdx2 < nChunks; chunkIdx2++) { + IFrame innerFrame; + if (chunkIdx2 == 0) { + // first chunk's frame is always in memory + innerFrame = chunkIdx == 0 ? curFrame : copyFrame2; + } else { + reader.nextFrame(runFrame); + innerFrame = runFrame; + } + tAccess2.reset(innerFrame.getBuffer()); + + int tBeginIdx2 = getTupleBeginIdx(chunkIdx2); + int tEndIdx2 = getTupleEndIdx(chunkIdx2); + for (int tIdx2 = tBeginIdx2; tIdx2 <= tEndIdx2; tIdx2++) { + tRef2.reset(tAccess2, tIdx2); + + if (frameStartExists || frameEndExists) { + for (int frameValueIdx = 0; frameValueIdx < frameValueEvals.length; frameValueIdx++) { + frameValueEvals[frameValueIdx].evaluate(tRef2, frameValuePointables[frameValueIdx]); + } + if (frameStartExists + && compare(frameValuePointables, frameStartPointables, frameValueComparators) < 0) { + // skip if value < start + continue; + } + if (frameEndExists + && compare(frameValuePointables, frameEndPointables, frameValueComparators) > 0) { + // skip and exit if value > end + break frame_loop; + } + } + if (frameExcludeExists && isExcluded()) { + // skip if excluded + continue; + } + + if (toSkip > 0) { + // skip if offset hasn't been reached + toSkip--; + continue; + } + + if (toWrite != 0) { + nestedAgg.aggregate(tAccess2, tIdx2, null, -1, null); + } + if (toWrite > 0) { + toWrite--; + } + if (toWrite == 0) { + break frame_loop; + } + } + } + + nestedAgg.outputFinalResult(tupleBuilder, null, -1, null); + appendToFrameFromTupleBuilder(tupleBuilder); + } + + if (nChunks > 1) { + reader.seek(readerPos); + } + } + + private boolean isExcluded() throws HyracksDataException { + for (int i = 0; i < frameExcludeEvals.length; i++) { + frameExcludeEvals[i].evaluate(tRef2, frameExcludePointable2); + boolean b = DataUtils.compare(frameExcludePointables[i], frameExcludePointable2, + frameExcludeComparators[i]) != 0; + if (i >= frameExcludeNegationStartIdx) { + b = !b; + } + if (b) { + return false; + } + } + return true; + } + + @Override + protected ArrayTupleBuilder createOutputTupleBuilder(int[] projectionList) { + return new ArrayTupleBuilder(projectionList.length + nestedAggOutSchemaSize); + } + + private static IScalarEvaluator[] createEvaluators(IScalarEvaluatorFactory[] evalFactories, IHyracksTaskContext ctx) + throws HyracksDataException { + IScalarEvaluator[] evals = new IScalarEvaluator[evalFactories.length]; + for (int i = 0; i < evalFactories.length; i++) { + evals[i] = evalFactories[i].createScalarEvaluator(ctx); + } + return evals; + } + + private static IPointable[] createPointables(int ln) { + IPointable[] pointables = new IPointable[ln]; + for (int i = 0; i < ln; i++) { + pointables[i] = VoidPointable.FACTORY.createPointable(); + } + return pointables; + } + + private static int compare(IValueReference[] first, IValueReference[] second, IBinaryComparator[] comparators) + throws HyracksDataException { + for (int i = 0; i < first.length; i++) { + int c = DataUtils.compare(first[i], second[i], comparators[i]); + if (c != 0) { + return c; + } + } + return 0; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java new file mode 100644 index 0000000..640e260 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.win; + +import java.util.Arrays; + +import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; + +/** + * Runtime factory for window operators that performs partition materialization and can evaluate running aggregates + * as well as regular aggregates (in nested plans) over window frames. + */ +public class WindowNestedPlansRuntimeFactory extends AbstractWindowRuntimeFactory { + + private static final long serialVersionUID = 1L; + + private final IScalarEvaluatorFactory[] frameValueEvalFactories; + + private final IScalarEvaluatorFactory[] frameStartEvalFactories; + + private final IScalarEvaluatorFactory[] frameEndEvalFactories; + + private final IBinaryComparatorFactory[] frameValueComparatorFactories; + + private final IScalarEvaluatorFactory[] frameExcludeEvalFactories; + + private final int frameExcludeNegationStartIdx; + + private final IBinaryComparatorFactory[] frameExcludeComparatorFactories; + + private final IScalarEvaluatorFactory frameOffsetEvalFactory; + + private final IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory; + + private final int frameMaxObjects; + + private final int nestedAggOutSchemaSize; + + private final WindowAggregatorDescriptorFactory nestedAggFactory; + + public WindowNestedPlansRuntimeFactory(int[] partitionColumns, + IBinaryComparatorFactory[] partitionComparatorFactories, + IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories, + IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartEvalFactories, + IScalarEvaluatorFactory[] frameEndEvalFactories, IScalarEvaluatorFactory[] frameExcludeEvalFactories, + int frameExcludeNegationStartIdx, IBinaryComparatorFactory[] frameExcludeComparatorFactories, + IScalarEvaluatorFactory frameOffsetEvalFactory, + IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int frameMaxObjects, + int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns, + IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize, + WindowAggregatorDescriptorFactory nestedAggFactory) { + super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, + projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories); + this.frameValueEvalFactories = frameValueEvalFactories; + this.frameStartEvalFactories = frameStartEvalFactories; + this.frameEndEvalFactories = frameEndEvalFactories; + this.frameValueComparatorFactories = frameValueComparatorFactories; + this.frameExcludeEvalFactories = frameExcludeEvalFactories; + this.frameExcludeComparatorFactories = frameExcludeComparatorFactories; + this.frameExcludeNegationStartIdx = frameExcludeNegationStartIdx; + this.frameOffsetEvalFactory = frameOffsetEvalFactory; + this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory; + this.frameMaxObjects = frameMaxObjects; + this.nestedAggFactory = nestedAggFactory; + this.nestedAggOutSchemaSize = nestedAggOutSchemaSize; + } + + @Override + public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) { + return new WindowNestedPlansPushRuntime(partitionColumns, partitionComparatorFactories, + orderComparatorFactories, frameValueEvalFactories, frameValueComparatorFactories, + frameStartEvalFactories, frameEndEvalFactories, frameExcludeEvalFactories, frameExcludeNegationStartIdx, + frameExcludeComparatorFactories, frameOffsetEvalFactory, binaryIntegerInspectorFactory, frameMaxObjects, + projectionList, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, + ctx); + } + + @Override + public String toString() { + return "window [nested] (" + Arrays.toString(partitionColumns) + ") " + Arrays.toString(runningAggOutColumns) + + " := " + Arrays.toString(runningAggFactories); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java new file mode 100644 index 0000000..bf71ed9 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.win; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * Runtime for window operators that evaluates running aggregates without partition materialization. + */ +class WindowSimplePushRuntime extends AbstractWindowPushRuntime { + + WindowSimplePushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, + IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns, + IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx) { + super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns, + runningAggOutColumns, runningAggFactories, ctx); + } + + @Override + protected void beginPartitionImpl() throws HyracksDataException { + runningAggInitPartition(-1); + } + + @Override + protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, int tBeginIdx, int tEndIdx) + throws HyracksDataException { + tAccess.reset(frameBuffer); + produceTuples(tAccess, tBeginIdx, tEndIdx); + } + + @Override + protected void endPartitionImpl() { + // nothing to do + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java new file mode 100644 index 0000000..ded399f --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.win; + +import java.util.Arrays; + +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; + +/** + * Runtime factory for window operators that evaluates running aggregates without partition materialization. + */ +public class WindowSimpleRuntimeFactory extends AbstractWindowRuntimeFactory { + + private static final long serialVersionUID = 1L; + + public WindowSimpleRuntimeFactory(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, + IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumnsExcludingSubplans, + int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories) { + super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, + projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories); + } + + @Override + public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) { + return new WindowSimplePushRuntime(partitionColumns, partitionComparatorFactories, orderComparatorFactories, + projectionList, runningAggOutColumns, runningAggFactories, ctx); + } + + @Override + public String toString() { + return "window (" + Arrays.toString(partitionColumns) + ") " + Arrays.toString(runningAggOutColumns) + " := " + + Arrays.toString(runningAggFactories); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java index d235d6d..6d763d1 100644 --- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java +++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java @@ -494,9 +494,8 @@ public class PushRuntimeTest { RecordDescriptor unnestDesc = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE }); - RunningAggregateRuntimeFactory ragg = new RunningAggregateRuntimeFactory(new int[] { 1 }, - new IRunningAggregateEvaluatorFactory[] { new TupleCountRunningAggregateFunctionFactory() }, - new int[] { 0, 1 }); + RunningAggregateRuntimeFactory ragg = new RunningAggregateRuntimeFactory(new int[] { 0, 1 }, new int[] { 1 }, + new IRunningAggregateEvaluatorFactory[] { new TupleCountRunningAggregateFunctionFactory() }); RecordDescriptor raggDesc = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE }); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BytePointable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BytePointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BytePointable.java index ebfa903..a8195ac 100644 --- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BytePointable.java +++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BytePointable.java @@ -99,7 +99,7 @@ public final class BytePointable extends AbstractPointable implements IHashable, public int compareTo(byte[] bytes, int start, int length) { byte b = getByte(); byte ob = getByte(bytes, start); - return b < ob ? -1 : (b > ob ? 1 : 0); + return Byte.compare(b, ob); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/DoublePointable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/DoublePointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/DoublePointable.java index 9306818..07ba40e 100644 --- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/DoublePointable.java +++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/DoublePointable.java @@ -120,7 +120,7 @@ public final class DoublePointable extends AbstractPointable implements IHashabl public int compareTo(byte[] bytes, int start, int length) { double v = getDouble(); double ov = getDouble(bytes, start); - return v < ov ? -1 : (v > ov ? 1 : 0); + return Double.compare(v, ov); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/FloatPointable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/FloatPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/FloatPointable.java index 926ee32..a55a398 100644 --- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/FloatPointable.java +++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/FloatPointable.java @@ -105,7 +105,7 @@ public final class FloatPointable extends AbstractPointable implements IHashable public int compareTo(byte[] bytes, int start, int length) { float v = getFloat(); float ov = getFloat(bytes, start); - return v < ov ? -1 : (v > ov ? 1 : 0); + return Float.compare(v, ov); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/IntegerPointable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/IntegerPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/IntegerPointable.java index b36d6a4..81b11d2 100644 --- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/IntegerPointable.java +++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/IntegerPointable.java @@ -109,7 +109,7 @@ public final class IntegerPointable extends AbstractPointable implements IHashab public int compareTo(byte[] bytes, int start, int length) { int v = getInteger(); int ov = getInteger(bytes, start); - return v < ov ? -1 : (v > ov ? 1 : 0); + return Integer.compare(v, ov); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/LongPointable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/LongPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/LongPointable.java index 007d0c7..1c2171c 100644 --- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/LongPointable.java +++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/LongPointable.java @@ -123,7 +123,7 @@ public final class LongPointable extends AbstractPointable implements IHashable, public int compareTo(byte[] bytes, int start, int length) { long v = getLong(); long ov = getLong(bytes, start); - return v < ov ? -1 : (v > ov ? 1 : 0); + return Long.compare(v, ov); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/ShortPointable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/ShortPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/ShortPointable.java index 7332ba8..73f3779 100644 --- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/ShortPointable.java +++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/ShortPointable.java @@ -100,7 +100,7 @@ public final class ShortPointable extends AbstractPointable implements IHashable public int compareTo(byte[] bytes, int start, int length) { short v = getShort(); short ov = getShort(bytes, start); - return v < ov ? -1 : (v > ov ? 1 : 0); + return Short.compare(v, ov); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java index 23f4b66..4993527 100644 --- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.data.std.util; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IValueReference; public class DataUtils { @@ -97,4 +99,20 @@ public class DataUtils { } return true; } + + /** + * Compare two value references using given comparator + * + * @param first + * first value + * @param second + * second value + * @param cmp + * comparator + */ + public static int compare(IValueReference first, IValueReference second, IBinaryComparator cmp) + throws HyracksDataException { + return cmp.compare(first.getByteArray(), first.getStartOffset(), first.getLength(), second.getByteArray(), + second.getStartOffset(), second.getLength()); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java index ffee1a6..03fdb49 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java @@ -53,6 +53,17 @@ public class RunFileReader implements IFrameReader { readPtr = 0; } + public void seek(long position) { + if (position < 0) { + throw new IllegalArgumentException(String.valueOf(position)); + } + readPtr = position; + } + + public long position() { + return readPtr; + } + @Override public boolean nextFrame(IFrame frame) throws HyracksDataException { if (readPtr >= size) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java index 49b5309..bbba85a 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java @@ -190,4 +190,19 @@ public class TupleUtils { } return true; } + + public static void addFields(ArrayTupleBuilder sourceBuilder, ArrayTupleBuilder targetBuilder) + throws HyracksDataException { + byte[] data = sourceBuilder.getByteArray(); + int[] fieldEnds = sourceBuilder.getFieldEndOffsets(); + int start = 0; + int offset; + for (int i = 0; i < fieldEnds.length; i++) { + if (i > 0) { + start = fieldEnds[i - 1]; + } + offset = fieldEnds[i] - start; + targetBuilder.addField(data, start, offset); + } + } }
