This is an automated email from the ASF dual-hosted git repository.
adarshsanjeev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new fbd305af0f1 MSQ WF: Batch multiple PARTITION BY keys for processing
(#16823)
fbd305af0f1 is described below
commit fbd305af0f1d300f27d2ff8e67cf9937c55c56e2
Author: Akshat Jain <[email protected]>
AuthorDate: Wed Aug 28 11:32:47 2024 +0530
MSQ WF: Batch multiple PARTITION BY keys for processing (#16823)
Currently, if we have a query with window function having PARTITION BY xyz,
and we have a million unique values for xyz each having 1 row, we'd end up
creating a million individual RACs for processing, each having a single row.
This is unnecessary, and we can batch the PARTITION BY keys together for
processing, and process them only when we can't batch further rows to adhere to
maxRowsMaterialized config.
The previous iteration of this PR was simplifying
WindowOperatorQueryFrameProcessor to run all operators on all the rows instead
of creating smaller RACs per partition by key. That approach was discarded in
favor of the batching approach, and the details are summarized here: #16823
(comment).
---
.../WindowOperatorQueryFrameProcessor.java | 329 ++++++++++-----------
.../druid/msq/querykit/FrameProcessorTestBase.java | 108 +++++++
.../WindowOperatorQueryFrameProcessorTest.java | 235 +++++++++++++++
.../common/SortMergeJoinFrameProcessorTest.java | 69 +----
.../results/QueryResultsFrameProcessorTest.java | 26 +-
.../querykit/scan/ScanQueryFrameProcessorTest.java | 25 +-
.../rowsandcols/MapOfColumnsRowsAndColumns.java | 9 +-
7 files changed, 510 insertions(+), 291 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
index 6b28c0263a8..3dc62f3a60d 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
@@ -80,7 +80,6 @@ public class WindowOperatorQueryFrameProcessor implements
FrameProcessor<Object>
private final WritableFrameChannel outputChannel;
private final FrameWriterFactory frameWriterFactory;
private final FrameReader frameReader;
- private final ArrayList<ResultRow> objectsOfASingleRac;
private final int maxRowsMaterialized;
private long currentAllocatorCapacity; // Used for generating
FrameRowTooLargeException if needed
private Cursor frameCursor = null;
@@ -95,6 +94,9 @@ public class WindowOperatorQueryFrameProcessor implements
FrameProcessor<Object>
// Type strategies are pushed in the same order as column types in
frameReader.signature()
private final NullableTypeStrategy[] typeStrategies;
+ private final ArrayList<ResultRow> rowsToProcess;
+ private int lastPartitionIndex = -1;
+
public WindowOperatorQueryFrameProcessor(
WindowOperatorQuery query,
ReadableFrameChannel inputChannel,
@@ -116,7 +118,7 @@ public class WindowOperatorQueryFrameProcessor implements
FrameProcessor<Object>
this.query = query;
this.frameRowsAndCols = new ArrayList<>();
this.resultRowAndCols = new ArrayList<>();
- this.objectsOfASingleRac = new ArrayList<>();
+ this.rowsToProcess = new ArrayList<>();
this.maxRowsMaterialized = maxRowsMaterializedInWindow;
this.partitionColumnNames = partitionColumnNames;
@@ -153,188 +155,117 @@ public class WindowOperatorQueryFrameProcessor
implements FrameProcessor<Object>
public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
{
/*
- *
- * PARTITION BY A ORDER BY B
- *
- * Frame 1 -> rac1
- * A B
- * 1, 2
- * 1, 3
- * 2, 1 --> key changed
- * 2, 2
- *
- *
- * Frame 2 -> rac2
- * 3, 1 --> key changed
- * 3, 2
- * 3, 3
- * 3, 4
- *
- * Frame 3 -> rac3
- *
- * 3, 5
- * 3, 6
- * 4, 1 --> key changed
- * 4, 2
- *
- * In case of empty OVER clause, all these racs need to be added to a
single rows and columns
- * to be processed. The way we can do this is to use a ConcatRowsAndColumns
- * ConcatRC [rac1, rac2, rac3]
- * Run all ops on this
- *
- *
- * The flow would look like:
- * 1. Validate if the operator doesn't have any OVER() clause with
PARTITION BY for this stage.
- * 2. If 1 is true make a giant rows and columns (R&C) using concat as
shown above
- * Let all operators run amok on that R&C
- * 3. If 1 is false
- * Read a frame
- * keep the older row in a class variable
- * check row by row and compare current with older row to check if
partition boundary is reached
- * when frame partition by changes
- * create R&C for those particular set of columns, they would have the
same partition key
- * output will be a single R&C
- * write to output channel
- *
- *
- * Future thoughts: {@link https://github.com/apache/druid/issues/16126}
- *
- * 1. We are writing 1 partition to each frame in this way. In case of
high cardinality data
- * we will be making a large number of small frames. We can have a
check to keep size of frame to a value
- * say 20k rows and keep on adding to the same pending frame and not
create a new frame
- *
- * 2. Current approach with R&C and operators materialize a single R&C
for processing. In case of data
- * with low cardinality a single R&C might be too big to consume. Same
for the case of empty OVER() clause
- * Most of the window operations like SUM(), RANK(), RANGE() etc. can
be made with 2 passes of the data.
- * We might think to reimplement them in the MSQ way so that we do not
have to materialize so much data
+ There are 2 scenarios:
+
+ *** Scenario 1: Query has atleast one window function with an OVER()
clause without a PARTITION BY ***
+
+ In this scenario, we add all the RACs to a single RowsAndColumns to be
processed. We do it via ConcatRowsAndColumns, and run all the operators on the
ConcatRowsAndColumns.
+ This is done because we anyway need to run the operators on the entire
set of rows when we have an OVER() clause without a PARTITION BY.
+ This scenario corresponds to partitionColumnNames.isEmpty()=true code
flow.
+
+ *** Scenario 2: All window functions in the query have OVER() clause with
a PARTITION BY ***
+
+ In this scenario, we need to process rows for each PARTITION BY group
together, but we can batch multiple PARTITION BY keys into the same RAC before
passing it to the operators for processing.
+ Batching is fine since the operators list would have the required
NaivePartitioningOperatorFactory to segregate each PARTITION BY group during
the processing.
+
+ The flow for this scenario can be summarised as following:
+ 1. Frame Reading and Cursor Initialization: We start by reading a frame
from the inputChannel and initializing frameCursor to iterate over the rows in
that frame.
+ 2. Row Comparison: For each row in the frame, we decide whether it
belongs to the same PARTITION BY group as the previous row.
+ This is determined by comparePartitionKeys() method.
+ Please refer to the Javadoc of that method for further
details and an example illustration.
+ 2.1. If the PARTITION BY columns of current row matches the PARTITION
BY columns of the previous row,
+ they belong to the same PARTITION BY group, and gets added to
rowsToProcess.
+ If the number of total rows materialized exceed
maxRowsMaterialized, we process the pending batch via
processRowsUpToLastPartition() method.
+ 2.2. If they don't match, then we have reached a partition boundary.
+ In this case, we update the value for lastPartitionIndex.
+ 3. End of Input: If the input channel is finished, any remaining rows in
rowsToProcess are processed.
+
+ *Illustration of Row Comparison step*
+
+ Let's say we have window_function() OVER (PARTITION BY A ORDER BY B) in
our query, and we get 3 frames in the input channel to process.
+
+ Frame 1
+ A, B
+ 1, 2
+ 1, 3
+ 2, 1 --> PARTITION BY key (column A) changed from 1 to 2.
+ 2, 2
+
+ Frame 2
+ A, B
+ 3, 1 --> PARTITION BY key (column A) changed from 2 to 3.
+ 3, 2
+ 3, 3
+ 3, 4
+
+ Frame 3
+ A, B
+ 3, 5
+ 3, 6
+ 4, 1 --> PARTITION BY key (column A) changed from 3 to 4.
+ 4, 2
+
+ *Why batching?*
+ We batch multiple PARTITION BY keys for processing together to avoid the
overhead of creating different RACs for each PARTITION BY keys, as that would
be unnecessary in scenarios where we have a large number of PARTITION BY keys,
but each key having a single row.
+
+ *Future thoughts: https://github.com/apache/druid/issues/16126*
+ Current approach with R&C and operators materialize a single R&C for
processing. In case of data with low cardinality a single R&C might be too big
to consume. Same for the case of empty OVER() clause.
+ Most of the window operations like SUM(), RANK(), RANGE() etc. can be
made with 2 passes of the data. We might think to reimplement them in the MSQ
way so that we do not have to materialize so much data.
*/
if (partitionColumnNames.isEmpty()) {
- // If we do not have any OVER() clause with PARTITION BY for this stage.
- // Bring all data to a single executor for processing.
- // Convert each frame to RAC.
- // Concatenate all the racs to make a giant RAC.
- // Let all operators run on the giant RAC until channel is finished.
+ // Scenario 1: Query has atleast one window function with an OVER()
clause without a PARTITION BY.
if (inputChannel.canRead()) {
final Frame frame = inputChannel.read();
convertRowFrameToRowsAndColumns(frame);
+ return ReturnOrAwait.runAgain();
} else if (inputChannel.isFinished()) {
runAllOpsOnMultipleRac(frameRowsAndCols);
return ReturnOrAwait.returnObject(Unit.instance());
} else {
return ReturnOrAwait.awaitAll(inputChannels().size());
}
- return ReturnOrAwait.runAgain();
- } else {
- // Aha, you found a PARTITION BY and maybe ORDER BY TO
- // PARTITION BY can also be on multiple keys
- // typically the last stage would already partition and sort for you
- // figure out frame boundaries and convert each distinct group to a rac
- // then run the windowing operator only on each rac
- if (frameCursor == null || frameCursor.isDone()) {
- if (readableInputs.isEmpty()) {
- return ReturnOrAwait.awaitAll(1);
- } else if (inputChannel.canRead()) {
- final Frame frame = inputChannel.read();
- frameCursor = FrameProcessors.makeCursor(frame, frameReader);
- final ColumnSelectorFactory frameColumnSelectorFactory =
frameCursor.getColumnSelectorFactory();
- final Supplier<Object>[] fieldSuppliers = new
Supplier[frameReader.signature().size()];
- for (int i = 0; i < fieldSuppliers.length; i++) {
- final ColumnValueSelector<?> selector =
-
frameColumnSelectorFactory.makeColumnValueSelector(frameReader.signature().getColumnName(i));
- fieldSuppliers[i] = selector::getObject;
-
- }
- rowSupplierFromFrameCursor = () -> {
- final ResultRow row = ResultRow.create(fieldSuppliers.length);
- for (int i = 0; i < fieldSuppliers.length; i++) {
- row.set(i, fieldSuppliers[i].get());
- }
- return row;
- };
- } else if (inputChannel.isFinished()) {
- // reaached end of channel
- // if there is data remaining
- // write it into a rac
- // and run operators on it
- if (!objectsOfASingleRac.isEmpty()) {
- if (objectsOfASingleRac.size() > maxRowsMaterialized) {
- throw new MSQException(new
TooManyRowsInAWindowFault(objectsOfASingleRac.size(), maxRowsMaterialized));
- }
- RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow(
- objectsOfASingleRac,
- frameReader.signature()
- );
- runAllOpsOnSingleRac(rac);
- objectsOfASingleRac.clear();
- }
- return ReturnOrAwait.returnObject(Unit.instance());
- } else {
- return ReturnOrAwait.runAgain();
- }
- }
- while (!frameCursor.isDone()) {
- final ResultRow currentRow = rowSupplierFromFrameCursor.get();
- if (outputRow == null) {
- outputRow = currentRow;
- objectsOfASingleRac.add(currentRow);
- } else if (comparePartitionKeys(outputRow, currentRow,
partitionColumnNames)) {
- // if they have the same partition key
- // keep adding them after checking
- // guardrails
- objectsOfASingleRac.add(currentRow);
- if (objectsOfASingleRac.size() > maxRowsMaterialized) {
- throw new MSQException(new TooManyRowsInAWindowFault(
- objectsOfASingleRac.size(),
- maxRowsMaterialized
- ));
- }
- } else {
- // key change noted
- // create rac from the rows seen before
- // run the operators on these rows and columns
- // clean up the object to hold the new rows only
- if (objectsOfASingleRac.size() > maxRowsMaterialized) {
- throw new MSQException(new TooManyRowsInAWindowFault(
- objectsOfASingleRac.size(),
- maxRowsMaterialized
- ));
- }
- RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow(
- objectsOfASingleRac,
- frameReader.signature()
- );
- runAllOpsOnSingleRac(rac);
- objectsOfASingleRac.clear();
- outputRow = currentRow.copy();
- return ReturnOrAwait.runAgain();
- }
- frameCursor.advance();
+ }
+
+ // Scenario 2: All window functions in the query have OVER() clause with a
PARTITION BY
+ if (frameCursor == null || frameCursor.isDone()) {
+ if (readableInputs.isEmpty()) {
+ return ReturnOrAwait.awaitAll(1);
+ } else if (inputChannel.canRead()) {
+ final Frame frame = inputChannel.read();
+ frameCursor = FrameProcessors.makeCursor(frame, frameReader);
+ makeRowSupplierFromFrameCursor();
+ } else if (inputChannel.isFinished()) {
+ // Handle any remaining data.
+ lastPartitionIndex = rowsToProcess.size() - 1;
+ processRowsUpToLastPartition();
+ return ReturnOrAwait.returnObject(Unit.instance());
+ } else {
+ return ReturnOrAwait.runAgain();
}
}
- return ReturnOrAwait.runAgain();
- }
- /**
- * @param singleRac Use this {@link RowsAndColumns} as a single input for
the operators to be run
- */
- private void runAllOpsOnSingleRac(RowsAndColumns singleRac)
- {
- Operator op = new Operator()
- {
- @Nullable
- @Override
- public Closeable goOrContinue(Closeable continuationObject, Receiver
receiver)
- {
- receiver.push(singleRac);
- if (singleRac.numRows() > maxRowsMaterialized) {
- throw new MSQException(new
TooManyRowsInAWindowFault(singleRac.numRows(), maxRowsMaterialized));
+ while (!frameCursor.isDone()) {
+ final ResultRow currentRow = rowSupplierFromFrameCursor.get();
+ if (outputRow == null) {
+ outputRow = currentRow;
+ rowsToProcess.add(currentRow);
+ } else if (comparePartitionKeys(outputRow, currentRow,
partitionColumnNames)) {
+ // Add current row to the same batch of rows for processing.
+ rowsToProcess.add(currentRow);
+ if (rowsToProcess.size() > maxRowsMaterialized) {
+ // We don't want to materialize more than maxRowsMaterialized rows
at any point in time, so process the pending batch.
+ processRowsUpToLastPartition();
}
- receiver.completed();
- return null;
+ ensureMaxRowsInAWindowConstraint(rowsToProcess.size());
+ } else {
+ lastPartitionIndex = rowsToProcess.size() - 1;
+ outputRow = currentRow.copy();
+ return ReturnOrAwait.runAgain();
}
- };
- runOperatorsAfterThis(op);
+ frameCursor.advance();
+ }
+ return ReturnOrAwait.runAgain();
}
/**
@@ -349,9 +280,7 @@ public class WindowOperatorQueryFrameProcessor implements
FrameProcessor<Object>
public Closeable goOrContinue(Closeable continuationObject, Receiver
receiver)
{
RowsAndColumns rac = new ConcatRowsAndColumns(listOfRacs);
- if (rac.numRows() > maxRowsMaterialized) {
- throw new MSQException(new TooManyRowsInAWindowFault(rac.numRows(),
maxRowsMaterialized));
- }
+ ensureMaxRowsInAWindowConstraint(rac.numRows());
receiver.push(rac);
receiver.completed();
return null;
@@ -496,12 +425,7 @@ public class WindowOperatorQueryFrameProcessor implements
FrameProcessor<Object>
null
);
// check if existing + newly added rows exceed guardrails
- if (frameRowsAndCols.size() + ldrc.numRows() > maxRowsMaterialized) {
- throw new MSQException(new TooManyRowsInAWindowFault(
- frameRowsAndCols.size() + ldrc.numRows(),
- maxRowsMaterialized
- ));
- }
+ ensureMaxRowsInAWindowConstraint(frameRowsAndCols.size() + ldrc.numRows());
frameRowsAndCols.add(ldrc);
}
@@ -533,4 +457,57 @@ public class WindowOperatorQueryFrameProcessor implements
FrameProcessor<Object>
}
return match == partitionColumnNames.size();
}
+
+ private void makeRowSupplierFromFrameCursor()
+ {
+ final ColumnSelectorFactory frameColumnSelectorFactory =
frameCursor.getColumnSelectorFactory();
+ final Supplier<Object>[] fieldSuppliers = new
Supplier[frameReader.signature().size()];
+ for (int i = 0; i < fieldSuppliers.length; i++) {
+ final ColumnValueSelector<?> selector =
+
frameColumnSelectorFactory.makeColumnValueSelector(frameReader.signature().getColumnName(i));
+ fieldSuppliers[i] = selector::getObject;
+ }
+ rowSupplierFromFrameCursor = () -> {
+ final ResultRow row = ResultRow.create(fieldSuppliers.length);
+ for (int i = 0; i < fieldSuppliers.length; i++) {
+ row.set(i, fieldSuppliers[i].get());
+ }
+ return row;
+ };
+ }
+
+ /**
+ * Process rows from rowsToProcess[0, lastPartitionIndex].
+ */
+ private void processRowsUpToLastPartition()
+ {
+ if (lastPartitionIndex == -1) {
+ return;
+ }
+
+ RowsAndColumns singleRac =
MapOfColumnsRowsAndColumns.fromResultRowTillIndex(
+ rowsToProcess,
+ frameReader.signature(),
+ lastPartitionIndex
+ );
+ ArrayList<RowsAndColumns> rowsAndColumns = new ArrayList<>();
+ rowsAndColumns.add(singleRac);
+ runAllOpsOnMultipleRac(rowsAndColumns);
+
+ // Remove elements in the range [0, lastPartitionIndex] from the list.
+ // The call to list.subList(a, b).clear() deletes the elements in the
range [a, b - 1],
+ // causing the remaining elements to shift and start from index 0.
+ rowsToProcess.subList(0, lastPartitionIndex + 1).clear();
+ lastPartitionIndex = -1;
+ }
+
+ private void ensureMaxRowsInAWindowConstraint(int numRowsInWindow)
+ {
+ if (numRowsInWindow > maxRowsMaterialized) {
+ throw new MSQException(new TooManyRowsInAWindowFault(
+ numRowsInWindow,
+ maxRowsMaterialized
+ ));
+ }
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/FrameProcessorTestBase.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/FrameProcessorTestBase.java
new file mode 100644
index 00000000000..75168e07ad8
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/FrameProcessorTestBase.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.frame.processor.FrameProcessorExecutor;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameStorageAdapter;
+import org.apache.druid.frame.testutil.FrameSequenceBuilder;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.msq.input.ReadableInput;
+import org.apache.druid.msq.kernel.StageId;
+import org.apache.druid.msq.kernel.StagePartition;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class FrameProcessorTestBase extends InitializedNullHandlingTest
+{
+ protected static final StagePartition STAGE_PARTITION = new
StagePartition(new StageId("q", 0), 0);
+
+ protected FrameProcessorExecutor exec;
+
+ @Before
+ public void setUp()
+ {
+ exec = new
FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")));
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ exec.getExecutorService().shutdownNow();
+ exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES);
+ }
+
+ protected ReadableInput makeChannelFromAdapter(
+ final StorageAdapter adapter,
+ final List<KeyColumn> keyColumns,
+ int rowsPerInputFrame
+ ) throws IOException
+ {
+ // Create a single, sorted frame.
+ final FrameSequenceBuilder singleFrameBuilder =
+ FrameSequenceBuilder.fromAdapter(adapter)
+ .frameType(FrameType.ROW_BASED)
+ .maxRowsPerFrame(Integer.MAX_VALUE)
+ .sortBy(keyColumns);
+
+ final RowSignature signature = singleFrameBuilder.signature();
+ final Frame frame =
Iterables.getOnlyElement(singleFrameBuilder.frames().toList());
+
+ // Split it up into frames that match rowsPerFrame. Set max size enough to
hold all rows we might ever want to use.
+ final BlockingQueueFrameChannel channel = new
BlockingQueueFrameChannel(10_000);
+
+ final FrameReader frameReader = FrameReader.create(signature);
+
+ final FrameSequenceBuilder frameSequenceBuilder =
+ FrameSequenceBuilder.fromAdapter(new FrameStorageAdapter(frame,
frameReader, Intervals.ETERNITY))
+ .frameType(FrameType.ROW_BASED)
+ .maxRowsPerFrame(rowsPerInputFrame);
+
+ final Sequence<Frame> frames = frameSequenceBuilder.frames();
+ frames.forEach(
+ f -> {
+ try {
+ channel.writable().write(f);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ );
+
+ channel.writable().close();
+ return ReadableInput.channel(channel.readable(),
FrameReader.create(signature), STAGE_PARTITION);
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java
new file mode 100644
index 00000000000..0b9b73facde
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.frame.allocation.HeapMemoryAllocator;
+import org.apache.druid.frame.allocation.SingleMemoryAllocatorFactory;
+import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.testutil.FrameTestUtil;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.indexing.CountingWritableFrameChannel;
+import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
+import org.apache.druid.msq.input.ReadableInput;
+import org.apache.druid.msq.test.LimitedFrameWriterFactory;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.operator.window.WindowOperatorFactory;
+import org.apache.druid.query.operator.window.ranking.WindowRowNumberProcessor;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.spec.LegacySegmentSpec;
+import org.apache.druid.segment.RowBasedSegment;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.timeline.SegmentId;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class WindowOperatorQueryFrameProcessorTest extends
FrameProcessorTestBase
+{
+ @Test
+ public void testBatchingOfPartitionByKeys_singleBatch() throws Exception
+ {
+ // With maxRowsMaterialized=100, we will get 1 frame:
+ // [1, 1, 2, 2, 2, 3, 3]
+ validateBatching(100, 1);
+ }
+
+ @Test
+ public void testBatchingOfPartitionByKeys_multipleBatches_1() throws
Exception
+ {
+ // With maxRowsMaterialized=5, we will get 2 frames:
+ // [1, 1, 2, 2, 2]
+ // [3, 3]
+ validateBatching(5, 2);
+ }
+
+ @Test
+ public void testBatchingOfPartitionByKeys_multipleBatches_2() throws
Exception
+ {
+ // With maxRowsMaterialized=4, we will get 3 frames:
+ // [1, 1]
+ // [2, 2, 2]
+ // [3, 3]
+ validateBatching(4, 3);
+ }
+
+ @Test
+ public void testBatchingOfPartitionByKeys_TooManyRowsInAWindowFault()
+ {
+ final RuntimeException e = Assert.assertThrows(
+ RuntimeException.class,
+ () -> validateBatching(2, 3)
+ );
+ MatcherAssert.assertThat(
+ ((MSQException) e.getCause().getCause()).getFault(),
+ CoreMatchers.instanceOf(TooManyRowsInAWindowFault.class)
+ );
+ Assert.assertTrue(e.getMessage().contains("TooManyRowsInAWindow: Too many
rows in a window (requested = 3, max = 2)"));
+ }
+
+ public void validateBatching(int maxRowsMaterialized, int numFramesWritten)
throws Exception
+ {
+ final ReadableInput factChannel = buildWindowTestInputChannel();
+
+ RowSignature inputSignature = RowSignature.builder()
+ .add("cityName",
ColumnType.STRING)
+ .add("added", ColumnType.LONG)
+ .build();
+
+ FrameReader frameReader = FrameReader.create(inputSignature);
+
+ RowSignature outputSignature = RowSignature.builder()
+ .addAll(inputSignature)
+ .add("w0", ColumnType.LONG)
+ .build();
+
+ final BlockingQueueFrameChannel outputChannel =
BlockingQueueFrameChannel.minimal();
+ ChannelCounters channelCounters = new ChannelCounters();
+ final CountingWritableFrameChannel countingWritableFrameChannel = new
CountingWritableFrameChannel(
+ outputChannel.writable(),
+ channelCounters,
+ 0
+ );
+
+ final WindowOperatorQuery query = new WindowOperatorQuery(
+ new QueryDataSource(
+ Druids.newScanQueryBuilder()
+ .dataSource(new TableDataSource("test"))
+ .intervals(new LegacySegmentSpec(Intervals.ETERNITY))
+ .columns("cityName", "added")
+
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+ .context(new HashMap<>())
+ .build()),
+ new LegacySegmentSpec(Intervals.ETERNITY),
+ new HashMap<>(),
+ outputSignature,
+ ImmutableList.of(
+ new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
+ ),
+ ImmutableList.of()
+ );
+
+ // Limit output frames to 1 row to ensure we test edge cases
+ final FrameWriterFactory frameWriterFactory = new
LimitedFrameWriterFactory(
+ FrameWriters.makeRowBasedFrameWriterFactory(
+ new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
+ outputSignature,
+ Collections.emptyList(),
+ false
+ ),
+ 100
+ );
+
+ final WindowOperatorQueryFrameProcessor processor = new
WindowOperatorQueryFrameProcessor(
+ query,
+ factChannel.getChannel(),
+ countingWritableFrameChannel,
+ frameWriterFactory,
+ frameReader,
+ new ObjectMapper(),
+ ImmutableList.of(
+ new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
+ ),
+ inputSignature,
+ maxRowsMaterialized,
+ ImmutableList.of("added")
+ );
+
+ exec.runFully(processor, null);
+
+ final Sequence<List<Object>> rowsFromProcessor =
FrameTestUtil.readRowsFromFrameChannel(
+ outputChannel.readable(),
+ FrameReader.create(outputSignature)
+ );
+
+ final List<List<Object>> rows = rowsFromProcessor.toList();
+
+ long actualNumFrames =
Arrays.stream(channelCounters.snapshot().getFrames()).findFirst().getAsLong();
+ Assert.assertEquals(numFramesWritten, actualNumFrames);
+ Assert.assertEquals(7, rows.size());
+ }
+
+ private ReadableInput buildWindowTestInputChannel() throws IOException
+ {
+ RowSignature inputSignature = RowSignature.builder()
+ .add("cityName",
ColumnType.STRING)
+ .add("added", ColumnType.LONG)
+ .build();
+
+ List<Map<String, Object>> rows = ImmutableList.of(
+ ImmutableMap.of("added", 1L, "cityName", "city1"),
+ ImmutableMap.of("added", 1L, "cityName", "city2"),
+ ImmutableMap.of("added", 2L, "cityName", "city3"),
+ ImmutableMap.of("added", 2L, "cityName", "city4"),
+ ImmutableMap.of("added", 2L, "cityName", "city5"),
+ ImmutableMap.of("added", 3L, "cityName", "city6"),
+ ImmutableMap.of("added", 3L, "cityName", "city7")
+ );
+
+ return makeChannelFromRows(rows, inputSignature, Collections.emptyList());
+ }
+
+ private ReadableInput makeChannelFromRows(
+ List<Map<String, Object>> rows,
+ RowSignature signature,
+ List<KeyColumn> keyColumns
+ ) throws IOException
+ {
+ RowBasedSegment<Map<String, Object>> segment = new RowBasedSegment<>(
+ SegmentId.dummy("test"),
+ Sequences.simple(rows),
+ columnName -> m -> m.get(columnName),
+ signature
+ );
+
+ return makeChannelFromAdapter(segment.asStorageAdapter(), keyColumns);
+ }
+
+ private ReadableInput makeChannelFromAdapter(
+ final StorageAdapter adapter,
+ final List<KeyColumn> keyColumns
+ ) throws IOException
+ {
+ return makeChannelFromAdapter(adapter, keyColumns, 1000);
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessorTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessorTest.java
index 0094aeb7369..4a8ec435873 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessorTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessorTest.java
@@ -21,15 +21,11 @@ package org.apache.druid.msq.querykit.common;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.guava.FutureUtils;
-import org.apache.druid.frame.Frame;
-import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.allocation.SingleMemoryAllocatorFactory;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
@@ -37,23 +33,17 @@ import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.ReadableNilFrameChannel;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.key.KeyOrder;
-import org.apache.druid.frame.processor.FrameProcessorExecutor;
import org.apache.druid.frame.read.FrameReader;
-import org.apache.druid.frame.segment.FrameStorageAdapter;
-import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.frame.write.FrameWriters;
-import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Unit;
-import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyRowsWithSameKeyFault;
import org.apache.druid.msq.input.ReadableInput;
-import org.apache.druid.msq.kernel.StageId;
-import org.apache.druid.msq.kernel.StagePartition;
+import org.apache.druid.msq.querykit.FrameProcessorTestBase;
import org.apache.druid.msq.test.LimitedFrameWriterFactory;
import org.apache.druid.segment.RowBasedSegment;
import org.apache.druid.segment.StorageAdapter;
@@ -61,15 +51,12 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinTestHelper;
import org.apache.druid.segment.join.JoinType;
-import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.SegmentId;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
-import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -82,19 +69,15 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
@RunWith(Parameterized.class)
-public class SortMergeJoinFrameProcessorTest extends
InitializedNullHandlingTest
+public class SortMergeJoinFrameProcessorTest extends FrameProcessorTestBase
{
- private static final StagePartition STAGE_PARTITION = new StagePartition(new
StageId("q", 0), 0);
private static final long MAX_BUFFERED_BYTES = 10_000_000;
private final int rowsPerInputFrame;
private final int rowsPerOutputFrame;
- private FrameProcessorExecutor exec;
-
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -118,19 +101,6 @@ public class SortMergeJoinFrameProcessorTest extends
InitializedNullHandlingTest
return constructors;
}
- @Before
- public void setUp()
- {
- exec = new
FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")));
- }
-
- @After
- public void tearDown() throws Exception
- {
- exec.getExecutorService().shutdownNow();
- exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES);
- }
-
@Test
public void testLeftJoinEmptyLeftSide() throws Exception
{
@@ -1531,40 +1501,7 @@ public class SortMergeJoinFrameProcessorTest extends
InitializedNullHandlingTest
final List<KeyColumn> keyColumns
) throws IOException
{
- // Create a single, sorted frame.
- final FrameSequenceBuilder singleFrameBuilder =
- FrameSequenceBuilder.fromAdapter(adapter)
- .frameType(FrameType.ROW_BASED)
- .maxRowsPerFrame(Integer.MAX_VALUE)
- .sortBy(keyColumns);
-
- final RowSignature signature = singleFrameBuilder.signature();
- final Frame frame =
Iterables.getOnlyElement(singleFrameBuilder.frames().toList());
-
- // Split it up into frames that match rowsPerFrame. Set max size enough to
hold all rows we might ever want to use.
- final BlockingQueueFrameChannel channel = new
BlockingQueueFrameChannel(10_000);
-
- final FrameReader frameReader = FrameReader.create(signature);
-
- final FrameSequenceBuilder frameSequenceBuilder =
- FrameSequenceBuilder.fromAdapter(new FrameStorageAdapter(frame,
frameReader, Intervals.ETERNITY))
- .frameType(FrameType.ROW_BASED)
- .maxRowsPerFrame(rowsPerInputFrame);
-
- final Sequence<Frame> frames = frameSequenceBuilder.frames();
- frames.forEach(
- f -> {
- try {
- channel.writable().write(f);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- );
-
- channel.writable().close();
- return ReadableInput.channel(channel.readable(),
FrameReader.create(signature), STAGE_PARTITION);
+ return makeChannelFromAdapter(adapter, keyColumns, rowsPerInputFrame);
}
private FrameWriterFactory makeFrameWriterFactory(final RowSignature
signature)
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/results/QueryResultsFrameProcessorTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/results/QueryResultsFrameProcessorTest.java
index d89b62977a3..e300a416705 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/results/QueryResultsFrameProcessorTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/results/QueryResultsFrameProcessorTest.java
@@ -20,54 +20,32 @@
package org.apache.druid.msq.querykit.results;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
-import org.apache.druid.frame.processor.FrameProcessorExecutor;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.java.util.common.Unit;
-import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.StagePartition;
+import org.apache.druid.msq.querykit.FrameProcessorTestBase;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
-import org.apache.druid.testing.InitializedNullHandlingTest;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-public class QueryResultsFrameProcessorTest extends InitializedNullHandlingTest
+public class QueryResultsFrameProcessorTest extends FrameProcessorTestBase
{
- private FrameProcessorExecutor exec;
-
- @Before
- public void setUp()
- {
- exec = new
FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")));
- }
-
- @After
- public void tearDown() throws Exception
- {
- exec.getExecutorService().shutdownNow();
- exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES);
- }
-
-
@Test
public void sanityTest() throws ExecutionException, InterruptedException,
IOException
{
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
index e3272cbcee7..205f3495807 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
@@ -20,7 +20,6 @@
package org.apache.druid.msq.querykit.scan;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.frame.Frame;
@@ -30,7 +29,6 @@ import org.apache.druid.frame.allocation.HeapMemoryAllocator;
import org.apache.druid.frame.allocation.SingleMemoryAllocatorFactory;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
-import org.apache.druid.frame.processor.FrameProcessorExecutor;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.testutil.FrameTestUtil;
@@ -39,11 +37,11 @@ import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Unit;
-import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.StagePartition;
+import org.apache.druid.msq.querykit.FrameProcessorTestBase;
import org.apache.druid.msq.test.LimitedFrameWriterFactory;
import org.apache.druid.query.Druids;
import org.apache.druid.query.scan.ScanQuery;
@@ -51,35 +49,16 @@ import
org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
-import org.apache.druid.testing.InitializedNullHandlingTest;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-public class ScanQueryFrameProcessorTest extends InitializedNullHandlingTest
+public class ScanQueryFrameProcessorTest extends FrameProcessorTestBase
{
- private FrameProcessorExecutor exec;
-
- @Before
- public void setUp()
- {
- exec = new
FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")));
- }
-
- @After
- public void tearDown() throws Exception
- {
- exec.getExecutorService().shutdownNow();
- exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES);
- }
-
@Test
public void test_runWithInputChannel() throws Exception
{
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
index 42972f9340d..29f092f6744 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
@@ -84,11 +84,16 @@ public class MapOfColumnsRowsAndColumns implements
RowsAndColumns
}
public static MapOfColumnsRowsAndColumns fromResultRow(ArrayList<ResultRow>
objs, RowSignature signature)
+ {
+ return fromResultRowTillIndex(objs, signature, objs.size() - 1);
+ }
+
+ public static MapOfColumnsRowsAndColumns
fromResultRowTillIndex(ArrayList<ResultRow> objs, RowSignature signature, int
index)
{
final Builder bob = builder();
if (!objs.isEmpty()) {
- Object[][] columnOriented = new
Object[objs.get(0).length()][objs.size()];
- for (int i = 0; i < objs.size(); ++i) {
+ Object[][] columnOriented = new Object[objs.get(0).length()][index + 1];
+ for (int i = 0; i <= index; ++i) {
for (int j = 0; j < objs.get(i).length(); ++j) {
columnOriented[j][i] = objs.get(i).get(j);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]