DRILL-3200: Add Window functions: ROW_NUMBER, RANK, PERCENT_RANK, DENSE_RANK and CUME_DIST
- enum WindowFrameRecordBatch.WindowFunction to handle supported window function and their corresponding output MajorType - renamed WindowFrameTemplate -> DefaultFrameTemplate, cleaned the template to handle the default frame efficiently: . a batch can be processed as soon as we find the last peer row of it's last row . once a batch is processed it can be safely released => we can transfer it's value vectors to the container instead of copying them - DefaultFrameTemplate.Partition tracks the current window frame and computes the following window functions automatically: row_number, rank, dense_rank, percent_rank, cume_dist. It doesn't need to aggregate the value vectors to compute these window functions - updated TestWindowFrame to check the results of row_number, rank, dense_rank, percent_rank and cume_dist in various cases . added a debug config option to MSorter to control the size of batches. This is needed by TestWindowFrame so it can use small test data files (20 rows per batch) . removed contrib/data/window-test-data - WindowFrameRecordBatch properly releases saved batches if the query stops prematurely - GenerateTestData can be used to generate test data for the window function unit tests [it's a work in progress and can be either improved to make it developer friendly or removed from the final patch] - using newly created WindowDataBatch in place of RecordDataBatch, to expose FragmentContext and VectorAccessible (fixes DRILL-3218) - window.enable is true by default Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3bccec91 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3bccec91 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3bccec91 Branch: refs/heads/master Commit: 3bccec9110c7ff86fa3cf04baa81a1747e1f5b9e Parents: 453f6f7 Author: adeneche <[email protected]> Authored: Fri Mar 13 10:06:32 2015 -0700 Committer: Jinfeng Ni <[email protected]> Committed: Thu Jun 11 10:59:35 2015 -0700 ---------------------------------------------------------------------- contrib/data/pom.xml | 1 - contrib/data/window-test-data/pom.xml | 64 -- exec/java-exec/pom.xml | 6 - .../org/apache/drill/exec/ExecConstants.java | 3 +- .../impl/window/DefaultFrameTemplate.java | 320 ++++++ .../exec/physical/impl/window/Partition.java | 82 ++ .../physical/impl/window/WindowDataBatch.java | 93 ++ .../impl/window/WindowFrameRecordBatch.java | 216 ++-- .../impl/window/WindowFrameTemplate.java | 379 ------- .../exec/physical/impl/window/WindowFramer.java | 21 +- .../exec/physical/impl/xsort/MSortTemplate.java | 15 +- .../drill/exec/record/AbstractRecordBatch.java | 6 +- .../apache/drill/TestDisabledFunctionality.java | 13 - .../physical/impl/window/GenerateTestData.java | 286 +++++ .../physical/impl/window/TestWindowFrame.java | 91 +- .../src/test/resources/window/allData.csv | 5 + .../src/test/resources/window/b1.p1.subs.tsv | 20 + .../src/test/resources/window/b1.p1.tsv | 20 + .../src/test/resources/window/b1.p1/0.data.json | 21 + .../src/test/resources/window/b1.p2.subs.tsv | 20 + .../src/test/resources/window/b1.p2.tsv | 20 + .../src/test/resources/window/b1.p2/0.data.json | 22 + .../src/test/resources/window/b2.p2.subs.tsv | 40 + .../src/test/resources/window/b2.p2.tsv | 40 + .../src/test/resources/window/b2.p2/0.data.json | 22 + .../src/test/resources/window/b2.p2/1.data.json | 20 + .../src/test/resources/window/b2.p4.subs.tsv | 40 + .../src/test/resources/window/b2.p4.tsv | 40 + .../src/test/resources/window/b2.p4/0.data.json | 24 + .../src/test/resources/window/b2.p4/1.data.json | 20 + .../src/test/resources/window/b3.p2.subs.tsv | 60 ++ .../src/test/resources/window/b3.p2.tsv | 60 ++ .../src/test/resources/window/b3.p2/0.data.json | 22 + .../src/test/resources/window/b3.p2/1.data.json | 20 + .../src/test/resources/window/b3.p2/2.data.json | 20 + .../src/test/resources/window/b4.p4.subs.tsv | 80 ++ .../src/test/resources/window/b4.p4.tsv | 80 ++ .../src/test/resources/window/b4.p4/0.data.json | 24 + .../src/test/resources/window/b4.p4/1.data.json | 20 + .../src/test/resources/window/b4.p4/2.data.json | 20 + .../src/test/resources/window/b4.p4/3.data.json | 20 + .../src/test/resources/window/mediumData.json | 1000 ------------------ .../src/test/resources/window/oneKeyCount.json | 43 - .../test/resources/window/oneKeyCountData.json | 4 - .../resources/window/oneKeyCountMultiBatch.json | 72 -- .../src/test/resources/window/twoKeys.json | 44 - .../src/test/resources/window/twoKeysData.json | 8 - 47 files changed, 1812 insertions(+), 1755 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/contrib/data/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/data/pom.xml b/contrib/data/pom.xml index d1def76..450bc0d 100644 --- a/contrib/data/pom.xml +++ b/contrib/data/pom.xml @@ -33,6 +33,5 @@ <modules> <module>tpch-sample-data</module> - <module>window-test-data</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/contrib/data/window-test-data/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/data/window-test-data/pom.xml b/contrib/data/window-test-data/pom.xml deleted file mode 100644 index 6d195da..0000000 --- a/contrib/data/window-test-data/pom.xml +++ /dev/null @@ -1,64 +0,0 @@ -<?xml version="1.0"?> -<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor - license agreements. See the NOTICE file distributed with this work for additional - information regarding copyright ownership. The ASF licenses this file to - You under the Apache License, Version 2.0 (the "License"); you may not use - this file except in compliance with the License. You may obtain a copy of - the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required - by applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - OF ANY KIND, either express or implied. See the License for the specific - language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <artifactId>drill-contrib-data-parent</artifactId> - <groupId>org.apache.drill.contrib.data</groupId> - <version>1.1.0-SNAPSHOT</version> - </parent> - - <artifactId>window-test-data</artifactId> - <name>contrib/data/window-test-data</name> - <packaging>jar</packaging> - - <dependencies> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>com.googlecode.maven-download-plugin</groupId> - <artifactId>download-maven-plugin</artifactId> - <version>1.2.0</version> - <executions> - <execution> - <id>install-tgz</id> - <phase>prepare-package</phase> - <goals> - <goal>wget</goal> - </goals> - <configuration> - <url>https://s3-us-west-2.amazonaws.com/denbucket/window_test_data_0.1.tgz</url> - <outputFileName>window.tgz</outputFileName> - <unpack>true</unpack> - <outputDirectory>${project.build.directory}/classes/window</outputDirectory> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - <pluginRepositories> - <pluginRepository> - <id>sonatype-public-repository</id> - <url>https://oss.sonatype.org/content/groups/public</url> - <snapshots> - <enabled>true</enabled> - </snapshots> - <releases> - <enabled>true</enabled> - </releases> - </pluginRepository> - </pluginRepositories> - -</project> http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/pom.xml ---------------------------------------------------------------------- diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index b5cd52b..5cc209d 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -38,12 +38,6 @@ <version>${project.version}</version> <scope>test</scope> </dependency> - <dependency> - <groupId>org.apache.drill.contrib.data</groupId> - <artifactId>window-test-data</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> <!-- <dependency> --> <!-- <groupId>org.ow2.asm</groupId> --> <!-- <artifactId>asm-commons</artifactId> --> http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 91793f5..8ea90e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -71,6 +71,7 @@ public interface ExecConstants { public static final String EXTERNAL_SORT_SPILL_THRESHOLD = "drill.exec.sort.external.spill.threshold"; public static final String EXTERNAL_SORT_SPILL_DIRS = "drill.exec.sort.external.spill.directories"; public static final String EXTERNAL_SORT_SPILL_FILESYSTEM = "drill.exec.sort.external.spill.fs"; + public static final String EXTERNAL_SORT_MSORT_MAX_BATCHSIZE = "drill.exec.sort.external.msort.batch.maxsize"; public static final String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size"; public static final String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size"; public static final String HAZELCAST_SUBNETS = "drill.exec.cache.hazel.subnets"; @@ -238,7 +239,7 @@ public interface ExecConstants { public static final String DRILL_SYS_FILE_SUFFIX = ".sys.drill"; public static final String ENABLE_WINDOW_FUNCTIONS = "window.enable"; - public static final OptionValidator ENABLE_WINDOW_FUNCTIONS_VALIDATOR = new BooleanValidator(ENABLE_WINDOW_FUNCTIONS, false); + public static final OptionValidator ENABLE_WINDOW_FUNCTIONS_VALIDATOR = new BooleanValidator(ENABLE_WINDOW_FUNCTIONS, true); public static final String DRILLBIT_CONTROL_INJECTIONS = "drill.exec.testing.controls"; public static final OptionValidator DRILLBIT_CONTROLS_VALIDATOR = http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java new file mode 100644 index 0000000..ada068b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/DefaultFrameTemplate.java @@ -0,0 +1,320 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.window; + +import org.apache.drill.common.exceptions.DrillException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.ValueVector; + +import javax.inject.Named; +import java.util.Iterator; +import java.util.List; + + +public abstract class DefaultFrameTemplate implements WindowFramer { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultFrameTemplate.class); + + private VectorContainer container; + private List<WindowDataBatch> batches; + private int outputCount; // number of rows in currently/last processed batch + + /** + * current partition being processed.</p> + * Can span over multiple batches, so we may need to keep it between calls to doWork() + */ + private Partition partition; + + @Override + public void setup(List<WindowDataBatch> batches, final VectorContainer container) throws SchemaChangeException { + this.container = container; + this.batches = batches; + + outputCount = 0; + partition = null; + } + + private void allocateOutgoing() { + for (VectorWrapper<?> w : container) { + w.getValueVector().allocateNew(); + } + } + + /** + * processes all rows of current batch: + * <ul> + * <li>compute aggregations</li> + * <li>compute window functions</li> + * <li>transfer remaining vectors from current batch to container</li> + * </ul> + */ + @Override + public void doWork() throws DrillException { + int currentRow = 0; + + logger.trace("WindowFramer.doWork() START, num batches {}, current batch has {} rows", + batches.size(), batches.get(0).getRecordCount()); + + allocateOutgoing(); + + final WindowDataBatch current = batches.get(0); + + // we need to store the record count explicitly, because we release current batch at the end of this call + outputCount = current.getRecordCount(); + + while (currentRow < outputCount) { + if (partition != null) { + assert currentRow == 0 : "pending windows are only expected at the start of the batch"; + + // we have a pending window we need to handle from a previous call to doWork() + logger.trace("we have a pending partition {}", partition); + } else { + final int length = computePartitionSize(currentRow); + partition = new Partition(length); + setupWrite(current, container); + } + + currentRow = processPartition(currentRow); + if (partition.isDone()) { + partition = null; + resetValues(); + } + } + + // transfer "non aggregated" vectors + for (VectorWrapper<?> vw : current) { + ValueVector v = container.addOrGet(vw.getField()); + TransferPair tp = vw.getValueVector().makeTransferPair(v); + tp.transfer(); + } + + for (VectorWrapper<?> v : container) { + v.getValueVector().getMutator().setValueCount(outputCount); + } + + // because we are using the default frame, and we keep the aggregated value until we start a new frame + // we can safely free the current batch + batches.remove(0).clear(); + + logger.trace("WindowFramer.doWork() END"); + } + + /** + * process all rows (computes and writes aggregation values) of current batch that are part of current partition. + * @param currentRow first unprocessed row + * @return index of next unprocessed row + * @throws DrillException if it can't write into the container + */ + private int processPartition(final int currentRow) throws DrillException { + logger.trace("process partition {}, currentRow: {}, outputCount: {}", partition, currentRow, outputCount); + + int row = currentRow; + while (row < outputCount && !partition.isDone()) { + if (partition.isFrameDone()) { + // because all peer rows share the same frame, we only need to compute and aggregate the frame once + partition.newFrame(countPeers(row)); + aggregatePeers(row); + } + + outputAggregatedValues(row, partition); + + partition.rowAggregated(); + row++; + } + + return row; + } + + /** + * @return number of rows that are part of the partition starting at row start of first batch + */ + private int computePartitionSize(final int start) { + logger.trace("compute partition size starting from {} on {} batches", start, batches.size()); + + // current partition always starts from first batch + final VectorAccessible first = getCurrent(); + + int length = 0; + + // count all rows that are in the same partition of start + // keep increasing length until we find first row of next partition or we reach the very + // last batch + for (WindowDataBatch batch : batches) { + final int recordCount = batch.getRecordCount(); + + // check first container from start row, and subsequent containers from first row + for (int row = (batch == first) ? start : 0; row < recordCount; row++, length++) { + if (!isSamePartition(start, first, row, batch)) { + return length; + } + } + } + + return length; + } + + /** + * Counts how many rows are peer with the first row of the current frame + * @param start first row of current frame + * @return number of peer rows + */ + private int countPeers(final int start) { + // current frame always starts from first batch + final VectorAccessible first = getCurrent(); + + int length = 0; + + // count all rows that are in the same frame of starting row + // keep increasing length until we find first non peer row we reach the very + // last batch + for (WindowDataBatch batch : batches) { + final int recordCount = batch.getRecordCount(); + + // for every remaining row in the partition, count it if it's a peer row + final int remaining = partition.getRemaining(); + for (int row = (batch == first) ? start : 0; row < recordCount && length < remaining; row++, length++) { + if (!isPeer(start, first, row, batch)) { + return length; + } + } + } + + return length; + } + + /** + * aggregates all peer rows of current row + * @param currentRow starting row of the current frame + * @throws SchemaChangeException + */ + private void aggregatePeers(final int currentRow) throws SchemaChangeException { + logger.trace("aggregating {} rows starting from {}", partition.getPeers(), currentRow); + assert !partition.isFrameDone() : "frame is empty!"; + + // a single frame can include rows from multiple batches + // start processing first batch and, if necessary, move to next batches + Iterator<WindowDataBatch> iterator = batches.iterator(); + WindowDataBatch current = iterator.next(); + setupRead(current, container); + + final int peers = partition.getPeers(); + for (int i = 0, row = currentRow; i < peers; i++, row++) { + if (row >= current.getRecordCount()) { + // we reached the end of the current batch, move to the next one + current = iterator.next(); + setupRead(current, container); + row = 0; + } + + aggregateRecord(row); + } + } + + @Override + public boolean canDoWork() { + // check if we can process a saved batch + if (batches.size() < 2) { + logger.trace("we don't have enough batches to proceed, fetch next batch"); + return false; + } + + final VectorAccessible current = getCurrent(); + final int currentSize = current.getRecordCount(); + final VectorAccessible last = batches.get(batches.size() - 1); + final int lastSize = last.getRecordCount(); + + if (!isSamePartition(currentSize - 1, current, lastSize - 1, last) + || !isPeer(currentSize - 1, current, lastSize - 1, last)) { + logger.trace("frame changed, we are ready to process first saved batch"); + return true; + } else { + logger.trace("frame didn't change, fetch next batch"); + return false; + } + } + + /** + * @return saved batch that will be processed in doWork() + */ + private VectorAccessible getCurrent() { + return batches.get(0); + } + + @Override + public int getOutputCount() { + return outputCount; + } + + @Override + public void cleanup() { + } + + /** + * setup incoming container for aggregateRecord() + */ + public abstract void setupRead(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException; + + /** + * setup outgoing container for outputAggregatedValues. This will also reset the aggregations in most cases. + */ + public abstract void setupWrite(@Named("incoming") WindowDataBatch incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException; + + /** + * aggregates a row from the incoming container + * @param index of row to aggregate + */ + public abstract void aggregateRecord(@Named("index") int index); + + /** + * writes aggregated values to row of outgoing container + * @param outIndex index of row + */ + public abstract void outputAggregatedValues(@Named("outIndex") int outIndex, @Named("partition") Partition partition); + + /** + * reset all window functions + */ + public abstract boolean resetValues(); + + /** + * compares two rows from different batches (can be the same), if they have the same value for the partition by + * expression + * @param b1Index index of first row + * @param b1 batch for first row + * @param b2Index index of second row + * @param b2 batch for second row + * @return true if the rows are in the same partition + */ + public abstract boolean isSamePartition(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1, + @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2); + + /** + * compares two rows from different batches (can be the same), if they have the same value for the order by + * expression + * @param b1Index index of first row + * @param b1 batch for first row + * @param b2Index index of second row + * @param b2 batch for second row + * @return true if the rows are in the same partition + */ + public abstract boolean isPeer(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1, + @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2); +} http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java new file mode 100644 index 0000000..8d6728e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.window; + +/** + * Used internally to keep track of partitions and frames + */ +public class Partition { + private final int length; // size of this partition + private int remaining; + private int peers; + + // we keep these attributes public because the generated code needs to access them + public int row_number; + public int rank; + public int dense_rank; + public double percent_rank; + public double cume_dist; + + /** + * @return number of rows not yet aggregated in this partition + */ + public int getRemaining() { + return remaining; + } + + /** + * @return peer rows not yet aggregated in current frame + */ + public int getPeers() { + return peers; + } + + public Partition(int length) { + this.length = length; + remaining = length; + row_number = 1; + } + + public void rowAggregated() { + remaining--; + peers--; + + row_number++; + } + + public void newFrame(int peers) { + this.peers = peers; + rank = row_number; // rank = row number of 1st peer + dense_rank++; + percent_rank = length > 1 ? (double) (rank - 1) / (length - 1) : 0; + cume_dist = (double)(rank + peers - 1) / length; + } + + public boolean isDone() { + return remaining == 0; + } + + public boolean isFrameDone() { + return peers == 0; + } + + @Override + public String toString() { + return String.format("{length: %d, remaining partition: %d, remaining peers: %d}", length, remaining, peers); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java new file mode 100644 index 0000000..5045cb3 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java @@ -0,0 +1,93 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.physical.impl.window; + +import com.google.common.collect.Lists; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.ValueVector; + +import java.util.Iterator; +import java.util.List; + +public class WindowDataBatch implements VectorAccessible { + + private final FragmentContext context; + private final VectorContainer container; + private final int recordCount; + + public WindowDataBatch(final VectorAccessible batch, final FragmentContext context) { + this.context = context; + recordCount = batch.getRecordCount(); + + List<ValueVector> vectors = Lists.newArrayList(); + + for (VectorWrapper<?> v : batch) { + if (v.isHyper()) { + throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch."); + } + TransferPair tp = v.getValueVector().getTransferPair(); + tp.transfer(); + vectors.add(tp.getTo()); + } + + container = new VectorContainer(); + container.addCollection(vectors); + container.setRecordCount(recordCount); + container.buildSchema(batch.getSchema().getSelectionVectorMode()); + } + + public FragmentContext getContext() { + return context; + } + + @Override + public int getRecordCount() { + return recordCount; + } + + @Override + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds) { + return container.getValueAccessorById(clazz, fieldIds); + } + + @Override + public TypedFieldId getValueVectorId(SchemaPath path) { + return container.getValueVectorId(path); + } + + @Override + public BatchSchema getSchema() { + return container.getSchema(); + } + + @Override + public Iterator<VectorWrapper<?>> iterator() { + return container.iterator(); + } + + public void clear() { + container.clear(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java index 428632f..da189eb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java @@ -19,13 +19,21 @@ package org.apache.drill.exec.physical.impl.window; import java.io.IOException; import java.util.List; +import java.util.Map; +import com.google.common.collect.Maps; +import com.sun.codemodel.JExpression; +import com.sun.codemodel.JInvocation; +import com.sun.codemodel.JVar; import org.apache.drill.common.exceptions.DrillException; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; +import org.apache.drill.common.expression.FunctionCall; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.common.logical.data.Order; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.compile.sig.GeneratorMapping; import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.ClassTransformationException; @@ -33,13 +41,11 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; -import org.apache.drill.exec.expr.ValueVectorReadExpression; import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.WindowPOP; -import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; @@ -61,12 +67,51 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WindowFrameRecordBatch.class); private final RecordBatch incoming; - private List<RecordBatchData> batches; + private List<WindowDataBatch> batches; private WindowFramer framer; private boolean noMoreBatches; private BatchSchema schema; + /** + * Describes supported window functions and if they output FLOAT8 or BIGINT + */ + private enum WindowFunction { + ROW_NUMBER(false), + RANK(false), + DENSE_RANK(false), + PERCENT_RANK(true), + CUME_DIST(true); + + private final boolean useDouble; + + WindowFunction(boolean useDouble) { + this.useDouble = useDouble; + } + + public TypeProtos.MajorType getMajorType() { + return useDouble ? Types.required(TypeProtos.MinorType.FLOAT8) : Types.required(TypeProtos.MinorType.BIGINT); + } + + /** + * Extract the WindowFunction corresponding to the logical expression + * @param expr logical expression + * @return WindowFunction or null if the logical expression is not a window function + */ + public static WindowFunction fromExpression(final LogicalExpression expr) { + if (!(expr instanceof FunctionCall)) { + return null; + } + + final String name = ((FunctionCall) expr).getName(); + try { + return WindowFunction.valueOf(name.toUpperCase()); + } catch (IllegalArgumentException e) { + return null; // not a window function + } + } + } + public WindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { super(popConfig, context); this.incoming = incoming; @@ -90,7 +135,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { * * <p><pre> * when innerNext() is called: - * call next(incoming), we receive and save b0 in a list of RecordDataBatch + * call next(incoming), we receive and save b0 in a list of WindowDataBatch * we can't process b0 yet because we don't know if p1 has more rows upstream * call next(incoming), we receive and save b1 * we can't process b0 yet for the same reason previously stated @@ -107,7 +152,11 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { * when innerNext() is called: * we return NONE * </pre></p> - * + * The previous scenario applies when we don't have an ORDER BY clause, otherwise a batch can be processed + * as soon as we reach the final peer row of the batch's last row (we find the end of the last frame of the batch). + * </p> + * Because we only support the default frame, we don't need to reset the aggregations until we reach the end of + * a partition. We can safely free a batch as soon as it has been processed. */ @Override public IterOutcome innerNext() { @@ -130,24 +179,21 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { case OUT_OF_MEMORY: case NOT_YET: case STOP: + cleanup(); return upstream; case OK_NEW_SCHEMA: - // when a partition of rows exceeds the current processed batch, it will be kept as "pending" and processed - // when innerNext() is called again. If the schema changes, the framer is "rebuilt" and the pending information - // will be lost which may lead to incorrect results. - - // only change in the case that the schema truly changes. Artificial schema changes are ignored. + // We don't support schema changes if (!incoming.getSchema().equals(schema)) { if (schema != null) { - throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas."); + throw new UnsupportedOperationException("OVER clause doesn't currently support changing schemas."); } this.schema = incoming.getSchema(); } case OK: - batches.add(new RecordBatchData(incoming)); + batches.add(new WindowDataBatch(incoming, context)); break; default: - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("Unsupported upstrean state " + upstream); } } @@ -157,15 +203,12 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { return IterOutcome.NONE; } - // process a saved batch + // process first saved batch, then release it try { framer.doWork(); } catch (DrillException e) { context.fail(e); - if (framer != null) { - framer.cleanup(); - framer = null; - } + cleanup(); return IterOutcome.STOP; } @@ -201,52 +244,44 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { } private WindowFramer createFramer(VectorAccessible batch) throws SchemaChangeException, IOException, ClassTransformationException { - logger.trace("creating framer"); + assert framer == null : "createFramer should only be called once"; - container.clear(); + logger.trace("creating framer"); - if (framer != null) { - framer.cleanup(); - framer = null; - } + final List<LogicalExpression> aggExprs = Lists.newArrayList(); + final Map<WindowFunction, TypedFieldId> winExprs = Maps.newHashMap(); + final List<LogicalExpression> keyExprs = Lists.newArrayList(); + final List<LogicalExpression> orderExprs = Lists.newArrayList(); + final ErrorCollector collector = new ErrorCollectorImpl(); - ErrorCollector collector = new ErrorCollectorImpl(); + container.clear(); - // setup code generation to copy all incoming vectors to the container - // we can't just transfer them because after we pass the container downstream, some values will be needed when - // processing the next batches - int j = 0; - LogicalExpression[] windowExprs = new LogicalExpression[batch.getSchema().getFieldCount()]; + // all existing vectors will be transferred to the outgoing container in framer.doWork() for (VectorWrapper wrapper : batch) { - // read value from saved batch - final LogicalExpression expr = ExpressionTreeMaterializer.materialize( - new ValueVectorReadExpression(new TypedFieldId(wrapper.getField().getType(), wrapper.isHyper(), j)), - batch, collector, context.getFunctionRegistry()); - - ValueVector vv = container.addOrGet(wrapper.getField()); - vv.allocateNew(); - - // write value into container - TypedFieldId id = container.getValueVectorId(vv.getField().getPath()); - windowExprs[j] = new ValueVectorWriteExpression(id, expr, true); - j++; + container.addOrGet(wrapper.getField()); } // add aggregation vectors to the container, and materialize corresponding expressions - LogicalExpression[] aggExprs = new LogicalExpression[popConfig.getAggregations().length]; - for (int i = 0; i < aggExprs.length; i++) { - // evaluate expression over saved batch - NamedExpression ne = popConfig.getAggregations()[i]; - final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), batch, collector, context.getFunctionRegistry()); - - // add corresponding ValueVector to container - final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); - ValueVector vv = container.addOrGet(outputField); - vv.allocateNew(); - - // write value into container - TypedFieldId id = container.getValueVectorId(ne.getRef()); - aggExprs[i] = new ValueVectorWriteExpression(id, expr, true); + for (final NamedExpression ne : popConfig.getAggregations()) { + final WindowFunction wf = WindowFunction.fromExpression(ne.getExpr()); + + if (wf != null) { + // add corresponding ValueVector to container + final MaterializedField outputField = MaterializedField.create(ne.getRef(), wf.getMajorType()); + ValueVector vv = container.addOrGet(outputField); + vv.allocateNew(); + winExprs.put(wf, container.getValueVectorId(ne.getRef())); + } else { + // evaluate expression over saved batch + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), batch, collector, context.getFunctionRegistry()); + + // add corresponding ValueVector to container + final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); + ValueVector vv = container.addOrGet(outputField); + vv.allocateNew(); + TypedFieldId id = container.getValueVectorId(ne.getRef()); + aggExprs.add(new ValueVectorWriteExpression(id, expr, true)); + } } if (container.isSchemaChanged()) { @@ -254,17 +289,15 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { } // materialize partition by expressions - LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getWithins().length]; - for (int i = 0; i < keyExprs.length; i++) { - NamedExpression ne = popConfig.getWithins()[i]; - keyExprs[i] = ExpressionTreeMaterializer.materialize(ne.getExpr(), batch, collector, context.getFunctionRegistry()); + for (final NamedExpression ne : popConfig.getWithins()) { + keyExprs.add( + ExpressionTreeMaterializer.materialize(ne.getExpr(), batch, collector, context.getFunctionRegistry())); } // materialize order by expressions - LogicalExpression[] orderExprs = new LogicalExpression[popConfig.getOrderings().length]; - for (int i = 0; i < orderExprs.length; i++) { - Order.Ordering oe = popConfig.getOrderings()[i]; - orderExprs[i] = ExpressionTreeMaterializer.materialize(oe.getExpr(), batch, collector, context.getFunctionRegistry()); + for (final Order.Ordering oe : popConfig.getOrderings()) { + orderExprs.add( + ExpressionTreeMaterializer.materialize(oe.getExpr(), batch, collector, context.getFunctionRegistry())); } if (collector.hasErrors()) { @@ -272,14 +305,11 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { } // generate framer code - final ClassGenerator<WindowFramer> cg = CodeGenerator.getRoot(WindowFramer.TEMPLATE_DEFINITION, context.getFunctionRegistry()); - // setup for isSamePartition() - setupIsFunction(cg, keyExprs, isaB1, isaB2); - // setup for isPeer() - setupIsFunction(cg, orderExprs, isaP1, isaP2); - setupAddRecords(cg, aggExprs); - setupOutputWindowValues(cg, windowExprs); + setupIsFunction(cg, keyExprs, isaB1, isaB2); // setup for isSamePartition() + setupIsFunction(cg, orderExprs, isaP1, isaP2); // setup for isPeer() + setupOutputAggregatedValues(cg, aggExprs); + setupAddWindowValue(cg, winExprs); cg.getBlock("resetValues")._return(JExpr.TRUE); @@ -300,7 +330,8 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { /** * setup comparison functions isSamePartition and isPeer */ - private void setupIsFunction(ClassGenerator<WindowFramer> cg, LogicalExpression[] exprs, MappingSet leftMapping, MappingSet rightMapping) { + private void setupIsFunction(final ClassGenerator<WindowFramer> cg, final List<LogicalExpression> exprs, + final MappingSet leftMapping, final MappingSet rightMapping) { cg.setMappingSet(leftMapping); for (LogicalExpression expr : exprs) { cg.setMappingSet(leftMapping); @@ -317,36 +348,51 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { cg.getEvalBlock()._return(JExpr.TRUE); } - private static final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupIncoming", "addRecord", null, null); - private static final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupOutgoing", "outputRecordValues", "resetValues", "cleanup"); + private static final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupRead", "aggregateRecord", null, null); + private static final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupWrite", "outputAggregatedValues", "resetValues", "cleanup"); private final MappingSet eval = new MappingSet("index", "outIndex", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE); /** - * setup for addRecords() and outputRecordValues() + * setup for aggregateRecord() and outputAggregatedValues() */ - private void setupAddRecords(ClassGenerator<WindowFramer> cg, LogicalExpression[] valueExprs) { + private void setupOutputAggregatedValues(ClassGenerator<WindowFramer> cg, List<LogicalExpression> valueExprs) { cg.setMappingSet(eval); for (LogicalExpression ex : valueExprs) { cg.addExpr(ex); } } - private final static GeneratorMapping OUTPUT_WINDOW_VALUES = GeneratorMapping.create("setupCopy", "outputWindowValues", null, null); - private final MappingSet windowValues = new MappingSet("index", "index", OUTPUT_WINDOW_VALUES, OUTPUT_WINDOW_VALUES); - - private void setupOutputWindowValues(ClassGenerator<WindowFramer> cg, LogicalExpression[] valueExprs) { - cg.setMappingSet(windowValues); - for (LogicalExpression valueExpr : valueExprs) { - cg.addExpr(valueExpr); + /** + * generate code to write "computed" window function values into their respective value vectors + */ + private void setupAddWindowValue(final ClassGenerator<WindowFramer> cg, final Map<WindowFunction, TypedFieldId> functions) { + cg.setMappingSet(eval); + for (WindowFunction function : functions.keySet()) { + final JVar vv = cg.declareVectorValueSetupAndMember(cg.getMappingSet().getOutgoing(), functions.get(function)); + final JExpression outIndex = cg.getMappingSet().getValueWriteIndex(); + JInvocation setMethod = vv.invoke("getMutator").invoke("setSafe").arg(outIndex).arg( + JExpr.direct("partition." + function.name().toLowerCase())); + cg.getEvalBlock().add(setMethod); } } - @Override - public void close() { + private void cleanup() { if (framer != null) { framer.cleanup(); framer = null; } + + if (batches != null) { + for (final WindowDataBatch bd : batches) { + bd.clear(); + } + batches = null; + } + } + + @Override + public void close() { + cleanup(); super.close(); } http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameTemplate.java deleted file mode 100644 index 78bab54..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameTemplate.java +++ /dev/null @@ -1,379 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.window; - -import org.apache.drill.common.exceptions.DrillException; -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.physical.impl.sort.RecordBatchData; -import org.apache.drill.exec.record.VectorAccessible; -import org.apache.drill.exec.record.VectorWrapper; - -import javax.inject.Named; -import java.util.Iterator; -import java.util.List; - - -public abstract class WindowFrameTemplate implements WindowFramer { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WindowFrameTemplate.class); - - private VectorAccessible container; - private List<RecordBatchData> batches; - private int outputCount; // number of rows in currently/last processed batch - - /** - * current partition being processed. Can span over multiple batches, so we may need to keep it between calls to doWork() - */ - private Interval partition; - - private int currentBatch; // first unprocessed batch - - @Override - public void setup(List<RecordBatchData> batches, VectorAccessible container) throws SchemaChangeException { - this.container = container; - this.batches = batches; - - outputCount = 0; - partition = null; - currentBatch = 0; - - setupOutgoing(container); - } - - /** - * processes all rows of current batch: - * <ul> - * <li>compute window aggregations</li> - * <li>copy remaining vectors from current batch to container</li> - * </ul> - */ - @Override - public void doWork() throws DrillException { - logger.trace("WindowFramer.doWork() START, num batches {}, currentBatch {}", batches.size(), currentBatch); - - final VectorAccessible current = batches.get(currentBatch).getContainer(); - - // we need to store the record count explicitly, in case we release current batch at the end of this call - outputCount = current.getRecordCount(); - - // allocate vectors - for (VectorWrapper<?> w : container){ - w.getValueVector().allocateNew(); - } - - setupCopy(current, container); - - int currentRow = 0; - - while (currentRow < outputCount) { - if (partition != null) { - assert currentRow == 0 : "pending windows are only expected at the start of the batch"; - - // we have a pending window we need to handle from a previous call to doWork() - logger.trace("we have a pending partition {}", partition); - } else { - // compute the size of the new partition - final int length = computePartitionSize(currentRow); - partition = new Interval(currentRow, length, 0); - } - - currentRow = processPartition(currentRow); - - if (partition == null) { - freeBatches(currentRow == outputCount); - } - } - - if (partition != null) { - logger.trace("we have a pending partition {}", partition); - // current batch has been processed but it won't be released until this pending partition is processed - currentBatch++; - } - - for (VectorWrapper<?> v : container) { - v.getValueVector().getMutator().setValueCount(outputCount); - } - - logger.trace("WindowFramer.doWork() END"); - } - - /** - * releases saved batches that are no longer needed: all rows of their partitions have been processed - * @param freeCurrent do we free current batch too ? - */ - private void freeBatches(boolean freeCurrent) { - // how many batches can be released - int numFree = currentBatch; - if (freeCurrent) { // current batch can also be released - numFree++; - } - if (numFree > 0) { - logger.trace("freeing {} batches", numFree); - - // we are ready to free batches < currentBatch - for (int i = 0; i < numFree; i++) { - RecordBatchData bd = batches.remove(0); - bd.getContainer().clear(); - } - - currentBatch = 0; - } - } - - /** - * process all rows (computes and writes aggregation values) of current batch that are part of current partition. - * @return index of next unprocessed row - * @throws DrillException if it can't write into the container - */ - private int processPartition(int currentRow) throws DrillException { - logger.trace("process partition {}, currentBatch: {}, currentRow: {}", partition, currentBatch, currentRow); - - // compute how many rows remain unprocessed in the current partition - int remaining = partition.length; - for (int b = 0; b < currentBatch; b++) { - remaining -= batches.get(b).getRecordCount(); - } - remaining -= currentRow - partition.start; - - // when computing the frame for the current row, keep track of how many peer rows need to be processed - // because all peer rows share the same frame, we only need to compute and aggregate the frame once - for (int peers = 0; currentRow < outputCount && remaining > 0; currentRow++, remaining--) { - if (peers == 0) { - Interval frame = computeFrame(currentBatch, currentRow); - resetValues(); - aggregate(frame); - peers = frame.peers; - } else { - peers--; - } - - outputRecordValues(currentRow); - outputWindowValues(currentRow); - } - - if (remaining == 0) { - logger.trace("finished processing {}", partition); - partition = null; - } - - return currentRow; - } - - /** - * @return number of rows that are part of the partition starting at row start of first batch - */ - private int computePartitionSize(int start) { - logger.trace("compute partition size starting from {} on {} batches", start, batches.size()); - - // current partition always starts from first batch - final VectorAccessible first = batches.get(0).getContainer(); - - int length = 0; - - // count all rows that are in the same partition of start - outer: - for (RecordBatchData batch : batches) { - final VectorAccessible cont = batch.getContainer(); - - // check first container from start row, and subsequent containers from first row - for (int row = (cont == first ? start : 0); row < cont.getRecordCount(); row++) { - if (isSamePartition(start, first, row, cont)) { - length++; - } else { - break outer; - } - } - } - - return length; - } - - /** - * find the limits of the window frame for a row - * @param batchId batch where the current row is - * @param row idx of row in the given batch - * @return frame interval - */ - private Interval computeFrame(int batchId, int row) { - - // using default frame for now RANGE BETWEEN UNBOUND PRECEDING AND CURRENT ROW - // frame contains all rows from start of partition to last peer of row - - // count how many rows in the current partition precede the row - int length = row; - // include rows of all batches previous to batchId - Iterator<RecordBatchData> iterator = batches.iterator(); - for (int b = 0; b < batchId; b++) { - length += iterator.next().getRecordCount(); - } - length -= partition.start; - - VectorAccessible batch = iterator.next().getContainer(); - VectorAccessible current = batch; - - // for every remaining row in the partition, count it if it's a peer row - int peers = 0; - for (int curRow = row; length < partition.length; length++, curRow++, peers++) { - if (curRow == current.getRecordCount()) { - current = iterator.next().getContainer(); - curRow = 0; - } - - if (!isPeer(row, batch, curRow, current)) { - break; - } - } - - // do not count row as a peer - return new Interval(partition.start, length, peers-1); - } - - private void aggregate(Interval frame) throws SchemaChangeException { - logger.trace("aggregating {}", frame); - assert frame.length > 0 : "processing empty frame!"; - - // a single frame can include rows from multiple batches - // start processing first batch and, if necessary, move to next batches - Iterator<RecordBatchData> iterator = batches.iterator(); - VectorAccessible current = iterator.next().getContainer(); - setupIncoming(current); - - for (int i = 0, row = frame.start; i < frame.length; i++, row++) { - if (row >= current.getRecordCount()) { - // we reached the end of the current batch, move to the next one - current = iterator.next().getContainer(); - setupIncoming(current); - row = 0; - } - - addRecord(row); - } - } - - @Override - public boolean canDoWork() { - // check if we can process a saved batch - if (batches.size() > 1) { - final VectorAccessible last = batches.get(batches.size()-1).getContainer(); - - if (!isSamePartition(getCurrent().getRecordCount() - 1, getCurrent(), last.getRecordCount() - 1, last)) { - logger.trace("partition changed, we are ready to process first saved batch"); - return true; - } else { - logger.trace("partition didn't change, fetch next batch"); - } - } else { - logger.trace("we don't have enough batches to proceed, fetch next batch"); - } - - return false; - } - - @Override - public VectorAccessible getCurrent() { - return batches.get(currentBatch).getContainer(); - } - - @Override - public int getOutputCount() { - return outputCount; - } - - @Override - public void cleanup() { - } - - /** - * setup incoming container for addRecord() - */ - public abstract void setupIncoming(@Named("incoming") VectorAccessible incoming) throws SchemaChangeException; - - /** - * setup outgoing container for outputRecordValues - */ - public abstract void setupOutgoing(@Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException; - - /** - * setup for outputWindowValues - */ - public abstract void setupCopy(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException; - - /** - * aggregates a row from the incoming container - * @param index of row to aggregate - */ - public abstract void addRecord(@Named("index") int index); - - /** - * writes aggregated values to row of outgoing container - * @param outIndex index of row - */ - public abstract void outputRecordValues(@Named("outIndex") int outIndex); - - /** - * copies all value vectors from incoming to container, for a specific row - * @param index of row to be copied - */ - public abstract void outputWindowValues(@Named("index") int index); - - /** - * reset all window functions - */ - public abstract boolean resetValues(); - - /** - * compares two rows from different batches (can be the same), if they have the same value for the partition by - * expression - * @param b1Index index of first row - * @param b1 batch for first row - * @param b2Index index of second row - * @param b2 batch for second row - * @return true if the rows are in the same partition - */ - public abstract boolean isSamePartition(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1, @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2); - - /** - * compares two rows from different batches (can be the same), if they have the same value for the order by - * expression - * @param b1Index index of first row - * @param b1 batch for first row - * @param b2Index index of second row - * @param b2 batch for second row - * @return true if the rows are in the same partition - */ - public abstract boolean isPeer(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1, @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2); - - /** - * Used internally to keep track of partitions and frames - */ - private static class Interval { - public final int start; - public final int length; - public final int peers; // we only need this for frames - - public Interval(int start, int length, int peers) { - this.start = start; - this.length = length; - this.peers = peers; - } - - @Override - public String toString() { - return String.format("{start: %d, length: %d, peers: %d}", start, length, peers); - } - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java index 23a2b53..69866af 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java @@ -20,21 +20,20 @@ package org.apache.drill.exec.physical.impl.window; import org.apache.drill.common.exceptions.DrillException; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.physical.impl.sort.RecordBatchData; -import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.record.VectorContainer; import java.util.List; public interface WindowFramer { - public static TemplateClassDefinition<WindowFramer> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, WindowFrameTemplate.class); + TemplateClassDefinition<WindowFramer> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, DefaultFrameTemplate.class); - public abstract void setup(List<RecordBatchData> batches, VectorAccessible container) throws SchemaChangeException; + void setup(List<WindowDataBatch> batches, final VectorContainer container) throws SchemaChangeException; /** * process the inner batch and write the aggregated values in the container * @throws DrillException */ - public abstract void doWork() throws DrillException; + void doWork() throws DrillException; /** * check if current batch can be processed: @@ -44,16 +43,12 @@ public interface WindowFramer { * </ol> * @return true if current batch can be processed, false otherwise */ - public abstract boolean canDoWork(); + boolean canDoWork(); + /** * @return number rows processed in last batch */ - public abstract int getOutputCount(); - - public abstract void cleanup(); + int getOutputCount(); - /** - * @return saved batch that will be processed in doWork() - */ - public abstract VectorAccessible getCurrent(); + void cleanup(); } http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java index 9b21ae3..6686fbe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java @@ -17,12 +17,14 @@ */ package org.apache.drill.exec.physical.impl.xsort; +import com.typesafe.config.ConfigException; import io.netty.buffer.DrillBuf; import java.util.Queue; import javax.inject.Named; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; @@ -36,7 +38,7 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.Queues; public abstract class MSortTemplate implements MSorter, IndexedSortable{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MSortTemplate.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MSortTemplate.class); private SelectionVector4 vector4; private SelectionVector4 aux; @@ -68,7 +70,16 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{ } } final DrillBuf drillBuf = allocator.buffer(4 * totalCount); - aux = new SelectionVector4(drillBuf, totalCount, Character.MAX_VALUE); + + // This is only useful for debugging: change the maximum size of batches exposed to downstream + // when we don't spill to disk + int MSORT_BATCH_MAXSIZE; + try { + MSORT_BATCH_MAXSIZE = context.getConfig().getInt(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE); + } catch(ConfigException.Missing e) { + MSORT_BATCH_MAXSIZE = Character.MAX_VALUE; + } + aux = new SelectionVector4(drillBuf, totalCount, MSORT_BATCH_MAXSIZE); } /** http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index 330ec79..ff53052 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -34,7 +34,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4; public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements CloseableRecordBatch { final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); - protected final VectorContainer container; //= new VectorContainer(); + protected final VectorContainer container; protected final T popConfig; protected final FragmentContext context; protected final OperatorContext oContext; @@ -56,8 +56,8 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements this.context = context; this.popConfig = popConfig; this.oContext = oContext; - this.stats = oContext.getStats(); - this.container = new VectorContainer(this.oContext); + stats = oContext.getStats(); + container = new VectorContainer(this.oContext); if (buildSchema) { state = BatchState.BUILD_SCHEMA; } else { http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java b/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java index adbf653..5d8cd95 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java @@ -23,7 +23,6 @@ import org.apache.drill.exec.work.foreman.SqlUnsupportedException; import org.apache.drill.exec.work.foreman.UnsupportedDataTypeException; import org.apache.drill.exec.work.foreman.UnsupportedFunctionException; import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException; -import org.junit.Ignore; import org.junit.Test; public class TestDisabledFunctionality extends BaseTestQuery{ @@ -286,18 +285,6 @@ public class TestDisabledFunctionality extends BaseTestQuery{ } } - @Test(expected = UnsupportedFunctionException.class) // see DRILL-2441 - public void testDisabledWindowFunctions() throws Exception { - try { - test("SELECT employee_id,position_id, salary, avg(salary) " + - "OVER (PARTITION BY position_id order by position_id) " + - "FROM cp.`employee.json` " + - "order by employee_id;"); - } catch(UserException ex) { - throwAsUnsupportedException(ex); - } - } - @Test(expected = UnsupportedFunctionException.class) // see DRILL-2181 public void testFlattenWithinGroupBy() throws Exception { try { http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/GenerateTestData.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/GenerateTestData.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/GenerateTestData.java new file mode 100644 index 0000000..623c1e2 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/GenerateTestData.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.window; + +import org.apache.drill.common.util.TestTools; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class GenerateTestData { + private static final int SUB_MUL = 1; + private static final int BATCH_SIZE = 20; + + private static class Partition { + Partition previous; + final int length; + final int[] subs; + + public Partition(int length, int[] subs) { + this.length = length; + this.subs = subs; + } + + /** + * @return total number of rows since first partition, this partition included + */ + public int cumulLength() { + int prevLength = previous != null ? previous.cumulLength() : 0; + return length + prevLength; + } + + public boolean isPartOf(int rowNumber) { + int prevLength = previous != null ? previous.cumulLength() : 0; + return rowNumber >= prevLength && rowNumber < cumulLength(); + } + + public int getSubIndex(final int sub) { + return Arrays.binarySearch(subs, sub); + } + + public int getSubSize(int sub) { + if (sub != subs[subs.length-1]) { + return sub * SUB_MUL; + } else { + //last sub has enough rows to reach partition length + int size = length; + for (int i = 0; i < subs.length-1; i++) { + size -= subs[i] * SUB_MUL; + } + return size; + } + } + + /** + * @return sub id of the sub that contains rowNumber + */ + public int getSubId(int rowNumber) { + assert isPartOf(rowNumber) : "row "+rowNumber+" isn't part of this partition"; + + int prevLength = previous != null ? previous.cumulLength() : 0; + rowNumber -= prevLength; // row num from start of this partition + + for (int s : subs) { + if (rowNumber < subRunningCount(s)) { + return s; + } + } + + throw new RuntimeException("should never happen!"); + } + + /** + * @return running count of rows from first row of the partition to current sub, this sub included + */ + public int subRunningCount(int sub) { + int count = 0; + for (int s : subs) { + count += getSubSize(s); + if (s == sub) { + break; + } + } + return count; + } + + /** + * @return running sum of salaries from first row of the partition to current sub, this sub included + */ + public int subRunningSum(int sub) { + int sum = 0; + for (int s : subs) { + sum += (s+10) * getSubSize(s); + if (s == sub) { + break; + } + } + return sum; + } + + /** + * @return sum of salaries for all rows of the partition + */ + public int totalSalary() { + return subRunningSum(subs[subs.length-1]); + } + + } + + private static Partition[] dataB1P1() { + // partition rows 20, subs [1, 2, 3, 4, 5, 6] + return new Partition[] { + new Partition(20, new int[]{1, 2, 3, 4, 5, 6}) + }; + } + + private static Partition[] dataB1P2() { + // partition rows 10, subs [1, 2, 3, 4] + // partition rows 10, subs [4, 5, 6] + return new Partition[] { + new Partition(10, new int[]{1, 2, 3, 4}), + new Partition(10, new int[]{4, 5, 6}), + }; + } + + private static Partition[] dataB2P2() { + // partition rows 20, subs [3, 5, 9] + // partition rows 20, subs [9, 10] + return new Partition[] { + new Partition(20, new int[]{3, 5, 9}), + new Partition(20, new int[]{9, 10}), + }; + } + + private static Partition[] dataB2P4() { + // partition rows 5, subs [1, 2, 3] + // partition rows 10, subs [3, 4, 5] + // partition rows 15, subs [5, 6, 7] + // partition rows 10, subs [7, 8] + return new Partition[] { + new Partition(5, new int[]{1, 2, 3}), + new Partition(10, new int[]{3, 4, 5}), + new Partition(15, new int[]{5, 6, 7}), + new Partition(10, new int[]{7, 8}), + }; + } + + private static Partition[] dataB3P2() { + // partition rows 5, subs [1, 2, 3] + // partition rows 55, subs [4, 5, 7, 8, 9, 10, 11, 12] + return new Partition[] { + new Partition(5, new int[]{1, 2, 3}), + new Partition(55, new int[]{4, 5, 7, 8, 9, 10, 11, 12}), + }; + } + + private static Partition[] dataB4P4() { + // partition rows 10, subs [1, 2, 3] + // partition rows 30, subs [3, 4, 5, 6, 7, 8] + // partition rows 20, subs [8, 9, 10] + // partition rows 20, subs [10, 11] + return new Partition[] { + new Partition(10, new int[]{1, 2, 3}), + new Partition(30, new int[]{3, 4, 5, 6, 7, 8}), + new Partition(20, new int[]{8, 9, 10}), + new Partition(20, new int[]{10, 11}), + }; + } + + private static void generateData(final String tableName, final Partition[] partitions) throws FileNotFoundException { + final String WORKING_PATH = TestTools.getWorkingPath(); + final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources"; + //TODO command line arguments contain file name + final String path = TEST_RES_PATH+"/window/" + tableName; + + final File pathFolder = new File(path); + if (!pathFolder.exists()) { + if (!pathFolder.mkdirs()) { + System.err.printf("Couldn't create folder %s, exiting%n", path); + } + } + + // expected results for query without order by clause + final PrintStream resultStream = new PrintStream(path + ".tsv"); + // expected results for query with order by clause + final PrintStream resultOrderStream = new PrintStream(path + ".subs.tsv"); + + // data file(s) + int fileId = 0; + PrintStream dataStream = new PrintStream(path + "/" + fileId + ".data.json"); + + for (Partition p : partitions) { + dataStream.printf("// partition rows %d, subs %s%n", p.length, Arrays.toString(p.subs)); + } + + // set previous partitions + for (int i = 1; i < partitions.length; i++) { + partitions[i].previous = partitions[i - 1]; + } + + // total number of rows + int total = partitions[partitions.length - 1].cumulLength(); + + // create data rows in randome order + List<Integer> emp_ids = new ArrayList<>(total); + for (int i = 0; i < total; i++) { + emp_ids.add(i); + } + Collections.shuffle(emp_ids); + + int emp_idx = 0; + for (int id : emp_ids) { + int p = 0; + while (!partitions[p].isPartOf(id)) { // emp x is @ row x-1 + p++; + } + + int sub = partitions[p].getSubId(id); + int salary = 10 + sub; + + dataStream.printf("{ \"employee_id\":%d, \"position_id\":%d, \"sub\":%d, \"salary\":%d }%n", id, p + 1, sub, salary); + emp_idx++; + if ((emp_idx % BATCH_SIZE)==0 && emp_idx < total) { + System.out.printf("total: %d, emp_idx: %d, fileID: %d%n", total, emp_idx, fileId); + dataStream.close(); + fileId++; + dataStream = new PrintStream(path + "/" + fileId + ".data.json"); + } + } + + dataStream.close(); + + for (int p = 0, idx = 0; p < partitions.length; p++) { + for (int i = 0; i < partitions[p].length; i++, idx++) { + final Partition partition = partitions[p]; //TODO change for p loop to for over partitions + + final int sub = partition.getSubId(idx); + final int rowNumber = i + 1; + final int rank = 1 + partition.subRunningCount(sub) - partition.getSubSize(sub); + final int denseRank = partition.getSubIndex(sub) + 1; + final double cumeDist = (double) partition.subRunningCount(sub) / partition.length; + final double percentRank = partition.length == 1 ? 0 : (double)(rank - 1)/(partition.length - 1); + + // each line has: count(*) sum(salary) row_number() rank() dense_rank() cume_dist() percent_rank() + resultOrderStream.printf("%d\t%d\t%d\t%d\t%d\t%s\t%s%n", + partition.subRunningCount(sub), partition.subRunningSum(sub), + rowNumber, rank, denseRank, Double.toString(cumeDist), Double.toString(percentRank)); + + // each line has: count(*) sum(salary) + resultStream.printf("%d\t%d%n", partition.length, partition.totalSalary()); + } + } + + resultStream.close(); + resultOrderStream.close(); + } + + public static void main(String[] args) throws FileNotFoundException { + generateData("b1.p1", dataB1P1()); + generateData("b1.p2", dataB1P2()); + generateData("b2.p2", dataB2P2()); + generateData("b2.p4", dataB2P4()); + generateData("b3.p2", dataB3P2()); + generateData("b4.p4", dataB4P4()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java index 2b8bd64..15fefa5 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java @@ -18,19 +18,60 @@ package org.apache.drill.exec.physical.impl.window; import org.apache.drill.BaseTestQuery; +import org.apache.drill.DrillTestWrapper; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.util.TestTools; import org.apache.drill.exec.ExecConstants; +import org.junit.BeforeClass; import org.junit.Test; +import java.util.Properties; + public class TestWindowFrame extends BaseTestQuery { - private void runTest(String data, String results, String window) throws Exception { - testNoResult("alter session set `%s`= true", ExecConstants.ENABLE_WINDOW_FUNCTIONS); - testBuilder() - .sqlQuery("select count(*) over pos_win `count`, sum(salary) over pos_win `sum` from cp.`window/%s.json` window pos_win as (%s)", data, window) + private static final String TEST_RES_PATH = TestTools.getWorkingPath() + "/src/test/resources"; + private static final String QUERY_NO_ORDERBY = + "select count(*) over pos_win `count`, sum(salary) over pos_win `sum` from dfs_test.`%s/window/%s` window pos_win as (partition by position_id)"; + private static final String QUERY_ORDERBY = + "select count(*) over pos_win `count`, sum(salary) over pos_win `sum`, row_number() over pos_win `row_number`, rank() over pos_win `rank`, dense_rank() over pos_win `dense_rank`, cume_dist() over pos_win `cume_dist`, percent_rank() over pos_win `percent_rank` from dfs_test.`%s/window/%s` window pos_win as (partition by position_id order by sub)"; + + @BeforeClass + public static void setupMSortBatchSize() { + // make sure memory sorter outputs 20 rows per batch + final Properties props = cloneDefaultTestConfigProperties(); + props.put(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE, Integer.toString(20)); + + updateTestCluster(1, DrillConfig.create(props)); + } + + private DrillTestWrapper buildWindowQuery(final String tableName) throws Exception { + return testBuilder() + .sqlQuery(String.format(QUERY_NO_ORDERBY, TEST_RES_PATH, tableName)) .ordered() - .csvBaselineFile("window/" + results + ".tsv") + .csvBaselineFile("window/" + tableName + ".tsv") .baselineColumns("count", "sum") - .build().run(); + .build(); + } + + private DrillTestWrapper buildWindowWithOrderByQuery(final String tableName) throws Exception { + return testBuilder() + .sqlQuery(String.format(QUERY_ORDERBY, TEST_RES_PATH, tableName)) + .ordered() + .csvBaselineFile("window/" + tableName + ".subs.tsv") + .baselineColumns("count", "sum", "row_number", "rank", "dense_rank", "cume_dist", "percent_rank") + .build(); + } + + private void runTest(final String tableName, final boolean withOrderBy) throws Exception { + runSQL(String.format("alter session set `%s`= true", ExecConstants.ENABLE_WINDOW_FUNCTIONS)); + + try { + DrillTestWrapper testWrapper = withOrderBy ? + buildWindowWithOrderByQuery(tableName) : buildWindowQuery(tableName); + testWrapper.run(); + } finally { + runSQL(String.format("alter session set `%s`= false", ExecConstants.ENABLE_WINDOW_FUNCTIONS)); + } } /** @@ -38,7 +79,7 @@ public class TestWindowFrame extends BaseTestQuery { */ @Test public void testB1P1() throws Exception { - runTest("b1.p1.data", "b1.p1", "partition by position_id order by position_id"); + runTest("b1.p1", false); } /** @@ -46,7 +87,7 @@ public class TestWindowFrame extends BaseTestQuery { */ @Test public void testB1P1OrderBy() throws Exception { - runTest("b1.p1.data", "b1.p1.subs", "partition by position_id order by sub"); + runTest("b1.p1", true); } /** @@ -54,7 +95,7 @@ public class TestWindowFrame extends BaseTestQuery { */ @Test public void testB1P2() throws Exception { - runTest("b1.p2.data", "b1.p2", "partition by position_id order by position_id"); + runTest("b1.p2", false); } /** @@ -63,7 +104,7 @@ public class TestWindowFrame extends BaseTestQuery { */ @Test public void testB1P2OrderBy() throws Exception { - runTest("b1.p2.data", "b1.p2.subs", "partition by position_id order by sub"); + runTest("b1.p2", true); } /** @@ -71,12 +112,12 @@ public class TestWindowFrame extends BaseTestQuery { */ @Test public void testB2P2() throws Exception { - runTest("b2.p2.data", "b2.p2", "partition by position_id order by position_id"); + runTest("b2.p2", false); } @Test public void testB2P2OrderBy() throws Exception { - runTest("b2.p2.data", "b2.p2.subs", "partition by position_id order by sub"); + runTest("b2.p2", true); } /** @@ -84,7 +125,7 @@ public class TestWindowFrame extends BaseTestQuery { */ @Test public void testB2P4() throws Exception { - runTest("b2.p4.data", "b2.p4", "partition by position_id order by position_id"); + runTest("b2.p4", false); } /** @@ -93,7 +134,7 @@ public class TestWindowFrame extends BaseTestQuery { */ @Test public void testB2P4OrderBy() throws Exception { - runTest("b2.p4.data", "b2.p4.subs", "partition by position_id order by sub"); + runTest("b2.p4", true); } /** @@ -101,7 +142,7 @@ public class TestWindowFrame extends BaseTestQuery { */ @Test public void testB3P2() throws Exception { - runTest("b3.p2.data", "b3.p2", "partition by position_id order by position_id"); + runTest("b3.p2", false); } /** @@ -110,7 +151,7 @@ public class TestWindowFrame extends BaseTestQuery { */ @Test public void testB3P2OrderBy() throws Exception { - runTest("b3.p2.data", "b3.p2.subs", "partition by position_id order by sub"); + runTest("b3.p2", true); } /** @@ -119,7 +160,23 @@ public class TestWindowFrame extends BaseTestQuery { */ @Test public void testb4P4() throws Exception { - runTest("b4.p4.data", "b4.p4", "partition by position_id order by position_id"); + runTest("b4.p4", false); + } + + @Test + public void testb4P4OrderBy() throws Exception { + runTest("b4.p4", true); } + @Test // DRILL-3218 + public void testMaxVarChar() throws Exception { + runSQL(String.format("alter session set `%s`= true", ExecConstants.ENABLE_WINDOW_FUNCTIONS)); + + try { + test("select max(cast(columns[2] as char(2))) over(partition by cast(columns[2] as char(2)) order by cast(columns[0] as int)) from dfs_test.`%s/window/allData.csv`", TEST_RES_PATH); + } finally { + runSQL(String.format("alter session set `%s`= false", ExecConstants.ENABLE_WINDOW_FUNCTIONS)); + } + + } } http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/test/resources/window/allData.csv ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/window/allData.csv b/exec/java-exec/src/test/resources/window/allData.csv new file mode 100644 index 0000000..29ab38f --- /dev/null +++ b/exec/java-exec/src/test/resources/window/allData.csv @@ -0,0 +1,5 @@ +-337516559,39342852852629160,VT,AXXXXXXXXXXXXXXXXXXXXXXXXXCXXXXXXXXXXXXXXXXXXXXXXXXB,2014-06-02 00:28:02.418,1952-08-14,false,729363085.95,8:16:8.58 +406158122,81588677006971200,IN,AXXXXXXXXXXXXXXXXXXXXXXXXXCXXXXXXXXXXXXXXXXXXXXXXXXB,2014-06-02 00:28:02.418,2001-03-08,false,1292460500.48,9:11:49.17 +1221407024,30009558124347168,VT,DXXXXXXXXXXXXXXXXXXXXXXXXXEXXXXXXXXXXXXXXXXXXXXXXXXF,2014-06-02 00:28:02.419,2000-10-18,true,395110006.277,18:44:25.43 +-1609141704,47841997008600128,ND,GXXXXXXXXXXXXXXXXXXXXXXXXXHXXXXXXXXXXXXXXXXXXXXXXXXI,2014-06-02 00:28:02.420,1991-05-13,true,1293582041.37,20:52:8.56 +-1032159521,38891661529640288,SD,HXXXXXXXXXXXXXXXXXXXXXXXXXIXXXXXXXXXXXXXXXXXXXXXXXXJ,2014-06-02 00:28:02.420,1965-02-21,false,983657842.924,19:46:10.42 http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/test/resources/window/b1.p1.subs.tsv ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/window/b1.p1.subs.tsv b/exec/java-exec/src/test/resources/window/b1.p1.subs.tsv new file mode 100644 index 0000000..8368d4a --- /dev/null +++ b/exec/java-exec/src/test/resources/window/b1.p1.subs.tsv @@ -0,0 +1,20 @@ +1 11 1 1 1 0.05 0.0 +3 35 2 2 2 0.15 0.05263157894736842 +3 35 3 2 2 0.15 0.05263157894736842 +6 74 4 4 3 0.3 0.15789473684210525 +6 74 5 4 3 0.3 0.15789473684210525 +6 74 6 4 3 0.3 0.15789473684210525 +10 130 7 7 4 0.5 0.3157894736842105 +10 130 8 7 4 0.5 0.3157894736842105 +10 130 9 7 4 0.5 0.3157894736842105 +10 130 10 7 4 0.5 0.3157894736842105 +15 205 11 11 5 0.75 0.5263157894736842 +15 205 12 11 5 0.75 0.5263157894736842 +15 205 13 11 5 0.75 0.5263157894736842 +15 205 14 11 5 0.75 0.5263157894736842 +15 205 15 11 5 0.75 0.5263157894736842 +20 285 16 16 6 1.0 0.7894736842105263 +20 285 17 16 6 1.0 0.7894736842105263 +20 285 18 16 6 1.0 0.7894736842105263 +20 285 19 16 6 1.0 0.7894736842105263 +20 285 20 16 6 1.0 0.7894736842105263 http://git-wip-us.apache.org/repos/asf/drill/blob/3bccec91/exec/java-exec/src/test/resources/window/b1.p1.tsv ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/window/b1.p1.tsv b/exec/java-exec/src/test/resources/window/b1.p1.tsv new file mode 100644 index 0000000..32f6ab7 --- /dev/null +++ b/exec/java-exec/src/test/resources/window/b1.p1.tsv @@ -0,0 +1,20 @@ +20 285 +20 285 +20 285 +20 285 +20 285 +20 285 +20 285 +20 285 +20 285 +20 285 +20 285 +20 285 +20 285 +20 285 +20 285 +20 285 +20 285 +20 285 +20 285 +20 285
