Repository: asterixdb Updated Branches: refs/heads/master b9d55c40e -> fdedf6263
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractWindowPushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractWindowPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractWindowPushRuntime.java new file mode 100644 index 0000000..f0bd204 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractWindowPushRuntime.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.aggrun; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IWindowAggregateEvaluator; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter; + +public abstract class AbstractWindowPushRuntime extends AbstractRunningAggregatePushRuntime<IWindowAggregateEvaluator> { + + private final int[] partitionColumnList; + private final IBinaryComparatorFactory[] partitionComparatorFactories; + private IBinaryComparator[] partitionComparators; + private final IBinaryComparatorFactory[] orderComparatorFactories; + private IFrame copyFrame; + private FrameTupleAccessor copyFrameAccessor; + private FrameTupleAccessor frameAccessor; + private long frameId; + private boolean inPartition; + + public AbstractWindowPushRuntime(int[] outColumns, IRunningAggregateEvaluatorFactory[] aggFactories, + int[] projectionList, int[] partitionColumnList, IBinaryComparatorFactory[] partitionComparatorFactories, + IBinaryComparatorFactory[] orderComparatorFactories, IHyracksTaskContext ctx) { + super(outColumns, aggFactories, projectionList, ctx, IWindowAggregateEvaluator.class); + this.partitionColumnList = partitionColumnList; + this.partitionComparatorFactories = partitionComparatorFactories; + this.orderComparatorFactories = orderComparatorFactories; + } + + @Override + public void open() throws HyracksDataException { + super.open(); + frameId = 0; + inPartition = false; + } + + @Override + protected void init() throws HyracksDataException { + super.init(); + partitionComparators = createBinaryComparators(partitionComparatorFactories); + frameAccessor = new FrameTupleAccessor(inputRecordDesc); + copyFrame = new VSizeFrame(ctx); + copyFrameAccessor = new FrameTupleAccessor(inputRecordDesc); + copyFrameAccessor.reset(copyFrame.getBuffer()); + IBinaryComparator[] orderComparators = createBinaryComparators(orderComparatorFactories); + for (IWindowAggregateEvaluator aggEval : aggEvals) { + aggEval.configure(orderComparators); + } + } + + @Override + public void close() throws HyracksDataException { + if (inPartition) { + endPartition(); + } + super.close(); + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + frameAccessor.reset(buffer); + int nTuple = frameAccessor.getTupleCount(); + if (nTuple == 0) { + return; + } + + if (frameId == 0) { + beginPartition(); + } else { + boolean samePartition = PreclusteredGroupWriter.sameGroup(copyFrameAccessor, + copyFrameAccessor.getTupleCount() - 1, frameAccessor, 0, partitionColumnList, partitionComparators); + if (!samePartition) { + endPartition(); + beginPartition(); + } + } + if (nTuple == 1) { + partitionChunk(frameId, buffer, 0, 0); + } else { + int tBeginIndex = 0; + int tLastIndex = nTuple - 1; + for (int tIndex = 1; tIndex <= tLastIndex; tIndex++) { + boolean samePartition = PreclusteredGroupWriter.sameGroup(frameAccessor, tIndex - 1, frameAccessor, + tIndex, partitionColumnList, partitionComparators); + if (!samePartition) { + partitionChunk(frameId, buffer, tBeginIndex, tIndex - 1); + endPartition(); + beginPartition(); + tBeginIndex = tIndex; + } + } + partitionChunk(frameId, buffer, tBeginIndex, tLastIndex); + } + + copyFrame.resize(buffer.capacity()); + FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer()); + copyFrameAccessor.reset(copyFrame.getBuffer()); + frameId++; + } + + private void beginPartition() throws HyracksDataException { + if (inPartition) { + throw new IllegalStateException(); + } + inPartition = true; + beginPartitionImpl(); + } + + private void partitionChunk(long frameId, ByteBuffer frameBuffer, int beginTupleIdx, int endTupleIdx) + throws HyracksDataException { + if (!inPartition || frameId < 0) { + throw new IllegalStateException(); + } + partitionChunkImpl(frameId, frameBuffer, beginTupleIdx, endTupleIdx); + } + + private void endPartition() throws HyracksDataException { + if (!inPartition) { + throw new IllegalStateException(); + } + endPartitionImpl(); + inPartition = false; + } + + void aggInitPartition(long partitionLength) throws HyracksDataException { + for (IWindowAggregateEvaluator aggEval : aggEvals) { + aggEval.initPartition(partitionLength); + } + } + + private static IBinaryComparator[] createBinaryComparators(IBinaryComparatorFactory[] factories) { + IBinaryComparator[] comparators = new IBinaryComparator[factories.length]; + for (int i = 0; i < factories.length; i++) { + comparators[i] = factories[i].createBinaryComparator(); + } + return comparators; + } + + protected abstract void beginPartitionImpl() throws HyracksDataException; + + protected abstract void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, int tBeginIdx, int tEndIdx) + throws HyracksDataException; + + protected abstract void endPartitionImpl() throws HyracksDataException; +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/MaterializingWindowPushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/MaterializingWindowPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/MaterializingWindowPushRuntime.java new file mode 100644 index 0000000..850430f --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/MaterializingWindowPushRuntime.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.aggrun; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.api.comm.FrameHelper; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; +import org.apache.hyracks.dataflow.common.io.RunFileWriter; +import org.apache.hyracks.storage.common.arraylist.IntArrayList; + +class MaterializingWindowPushRuntime extends AbstractWindowPushRuntime { + + private RunFileWriter run; + + private IntArrayList runInfo; + + private long partitionLength; + + private IFrame curFrame; + + private long curFrameId; + + private long runLastFrameId; + + MaterializingWindowPushRuntime(int[] outColumns, IRunningAggregateEvaluatorFactory[] aggFactories, + int[] projectionList, int[] partitionColumnList, IBinaryComparatorFactory[] partitionComparatorFactories, + IBinaryComparatorFactory[] orderComparatorFactories, IHyracksTaskContext ctx) { + super(outColumns, aggFactories, projectionList, partitionColumnList, partitionComparatorFactories, + orderComparatorFactories, ctx); + } + + @Override + public void open() throws HyracksDataException { + super.open(); + run = null; + curFrameId = -1; + } + + @Override + protected void init() throws HyracksDataException { + super.init(); + curFrame = new VSizeFrame(ctx); + runInfo = new IntArrayList(128, 128); + } + + @Override + public void close() throws HyracksDataException { + super.close(); + if (run != null) { + run.erase(); + } + } + + @Override + protected void beginPartitionImpl() { + runInfo.clear(); + partitionLength = 0; + if (run != null) { + run.rewind(); + } + } + + @Override + protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, int tBeginIdx, int tEndIdx) + throws HyracksDataException { + boolean firstChunk = runInfo.isEmpty(); + runInfo.add(tBeginIdx); + runInfo.add(tEndIdx); + + // save frame. first one to memory, remaining ones to the run file + if (firstChunk || tBeginIdx == 0) { + int pos = frameBuffer.position(); + frameBuffer.position(0); + + if (firstChunk) { + if (frameId != curFrameId) { + curFrame.resize(curFrame.getMinSize() * FrameHelper.deserializeNumOfMinFrame(frameBuffer)); + curFrame.getBuffer().clear(); + curFrame.getBuffer().put(frameBuffer); + curFrameId = frameId; + } + } else { + if (run == null) { + FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(getClass().getSimpleName()); + run = new RunFileWriter(file, ctx.getIoManager()); + run.open(); + } + run.nextFrame(frameBuffer); + runLastFrameId = frameId; + } + + frameBuffer.position(pos); + } + + partitionLength += tEndIdx - tBeginIdx + 1; + } + + @Override + protected void endPartitionImpl() throws HyracksDataException { + aggInitPartition(partitionLength); + GeneratedRunFileReader reader = null; + try { + boolean runRead = false; + for (int idx = 0, ln = runInfo.size(); idx < ln; idx += 2) { + int tBeginIdx = runInfo.get(idx); + int tEndIdx = runInfo.get(idx + 1); + if (tBeginIdx == 0 && idx > 0) { + if (reader == null) { + reader = run.createReader(); + reader.open(); + } + reader.nextFrame(curFrame); + runRead = true; + } + tAccess.reset(curFrame.getBuffer()); + produceTuples(tAccess, tBeginIdx, tEndIdx); + } + if (runRead) { + curFrameId = runLastFrameId; + } + } finally { + if (reader != null) { + reader.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java new file mode 100644 index 0000000..7c39a21 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.aggrun; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +class RunningAggregatePushRuntime extends AbstractRunningAggregatePushRuntime<IRunningAggregateEvaluator> { + + RunningAggregatePushRuntime(int[] outColumns, IRunningAggregateEvaluatorFactory[] aggFactories, + int[] projectionList, IHyracksTaskContext ctx) { + super(outColumns, aggFactories, projectionList, ctx, IRunningAggregateEvaluator.class); + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + tAccess.reset(buffer); + int nTuple = tAccess.getTupleCount(); + produceTuples(tAccess, 0, nTuple - 1); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregateRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregateRuntimeFactory.java new file mode 100644 index 0000000..e97ea7d --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregateRuntimeFactory.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.runtime.operators.aggrun; + +import java.util.Arrays; + +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; +import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; + +public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory { + + private static final long serialVersionUID = 1L; + + protected final int[] outColumns; + + protected final IRunningAggregateEvaluatorFactory[] aggFactories; + + /** + * @param outColumns + * a sorted array of columns into which the result is written to + * @param aggFactories + * @param projectionList + * an array of columns to be projected + */ + public RunningAggregateRuntimeFactory(int[] outColumns, IRunningAggregateEvaluatorFactory[] aggFactories, + int[] projectionList) { + super(projectionList); + this.outColumns = outColumns; + this.aggFactories = aggFactories; + } + + @Override + public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) { + return new RunningAggregatePushRuntime(outColumns, aggFactories, projectionList, ctx); + } + + @Override + public String toString() { + return "running-aggregate " + Arrays.toString(outColumns) + " := " + Arrays.toString(aggFactories); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/SimpleWindowPushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/SimpleWindowPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/SimpleWindowPushRuntime.java new file mode 100644 index 0000000..61b06bf --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/SimpleWindowPushRuntime.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.aggrun; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +class SimpleWindowPushRuntime extends AbstractWindowPushRuntime { + + SimpleWindowPushRuntime(int[] outColumns, IRunningAggregateEvaluatorFactory[] aggFactories, int[] projectionList, + int[] partitionColumnList, IBinaryComparatorFactory[] partitionComparatorFactories, + IBinaryComparatorFactory[] orderComparatorFactories, IHyracksTaskContext ctx) { + super(outColumns, aggFactories, projectionList, partitionColumnList, partitionComparatorFactories, + orderComparatorFactories, ctx); + } + + @Override + protected void beginPartitionImpl() throws HyracksDataException { + aggInitPartition(-1); + } + + @Override + protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, int tBeginIdx, int tEndIdx) + throws HyracksDataException { + tAccess.reset(frameBuffer); + produceTuples(tAccess, tBeginIdx, tEndIdx); + } + + @Override + protected void endPartitionImpl() { + // nothing to do + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/WindowRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/WindowRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/WindowRuntimeFactory.java new file mode 100644 index 0000000..fe7f554 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/WindowRuntimeFactory.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.aggrun; + +import java.util.Arrays; + +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; + +public class WindowRuntimeFactory extends RunningAggregateRuntimeFactory { + + private static final long serialVersionUID = 1L; + + private final int[] partitionColumnList; + + private final IBinaryComparatorFactory[] partitionComparatorFactories; + + private final boolean partitionMaterialization; + + private final IBinaryComparatorFactory[] orderComparatorFactories; + + public WindowRuntimeFactory(int[] outColumns, IRunningAggregateEvaluatorFactory[] aggFactories, + int[] projectionList, int[] partitionColumnList, IBinaryComparatorFactory[] partitionComparatorFactories, + boolean partitionMaterialization, IBinaryComparatorFactory[] orderComparatorFactories) { + super(outColumns, aggFactories, projectionList); + this.partitionColumnList = partitionColumnList; + this.partitionComparatorFactories = partitionComparatorFactories; + this.partitionMaterialization = partitionMaterialization; + this.orderComparatorFactories = orderComparatorFactories; + } + + @Override + public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) { + return partitionMaterialization + ? new MaterializingWindowPushRuntime(outColumns, aggFactories, projectionList, partitionColumnList, + partitionComparatorFactories, orderComparatorFactories, ctx) + : new SimpleWindowPushRuntime(outColumns, aggFactories, projectionList, partitionColumnList, + partitionComparatorFactories, orderComparatorFactories, ctx); + } + + @Override + public String toString() { + return "window (" + Arrays.toString(partitionColumnList) + ") " + Arrays.toString(outColumns) + " := " + + Arrays.toString(aggFactories); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java deleted file mode 100644 index ca58d4d..0000000 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.algebricks.runtime.operators.std; - -import java.nio.ByteBuffer; -import java.util.Arrays; - -import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator; -import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; -import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory; -import org.apache.hyracks.api.comm.IFrameTupleAccessor; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.data.std.api.IPointable; -import org.apache.hyracks.data.std.primitive.VoidPointable; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; - -public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory { - - private static final long serialVersionUID = 1L; - - private final int[] outColumns; - private final IRunningAggregateEvaluatorFactory[] runningAggregates; - - /** - * @param outColumns - * a sorted array of columns into which the result is written to - * @param runningAggregates - * @param projectionList - * an array of columns to be projected - */ - - public RunningAggregateRuntimeFactory(int[] outColumns, IRunningAggregateEvaluatorFactory[] runningAggregates, - int[] projectionList) { - super(projectionList); - this.outColumns = outColumns; - this.runningAggregates = runningAggregates; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("running-aggregate ["); - for (int i = 0; i < outColumns.length; i++) { - if (i > 0) { - sb.append(", "); - } - sb.append(outColumns[i]); - } - sb.append("] := ["); - for (int i = 0; i < runningAggregates.length; i++) { - if (i > 0) { - sb.append(", "); - } - sb.append(runningAggregates[i]); - } - sb.append("]"); - return sb.toString(); - } - - @Override - public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) - throws HyracksDataException { - final int[] projectionToOutColumns = new int[projectionList.length]; - for (int j = 0; j < projectionList.length; j++) { - projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]); - } - - return new AbstractOneInputOneOutputOneFramePushRuntime() { - private final IPointable p = VoidPointable.FACTORY.createPointable(); - private final IRunningAggregateEvaluator[] raggs = new IRunningAggregateEvaluator[runningAggregates.length]; - private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length); - private boolean first = true; - - @Override - public void open() throws HyracksDataException { - initAccessAppendRef(ctx); - if (first) { - first = false; - int n = runningAggregates.length; - for (int i = 0; i < n; i++) { - raggs[i] = runningAggregates[i].createRunningAggregateEvaluator(ctx); - } - } - for (int i = 0; i < runningAggregates.length; i++) { - raggs[i].init(); - } - super.open(); - } - - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - tAccess.reset(buffer); - int nTuple = tAccess.getTupleCount(); - for (int t = 0; t < nTuple; t++) { - tRef.reset(tAccess, t); - produceTuple(tupleBuilder, tAccess, t, tRef); - appendToFrameFromTupleBuilder(tupleBuilder); - } - } - - private void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex, - FrameTupleReference tupleRef) throws HyracksDataException { - tb.reset(); - for (int f = 0; f < projectionList.length; f++) { - int k = projectionToOutColumns[f]; - if (k >= 0) { - raggs[k].step(tupleRef, p); - tb.addField(p.getByteArray(), p.getStartOffset(), p.getLength()); - } else { - tb.addField(accessor, tIndex, projectionList[f]); - } - } - } - - @Override - public void flush() throws HyracksDataException { - appender.flush(writer); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java index d4944af..d235d6d 100644 --- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java +++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java @@ -51,7 +51,7 @@ import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.PrinterRuntimeFactory; -import org.apache.hyracks.algebricks.runtime.operators.std.RunningAggregateRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.aggrun.RunningAggregateRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.SplitOperatorDescriptor; import org.apache.hyracks.algebricks.runtime.operators.std.StreamLimitRuntimeFactory; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index 7d126ac..690870c 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -152,6 +152,7 @@ public class ErrorCode { public static final int ONE_TUPLE_RANGEMAP_EXPECTED = 116; public static final int NO_RANGEMAP_PRODUCED = 117; public static final int RANGEMAP_NOT_FOUND = 118; + public static final int UNSUPPORTED_WINDOW_SPEC = 119; // Compilation error codes. public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 50e92b3..413bbee 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -135,6 +135,7 @@ 116 = One tuple rangemap is expected 117 = No range map produced for parallel sort 118 = Range map was not found for parallel sort +119 = Unsupported window specification: PARTITION BY %1$s, ORDER BY %2$s 10000 = The given rule collection %1$s is not an instance of the List class. 10001 = Cannot compose partition constraint %1$s with %2$s http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java index 915c63d..dabdd4f 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java @@ -49,6 +49,11 @@ public class RunFileWriter implements IFrameWriter { maxOutputFrameSize = 0; } + public void rewind() { + size = 0; + maxOutputFrameSize = 0; + } + @Override public void fail() throws HyracksDataException { ioManager.close(handle); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java index ca78046..c018e9d 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java @@ -21,6 +21,7 @@ package org.apache.hyracks.dataflow.std.group.preclustered; import java.nio.ByteBuffer; import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -138,9 +139,10 @@ public class PreclusteredGroupWriter implements IFrameWriter { } } - private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex, - FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException { - if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) { + private void switchGroupIfRequired(IFrameTupleAccessor prevTupleAccessor, int prevTupleIndex, + IFrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException { + if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex, groupFields, + comparators)) { writeOutput(prevTupleAccessor, prevTupleIndex); tupleBuilder.reset(); @@ -153,7 +155,7 @@ public class PreclusteredGroupWriter implements IFrameWriter { } } - private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex) + private void writeOutput(final IFrameTupleAccessor lastTupleAccessor, int lastTupleIndex) throws HyracksDataException { tupleBuilder.reset(); @@ -171,8 +173,8 @@ public class PreclusteredGroupWriter implements IFrameWriter { } - private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) - throws HyracksDataException { + public static boolean sameGroup(IFrameTupleAccessor a1, int t1Idx, IFrameTupleAccessor a2, int t2Idx, + int[] groupFields, IBinaryComparator[] comparators) throws HyracksDataException { for (int i = 0; i < comparators.length; ++i) { int fIdx = groupFields[i]; int s1 = a1.getAbsoluteFieldStartOffset(t1Idx, fIdx);
