DRILL-31 WindowFrame operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b41f51f1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b41f51f1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b41f51f1 Branch: refs/heads/master Commit: b41f51f1c6cd786f14f0d50066a67a76169b9d1e Parents: cb3132a Author: Timothy Chen <[email protected]> Authored: Sun Feb 17 23:41:57 2013 -0800 Committer: Timothy Chen <[email protected]> Committed: Mon Mar 4 02:22:26 2013 -0800 ---------------------------------------------------------------------- .../common/logical/data/SingleInputOperator.java | 2 +- .../drill/common/logical/data/WindowFrame.java | 4 +- .../org/apache/drill/exec/ref/UnbackedRecord.java | 3 +- .../apache/drill/exec/ref/rops/WindowFrameROP.java | 321 +++++++++++++++ .../apache/drill/exec/ref/rops/WindowingROP.java | 56 --- .../drill/exec/ref/rops/WindowFrameROPTest.java | 201 +++++++++ 6 files changed, 526 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b41f51f1/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java index 1310a79..0acfbef 100644 --- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java +++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java @@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; /** * SimpleOperator is an operator that has one inputs at most. */ -public abstract class SingleInputOperator extends LogicalOperatorBase{ +public abstract class SingleInputOperator extends LogicalOperatorBase { private LogicalOperator input; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b41f51f1/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java index 72a7ffd..ec14663 100644 --- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java +++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b41f51f1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java index bc6ae0e..6152a32 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java @@ -77,9 +77,8 @@ public class UnbackedRecord implements RecordPointer { @Override public RecordPointer copy() { - // TODO: Make a deep copy. UnbackedRecord r = new UnbackedRecord(); - r.root = this.root; + r.root = this.root.copy(); return r; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b41f51f1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowFrameROP.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowFrameROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowFrameROP.java new file mode 100644 index 0000000..a8417d1 --- /dev/null +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowFrameROP.java @@ -0,0 +1,321 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +package org.apache.drill.exec.ref.rops; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Queues; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.logical.data.WindowFrame; +import org.apache.drill.exec.ref.RecordIterator; +import org.apache.drill.exec.ref.RecordPointer; +import org.apache.drill.exec.ref.values.DataValue; +import org.apache.drill.exec.ref.values.ScalarValues; + +import java.util.List; +import java.util.Queue; + +import static com.google.common.base.Preconditions.checkArgument; + +public class WindowFrameROP extends SingleInputROPBase<WindowFrame> { + + private RecordIterator incoming; + private FieldReference segmentRef; + private FieldReference positionRef; + private FieldReference withinRef; + private boolean withinConstrained; + + public WindowFrameROP(WindowFrame config) { + super(config); + WindowFrame.FrameRef frameRef = config.getFrame(); + if (frameRef != null) { + positionRef = frameRef.getPosition(); + segmentRef = frameRef.getSegment(); + } + + if (positionRef == null) { + positionRef = new FieldReference("ref.position"); + } + + if (segmentRef == null) { + segmentRef = new FieldReference("ref.segment"); + } + + withinRef = config.getWithin(); + withinConstrained = withinRef != null; + } + + @Override + protected void setInput(RecordIterator incoming) { + this.incoming = incoming; + } + + @Override + protected RecordIterator getIteratorInternal() { + return new WindowIterator(config.getStart(), config.getEnd()); + } + + private class Window { + List<WindowObjectHolder> holders; + int windowId; + int nextRecordIndex; + int lastIndex; + int nextPosition; + + private Window(long start, long end, Window lastWindow) { + this.holders = Lists.newArrayList(); + if (lastWindow != null) { + this.windowId = lastWindow.getWindowId() + 1; + lastIndex = (int) (windowId + end); + final int lastMinIndex = (int) Math.max(windowId + start, 0); + Iterable<WindowObjectHolder> lastHolders = Iterables.filter(lastWindow.getHolders(), new Predicate<WindowObjectHolder>() { + @Override + public boolean apply(WindowObjectHolder windowObjectHolder) { + return windowObjectHolder.getIndex() >= lastMinIndex; + } + }); + for (WindowObjectHolder holder : lastHolders) { + holder.setPosition(nextPosition()); + addRecord(holder); + } + } else { + this.windowId = 0; + lastIndex = (int) (windowId + end); + } + + } + + private int getWindowId() { + return windowId; + } + + private boolean isFull() { + if (holders.isEmpty()) { + return false; + } + + WindowObjectHolder lastHolder = holders.get(holders.size() - 1); + return lastHolder.getIndex() >= lastIndex; + } + + private void addRecord(RecordPointer pointer, int index, boolean schemaChanged) { + addRecord(new WindowObjectHolder(pointer, nextPosition(), index, schemaChanged)); + } + + private void addRecord(WindowObjectHolder holder) { + if (!isFull()) { + holders.add(holder); + } else { + throw new DrillRuntimeException("Adding more records into windows then configured."); + } + } + + private int nextPosition() { + int position = nextPosition; + if (nextPosition == 0) { + nextPosition = 1; + } else if (nextPosition > 0) { + nextPosition = -nextPosition; + } else { + nextPosition = -nextPosition + 1; + } + return position; + } + + public List<WindowObjectHolder> getHolders() { + return holders; + } + + public WindowObjectHolder nextRecord() { + if (nextRecordIndex >= holders.size()) { + return null; + } else { + return holders.get(nextRecordIndex++); + } + } + + public boolean isCrossedWithinBoundary(RecordPointer nextRecord) { + if (withinConstrained && nextRecord != null && !holders.isEmpty()) { + DataValue lastWithinVal = holders.get(holders.size() - 1).getPointer().getField(withinRef); + DataValue nextWithinVal = nextRecord.getField(withinRef); + boolean lastIsNull = lastWithinVal == null; + boolean nextIsNull = nextWithinVal == null; + return lastIsNull != nextIsNull || (!nextIsNull && !nextWithinVal.equals(lastWithinVal)); + } + + return false; + } + + public void removeHoldersBeforeIndex(final int index) { + Iterables.removeIf(holders, new Predicate<WindowObjectHolder>() { + @Override + public boolean apply(WindowObjectHolder windowObjectHolder) { + return windowObjectHolder.getIndex() <= index; + } + }); + } + } + + private class WindowObjectHolder { + private final RecordPointer pointer; + private int position; + private final int index; + private final boolean schemaChanged; + private int windowId; + + private WindowObjectHolder(RecordPointer pointer, int position, int index, boolean schemaChanged) { + this.pointer = pointer; + this.position = position; + this.index = index; + this.schemaChanged = schemaChanged; + } + + public WindowObjectHolder setWindowId(int windowId) { + this.windowId = windowId; + return this; + } + + public RecordPointer getPointer() { + return pointer; + } + + public int getPosition() { + return position; + } + + public int getIndex() { + return index; + } + + public boolean isSchemaChanged() { + return schemaChanged; + } + + public int getWindowId() { + return windowId; + } + + public void setPosition(int position) { + this.position = position; + } + } + + private class WindowManager { + Queue<WindowObjectHolder> holderBuffer; + Window curWindow; + long start; + long end; + + WindowManager(long start, long end) { + holderBuffer = Queues.newLinkedBlockingDeque(); + this.start = start; + this.end = end; + } + + private WindowObjectHolder nextRecord() { + if (curWindow != null) { + WindowObjectHolder holder = curWindow.nextRecord(); + if (holder != null) { + return holder.setWindowId(curWindow.getWindowId()); + } else if (!holderBuffer.isEmpty()) { + WindowObjectHolder obj = holderBuffer.poll(); + addRecord(obj.getPointer(), obj.getIndex(), obj.isSchemaChanged()); + return nextRecord(); + } + } + + return null; + } + + public void addRecord(RecordPointer recordPointer, int index, boolean schemaChanged) { + if (curWindow == null || curWindow.isFull()) { + curWindow = new Window(start, end, curWindow); + } else if (curWindow.isCrossedWithinBoundary(recordPointer)) { + if (index - 1 == curWindow.getWindowId()) { + curWindow.removeHoldersBeforeIndex(curWindow.getWindowId()); + } + curWindow = new Window(start, end, curWindow); + if (curWindow.isCrossedWithinBoundary(recordPointer)) { + holderBuffer.add(new WindowObjectHolder(recordPointer, 0, index, schemaChanged)); + return; + } + } + + curWindow.addRecord(recordPointer, index, schemaChanged); + } + + public boolean hasMoreWindows(int curIndex) { + return curWindow != null && curWindow.getWindowId() < curIndex; + } + + public WindowObjectHolder nextWindowOnEmpty() { + curWindow = new Window(start, end, curWindow); + return nextRecord(); + } + } + + + private class WindowIterator implements RecordIterator { + private int curIndex = -1; + private WindowManager windowManager; + private ProxySimpleRecord windowPointer; + + public WindowIterator(long start, long end) { + checkArgument(end - start >= 0, "Invalid end and start. end: %s, start: %s", end, start); + windowManager = new WindowManager(start, end); + windowPointer = new ProxySimpleRecord(); + } + + @Override + public RecordPointer getRecordPointer() { + return windowPointer; + } + + @Override + public NextOutcome next() { + WindowObjectHolder holder = windowManager.nextRecord(); + if (holder == null) { + NextOutcome outcome = incoming.next(); + if (outcome != NextOutcome.NONE_LEFT) { + windowManager.addRecord(incoming.getRecordPointer().copy(), ++curIndex, outcome == NextOutcome.INCREMENTED_SCHEMA_CHANGED); + holder = windowManager.nextRecord(); + } else if (windowManager.hasMoreWindows(curIndex)) { + holder = windowManager.nextWindowOnEmpty(); + } + + if (holder == null) { + return NextOutcome.NONE_LEFT; + } + } + + RecordPointer newPointer = holder.getPointer().copy(); + newPointer.addField(positionRef, new ScalarValues.IntegerScalar(holder.getPosition())); + newPointer.addField(segmentRef, new ScalarValues.IntegerScalar(holder.getWindowId())); + windowPointer.setRecord(newPointer); + return holder.isSchemaChanged() ? NextOutcome.INCREMENTED_SCHEMA_CHANGED : NextOutcome.INCREMENTED_SCHEMA_UNCHANGED; + } + + @Override + public ROP getParent() { + return WindowFrameROP.this; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b41f51f1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java deleted file mode 100644 index bbfbc1c..0000000 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java +++ /dev/null @@ -1,56 +0,0 @@ -package org.apache.drill.exec.ref.rops; - -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; - -import org.apache.commons.lang.NotImplementedException; -import org.apache.drill.common.logical.data.SingleInputOperator; -import org.apache.drill.exec.ref.RecordIterator; -import org.apache.drill.exec.ref.RecordPointer; - -/** - * For simplification purposes, the Windowing reference implementation takes the lazy approach of finishing a window - * before it outputs any values from that window. While this is necessary in the ALL:ALL scenario, other scenarios could - * be implemented more efficiently with an appropriately size open window. - */ -public class WindowingROP extends SingleInputROPBase { - - private List<RecordPointer> records = new LinkedList<RecordPointer>(); - private WindowManager[] windows; - private Window[] windowPerKey; - - // the place where we should start the next batch. - private int internalWindowPosition; - - public WindowingROP(SingleInputOperator config) { - super(config); - throw new NotImplementedException(); - } - - @Override - protected void setInput(RecordIterator incoming) { - } - - @Override - protected RecordIterator getIteratorInternal() { - return null; - } - - private class Window { - - int ending; - List<RecordPointer> records = new ArrayList<RecordPointer>(); - - public void reset(int curPos) { - - } - } - - private class WindowManager { - private void increment() { - - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b41f51f1/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/WindowFrameROPTest.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/WindowFrameROPTest.java b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/WindowFrameROPTest.java new file mode 100644 index 0000000..d041174 --- /dev/null +++ b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/WindowFrameROPTest.java @@ -0,0 +1,201 @@ +package org.apache.drill.exec.ref.rops; + +import com.google.common.collect.Lists; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.data.WindowFrame; +import org.apache.drill.exec.ref.RecordIterator; +import org.apache.drill.exec.ref.RecordPointer; +import org.apache.drill.exec.ref.TestUtils; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static junit.framework.Assert.assertEquals; + +public class WindowFrameROPTest { + final String input = "" + + "{id: 0}" + + "{id: 1}" + + "{id: 2}" + + "{id: 3}" + + "{id: 4}"; + + @Test + public void windowShouldWorkWithBefore() throws IOException { + WindowFrameROP rop = new WindowFrameROP(new WindowFrame(null, null, -2L, 0L)); + RecordIterator incoming = TestUtils.jsonToRecordIterator("test", input); + rop.setInput(incoming); + RecordIterator out = rop.getOutput(); + + List<WindowObj> windows = Lists.newArrayList( + new WindowObj(0, 0, 0), + new WindowObj(1, 0, 0), + new WindowObj(1, 1, 1), + new WindowObj(2, 0, 0), + new WindowObj(2, 1, 1), + new WindowObj(2, 2, -1), + new WindowObj(3, 1, 0), + new WindowObj(3, 2, 1), + new WindowObj(3, 3, -1), + new WindowObj(4, 2, 0), + new WindowObj(4, 3, 1), + new WindowObj(4, 4, -1) + ); + + verifyWindowOrder(windows, out); + } + + @Test + public void windowShouldWorkWithAfter() throws IOException { + WindowFrameROP rop = new WindowFrameROP(new WindowFrame(null, null, 0L, 2L)); + RecordIterator incoming = TestUtils.jsonToRecordIterator("test", input); + rop.setInput(incoming); + RecordIterator out = rop.getOutput(); + + List<WindowObj> windows = Lists.newArrayList( + new WindowObj(0, 0, 0), + new WindowObj(0, 1, 1), + new WindowObj(0, 2, -1), + new WindowObj(1, 1, 0), + new WindowObj(1, 2, 1), + new WindowObj(1, 3, -1), + new WindowObj(2, 2, 0), + new WindowObj(2, 3, 1), + new WindowObj(2, 4, -1), + new WindowObj(3, 3, 0), + new WindowObj(3, 4, 1), + new WindowObj(4, 4, 0) + ); + + verifyWindowOrder(windows, out); + } + + @Test + public void windowShouldWorkWithBeforeAndAfter() throws IOException { + WindowFrameROP rop = new WindowFrameROP(new WindowFrame(null, null, -2L, 2L)); + RecordIterator incoming = TestUtils.jsonToRecordIterator("test", input); + rop.setInput(incoming); + RecordIterator out = rop.getOutput(); + + List<WindowObj> windows = Lists.newArrayList( + new WindowObj(0, 0, 0), + new WindowObj(0, 1, 1), + new WindowObj(0, 2, -1), + new WindowObj(1, 0, 0), + new WindowObj(1, 1, 1), + new WindowObj(1, 2, -1), + new WindowObj(1, 3, 2), + new WindowObj(2, 0, 0), + new WindowObj(2, 1, 1), + new WindowObj(2, 2, -1), + new WindowObj(2, 3, 2), + new WindowObj(2, 4, -2), + new WindowObj(3, 1, 0), + new WindowObj(3, 2, 1), + new WindowObj(3, 3, -1), + new WindowObj(3, 4, 2), + new WindowObj(4, 2, 0), + new WindowObj(4, 3, 1), + new WindowObj(4, 4, -1) + ); + + verifyWindowOrder(windows, out); + } + + @Test + public void windowShouldNotCrossWithin() throws IOException { + String withinInput = "" + + "{id: 0, v: 0}" + + "{id: 1, v: 1}" + + "{id: 2, v: 2}" + + "{id: 3, v: 3}" + + "{id: 4, v: 4}"; + WindowFrameROP rop = new WindowFrameROP(new WindowFrame(new FieldReference("test.v"), null, -2L, 2L)); + RecordIterator incoming = TestUtils.jsonToRecordIterator("test", withinInput); + rop.setInput(incoming); + RecordIterator out = rop.getOutput(); + + List<WindowObj> windows = Lists.newArrayList( + new WindowObj(0, 0, 0), + new WindowObj(1, 1, 0), + new WindowObj(2, 2, 0), + new WindowObj(3, 3, 0), + new WindowObj(4, 4, 0) + ); + + verifyWindowOrder(windows, out); + } + + @Test + public void windowShouldNotCrossWithinAndRange() throws IOException { + String withinInput = "" + + "{id: 0, v: 0}" + + "{id: 1, v: 0}" + + "{id: 2, v: 1}" + + "{id: 3, v: 1}" + + "{id: 4, v: 2}"; + WindowFrameROP rop = new WindowFrameROP(new WindowFrame(new FieldReference("test.v"), null, -1L, 2L)); + RecordIterator incoming = TestUtils.jsonToRecordIterator("test", withinInput); + rop.setInput(incoming); + RecordIterator out = rop.getOutput(); + + List<WindowObj> windows = Lists.newArrayList( + new WindowObj(0, 0, 0), + new WindowObj(0, 1, 1), + new WindowObj(1, 0, 0), + new WindowObj(1, 1, 1), + new WindowObj(2, 2, 0), + new WindowObj(2, 3, 1), + new WindowObj(3, 2, 0), + new WindowObj(3, 3, 1), + new WindowObj(4, 4, 0) + ); + + verifyWindowOrder(windows, out); + } + + private void verifyWindowOrder(List<WindowObj> expectedIds, RecordIterator out) { + verifyWindowOrder(expectedIds, out, new SchemaPath("ref.segment"), new SchemaPath("ref.position")); + } + + private void verifyWindowOrder(List<WindowObj> expectedIds, RecordIterator out, SchemaPath segment, SchemaPath position) { + RecordIterator.NextOutcome outcome = out.next(); + RecordPointer pointer = out.getRecordPointer(); + int count = 0; + SchemaPath id = new SchemaPath("test.id"); + int expectedSize = expectedIds.size(); + while (outcome != RecordIterator.NextOutcome.NONE_LEFT) { + count += 1; + WindowObj windowObj = expectedIds.get(count - 1); + //System.out.println(windowObj); + assertEquals("Id mismatch", windowObj.id, pointer.getField(id).getAsNumeric().getAsInt()); + assertEquals("Window id mismatch", windowObj.windowId, pointer.getField(segment).getAsNumeric().getAsInt()); + assertEquals("Position mismatch", windowObj.position, pointer.getField(position).getAsNumeric().getAsInt()); + outcome = out.next(); + } + assertEquals(expectedSize, count); + } + + private class WindowObj { + int id; + int position; + int windowId; + + private WindowObj(int windowId, int id, int position) { + this.id = id; + this.position = position; + this.windowId = windowId; + } + + @Override + public String toString() { + return "WindowObj{" + + "id=" + id + + ", position=" + position + + ", windowId=" + windowId + + '}'; + } + } +}
