Repository: drill Updated Branches: refs/heads/master 1d890ff94 -> a45f7fd11
DRILL-4260: Adding support for some custom window frames this includes the following JIRAs: DRILL-4261: Add support for RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING DRILL-4262: add support for ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW DRILL-4263: add support for RANGE BETWEEN CURRENT ROW AND CURRENT ROW this closes #340 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a45f7fd1 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a45f7fd1 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a45f7fd1 Branch: refs/heads/master Commit: a45f7fd1126cd5110ac959961256ce75923fa2fd Parents: 1d890ff Author: adeneche <[email protected]> Authored: Tue Jan 19 13:33:22 2016 -0800 Committer: adeneche <[email protected]> Committed: Mon Feb 22 11:29:38 2016 -0800 ---------------------------------------------------------------------- .../apache/drill/exec/opt/BasicOptimizer.java | 2 +- .../drill/exec/physical/config/WindowPOP.java | 51 +++++++-- .../impl/window/FrameSupportTemplate.java | 113 +++++++++++++------ .../impl/window/NoFrameSupportTemplate.java | 3 +- .../exec/physical/impl/window/Partition.java | 3 + .../impl/window/WindowFrameRecordBatch.java | 8 +- .../exec/physical/impl/window/WindowFramer.java | 3 +- .../physical/impl/window/WindowFunction.java | 43 +++---- .../drill/exec/planner/physical/WindowPrel.java | 5 +- .../sql/parser/UnsupportedOperatorsVisitor.java | 19 +++- .../test/java/org/apache/drill/TestBuilder.java | 4 + .../apache/drill/exec/TestWindowFunctions.java | 29 ----- .../physical/impl/window/TestWindowFrame.java | 58 +++++++++- .../window/aggregate_range_current_current.sql | 7 ++ ...aggregate_range_current_current_baseline.sql | 7 ++ .../window/aggregate_rows_unbounded_current.sql | 4 + ...ggregate_rows_unbounded_current_baseline.sql | 3 + .../first_value_range_current_current.sql | 7 ++ ...rst_value_range_current_current_baseline.sql | 7 ++ .../last_value_rows_unbounded_current.sql | 5 + ...st_value_rows_unbounded_current_baseline.sql | 5 + exec/java-exec/src/test/resources/window/q3.sql | 9 ++ exec/java-exec/src/test/resources/window/q4.sql | 9 ++ 23 files changed, 299 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java index 6e70506..3f064d4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java @@ -158,7 +158,7 @@ public class BasicOptimizer extends Optimizer { input = new Sort(input, ods, false); return new WindowPOP(input, window.getWithins(), window.getAggregations(), - window.getOrderings(), window.getStart(), window.getEnd()); + window.getOrderings(), false, null, null); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java index 5926a06..ec5f361 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java @@ -18,8 +18,10 @@ package org.apache.drill.exec.physical.config; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.calcite.rex.RexWindowBound; import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.common.logical.data.Order; import org.apache.drill.exec.physical.base.AbstractSingle; @@ -33,26 +35,29 @@ public class WindowPOP extends AbstractSingle { private final NamedExpression[] withins; private final NamedExpression[] aggregations; private final Order.Ordering[] orderings; - private final long start; - private final long end; + private final boolean frameUnitsRows; + private final Bound start; + private final Bound end; public WindowPOP(@JsonProperty("child") PhysicalOperator child, @JsonProperty("within") NamedExpression[] withins, @JsonProperty("aggregations") NamedExpression[] aggregations, @JsonProperty("orderings") Order.Ordering[] orderings, - @JsonProperty("start") long start, - @JsonProperty("end") long end) { + @JsonProperty("frameUnitsRows") boolean frameUnitsRows, + @JsonProperty("start") Bound start, + @JsonProperty("end") Bound end) { super(child); this.withins = withins; this.aggregations = aggregations; this.orderings = orderings; + this.frameUnitsRows = frameUnitsRows; this.start = start; this.end = end; } @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { - return new WindowPOP(child, withins, aggregations, orderings, start, end); + return new WindowPOP(child, withins, aggregations, orderings, frameUnitsRows, start, end); } @Override @@ -65,11 +70,11 @@ public class WindowPOP extends AbstractSingle { return UserBitShared.CoreOperatorType.WINDOW_VALUE; } - public long getStart() { + public Bound getStart() { return start; } - public long getEnd() { + public Bound getEnd() { return end; } @@ -84,4 +89,36 @@ public class WindowPOP extends AbstractSingle { public Order.Ordering[] getOrderings() { return orderings; } + + public boolean isFrameUnitsRows() { + return frameUnitsRows; + } + + @JsonTypeName("windowBound") + public static class Bound { + private final boolean unbounded; + private final long offset; + + public Bound(@JsonProperty("unbounded") boolean unbounded, @JsonProperty("offset") long offset) { + this.unbounded = unbounded; + this.offset = offset; + } + + public boolean isUnbounded() { + return unbounded; + } + + @JsonIgnore + public boolean isCurrent() { + return offset == 0; + } + + public long getOffset() { + return offset; + } + } + + public static Bound newBound(RexWindowBound windowBound) { + return new Bound(windowBound.isUnbounded(), windowBound.isCurrentRow() ? 0 : Long.MIN_VALUE); //TODO: Get offset to work + } } http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java index 16c7513..1e9d12e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.window; import org.apache.drill.common.exceptions.DrillException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.config.WindowPOP; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; @@ -50,11 +51,15 @@ public abstract class FrameSupportTemplate implements WindowFramer { // true when at least one window function needs to process all batches of a partition before passing any batch downstream private boolean requireFullPartition; - private Partition partition; + private long remainingRows; // num unprocessed rows in current partition + private long remainingPeers; // num unprocessed peer rows in current frame + private boolean partialPartition; // true if we remainingRows only account for the current batch and more batches are expected for the current partition + + private WindowPOP popConfig; @Override public void setup(final List<WindowDataBatch> batches, final VectorContainer container, final OperatorContext oContext, - final boolean requireFullPartition) throws SchemaChangeException { + final boolean requireFullPartition, final WindowPOP popConfig) throws SchemaChangeException { this.container = container; this.batches = batches; @@ -62,9 +67,9 @@ public abstract class FrameSupportTemplate implements WindowFramer { allocateInternal(); outputCount = 0; - partition = null; this.requireFullPartition = requireFullPartition; + this.popConfig = popConfig; } private void allocateInternal() { @@ -74,6 +79,10 @@ public abstract class FrameSupportTemplate implements WindowFramer { } } + private boolean isPartitionDone() { + return !partialPartition && remainingRows == 0; + } + /** * processes all rows of the first batch. */ @@ -88,37 +97,36 @@ public abstract class FrameSupportTemplate implements WindowFramer { outputCount = current.getRecordCount(); while (currentRow < outputCount) { - if (partition != null) { - assert currentRow == 0 : "pending windows are only expected at the start of the batch"; - - // we have a pending window we need to handle from a previous call to doWork() - logger.trace("we have a pending partition {}", partition); + if (!isPartitionDone()) { + // we have a pending partition we need to handle from a previous call to doWork() + assert currentRow == 0 : "pending partitions are only expected at the start of the batch"; + logger.trace("we have a pending partition {}", remainingRows); if (!requireFullPartition) { // we didn't compute the whole partition length in the previous partition, we need to update the length now - updatePartitionSize(partition, currentRow); + updatePartitionSize(currentRow); } } else { newPartition(current, currentRow); } currentRow = processPartition(currentRow); - if (partition.isDone()) { + if (isPartitionDone()) { cleanPartition(); } } } private void newPartition(final WindowDataBatch current, final int currentRow) throws SchemaChangeException { - partition = new Partition(); - updatePartitionSize(partition, currentRow); + remainingRows = 0; + remainingPeers = 0; + updatePartitionSize(currentRow); setupPartition(current, container); saveFirstValue(currentRow); } private void cleanPartition() { - partition = null; resetValues(); for (VectorWrapper<?> vw : internal) { if ((vw.getValueVector() instanceof BaseDataValueVector)) { @@ -134,44 +142,67 @@ public abstract class FrameSupportTemplate implements WindowFramer { * @throws DrillException if it can't write into the container */ private int processPartition(final int currentRow) throws DrillException { - logger.trace("process partition {}, currentRow: {}, outputCount: {}", partition, currentRow, outputCount); + logger.trace("{} rows remaining to process, currentRow: {}, outputCount: {}", remainingRows, currentRow, outputCount); setupWriteFirstValue(internal, container); - int row = currentRow; + if (popConfig.isFrameUnitsRows()) { + return processROWS(currentRow); + } else { + return processRANGE(currentRow); + } + } + + private int processROWS(int row) throws DrillException { + //TODO (DRILL-4413) we only need to call these once per batch + setupEvaluatePeer(current, container); + setupReadLastValue(current, container); - // process all rows except the last one of the batch/partition - while (row < outputCount && !partition.isDone()) { - processRow(row); + while (row < outputCount && !isPartitionDone()) { + logger.trace("aggregating row {}", row); + evaluatePeer(row); + outputRow(row); + writeLastValue(row, row); + + remainingRows--; row++; } return row; } - private void processRow(final int row) throws DrillException { - if (partition.isFrameDone()) { - // because all peer rows share the same frame, we only need to compute and aggregate the frame once - final long peers = aggregatePeers(row); - partition.newFrame(peers); - } + private int processRANGE(int row) throws DrillException { + while (row < outputCount && !isPartitionDone()) { + if (remainingPeers == 0) { + // because all peer rows share the same frame, we only need to compute and aggregate the frame once + if (popConfig.getStart().isCurrent()) { + resetValues(); + saveFirstValue(row); + } + + remainingPeers = aggregatePeers(row); + } + + outputRow(row); + writeLastValue(frameLastRow, row); - outputRow(row, partition); - writeLastValue(frameLastRow, row); + remainingRows--; + remainingPeers--; + row++; + } - partition.rowAggregated(); + return row; } /** * updates partition's length after computing the number of rows for the current the partition starting at the specified * row of the first batch. If !requiresFullPartition, this method will only count the rows in the current batch */ - private void updatePartitionSize(final Partition partition, final int start) { + private void updatePartitionSize(final int start) { logger.trace("compute partition size starting from {} on {} batches", start, batches.size()); long length = 0; - boolean lastBatch = false; int row = start; // count all rows that are in the same partition of start @@ -198,12 +229,16 @@ public abstract class FrameSupportTemplate implements WindowFramer { if (!requireFullPartition) { // this is the last batch of current partition if - lastBatch = row < outputCount // partition ends before the end of the batch - || batches.size() == 1 // it's the last available batch + boolean lastBatch = row < outputCount // partition ends before the end of the batch + || batches.size() == 1 // it's the last available batch || !isSamePartition(start, current, 0, batches.get(1)); // next batch contains a different partition + + partialPartition = !lastBatch; + } else { + partialPartition = false; } - partition.updateLength(length, !(requireFullPartition || lastBatch)); + remainingRows += length; } /** @@ -215,6 +250,7 @@ public abstract class FrameSupportTemplate implements WindowFramer { private long aggregatePeers(final int start) throws SchemaChangeException { logger.trace("aggregating rows starting from {}", start); + final boolean unboundedFollowing = popConfig.getEnd().isUnbounded(); VectorAccessible last = current; long length = 0; @@ -226,8 +262,14 @@ public abstract class FrameSupportTemplate implements WindowFramer { // for every remaining row in the partition, count it if it's a peer row for (int row = (batch == current) ? start : 0; row < recordCount; row++, length++) { - if (!isPeer(start, current, row, batch)) { - break; + if (unboundedFollowing) { + if (length >= remainingRows) { + break; + } + } else { + if (!isPeer(start, current, row, batch)) { + break; + } } evaluatePeer(row); @@ -271,9 +313,8 @@ public abstract class FrameSupportTemplate implements WindowFramer { * called once for each row after we evaluate all peer rows. Used to write a value in the row * * @param outIndex index of row - * @param partition object used by "computed" window functions */ - public abstract void outputRow(@Named("outIndex") int outIndex, @Named("partition") Partition partition); + public abstract void outputRow(@Named("outIndex") int outIndex); /** * Called once per partition, before processing the partition. Used to setup read/write vectors http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java index ac1eefc..21dfbba 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.window; import org.apache.drill.common.exceptions.DrillException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.config.WindowPOP; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; @@ -53,7 +54,7 @@ public abstract class NoFrameSupportTemplate implements WindowFramer { @Override public void setup(final List<WindowDataBatch> batches, final VectorContainer container, final OperatorContext oContext, - final boolean requireFullPartition) throws SchemaChangeException { + final boolean requireFullPartition, final WindowPOP popConfig) throws SchemaChangeException { this.container = container; this.batches = batches; http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java index 66cf720..92bff6e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java @@ -45,6 +45,9 @@ public class Partition { return remaining; } + public long getLength() { + return length; + } /** * @param length number of rows in this partition * @param partial if true, then length is not the full length of the partition but just the number of rows in the http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java index d6be1eb..46a6c0e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java @@ -212,7 +212,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { final boolean frameEndReached = partitionEndReached || !framers[0].isPeer(currentSize - 1, current, lastSize - 1, last); for (final WindowFunction function : functions) { - if (!function.canDoWork(batches.size(), hasOrderBy, frameEndReached, partitionEndReached)) { + if (!function.canDoWork(batches.size(), popConfig, frameEndReached, partitionEndReached)) { return false; } } @@ -279,7 +279,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { final WindowFunction winfun = WindowFunction.fromExpression(call); if (winfun.materialize(ne, container, context.getFunctionRegistry())) { functions.add(winfun); - requireFullPartition |= winfun.requiresFullPartition(hasOrderBy); + requireFullPartition |= winfun.requiresFullPartition(popConfig); if (winfun.supportsCustomFrames()) { useCustomFrame = true; @@ -311,13 +311,13 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { int index = 0; if (useDefaultFrame) { framers[index] = generateFramer(keyExprs, orderExprs, functions, false); - framers[index].setup(batches, container, oContext, requireFullPartition); + framers[index].setup(batches, container, oContext, requireFullPartition, popConfig); index++; } if (useCustomFrame) { framers[index] = generateFramer(keyExprs, orderExprs, functions, true); - framers[index].setup(batches, container, oContext, requireFullPartition); + framers[index].setup(batches, container, oContext, requireFullPartition, popConfig); } } http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java index 9b985c0..3d2d0fc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java @@ -21,6 +21,7 @@ import org.apache.drill.common.exceptions.DrillException; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.config.WindowPOP; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; @@ -32,7 +33,7 @@ public interface WindowFramer { TemplateClassDefinition<WindowFramer> FRAME_TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, FrameSupportTemplate.class); void setup(final List<WindowDataBatch> batches, final VectorContainer container, final OperatorContext operatorContext, - final boolean requireFullPartition) throws SchemaChangeException; + final boolean requireFullPartition, final WindowPOP popConfig) throws SchemaChangeException; /** * process the inner batch and write the aggregated values in the container http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java index a983df5..191dad1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFunction.java @@ -36,6 +36,7 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.ValueVectorReadExpression; import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.expr.fn.FunctionLookupContext; +import org.apache.drill.exec.physical.config.WindowPOP; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; @@ -95,23 +96,23 @@ public abstract class WindowFunction { abstract boolean supportsCustomFrames(); /** - * @param hasOrderBy window definition contains an ORDER BY clause + * @param pop window group definition * @return true if this window function requires all batches of current partition to be available before processing * the first batch */ - public boolean requiresFullPartition(final boolean hasOrderBy) { + public boolean requiresFullPartition(final WindowPOP pop) { return true; } /** * @param numBatchesAvailable number of batches available for current partition - * @param hasOrderBy window definition contains an ORDER BY clause + * @param pop window group definition * @param frameEndReached we found the last row of the first batch's frame * @param partitionEndReached all batches of current partition are available * * @return true if this window function can process the first batch immediately */ - public boolean canDoWork(final int numBatchesAvailable, final boolean hasOrderBy, final boolean frameEndReached, + public boolean canDoWork(final int numBatchesAvailable, final WindowPOP pop, final boolean frameEndReached, final boolean partitionEndReached) { return partitionEndReached; } @@ -155,13 +156,13 @@ public abstract class WindowFunction { } @Override - public boolean requiresFullPartition(final boolean hasOrderBy) { - return !hasOrderBy; + public boolean requiresFullPartition(final WindowPOP pop) { + return pop.getOrderings().length == 0 || pop.getEnd().isUnbounded(); } @Override - public boolean canDoWork(int numBatchesAvailable, boolean hasOrderBy, boolean frameEndReached, boolean partitionEndReached) { - return partitionEndReached || (hasOrderBy && frameEndReached); + public boolean canDoWork(int numBatchesAvailable, WindowPOP pop, boolean frameEndReached, boolean partitionEndReached) { + return partitionEndReached || (!requiresFullPartition(pop) && frameEndReached); } @Override @@ -212,18 +213,18 @@ public abstract class WindowFunction { } @Override - public boolean requiresFullPartition(final boolean hasOrderBy) { + public boolean requiresFullPartition(final WindowPOP pop) { // CUME_DIST, PERCENT_RANK and NTILE require the length of current partition before processing it's first batch return type == Type.CUME_DIST || type == Type.PERCENT_RANK || type == Type.NTILE; } @Override - public boolean canDoWork(int numBatchesAvailable, final boolean hasOrderBy, boolean frameEndReached, boolean partitionEndReached) { + public boolean canDoWork(int numBatchesAvailable, final WindowPOP pop, boolean frameEndReached, boolean partitionEndReached) { assert numBatchesAvailable > 0 : "canDoWork() should not be called when numBatchesAvailable == 0"; // for CUME_DIST, PERCENT_RANK and NTILE we need the full partition // otherwise we can process the first batch immediately - return partitionEndReached || ! requiresFullPartition(hasOrderBy); + return partitionEndReached || ! requiresFullPartition(pop); } @Override @@ -319,12 +320,12 @@ public abstract class WindowFunction { } @Override - public boolean requiresFullPartition(final boolean hasOrderBy) { + public boolean requiresFullPartition(final WindowPOP pop) { return false; } @Override - public boolean canDoWork(int numBatchesAvailable, final boolean hasOrderBy, boolean frameEndReached, boolean partitionEndReached) { + public boolean canDoWork(int numBatchesAvailable, final WindowPOP pop, boolean frameEndReached, boolean partitionEndReached) { return partitionEndReached || numBatchesAvailable > 1; } @@ -389,12 +390,12 @@ public abstract class WindowFunction { } @Override - public boolean requiresFullPartition(final boolean hasOrderBy) { + public boolean requiresFullPartition(final WindowPOP pop) { return false; } @Override - public boolean canDoWork(int numBatchesAvailable, final boolean hasOrderBy, boolean frameEndReached, boolean partitionEndReached) { + public boolean canDoWork(int numBatchesAvailable, final WindowPOP pop, boolean frameEndReached, boolean partitionEndReached) { assert numBatchesAvailable > 0 : "canDoWork() should not be called when numBatchesAvailable == 0"; return true; } @@ -449,13 +450,13 @@ public abstract class WindowFunction { } @Override - public boolean requiresFullPartition(final boolean hasOrderBy) { - return !hasOrderBy; + public boolean requiresFullPartition(final WindowPOP pop) { + return pop.getOrderings().length == 0 || pop.getEnd().isUnbounded(); } @Override - public boolean canDoWork(int numBatchesAvailable, boolean hasOrderBy, boolean frameEndReached, boolean partitionEndReached) { - return partitionEndReached || (hasOrderBy && frameEndReached); + public boolean canDoWork(int numBatchesAvailable, WindowPOP pop, boolean frameEndReached, boolean partitionEndReached) { + return partitionEndReached || (!requiresFullPartition(pop) && frameEndReached); } @Override @@ -533,12 +534,12 @@ public abstract class WindowFunction { } @Override - public boolean requiresFullPartition(boolean hasOrderBy) { + public boolean requiresFullPartition(final WindowPOP pop) { return false; } @Override - public boolean canDoWork(int numBatchesAvailable, boolean hasOrderBy, boolean frameEndReached, boolean partitionEndReached) { + public boolean canDoWork(int numBatchesAvailable, WindowPOP pop, boolean frameEndReached, boolean partitionEndReached) { assert numBatchesAvailable > 0 : "canDoWork() should not be called when numBatchesAvailable == 0"; return true; } http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrel.java index c27b547..1a89bd7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrel.java @@ -105,8 +105,9 @@ public class WindowPrel extends DrillWindowRelBase implements Prel { withins.toArray(new NamedExpression[withins.size()]), aggs.toArray(new NamedExpression[aggs.size()]), orderings.toArray(new Order.Ordering[orderings.size()]), - Long.MIN_VALUE, //TODO: Get first/last to work - Long.MIN_VALUE); + window.isRows, + WindowPOP.newBound(window.lowerBound), + WindowPOP.newBound(window.upperBound)); creator.addMetadata(this, windowPOP); return windowPOP; http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java index effe022..17bc339 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java @@ -178,16 +178,33 @@ public class UnsupportedOperatorsVisitor extends SqlShuttle { // it is a default frame boolean isSupported = (lowerBound == null && upperBound == null); - // When OVER clause contain an ORDER BY clause the following frames are equivalent to the default frame: + // When OVER clause contain an ORDER BY clause the following frames are supported: // RANGE UNBOUNDED PRECEDING // RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + // RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING if(window.getOrderList().size() != 0 && !window.isRows() && SqlWindow.isUnboundedPreceding(lowerBound) + && (upperBound == null || SqlWindow.isCurrentRow(upperBound) || SqlWindow.isUnboundedFollowing(upperBound))) { + isSupported = true; + } + + // ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + // is supported with and without the ORDER BY clause + if (window.isRows() + && SqlWindow.isUnboundedPreceding(lowerBound) && (upperBound == null || SqlWindow.isCurrentRow(upperBound))) { isSupported = true; } + // RANGE BETWEEN CURRENT ROW AND CURRENT ROW + // is supported with and without an ORDER BY clause + if (!window.isRows() && + SqlWindow.isCurrentRow(lowerBound) && + SqlWindow.isCurrentRow(upperBound)) { + isSupported = true; + } + // When OVER clause doesn't contain an ORDER BY clause, the following are equivalent to the default frame: // RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING // ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java index f4a5825..0cebd03 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java @@ -367,6 +367,10 @@ public class TestBuilder { baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries, highPerformanceComparison, expectedNumBatches); } + public BaselineQueryTestBuilder sqlBaselineQuery(String query, String ...replacements) { + return sqlBaselineQuery(String.format(query, replacements)); + } + // provide a path to a file containing a SQL query to use as a baseline public BaselineQueryTestBuilder sqlBaselineQueryFromFile(String baselineQueryFilename) throws IOException { String baselineQuery = BaseTestQuery.getFile(baselineQueryFilename); http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java index d6cd3c7..8055774 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWindowFunctions.java @@ -154,35 +154,6 @@ public class TestWindowFunctions extends BaseTestQuery { } } - @Test(expected = UnsupportedFunctionException.class) // DRILL-3188 - public void testRowsUnboundedPreceding() throws Exception { - try { - final String query = "explain plan for select sum(n_nationKey) over(partition by n_nationKey order by n_nationKey \n" + - "rows UNBOUNDED PRECEDING)" + - "from cp.`tpch/nation.parquet` t \n" + - "order by n_nationKey"; - - test(query); - } catch(UserException ex) { - throwAsUnsupportedException(ex); - throw ex; - } - } - - @Test(expected = UnsupportedFunctionException.class) // DRILL-3359 - public void testFramesDefinedInWindowClause() throws Exception { - try { - final String query = "explain plan for select sum(n_nationKey) over w \n" + - "from cp.`tpch/nation.parquet` \n" + - "window w as (partition by n_nationKey order by n_nationKey rows UNBOUNDED PRECEDING)"; - - test(query); - } catch(UserException ex) { - throwAsUnsupportedException(ex); - throw ex; - } - } - @Test(expected = UnsupportedFunctionException.class) // DRILL-3326 public void testWindowWithAlias() throws Exception { try { http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java index 10abbff..f8a537b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java @@ -48,7 +48,7 @@ public class TestWindowFrame extends BaseTestQuery { private DrillTestWrapper buildWindowQuery(final String tableName, final boolean withPartitionBy, final int numBatches) throws Exception { return testBuilder() - .sqlQuery(String.format(getFile("window/q1.sql"), TEST_RES_PATH, tableName, withPartitionBy ? "(partition by position_id)":"()")) + .sqlQuery(getFile("window/q1.sql"), TEST_RES_PATH, tableName, withPartitionBy ? "(partition by position_id)":"()") .ordered() .csvBaselineFile("window/" + tableName + (withPartitionBy ? ".pby" : "") + ".tsv") .baselineColumns("count", "sum") @@ -59,7 +59,7 @@ public class TestWindowFrame extends BaseTestQuery { private DrillTestWrapper buildWindowWithOrderByQuery(final String tableName, final boolean withPartitionBy, final int numBatches) throws Exception { return testBuilder() - .sqlQuery(String.format(getFile("window/q2.sql"), TEST_RES_PATH, tableName, withPartitionBy ? "(partition by position_id order by sub)" : "(order by sub)")) + .sqlQuery(getFile("window/q2.sql"), TEST_RES_PATH, tableName, withPartitionBy ? "(partition by position_id order by sub)" : "(order by sub)") .ordered() .csvBaselineFile("window/" + tableName + (withPartitionBy ? ".pby" : "") + ".oby.tsv") .baselineColumns("count", "sum", "row_number", "rank", "dense_rank", "cume_dist", "percent_rank") @@ -106,6 +106,60 @@ public class TestWindowFrame extends BaseTestQuery { ); } + @Test + public void testUnboundedFollowing() throws Exception { + testBuilder() + .sqlQuery(getFile("window/q3.sql"), TEST_RES_PATH) + .ordered() + .sqlBaselineQuery(getFile("window/q4.sql"), TEST_RES_PATH) + .build() + .run(); + } + + @Test + public void testAggregateRowsUnboundedAndCurrentRow() throws Exception { + final String table = "dfs_test.`"+TEST_RES_PATH+"/window/b4.p4`"; + testBuilder() + .sqlQuery(getFile("window/aggregate_rows_unbounded_current.sql"), table) + .ordered() + .sqlBaselineQuery(getFile("window/aggregate_rows_unbounded_current_baseline.sql"), table) + .build() + .run(); + } + + @Test + public void testLastValueRowsUnboundedAndCurrentRow() throws Exception { + final String table = "dfs_test.`"+TEST_RES_PATH+"/window/b4.p4`"; + testBuilder() + .sqlQuery(getFile("window/last_value_rows_unbounded_current.sql"), table) + .unOrdered() + .sqlBaselineQuery(getFile("window/last_value_rows_unbounded_current_baseline.sql"), table) + .build() + .run(); + } + + @Test + public void testAggregateRangeCurrentAndCurrent() throws Exception { + final String table = "dfs_test.`"+TEST_RES_PATH+"/window/b4.p4`"; + testBuilder() + .sqlQuery(getFile("window/aggregate_range_current_current.sql"), table) + .unOrdered() + .sqlBaselineQuery(getFile("window/aggregate_range_current_current_baseline.sql"), table) + .build() + .run(); + } + + @Test + public void testFirstValueRangeCurrentAndCurrent() throws Exception { + final String table = "dfs_test.`"+TEST_RES_PATH+"/window/b4.p4`"; + testBuilder() + .sqlQuery(getFile("window/first_value_range_current_current.sql"), table) + .unOrdered() + .sqlBaselineQuery(getFile("window/first_value_range_current_current_baseline.sql"), table) + .build() + .run(); + } + /** * 2 batches with 2 partitions (position_id column), each batch contains a different partition */ http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/test/resources/window/aggregate_range_current_current.sql ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/window/aggregate_range_current_current.sql b/exec/java-exec/src/test/resources/window/aggregate_range_current_current.sql new file mode 100644 index 0000000..d8a66f4 --- /dev/null +++ b/exec/java-exec/src/test/resources/window/aggregate_range_current_current.sql @@ -0,0 +1,7 @@ +SELECT + employee_id, + position_id, + sub, + COUNT(*) OVER(PARTITION BY position_id ORDER BY sub RANGE BETWEEN CURRENT ROW AND CURRENT ROW) AS `count` +FROM + %s \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/test/resources/window/aggregate_range_current_current_baseline.sql ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/window/aggregate_range_current_current_baseline.sql b/exec/java-exec/src/test/resources/window/aggregate_range_current_current_baseline.sql new file mode 100644 index 0000000..a2aa285 --- /dev/null +++ b/exec/java-exec/src/test/resources/window/aggregate_range_current_current_baseline.sql @@ -0,0 +1,7 @@ +SELECT + employee_id, + position_id, + sub, + COUNT(*) OVER(PARTITION BY position_id, sub) AS `count` +FROM + %s \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/test/resources/window/aggregate_rows_unbounded_current.sql ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/window/aggregate_rows_unbounded_current.sql b/exec/java-exec/src/test/resources/window/aggregate_rows_unbounded_current.sql new file mode 100644 index 0000000..6b163d8 --- /dev/null +++ b/exec/java-exec/src/test/resources/window/aggregate_rows_unbounded_current.sql @@ -0,0 +1,4 @@ +SELECT + COUNT(*) OVER(PARTITION BY position_id ORDER BY sub ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `count` +FROM + %s \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/test/resources/window/aggregate_rows_unbounded_current_baseline.sql ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/window/aggregate_rows_unbounded_current_baseline.sql b/exec/java-exec/src/test/resources/window/aggregate_rows_unbounded_current_baseline.sql new file mode 100644 index 0000000..2d13c00 --- /dev/null +++ b/exec/java-exec/src/test/resources/window/aggregate_rows_unbounded_current_baseline.sql @@ -0,0 +1,3 @@ +SELECT + COUNT(*) OVER(PARTITION BY position_id ORDER BY sub, employee_id) AS `count` +FROM %s \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/test/resources/window/first_value_range_current_current.sql ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/window/first_value_range_current_current.sql b/exec/java-exec/src/test/resources/window/first_value_range_current_current.sql new file mode 100644 index 0000000..0c863ad --- /dev/null +++ b/exec/java-exec/src/test/resources/window/first_value_range_current_current.sql @@ -0,0 +1,7 @@ +SELECT + employee_id, + position_id, + sub, + FIRST_VALUE(sub) OVER(PARTITION BY position_id ORDER BY sub RANGE BETWEEN CURRENT ROW AND CURRENT ROW) AS `first_value` +FROM + %s \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/test/resources/window/first_value_range_current_current_baseline.sql ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/window/first_value_range_current_current_baseline.sql b/exec/java-exec/src/test/resources/window/first_value_range_current_current_baseline.sql new file mode 100644 index 0000000..7472651 --- /dev/null +++ b/exec/java-exec/src/test/resources/window/first_value_range_current_current_baseline.sql @@ -0,0 +1,7 @@ +SELECT + employee_id, + position_id, + sub, + FIRST_VALUE(sub) OVER(PARTITION BY position_id, sub) AS `first_value` +FROM + %s \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/test/resources/window/last_value_rows_unbounded_current.sql ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/window/last_value_rows_unbounded_current.sql b/exec/java-exec/src/test/resources/window/last_value_rows_unbounded_current.sql new file mode 100644 index 0000000..5d7eedd --- /dev/null +++ b/exec/java-exec/src/test/resources/window/last_value_rows_unbounded_current.sql @@ -0,0 +1,5 @@ +SELECT + employee_id, + LAST_VALUE(employee_id) OVER(PARTITION BY position_id ORDER BY sub ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `last_value` +FROM + %s \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/test/resources/window/last_value_rows_unbounded_current_baseline.sql ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/window/last_value_rows_unbounded_current_baseline.sql b/exec/java-exec/src/test/resources/window/last_value_rows_unbounded_current_baseline.sql new file mode 100644 index 0000000..432f352 --- /dev/null +++ b/exec/java-exec/src/test/resources/window/last_value_rows_unbounded_current_baseline.sql @@ -0,0 +1,5 @@ +SELECT + employee_id, + employee_id AS `last_value` +FROM + %s \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/test/resources/window/q3.sql ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/window/q3.sql b/exec/java-exec/src/test/resources/window/q3.sql new file mode 100644 index 0000000..0efb137 --- /dev/null +++ b/exec/java-exec/src/test/resources/window/q3.sql @@ -0,0 +1,9 @@ +SELECT + position_id, + employee_id, + LAST_VALUE(employee_id) + OVER(PARTITION BY position_id + ORDER by employee_id + RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `last_value` +FROM + dfs_test.`%s/window/b4.p4` http://git-wip-us.apache.org/repos/asf/drill/blob/a45f7fd1/exec/java-exec/src/test/resources/window/q4.sql ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/window/q4.sql b/exec/java-exec/src/test/resources/window/q4.sql new file mode 100644 index 0000000..5e1fb22 --- /dev/null +++ b/exec/java-exec/src/test/resources/window/q4.sql @@ -0,0 +1,9 @@ +SELECT + position_id, + employee_id, + MAX(employee_id) OVER(PARTITION BY position_id) AS `last_value` +FROM ( + SELECT * + FROM dfs_test.`%s/window/b4.p4` + ORDER BY position_id, employee_id +)
