http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java index b260a8a..0ee7c1d 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java @@ -21,9 +21,7 @@ package org.apache.hyracks.algebricks.runtime.operators.aggreg; import java.nio.ByteBuffer; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; -import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime; -import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; -import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.meta.PipelineAssembler; import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime; import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.comm.IFrameWriter; @@ -31,9 +29,9 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.utils.TupleUtils; import org.apache.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory; import org.apache.hyracks.dataflow.std.group.AggregateState; import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor; @@ -56,10 +54,11 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys, long memoryBudget) throws HyracksDataException { - final AggregatorOutput outputWriter = new AggregatorOutput(subplans, keyFieldIdx.length, decorFieldIdx.length); + final AggregatorOutput outputWriter = new AggregatorOutput(subplans, keyFieldIdx.length + decorFieldIdx.length); final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length]; for (int i = 0; i < subplans.length; i++) { - pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, ctx); + pipelines[i] = + (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], outputWriter, ctx); } return new IAggregatorDescriptor() { @@ -78,7 +77,6 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati for (int i = 0; i < pipelines.length; ++i) { pipelines[i].open(); } - // aggregate the first tuple for (int i = 0; i < pipelines.length; i++) { pipelines[i].writeTuple(accessor.getBuffer(), tIndex); @@ -108,18 +106,8 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati memoryUsageCheck(); tupleBuilder.reset(); - ArrayTupleBuilder tb = outputWriter.getTupleBuilder(); - byte[] data = tb.getByteArray(); - int[] fieldEnds = tb.getFieldEndOffsets(); - int start = 0; - int offset; - for (int i = 0; i < fieldEnds.length; i++) { - if (i > 0) { - start = fieldEnds[i - 1]; - } - offset = fieldEnds[i] - start; - tupleBuilder.addField(data, start, offset); - } + TupleUtils.addFields(outputWriter.getTupleBuilder(), tupleBuilder); + return true; } @@ -159,34 +147,10 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati }; } - private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx) - throws HyracksDataException { - // plug the operators - IFrameWriter start = writer; - IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories(); - RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors(); - // should enforce protocol - boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); - for (int i = runtimeFactories.length - 1; i >= 0; i--) { - IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx)[0]; - newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime; - start = enforce ? EnforcePushRuntime.enforce(start) : start; - newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]); - if (i > 0) { - newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]); - } else { - // the nts has the same input and output rec. desc. - newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]); - } - start = newRuntime; - } - return start; - } - /** * We suppose for now, that each subplan only produces one tuple. */ - private static class AggregatorOutput implements IFrameWriter { + public static class AggregatorOutput implements IFrameWriter { // private ByteBuffer frame; private FrameTupleAccessor[] tAccess; @@ -195,9 +159,8 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati private ArrayTupleBuilder tb; private AlgebricksPipeline[] subplans; - public AggregatorOutput(AlgebricksPipeline[] subplans, int numKeys, int numDecors) { + public AggregatorOutput(AlgebricksPipeline[] subplans, int numPropagatedFields) { this.subplans = subplans; - // this.keyFieldIndexes = keyFieldIndexes; int totalAggFields = 0; this.inputRecDesc = new RecordDescriptor[subplans.length]; for (int i = 0; i < subplans.length; i++) { @@ -205,7 +168,7 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati this.inputRecDesc[i] = rd[rd.length - 1]; totalAggFields += subplans[i].getOutputWidth(); } - tb = new ArrayTupleBuilder(numKeys + numDecors + totalAggFields); + tb = new ArrayTupleBuilder(numPropagatedFields + totalAggFields); this.tAccess = new FrameTupleAccessor[inputRecDesc.length]; for (int i = 0; i < inputRecDesc.length; i++) { @@ -250,5 +213,4 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati public void fail() throws HyracksDataException { } } - }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java index f057515..d14f5c1 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java @@ -21,9 +21,7 @@ package org.apache.hyracks.algebricks.runtime.operators.aggreg; import java.nio.ByteBuffer; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; -import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime; -import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; -import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.meta.PipelineAssembler; import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime; import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.comm.IFrameWriter; @@ -37,9 +35,11 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.utils.TupleUtils; import org.apache.hyracks.dataflow.std.group.AbstractAggregatorDescriptorFactory; import org.apache.hyracks.dataflow.std.group.AggregateState; import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor; +import org.apache.hyracks.dataflow.std.structures.TuplePointer; public class NestedPlansRunningAggregatorFactory extends AbstractAggregatorDescriptorFactory { @@ -62,13 +62,14 @@ public class NestedPlansRunningAggregatorFactory extends AbstractAggregatorDescr RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults, final IFrameWriter writer, long memoryBudget) throws HyracksDataException { final RunningAggregatorOutput outputWriter = - new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length, decorFieldIdx.length, writer); + new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length + decorFieldIdx.length, writer); // should enforce protocol boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); IFrameWriter enforcedWriter = enforce ? EnforceFrameWriter.enforce(outputWriter) : outputWriter; final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length]; for (int i = 0; i < subplans.length; i++) { - pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], enforcedWriter, ctx); + pipelines[i] = + (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], enforcedWriter, ctx); } final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder(); @@ -140,30 +141,6 @@ public class NestedPlansRunningAggregatorFactory extends AbstractAggregatorDescr }; } - private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx) - throws HyracksDataException { - // should enforce protocol - boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); - // plug the operators - IFrameWriter start = writer; - IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories(); - RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors(); - for (int i = runtimeFactories.length - 1; i >= 0; i--) { - IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx)[0]; - newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime; - start = enforce ? EnforceFrameWriter.enforce(start) : start; - newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]); - if (i > 0) { - newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]); - } else { - // the nts has the same input and output rec. desc. - newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]); - } - start = newRuntime; - } - return start; - } - private static class RunningAggregatorOutput implements IFrameWriter { private final FrameTupleAccessor[] tAccess; @@ -175,8 +152,8 @@ public class NestedPlansRunningAggregatorFactory extends AbstractAggregatorDescr private final IFrameWriter outputWriter; private final FrameTupleAppender outputAppender; - public RunningAggregatorOutput(IHyracksTaskContext ctx, AlgebricksPipeline[] subplans, int numKeys, - int numDecors, IFrameWriter outputWriter) throws HyracksDataException { + public RunningAggregatorOutput(IHyracksTaskContext ctx, AlgebricksPipeline[] subplans, int numPropagatedFields, + IFrameWriter outputWriter) throws HyracksDataException { this.subplans = subplans; this.outputWriter = outputWriter; @@ -188,8 +165,8 @@ public class NestedPlansRunningAggregatorFactory extends AbstractAggregatorDescr this.inputRecDesc[i] = rd[rd.length - 1]; totalAggFields += subplans[i].getOutputWidth(); } - tb = new ArrayTupleBuilder(numKeys + numDecors + totalAggFields); - gbyTb = new ArrayTupleBuilder(numKeys + numDecors); + tb = new ArrayTupleBuilder(numPropagatedFields + totalAggFields); + gbyTb = new ArrayTupleBuilder(numPropagatedFields); this.tAccess = new FrameTupleAccessor[inputRecDesc.length]; for (int i = 0; i < inputRecDesc.length; i++) { @@ -211,17 +188,7 @@ public class NestedPlansRunningAggregatorFactory extends AbstractAggregatorDescr accessor.reset(buffer); for (int tIndex = 0; tIndex < accessor.getTupleCount(); tIndex++) { tb.reset(); - byte[] data = gbyTb.getByteArray(); - int[] fieldEnds = gbyTb.getFieldEndOffsets(); - int start = 0; - int offset = 0; - for (int i = 0; i < fieldEnds.length; i++) { - if (i > 0) { - start = fieldEnds[i - 1]; - } - offset = fieldEnds[i] - start; - tb.addField(data, start, offset); - } + TupleUtils.addFields(gbyTb, tb); for (int f = 0; f < w; f++) { tb.addField(accessor, tIndex, f); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java index 27354cb..0449cf5 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java @@ -37,51 +37,55 @@ import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; public abstract class AbstractRunningAggregatePushRuntime<T extends IRunningAggregateEvaluator> extends AbstractOneInputOneOutputOneFramePushRuntime { protected final IHyracksTaskContext ctx; - private final IRunningAggregateEvaluatorFactory[] aggFactories; - private final Class<T> aggEvalClass; - protected final List<T> aggEvals; - private final int[] projectionList; + private final IRunningAggregateEvaluatorFactory[] runningAggFactories; + private final Class<T> runningAggEvalClass; + protected final List<T> runningAggEvals; + private final int[] projectionColumns; private final int[] projectionToOutColumns; private final IPointable p = VoidPointable.FACTORY.createPointable(); - private final ArrayTupleBuilder tupleBuilder; - private boolean first; + protected ArrayTupleBuilder tupleBuilder; + private boolean isFirst; - public AbstractRunningAggregatePushRuntime(int[] outColumns, IRunningAggregateEvaluatorFactory[] aggFactories, - int[] projectionList, IHyracksTaskContext ctx, Class<T> aggEvalClass) { + public AbstractRunningAggregatePushRuntime(int[] projectionColumns, int[] runningAggOutColumns, + IRunningAggregateEvaluatorFactory[] runningAggFactories, Class<T> runningAggEvalClass, + IHyracksTaskContext ctx) { this.ctx = ctx; - this.projectionList = projectionList; - this.aggFactories = aggFactories; - this.aggEvalClass = aggEvalClass; - aggEvals = new ArrayList<>(aggFactories.length); - tupleBuilder = new ArrayTupleBuilder(projectionList.length); - projectionToOutColumns = new int[projectionList.length]; - - for (int j = 0; j < projectionList.length; j++) { - projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]); + this.projectionColumns = projectionColumns; + this.runningAggFactories = runningAggFactories; + this.runningAggEvalClass = runningAggEvalClass; + runningAggEvals = new ArrayList<>(runningAggFactories.length); + projectionToOutColumns = new int[projectionColumns.length]; + for (int j = 0; j < projectionColumns.length; j++) { + projectionToOutColumns[j] = Arrays.binarySearch(runningAggOutColumns, projectionColumns[j]); } - first = true; + isFirst = true; } @Override public void open() throws HyracksDataException { super.open(); - if (first) { - first = false; + if (isFirst) { + isFirst = false; init(); } - for (T aggEval : aggEvals) { - aggEval.init(); + for (T runningAggEval : runningAggEvals) { + runningAggEval.init(); } } protected void init() throws HyracksDataException { + tupleBuilder = createOutputTupleBuilder(projectionColumns); initAccessAppendRef(ctx); - for (IRunningAggregateEvaluatorFactory aggFactory : aggFactories) { - IRunningAggregateEvaluator aggEval = aggFactory.createRunningAggregateEvaluator(ctx); - aggEvals.add(aggEvalClass.cast(aggEval)); + for (IRunningAggregateEvaluatorFactory runningAggFactory : runningAggFactories) { + IRunningAggregateEvaluator runningAggEval = runningAggFactory.createRunningAggregateEvaluator(ctx); + runningAggEvals.add(runningAggEvalClass.cast(runningAggEval)); } } + protected ArrayTupleBuilder createOutputTupleBuilder(int[] projectionList) { + return new ArrayTupleBuilder(projectionList.length); + } + protected void produceTuples(IFrameTupleAccessor accessor, int beginIdx, int endIdx) throws HyracksDataException { for (int t = beginIdx; t <= endIdx; t++) { tRef.reset(accessor, t); @@ -90,16 +94,16 @@ public abstract class AbstractRunningAggregatePushRuntime<T extends IRunningAggr } } - private void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex, + protected void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex, FrameTupleReference tupleRef) throws HyracksDataException { tb.reset(); - for (int f = 0; f < projectionList.length; f++) { + for (int f = 0; f < projectionColumns.length; f++) { int k = projectionToOutColumns[f]; if (k >= 0) { - aggEvals.get(k).step(tupleRef, p); + runningAggEvals.get(k).step(tupleRef, p); tb.addField(p.getByteArray(), p.getStartOffset(), p.getLength()); } else { - tb.addField(accessor, tIndex, projectionList[f]); + tb.addField(accessor, tIndex, projectionColumns[f]); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractWindowPushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractWindowPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractWindowPushRuntime.java deleted file mode 100644 index f0bd204..0000000 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractWindowPushRuntime.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hyracks.algebricks.runtime.operators.aggrun; - -import java.nio.ByteBuffer; - -import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.base.IWindowAggregateEvaluator; -import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.IBinaryComparator; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; -import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter; - -public abstract class AbstractWindowPushRuntime extends AbstractRunningAggregatePushRuntime<IWindowAggregateEvaluator> { - - private final int[] partitionColumnList; - private final IBinaryComparatorFactory[] partitionComparatorFactories; - private IBinaryComparator[] partitionComparators; - private final IBinaryComparatorFactory[] orderComparatorFactories; - private IFrame copyFrame; - private FrameTupleAccessor copyFrameAccessor; - private FrameTupleAccessor frameAccessor; - private long frameId; - private boolean inPartition; - - public AbstractWindowPushRuntime(int[] outColumns, IRunningAggregateEvaluatorFactory[] aggFactories, - int[] projectionList, int[] partitionColumnList, IBinaryComparatorFactory[] partitionComparatorFactories, - IBinaryComparatorFactory[] orderComparatorFactories, IHyracksTaskContext ctx) { - super(outColumns, aggFactories, projectionList, ctx, IWindowAggregateEvaluator.class); - this.partitionColumnList = partitionColumnList; - this.partitionComparatorFactories = partitionComparatorFactories; - this.orderComparatorFactories = orderComparatorFactories; - } - - @Override - public void open() throws HyracksDataException { - super.open(); - frameId = 0; - inPartition = false; - } - - @Override - protected void init() throws HyracksDataException { - super.init(); - partitionComparators = createBinaryComparators(partitionComparatorFactories); - frameAccessor = new FrameTupleAccessor(inputRecordDesc); - copyFrame = new VSizeFrame(ctx); - copyFrameAccessor = new FrameTupleAccessor(inputRecordDesc); - copyFrameAccessor.reset(copyFrame.getBuffer()); - IBinaryComparator[] orderComparators = createBinaryComparators(orderComparatorFactories); - for (IWindowAggregateEvaluator aggEval : aggEvals) { - aggEval.configure(orderComparators); - } - } - - @Override - public void close() throws HyracksDataException { - if (inPartition) { - endPartition(); - } - super.close(); - } - - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - frameAccessor.reset(buffer); - int nTuple = frameAccessor.getTupleCount(); - if (nTuple == 0) { - return; - } - - if (frameId == 0) { - beginPartition(); - } else { - boolean samePartition = PreclusteredGroupWriter.sameGroup(copyFrameAccessor, - copyFrameAccessor.getTupleCount() - 1, frameAccessor, 0, partitionColumnList, partitionComparators); - if (!samePartition) { - endPartition(); - beginPartition(); - } - } - if (nTuple == 1) { - partitionChunk(frameId, buffer, 0, 0); - } else { - int tBeginIndex = 0; - int tLastIndex = nTuple - 1; - for (int tIndex = 1; tIndex <= tLastIndex; tIndex++) { - boolean samePartition = PreclusteredGroupWriter.sameGroup(frameAccessor, tIndex - 1, frameAccessor, - tIndex, partitionColumnList, partitionComparators); - if (!samePartition) { - partitionChunk(frameId, buffer, tBeginIndex, tIndex - 1); - endPartition(); - beginPartition(); - tBeginIndex = tIndex; - } - } - partitionChunk(frameId, buffer, tBeginIndex, tLastIndex); - } - - copyFrame.resize(buffer.capacity()); - FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer()); - copyFrameAccessor.reset(copyFrame.getBuffer()); - frameId++; - } - - private void beginPartition() throws HyracksDataException { - if (inPartition) { - throw new IllegalStateException(); - } - inPartition = true; - beginPartitionImpl(); - } - - private void partitionChunk(long frameId, ByteBuffer frameBuffer, int beginTupleIdx, int endTupleIdx) - throws HyracksDataException { - if (!inPartition || frameId < 0) { - throw new IllegalStateException(); - } - partitionChunkImpl(frameId, frameBuffer, beginTupleIdx, endTupleIdx); - } - - private void endPartition() throws HyracksDataException { - if (!inPartition) { - throw new IllegalStateException(); - } - endPartitionImpl(); - inPartition = false; - } - - void aggInitPartition(long partitionLength) throws HyracksDataException { - for (IWindowAggregateEvaluator aggEval : aggEvals) { - aggEval.initPartition(partitionLength); - } - } - - private static IBinaryComparator[] createBinaryComparators(IBinaryComparatorFactory[] factories) { - IBinaryComparator[] comparators = new IBinaryComparator[factories.length]; - for (int i = 0; i < factories.length; i++) { - comparators[i] = factories[i].createBinaryComparator(); - } - return comparators; - } - - protected abstract void beginPartitionImpl() throws HyracksDataException; - - protected abstract void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, int tBeginIdx, int tEndIdx) - throws HyracksDataException; - - protected abstract void endPartitionImpl() throws HyracksDataException; -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/MaterializingWindowPushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/MaterializingWindowPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/MaterializingWindowPushRuntime.java deleted file mode 100644 index 850430f..0000000 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/MaterializingWindowPushRuntime.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hyracks.algebricks.runtime.operators.aggrun; - -import java.nio.ByteBuffer; - -import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; -import org.apache.hyracks.api.comm.FrameHelper; -import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; -import org.apache.hyracks.dataflow.common.io.RunFileWriter; -import org.apache.hyracks.storage.common.arraylist.IntArrayList; - -class MaterializingWindowPushRuntime extends AbstractWindowPushRuntime { - - private RunFileWriter run; - - private IntArrayList runInfo; - - private long partitionLength; - - private IFrame curFrame; - - private long curFrameId; - - private long runLastFrameId; - - MaterializingWindowPushRuntime(int[] outColumns, IRunningAggregateEvaluatorFactory[] aggFactories, - int[] projectionList, int[] partitionColumnList, IBinaryComparatorFactory[] partitionComparatorFactories, - IBinaryComparatorFactory[] orderComparatorFactories, IHyracksTaskContext ctx) { - super(outColumns, aggFactories, projectionList, partitionColumnList, partitionComparatorFactories, - orderComparatorFactories, ctx); - } - - @Override - public void open() throws HyracksDataException { - super.open(); - run = null; - curFrameId = -1; - } - - @Override - protected void init() throws HyracksDataException { - super.init(); - curFrame = new VSizeFrame(ctx); - runInfo = new IntArrayList(128, 128); - } - - @Override - public void close() throws HyracksDataException { - super.close(); - if (run != null) { - run.erase(); - } - } - - @Override - protected void beginPartitionImpl() { - runInfo.clear(); - partitionLength = 0; - if (run != null) { - run.rewind(); - } - } - - @Override - protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, int tBeginIdx, int tEndIdx) - throws HyracksDataException { - boolean firstChunk = runInfo.isEmpty(); - runInfo.add(tBeginIdx); - runInfo.add(tEndIdx); - - // save frame. first one to memory, remaining ones to the run file - if (firstChunk || tBeginIdx == 0) { - int pos = frameBuffer.position(); - frameBuffer.position(0); - - if (firstChunk) { - if (frameId != curFrameId) { - curFrame.resize(curFrame.getMinSize() * FrameHelper.deserializeNumOfMinFrame(frameBuffer)); - curFrame.getBuffer().clear(); - curFrame.getBuffer().put(frameBuffer); - curFrameId = frameId; - } - } else { - if (run == null) { - FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(getClass().getSimpleName()); - run = new RunFileWriter(file, ctx.getIoManager()); - run.open(); - } - run.nextFrame(frameBuffer); - runLastFrameId = frameId; - } - - frameBuffer.position(pos); - } - - partitionLength += tEndIdx - tBeginIdx + 1; - } - - @Override - protected void endPartitionImpl() throws HyracksDataException { - aggInitPartition(partitionLength); - GeneratedRunFileReader reader = null; - try { - boolean runRead = false; - for (int idx = 0, ln = runInfo.size(); idx < ln; idx += 2) { - int tBeginIdx = runInfo.get(idx); - int tEndIdx = runInfo.get(idx + 1); - if (tBeginIdx == 0 && idx > 0) { - if (reader == null) { - reader = run.createReader(); - reader.open(); - } - reader.nextFrame(curFrame); - runRead = true; - } - tAccess.reset(curFrame.getBuffer()); - produceTuples(tAccess, tBeginIdx, tEndIdx); - } - if (runRead) { - curFrameId = runLastFrameId; - } - } finally { - if (reader != null) { - reader.close(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java index 7c39a21..4ca166f 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java @@ -28,15 +28,14 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; class RunningAggregatePushRuntime extends AbstractRunningAggregatePushRuntime<IRunningAggregateEvaluator> { - RunningAggregatePushRuntime(int[] outColumns, IRunningAggregateEvaluatorFactory[] aggFactories, - int[] projectionList, IHyracksTaskContext ctx) { - super(outColumns, aggFactories, projectionList, ctx, IRunningAggregateEvaluator.class); + RunningAggregatePushRuntime(int[] projectionColumns, int[] runningAggOutColumns, + IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx) { + super(projectionColumns, runningAggOutColumns, runningAggFactories, IRunningAggregateEvaluator.class, ctx); } @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { tAccess.reset(buffer); - int nTuple = tAccess.getTupleCount(); - produceTuples(tAccess, 0, nTuple - 1); + produceTuples(tAccess, 0, tAccess.getTupleCount() - 1); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregateRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregateRuntimeFactory.java index e97ea7d..d9b3b5c 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregateRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregateRuntimeFactory.java @@ -29,31 +29,32 @@ public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRun private static final long serialVersionUID = 1L; - protected final int[] outColumns; + protected final int[] runningAggOutColumns; - protected final IRunningAggregateEvaluatorFactory[] aggFactories; + protected final IRunningAggregateEvaluatorFactory[] runningAggFactories; /** - * @param outColumns - * a sorted array of columns into which the result is written to - * @param aggFactories - * @param projectionList + * @param projectionColumns * an array of columns to be projected + * @param runningAggOutColumns + * a sorted array of columns into which the result is written to + * @param runningAggFactories */ - public RunningAggregateRuntimeFactory(int[] outColumns, IRunningAggregateEvaluatorFactory[] aggFactories, - int[] projectionList) { - super(projectionList); - this.outColumns = outColumns; - this.aggFactories = aggFactories; + public RunningAggregateRuntimeFactory(int[] projectionColumns, int[] runningAggOutColumns, + IRunningAggregateEvaluatorFactory[] runningAggFactories) { + super(projectionColumns); + this.runningAggOutColumns = runningAggOutColumns; + this.runningAggFactories = runningAggFactories; } @Override public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) { - return new RunningAggregatePushRuntime(outColumns, aggFactories, projectionList, ctx); + return new RunningAggregatePushRuntime(projectionList, runningAggOutColumns, runningAggFactories, ctx); } @Override public String toString() { - return "running-aggregate " + Arrays.toString(outColumns) + " := " + Arrays.toString(aggFactories); + return "running-aggregate " + Arrays.toString(runningAggOutColumns) + " := " + + Arrays.toString(runningAggFactories); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/SimpleWindowPushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/SimpleWindowPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/SimpleWindowPushRuntime.java deleted file mode 100644 index 61b06bf..0000000 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/SimpleWindowPushRuntime.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hyracks.algebricks.runtime.operators.aggrun; - -import java.nio.ByteBuffer; - -import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.exceptions.HyracksDataException; - -class SimpleWindowPushRuntime extends AbstractWindowPushRuntime { - - SimpleWindowPushRuntime(int[] outColumns, IRunningAggregateEvaluatorFactory[] aggFactories, int[] projectionList, - int[] partitionColumnList, IBinaryComparatorFactory[] partitionComparatorFactories, - IBinaryComparatorFactory[] orderComparatorFactories, IHyracksTaskContext ctx) { - super(outColumns, aggFactories, projectionList, partitionColumnList, partitionComparatorFactories, - orderComparatorFactories, ctx); - } - - @Override - protected void beginPartitionImpl() throws HyracksDataException { - aggInitPartition(-1); - } - - @Override - protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, int tBeginIdx, int tEndIdx) - throws HyracksDataException { - tAccess.reset(frameBuffer); - produceTuples(tAccess, tBeginIdx, tEndIdx); - } - - @Override - protected void endPartitionImpl() { - // nothing to do - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/WindowRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/WindowRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/WindowRuntimeFactory.java deleted file mode 100644 index fe7f554..0000000 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/WindowRuntimeFactory.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hyracks.algebricks.runtime.operators.aggrun; - -import java.util.Arrays; - -import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; - -public class WindowRuntimeFactory extends RunningAggregateRuntimeFactory { - - private static final long serialVersionUID = 1L; - - private final int[] partitionColumnList; - - private final IBinaryComparatorFactory[] partitionComparatorFactories; - - private final boolean partitionMaterialization; - - private final IBinaryComparatorFactory[] orderComparatorFactories; - - public WindowRuntimeFactory(int[] outColumns, IRunningAggregateEvaluatorFactory[] aggFactories, - int[] projectionList, int[] partitionColumnList, IBinaryComparatorFactory[] partitionComparatorFactories, - boolean partitionMaterialization, IBinaryComparatorFactory[] orderComparatorFactories) { - super(outColumns, aggFactories, projectionList); - this.partitionColumnList = partitionColumnList; - this.partitionComparatorFactories = partitionComparatorFactories; - this.partitionMaterialization = partitionMaterialization; - this.orderComparatorFactories = orderComparatorFactories; - } - - @Override - public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) { - return partitionMaterialization - ? new MaterializingWindowPushRuntime(outColumns, aggFactories, projectionList, partitionColumnList, - partitionComparatorFactories, orderComparatorFactories, ctx) - : new SimpleWindowPushRuntime(outColumns, aggFactories, projectionList, partitionColumnList, - partitionComparatorFactories, orderComparatorFactories, ctx); - } - - @Override - public String toString() { - return "window (" + Arrays.toString(partitionColumnList) + ") " + Arrays.toString(outColumns) + " := " - + Arrays.toString(aggFactories); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java index a717794..376a7a1 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java @@ -93,4 +93,29 @@ public class PipelineAssembler { public IPushRuntime[] getPushRuntime(IPushRuntimeFactory runtimeFactory) { return runtimeMap.get(runtimeFactory); } + + //TODO: refactoring is needed + public static IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, + IHyracksTaskContext ctx) throws HyracksDataException { + // should enforce protocol + boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); + // plug the operators + IFrameWriter start = writer; + IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories(); + RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors(); + for (int i = runtimeFactories.length - 1; i >= 0; i--) { + IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx)[0]; + newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime; + start = enforce ? EnforceFrameWriter.enforce(start) : start; + newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]); + if (i > 0) { + newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]); + } else { + // the nts has the same input and output rec. desc. + newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]); + } + start = newRuntime; + } + return start; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java new file mode 100644 index 0000000..a63aaf8 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.win; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IWindowAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.operators.aggrun.AbstractRunningAggregatePushRuntime; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter; + +public abstract class AbstractWindowPushRuntime extends AbstractRunningAggregatePushRuntime<IWindowAggregateEvaluator> { + + private final int[] partitionColumns; + private final IBinaryComparatorFactory[] partitionComparatorFactories; + private IBinaryComparator[] partitionComparators; + private final IBinaryComparatorFactory[] orderComparatorFactories; + private IFrame copyFrame; + private FrameTupleAccessor copyFrameAccessor; + private FrameTupleAccessor frameAccessor; + private long frameId; + private boolean inPartition; + + AbstractWindowPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, + IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns, + IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx) { + super(projectionColumns, runningAggOutColumns, runningAggFactories, IWindowAggregateEvaluator.class, ctx); + this.partitionColumns = partitionColumns; + this.partitionComparatorFactories = partitionComparatorFactories; + this.orderComparatorFactories = orderComparatorFactories; + } + + @Override + public void open() throws HyracksDataException { + super.open(); + frameId = 0; + inPartition = false; + } + + @Override + protected void init() throws HyracksDataException { + super.init(); + partitionComparators = createBinaryComparators(partitionComparatorFactories); + frameAccessor = new FrameTupleAccessor(inputRecordDesc); + copyFrame = new VSizeFrame(ctx); + copyFrameAccessor = new FrameTupleAccessor(inputRecordDesc); + copyFrameAccessor.reset(copyFrame.getBuffer()); + IBinaryComparator[] orderComparators = createBinaryComparators(orderComparatorFactories); + for (IWindowAggregateEvaluator runningAggEval : runningAggEvals) { + runningAggEval.configure(orderComparators); + } + } + + @Override + public void close() throws HyracksDataException { + if (inPartition) { + endPartition(); + } + super.close(); + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + frameAccessor.reset(buffer); + int nTuple = frameAccessor.getTupleCount(); + if (nTuple == 0) { + return; + } + + if (frameId == 0) { + beginPartition(); + } else { + boolean samePartition = PreclusteredGroupWriter.sameGroup(copyFrameAccessor, + copyFrameAccessor.getTupleCount() - 1, frameAccessor, 0, partitionColumns, partitionComparators); + if (!samePartition) { + endPartition(); + beginPartition(); + } + } + if (nTuple == 1) { + partitionChunk(frameId, buffer, 0, 0); + } else { + int tBeginIndex = 0; + int tLastIndex = nTuple - 1; + for (int tIndex = 1; tIndex <= tLastIndex; tIndex++) { + boolean samePartition = PreclusteredGroupWriter.sameGroup(frameAccessor, tIndex - 1, frameAccessor, + tIndex, partitionColumns, partitionComparators); + if (!samePartition) { + partitionChunk(frameId, buffer, tBeginIndex, tIndex - 1); + endPartition(); + beginPartition(); + tBeginIndex = tIndex; + } + } + partitionChunk(frameId, buffer, tBeginIndex, tLastIndex); + } + + copyFrame.resize(buffer.capacity()); + FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer()); + copyFrameAccessor.reset(copyFrame.getBuffer()); + frameId++; + } + + private void beginPartition() throws HyracksDataException { + if (inPartition) { + throw new IllegalStateException(); + } + inPartition = true; + beginPartitionImpl(); + } + + private void partitionChunk(long frameId, ByteBuffer frameBuffer, int beginTupleIdx, int endTupleIdx) + throws HyracksDataException { + if (!inPartition || frameId < 0) { + throw new IllegalStateException(); + } + partitionChunkImpl(frameId, frameBuffer, beginTupleIdx, endTupleIdx); + } + + private void endPartition() throws HyracksDataException { + if (!inPartition) { + throw new IllegalStateException(); + } + endPartitionImpl(); + inPartition = false; + } + + void runningAggInitPartition(long partitionLength) throws HyracksDataException { + for (IWindowAggregateEvaluator runningAggEval : runningAggEvals) { + runningAggEval.initPartition(partitionLength); + } + } + + protected static IBinaryComparator[] createBinaryComparators(IBinaryComparatorFactory[] factories) { + IBinaryComparator[] comparators = new IBinaryComparator[factories.length]; + for (int i = 0; i < factories.length; i++) { + comparators[i] = factories[i].createBinaryComparator(); + } + return comparators; + } + + protected abstract void beginPartitionImpl() throws HyracksDataException; + + protected abstract void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, int tBeginIdx, int tEndIdx) + throws HyracksDataException; + + protected abstract void endPartitionImpl() throws HyracksDataException; +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowRuntimeFactory.java new file mode 100644 index 0000000..6c52b0b --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowRuntimeFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.win; + +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; + +public abstract class AbstractWindowRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory { + + private static final long serialVersionUID = 1L; + + final int[] partitionColumns; + + final IBinaryComparatorFactory[] partitionComparatorFactories; + + final IBinaryComparatorFactory[] orderComparatorFactories; + + final int[] runningAggOutColumns; + + final IRunningAggregateEvaluatorFactory[] runningAggFactories; + + AbstractWindowRuntimeFactory(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, + IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns, + IRunningAggregateEvaluatorFactory[] runningAggFactories) { + super(projectionColumns); + this.runningAggOutColumns = runningAggOutColumns; + this.runningAggFactories = runningAggFactories; + this.partitionColumns = partitionColumns; + this.partitionComparatorFactories = partitionComparatorFactories; + this.orderComparatorFactories = orderComparatorFactories; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java new file mode 100644 index 0000000..6177723 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowAggregatorDescriptorFactory.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.win; + +import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; +import org.apache.hyracks.algebricks.runtime.operators.aggreg.NestedPlansAccumulatingAggregatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.meta.PipelineAssembler; +import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime; +import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.dataflow.common.utils.TupleUtils; +import org.apache.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory; +import org.apache.hyracks.dataflow.std.group.AggregateState; +import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor; + +/** + * Aggregator factory for window operators + */ +public final class WindowAggregatorDescriptorFactory extends AbstractAccumulatingAggregatorDescriptorFactory { + + private static final long serialVersionUID = 1L; + + private AlgebricksPipeline[] subplans; + + public WindowAggregatorDescriptorFactory(AlgebricksPipeline[] subplans) { + this.subplans = subplans; + } + + @Override + public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc, + RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys, long memoryBudget) + throws HyracksDataException { + NestedPlansAccumulatingAggregatorFactory.AggregatorOutput outputWriter = + new NestedPlansAccumulatingAggregatorFactory.AggregatorOutput(subplans, 0); + NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length]; + for (int i = 0; i < subplans.length; i++) { + pipelines[i] = + (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], outputWriter, ctx); + } + + return new IAggregatorDescriptor() { + + @Override + public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, + AggregateState state) throws HyracksDataException { + outputWriter.getTupleBuilder().reset(); + + for (NestedTupleSourceRuntime pipeline : pipelines) { + pipeline.open(); + } + } + + @Override + public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor, + int stateTupleIndex, AggregateState state) throws HyracksDataException { + memoryUsageCheck(); + for (NestedTupleSourceRuntime pipeline : pipelines) { + pipeline.writeTuple(accessor.getBuffer(), tIndex); + } + } + + @Override + public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor, + int tIndex, AggregateState state) throws HyracksDataException { + for (int i = 0; i < pipelines.length; i++) { + outputWriter.setInputIdx(i); + pipelines[i].close(); + } + memoryUsageCheck(); + TupleUtils.addFields(outputWriter.getTupleBuilder(), tupleBuilder); + return true; + } + + @Override + public AggregateState createAggregateStates() { + return null; + } + + @Override + public void reset() { + } + + @Override + public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, + AggregateState state) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + } + + private void memoryUsageCheck() { + // TODO: implement as in NestedPlansAccumulatingAggregatorFactory.memoryUsageCheck() + } + }; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java new file mode 100644 index 0000000..ec13c5f --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.win; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.api.comm.FrameHelper; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; +import org.apache.hyracks.dataflow.common.io.RunFileWriter; +import org.apache.hyracks.storage.common.arraylist.IntArrayList; + +/** + * Runtime for window operators that performs partition materialization and evaluates running aggregates + * that require information about number of tuples in the partition. + */ +class WindowMaterializingPushRuntime extends AbstractWindowPushRuntime { + + private long partitionLength; + + IFrame curFrame; + + private long curFrameId; + + private int chunkBeginIdx; + + private IntArrayList chunkEndIdx; + + private RunFileWriter run; + + private long runLastFrameId; + + WindowMaterializingPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, + IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns, + IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx) { + super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns, + runningAggOutColumns, runningAggFactories, ctx); + } + + @Override + public void open() throws HyracksDataException { + super.open(); + run = null; + curFrameId = -1; + } + + @Override + protected void init() throws HyracksDataException { + super.init(); + curFrame = new VSizeFrame(ctx); + chunkEndIdx = new IntArrayList(128, 128); + } + + @Override + public void close() throws HyracksDataException { + super.close(); + if (run != null) { + run.erase(); + } + } + + @Override + protected void beginPartitionImpl() { + chunkEndIdx.clear(); + partitionLength = 0; + if (run != null) { + run.rewind(); + } + } + + @Override + protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, int tBeginIdx, int tEndIdx) + throws HyracksDataException { + // save the frame. first one to memory, remaining ones to the run file + boolean isFirstChunk = chunkEndIdx.isEmpty(); + if (isFirstChunk) { + if (frameId != curFrameId) { + int nBlocks = FrameHelper.deserializeNumOfMinFrame(frameBuffer); + curFrame.ensureFrameSize(curFrame.getMinSize() * nBlocks); + int pos = frameBuffer.position(); + FrameUtils.copyAndFlip(frameBuffer, curFrame.getBuffer()); + frameBuffer.position(pos); + curFrameId = frameId; + } + chunkBeginIdx = tBeginIdx; + } else { + if (tBeginIdx != 0) { + throw new IllegalStateException(String.valueOf(tBeginIdx)); + } + if (run == null) { + FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(getClass().getSimpleName()); + run = new RunFileWriter(file, ctx.getIoManager()); + run.open(); + } + int pos = frameBuffer.position(); + frameBuffer.position(0); + run.nextFrame(frameBuffer); + frameBuffer.position(pos); + runLastFrameId = frameId; + } + + chunkEndIdx.add(tEndIdx); + partitionLength += tEndIdx - tBeginIdx + 1; + } + + @Override + protected void endPartitionImpl() throws HyracksDataException { + runningAggInitPartition(partitionLength); + + int nChunks = getPartitionChunkCount(); + if (nChunks == 1) { + producePartitionTuples(0, null); + } else { + GeneratedRunFileReader reader = run.createReader(); + reader.open(); + try { + for (int chunkIdx = 0; chunkIdx < nChunks; chunkIdx++) { + if (chunkIdx > 0) { + reader.nextFrame(curFrame); + } + producePartitionTuples(chunkIdx, reader); + } + curFrameId = runLastFrameId; + } finally { + reader.close(); + } + } + } + + protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader reader) throws HyracksDataException { + tAccess.reset(curFrame.getBuffer()); + produceTuples(tAccess, getTupleBeginIdx(chunkIdx), getTupleEndIdx(chunkIdx)); + } + + int getPartitionChunkCount() { + return chunkEndIdx.size(); + } + + int getTupleBeginIdx(int chunkIdx) { + return chunkIdx == 0 ? chunkBeginIdx : 0; + } + + int getTupleEndIdx(int chunkIdx) { + return chunkEndIdx.get(chunkIdx); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2c18aa9/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java new file mode 100644 index 0000000..1b02fb1 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.win; + +import java.util.Arrays; + +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; + +/** + * Runtime factory for window operators that performs partition materialization and evaluates running aggregates + * that require information about number of tuples in the partition. + */ +public class WindowMaterializingRuntimeFactory extends AbstractWindowRuntimeFactory { + + private static final long serialVersionUID = 1L; + + public WindowMaterializingRuntimeFactory(int[] partitionColumns, + IBinaryComparatorFactory[] partitionComparatorFactories, + IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumnsExcludingSubplans, + int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories) { + super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, + projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories); + } + + @Override + public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) { + return new WindowMaterializingPushRuntime(partitionColumns, partitionComparatorFactories, + orderComparatorFactories, projectionList, runningAggOutColumns, runningAggFactories, ctx); + } + + @Override + public String toString() { + return "window [materialize] (" + Arrays.toString(partitionColumns) + ") " + + Arrays.toString(runningAggOutColumns) + " := " + Arrays.toString(runningAggFactories); + } +}
