This is an automated email from the ASF dual-hosted git repository.

adarshsanjeev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new fbd305af0f1 MSQ WF: Batch multiple PARTITION BY keys for processing 
(#16823)
fbd305af0f1 is described below

commit fbd305af0f1d300f27d2ff8e67cf9937c55c56e2
Author: Akshat Jain <[email protected]>
AuthorDate: Wed Aug 28 11:32:47 2024 +0530

    MSQ WF: Batch multiple PARTITION BY keys for processing (#16823)
    
    Currently, if we have a query with window function having PARTITION BY xyz, 
and we have a million unique values for xyz each having 1 row, we'd end up 
creating a million individual RACs for processing, each having a single row. 
This is unnecessary, and we can batch the PARTITION BY keys together for 
processing, and process them only when we can't batch further rows to adhere to 
maxRowsMaterialized config.
    
    The previous iteration of this PR was simplifying 
WindowOperatorQueryFrameProcessor to run all operators on all the rows instead 
of creating smaller RACs per partition by key. That approach was discarded in 
favor of the batching approach, and the details are summarized here: #16823 
(comment).
---
 .../WindowOperatorQueryFrameProcessor.java         | 329 ++++++++++-----------
 .../druid/msq/querykit/FrameProcessorTestBase.java | 108 +++++++
 .../WindowOperatorQueryFrameProcessorTest.java     | 235 +++++++++++++++
 .../common/SortMergeJoinFrameProcessorTest.java    |  69 +----
 .../results/QueryResultsFrameProcessorTest.java    |  26 +-
 .../querykit/scan/ScanQueryFrameProcessorTest.java |  25 +-
 .../rowsandcols/MapOfColumnsRowsAndColumns.java    |   9 +-
 7 files changed, 510 insertions(+), 291 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
index 6b28c0263a8..3dc62f3a60d 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
@@ -80,7 +80,6 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
   private final WritableFrameChannel outputChannel;
   private final FrameWriterFactory frameWriterFactory;
   private final FrameReader frameReader;
-  private final ArrayList<ResultRow> objectsOfASingleRac;
   private final int maxRowsMaterialized;
   private long currentAllocatorCapacity; // Used for generating 
FrameRowTooLargeException if needed
   private Cursor frameCursor = null;
@@ -95,6 +94,9 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
   // Type strategies are pushed in the same order as column types in 
frameReader.signature()
   private final NullableTypeStrategy[] typeStrategies;
 
+  private final ArrayList<ResultRow> rowsToProcess;
+  private int lastPartitionIndex = -1;
+
   public WindowOperatorQueryFrameProcessor(
       WindowOperatorQuery query,
       ReadableFrameChannel inputChannel,
@@ -116,7 +118,7 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
     this.query = query;
     this.frameRowsAndCols = new ArrayList<>();
     this.resultRowAndCols = new ArrayList<>();
-    this.objectsOfASingleRac = new ArrayList<>();
+    this.rowsToProcess = new ArrayList<>();
     this.maxRowsMaterialized = maxRowsMaterializedInWindow;
     this.partitionColumnNames = partitionColumnNames;
 
@@ -153,188 +155,117 @@ public class WindowOperatorQueryFrameProcessor 
implements FrameProcessor<Object>
   public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
   {
     /*
-     *
-     * PARTITION BY A ORDER BY B
-     *
-     * Frame 1   -> rac1
-     * A  B
-     * 1, 2
-     * 1, 3
-     * 2, 1 --> key changed
-     * 2, 2
-     *
-     *
-     * Frame 2 -> rac2
-     * 3, 1 --> key changed
-     * 3, 2
-     * 3, 3
-     * 3, 4
-     *
-     * Frame 3 -> rac3
-     *
-     * 3, 5
-     * 3, 6
-     * 4, 1 --> key changed
-     * 4, 2
-     *
-     * In case of empty OVER clause, all these racs need to be added to a 
single rows and columns
-     * to be processed. The way we can do this is to use a ConcatRowsAndColumns
-     * ConcatRC [rac1, rac2, rac3]
-     * Run all ops on this
-     *
-     *
-     * The flow would look like:
-     * 1. Validate if the operator doesn't have any OVER() clause with 
PARTITION BY for this stage.
-     * 2. If 1 is true make a giant rows and columns (R&C) using concat as 
shown above
-     *    Let all operators run amok on that R&C
-     * 3. If 1 is false
-     *    Read a frame
-     *    keep the older row in a class variable
-     *    check row by row and compare current with older row to check if 
partition boundary is reached
-     *    when frame partition by changes
-     *    create R&C for those particular set of columns, they would have the 
same partition key
-     *    output will be a single R&C
-     *    write to output channel
-     *
-     *
-     *  Future thoughts: {@link https://github.com/apache/druid/issues/16126}
-     *
-     *  1. We are writing 1 partition to each frame in this way. In case of 
high cardinality data
-     *      we will be making a large number of small frames. We can have a 
check to keep size of frame to a value
-     *      say 20k rows and keep on adding to the same pending frame and not 
create a new frame
-     *
-     *  2. Current approach with R&C and operators materialize a single R&C 
for processing. In case of data
-     *     with low cardinality a single R&C might be too big to consume. Same 
for the case of empty OVER() clause
-     *     Most of the window operations like SUM(), RANK(), RANGE() etc. can 
be made with 2 passes of the data.
-     *     We might think to reimplement them in the MSQ way so that we do not 
have to materialize so much data
+     There are 2 scenarios:
+
+     *** Scenario 1: Query has atleast one window function with an OVER() 
clause without a PARTITION BY ***
+
+     In this scenario, we add all the RACs to a single RowsAndColumns to be 
processed. We do it via ConcatRowsAndColumns, and run all the operators on the 
ConcatRowsAndColumns.
+     This is done because we anyway need to run the operators on the entire 
set of rows when we have an OVER() clause without a PARTITION BY.
+     This scenario corresponds to partitionColumnNames.isEmpty()=true code 
flow.
+
+     *** Scenario 2: All window functions in the query have OVER() clause with 
a PARTITION BY ***
+
+     In this scenario, we need to process rows for each PARTITION BY group 
together, but we can batch multiple PARTITION BY keys into the same RAC before 
passing it to the operators for processing.
+     Batching is fine since the operators list would have the required 
NaivePartitioningOperatorFactory to segregate each PARTITION BY group during 
the processing.
+
+     The flow for this scenario can be summarised as following:
+     1. Frame Reading and Cursor Initialization: We start by reading a frame 
from the inputChannel and initializing frameCursor to iterate over the rows in 
that frame.
+     2. Row Comparison: For each row in the frame, we decide whether it 
belongs to the same PARTITION BY group as the previous row.
+                        This is determined by comparePartitionKeys() method.
+                        Please refer to the Javadoc of that method for further 
details and an example illustration.
+        2.1. If the PARTITION BY columns of current row matches the PARTITION 
BY columns of the previous row,
+             they belong to the same PARTITION BY group, and gets added to 
rowsToProcess.
+             If the number of total rows materialized exceed 
maxRowsMaterialized, we process the pending batch via 
processRowsUpToLastPartition() method.
+        2.2. If they don't match, then we have reached a partition boundary.
+             In this case, we update the value for lastPartitionIndex.
+     3. End of Input: If the input channel is finished, any remaining rows in 
rowsToProcess are processed.
+
+     *Illustration of Row Comparison step*
+
+     Let's say we have window_function() OVER (PARTITION BY A ORDER BY B) in 
our query, and we get 3 frames in the input channel to process.
+
+     Frame 1
+     A, B
+     1, 2
+     1, 3
+     2, 1 --> PARTITION BY key (column A) changed from 1 to 2.
+     2, 2
+
+     Frame 2
+     A, B
+     3, 1 --> PARTITION BY key (column A) changed from 2 to 3.
+     3, 2
+     3, 3
+     3, 4
+
+     Frame 3
+     A, B
+     3, 5
+     3, 6
+     4, 1 --> PARTITION BY key (column A) changed from 3 to 4.
+     4, 2
+
+     *Why batching?*
+     We batch multiple PARTITION BY keys for processing together to avoid the 
overhead of creating different RACs for each PARTITION BY keys, as that would 
be unnecessary in scenarios where we have a large number of PARTITION BY keys, 
but each key having a single row.
+
+     *Future thoughts: https://github.com/apache/druid/issues/16126*
+     Current approach with R&C and operators materialize a single R&C for 
processing. In case of data with low cardinality a single R&C might be too big 
to consume. Same for the case of empty OVER() clause.
+     Most of the window operations like SUM(), RANK(), RANGE() etc. can be 
made with 2 passes of the data. We might think to reimplement them in the MSQ 
way so that we do not have to materialize so much data.
      */
 
     if (partitionColumnNames.isEmpty()) {
-      // If we do not have any OVER() clause with PARTITION BY for this stage.
-      // Bring all data to a single executor for processing.
-      // Convert each frame to RAC.
-      // Concatenate all the racs to make a giant RAC.
-      // Let all operators run on the giant RAC until channel is finished.
+      // Scenario 1: Query has atleast one window function with an OVER() 
clause without a PARTITION BY.
       if (inputChannel.canRead()) {
         final Frame frame = inputChannel.read();
         convertRowFrameToRowsAndColumns(frame);
+        return ReturnOrAwait.runAgain();
       } else if (inputChannel.isFinished()) {
         runAllOpsOnMultipleRac(frameRowsAndCols);
         return ReturnOrAwait.returnObject(Unit.instance());
       } else {
         return ReturnOrAwait.awaitAll(inputChannels().size());
       }
-      return ReturnOrAwait.runAgain();
-    } else {
-      // Aha, you found a PARTITION BY and maybe ORDER BY TO
-      // PARTITION BY can also be on multiple keys
-      // typically the last stage would already partition and sort for you
-      // figure out frame boundaries and convert each distinct group to a rac
-      // then run the windowing operator only on each rac
-      if (frameCursor == null || frameCursor.isDone()) {
-        if (readableInputs.isEmpty()) {
-          return ReturnOrAwait.awaitAll(1);
-        } else if (inputChannel.canRead()) {
-          final Frame frame = inputChannel.read();
-          frameCursor = FrameProcessors.makeCursor(frame, frameReader);
-          final ColumnSelectorFactory frameColumnSelectorFactory = 
frameCursor.getColumnSelectorFactory();
-          final Supplier<Object>[] fieldSuppliers = new 
Supplier[frameReader.signature().size()];
-          for (int i = 0; i < fieldSuppliers.length; i++) {
-            final ColumnValueSelector<?> selector =
-                
frameColumnSelectorFactory.makeColumnValueSelector(frameReader.signature().getColumnName(i));
-            fieldSuppliers[i] = selector::getObject;
-
-          }
-          rowSupplierFromFrameCursor = () -> {
-            final ResultRow row = ResultRow.create(fieldSuppliers.length);
-            for (int i = 0; i < fieldSuppliers.length; i++) {
-              row.set(i, fieldSuppliers[i].get());
-            }
-            return row;
-          };
-        } else if (inputChannel.isFinished()) {
-          // reaached end of channel
-          // if there is data remaining
-          // write it into a rac
-          // and run operators on it
-          if (!objectsOfASingleRac.isEmpty()) {
-            if (objectsOfASingleRac.size() > maxRowsMaterialized) {
-              throw new MSQException(new 
TooManyRowsInAWindowFault(objectsOfASingleRac.size(), maxRowsMaterialized));
-            }
-            RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow(
-                objectsOfASingleRac,
-                frameReader.signature()
-            );
-            runAllOpsOnSingleRac(rac);
-            objectsOfASingleRac.clear();
-          }
-          return ReturnOrAwait.returnObject(Unit.instance());
-        } else {
-          return ReturnOrAwait.runAgain();
-        }
-      }
-      while (!frameCursor.isDone()) {
-        final ResultRow currentRow = rowSupplierFromFrameCursor.get();
-        if (outputRow == null) {
-          outputRow = currentRow;
-          objectsOfASingleRac.add(currentRow);
-        } else if (comparePartitionKeys(outputRow, currentRow, 
partitionColumnNames)) {
-          // if they have the same partition key
-          // keep adding them after checking
-          // guardrails
-          objectsOfASingleRac.add(currentRow);
-          if (objectsOfASingleRac.size() > maxRowsMaterialized) {
-            throw new MSQException(new TooManyRowsInAWindowFault(
-                objectsOfASingleRac.size(),
-                maxRowsMaterialized
-            ));
-          }
-        } else {
-          // key change noted
-          // create rac from the rows seen before
-          // run the operators on these rows and columns
-          // clean up the object to hold the new rows only
-          if (objectsOfASingleRac.size() > maxRowsMaterialized) {
-            throw new MSQException(new TooManyRowsInAWindowFault(
-                objectsOfASingleRac.size(),
-                maxRowsMaterialized
-            ));
-          }
-          RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow(
-              objectsOfASingleRac,
-              frameReader.signature()
-          );
-          runAllOpsOnSingleRac(rac);
-          objectsOfASingleRac.clear();
-          outputRow = currentRow.copy();
-          return ReturnOrAwait.runAgain();
-        }
-        frameCursor.advance();
+    }
+
+    // Scenario 2: All window functions in the query have OVER() clause with a 
PARTITION BY
+    if (frameCursor == null || frameCursor.isDone()) {
+      if (readableInputs.isEmpty()) {
+        return ReturnOrAwait.awaitAll(1);
+      } else if (inputChannel.canRead()) {
+        final Frame frame = inputChannel.read();
+        frameCursor = FrameProcessors.makeCursor(frame, frameReader);
+        makeRowSupplierFromFrameCursor();
+      } else if (inputChannel.isFinished()) {
+        // Handle any remaining data.
+        lastPartitionIndex = rowsToProcess.size() - 1;
+        processRowsUpToLastPartition();
+        return ReturnOrAwait.returnObject(Unit.instance());
+      } else {
+        return ReturnOrAwait.runAgain();
       }
     }
-    return ReturnOrAwait.runAgain();
-  }
 
-  /**
-   * @param singleRac Use this {@link RowsAndColumns} as a single input for 
the operators to be run
-   */
-  private void runAllOpsOnSingleRac(RowsAndColumns singleRac)
-  {
-    Operator op = new Operator()
-    {
-      @Nullable
-      @Override
-      public Closeable goOrContinue(Closeable continuationObject, Receiver 
receiver)
-      {
-        receiver.push(singleRac);
-        if (singleRac.numRows() > maxRowsMaterialized) {
-          throw new MSQException(new 
TooManyRowsInAWindowFault(singleRac.numRows(), maxRowsMaterialized));
+    while (!frameCursor.isDone()) {
+      final ResultRow currentRow = rowSupplierFromFrameCursor.get();
+      if (outputRow == null) {
+        outputRow = currentRow;
+        rowsToProcess.add(currentRow);
+      } else if (comparePartitionKeys(outputRow, currentRow, 
partitionColumnNames)) {
+        // Add current row to the same batch of rows for processing.
+        rowsToProcess.add(currentRow);
+        if (rowsToProcess.size() > maxRowsMaterialized) {
+          // We don't want to materialize more than maxRowsMaterialized rows 
at any point in time, so process the pending batch.
+          processRowsUpToLastPartition();
         }
-        receiver.completed();
-        return null;
+        ensureMaxRowsInAWindowConstraint(rowsToProcess.size());
+      } else {
+        lastPartitionIndex = rowsToProcess.size() - 1;
+        outputRow = currentRow.copy();
+        return ReturnOrAwait.runAgain();
       }
-    };
-    runOperatorsAfterThis(op);
+      frameCursor.advance();
+    }
+    return ReturnOrAwait.runAgain();
   }
 
   /**
@@ -349,9 +280,7 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
       public Closeable goOrContinue(Closeable continuationObject, Receiver 
receiver)
       {
         RowsAndColumns rac = new ConcatRowsAndColumns(listOfRacs);
-        if (rac.numRows() > maxRowsMaterialized) {
-          throw new MSQException(new TooManyRowsInAWindowFault(rac.numRows(), 
maxRowsMaterialized));
-        }
+        ensureMaxRowsInAWindowConstraint(rac.numRows());
         receiver.push(rac);
         receiver.completed();
         return null;
@@ -496,12 +425,7 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
         null
     );
     // check if existing + newly added rows exceed guardrails
-    if (frameRowsAndCols.size() + ldrc.numRows() > maxRowsMaterialized) {
-      throw new MSQException(new TooManyRowsInAWindowFault(
-          frameRowsAndCols.size() + ldrc.numRows(),
-          maxRowsMaterialized
-      ));
-    }
+    ensureMaxRowsInAWindowConstraint(frameRowsAndCols.size() + ldrc.numRows());
     frameRowsAndCols.add(ldrc);
   }
 
@@ -533,4 +457,57 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
     }
     return match == partitionColumnNames.size();
   }
+
+  private void makeRowSupplierFromFrameCursor()
+  {
+    final ColumnSelectorFactory frameColumnSelectorFactory = 
frameCursor.getColumnSelectorFactory();
+    final Supplier<Object>[] fieldSuppliers = new 
Supplier[frameReader.signature().size()];
+    for (int i = 0; i < fieldSuppliers.length; i++) {
+      final ColumnValueSelector<?> selector =
+          
frameColumnSelectorFactory.makeColumnValueSelector(frameReader.signature().getColumnName(i));
+      fieldSuppliers[i] = selector::getObject;
+    }
+    rowSupplierFromFrameCursor = () -> {
+      final ResultRow row = ResultRow.create(fieldSuppliers.length);
+      for (int i = 0; i < fieldSuppliers.length; i++) {
+        row.set(i, fieldSuppliers[i].get());
+      }
+      return row;
+    };
+  }
+
+  /**
+   * Process rows from rowsToProcess[0, lastPartitionIndex].
+   */
+  private void processRowsUpToLastPartition()
+  {
+    if (lastPartitionIndex == -1) {
+      return;
+    }
+
+    RowsAndColumns singleRac = 
MapOfColumnsRowsAndColumns.fromResultRowTillIndex(
+        rowsToProcess,
+        frameReader.signature(),
+        lastPartitionIndex
+    );
+    ArrayList<RowsAndColumns> rowsAndColumns = new ArrayList<>();
+    rowsAndColumns.add(singleRac);
+    runAllOpsOnMultipleRac(rowsAndColumns);
+
+    // Remove elements in the range [0, lastPartitionIndex] from the list.
+    // The call to list.subList(a, b).clear() deletes the elements in the 
range [a, b - 1],
+    // causing the remaining elements to shift and start from index 0.
+    rowsToProcess.subList(0, lastPartitionIndex + 1).clear();
+    lastPartitionIndex = -1;
+  }
+
+  private void ensureMaxRowsInAWindowConstraint(int numRowsInWindow)
+  {
+    if (numRowsInWindow > maxRowsMaterialized) {
+      throw new MSQException(new TooManyRowsInAWindowFault(
+          numRowsInWindow,
+          maxRowsMaterialized
+      ));
+    }
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/FrameProcessorTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/FrameProcessorTestBase.java
new file mode 100644
index 00000000000..75168e07ad8
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/FrameProcessorTestBase.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.frame.processor.FrameProcessorExecutor;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameStorageAdapter;
+import org.apache.druid.frame.testutil.FrameSequenceBuilder;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.msq.input.ReadableInput;
+import org.apache.druid.msq.kernel.StageId;
+import org.apache.druid.msq.kernel.StagePartition;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class FrameProcessorTestBase extends InitializedNullHandlingTest
+{
+  protected static final StagePartition STAGE_PARTITION = new 
StagePartition(new StageId("q", 0), 0);
+
+  protected FrameProcessorExecutor exec;
+
+  @Before
+  public void setUp()
+  {
+    exec = new 
FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")));
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    exec.getExecutorService().shutdownNow();
+    exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES);
+  }
+
+  protected ReadableInput makeChannelFromAdapter(
+      final StorageAdapter adapter,
+      final List<KeyColumn> keyColumns,
+      int rowsPerInputFrame
+  ) throws IOException
+  {
+    // Create a single, sorted frame.
+    final FrameSequenceBuilder singleFrameBuilder =
+        FrameSequenceBuilder.fromAdapter(adapter)
+                            .frameType(FrameType.ROW_BASED)
+                            .maxRowsPerFrame(Integer.MAX_VALUE)
+                            .sortBy(keyColumns);
+
+    final RowSignature signature = singleFrameBuilder.signature();
+    final Frame frame = 
Iterables.getOnlyElement(singleFrameBuilder.frames().toList());
+
+    // Split it up into frames that match rowsPerFrame. Set max size enough to 
hold all rows we might ever want to use.
+    final BlockingQueueFrameChannel channel = new 
BlockingQueueFrameChannel(10_000);
+
+    final FrameReader frameReader = FrameReader.create(signature);
+
+    final FrameSequenceBuilder frameSequenceBuilder =
+        FrameSequenceBuilder.fromAdapter(new FrameStorageAdapter(frame, 
frameReader, Intervals.ETERNITY))
+                            .frameType(FrameType.ROW_BASED)
+                            .maxRowsPerFrame(rowsPerInputFrame);
+
+    final Sequence<Frame> frames = frameSequenceBuilder.frames();
+    frames.forEach(
+        f -> {
+          try {
+            channel.writable().write(f);
+          }
+          catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+    );
+
+    channel.writable().close();
+    return ReadableInput.channel(channel.readable(), 
FrameReader.create(signature), STAGE_PARTITION);
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java
new file mode 100644
index 00000000000..0b9b73facde
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.frame.allocation.HeapMemoryAllocator;
+import org.apache.druid.frame.allocation.SingleMemoryAllocatorFactory;
+import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.testutil.FrameTestUtil;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.indexing.CountingWritableFrameChannel;
+import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
+import org.apache.druid.msq.input.ReadableInput;
+import org.apache.druid.msq.test.LimitedFrameWriterFactory;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.operator.window.WindowOperatorFactory;
+import org.apache.druid.query.operator.window.ranking.WindowRowNumberProcessor;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.spec.LegacySegmentSpec;
+import org.apache.druid.segment.RowBasedSegment;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.timeline.SegmentId;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class WindowOperatorQueryFrameProcessorTest extends 
FrameProcessorTestBase
+{
+  @Test
+  public void testBatchingOfPartitionByKeys_singleBatch() throws Exception
+  {
+    // With maxRowsMaterialized=100, we will get 1 frame:
+    // [1, 1, 2, 2, 2, 3, 3]
+    validateBatching(100, 1);
+  }
+
+  @Test
+  public void testBatchingOfPartitionByKeys_multipleBatches_1() throws 
Exception
+  {
+    // With maxRowsMaterialized=5, we will get 2 frames:
+    // [1, 1, 2, 2, 2]
+    // [3, 3]
+    validateBatching(5, 2);
+  }
+
+  @Test
+  public void testBatchingOfPartitionByKeys_multipleBatches_2() throws 
Exception
+  {
+    // With maxRowsMaterialized=4, we will get 3 frames:
+    // [1, 1]
+    // [2, 2, 2]
+    // [3, 3]
+    validateBatching(4, 3);
+  }
+
+  @Test
+  public void testBatchingOfPartitionByKeys_TooManyRowsInAWindowFault()
+  {
+    final RuntimeException e = Assert.assertThrows(
+        RuntimeException.class,
+        () -> validateBatching(2, 3)
+    );
+    MatcherAssert.assertThat(
+        ((MSQException) e.getCause().getCause()).getFault(),
+        CoreMatchers.instanceOf(TooManyRowsInAWindowFault.class)
+    );
+    Assert.assertTrue(e.getMessage().contains("TooManyRowsInAWindow: Too many 
rows in a window (requested = 3, max = 2)"));
+  }
+
+  public void validateBatching(int maxRowsMaterialized, int numFramesWritten) 
throws Exception
+  {
+    final ReadableInput factChannel = buildWindowTestInputChannel();
+
+    RowSignature inputSignature = RowSignature.builder()
+                                              .add("cityName", 
ColumnType.STRING)
+                                              .add("added", ColumnType.LONG)
+                                              .build();
+
+    FrameReader frameReader = FrameReader.create(inputSignature);
+
+    RowSignature outputSignature = RowSignature.builder()
+                                               .addAll(inputSignature)
+                                               .add("w0", ColumnType.LONG)
+                                               .build();
+
+    final BlockingQueueFrameChannel outputChannel = 
BlockingQueueFrameChannel.minimal();
+    ChannelCounters channelCounters = new ChannelCounters();
+    final CountingWritableFrameChannel countingWritableFrameChannel = new 
CountingWritableFrameChannel(
+        outputChannel.writable(),
+        channelCounters,
+        0
+    );
+
+    final WindowOperatorQuery query = new WindowOperatorQuery(
+        new QueryDataSource(
+            Druids.newScanQueryBuilder()
+                     .dataSource(new TableDataSource("test"))
+                     .intervals(new LegacySegmentSpec(Intervals.ETERNITY))
+                     .columns("cityName", "added")
+                     
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                     .context(new HashMap<>())
+                     .build()),
+        new LegacySegmentSpec(Intervals.ETERNITY),
+        new HashMap<>(),
+        outputSignature,
+        ImmutableList.of(
+            new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
+        ),
+        ImmutableList.of()
+    );
+
+    // Limit output frames to 1 row to ensure we test edge cases
+    final FrameWriterFactory frameWriterFactory = new 
LimitedFrameWriterFactory(
+        FrameWriters.makeRowBasedFrameWriterFactory(
+            new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
+            outputSignature,
+            Collections.emptyList(),
+            false
+        ),
+        100
+    );
+
+    final WindowOperatorQueryFrameProcessor processor = new 
WindowOperatorQueryFrameProcessor(
+        query,
+        factChannel.getChannel(),
+        countingWritableFrameChannel,
+        frameWriterFactory,
+        frameReader,
+        new ObjectMapper(),
+        ImmutableList.of(
+            new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
+        ),
+        inputSignature,
+        maxRowsMaterialized,
+        ImmutableList.of("added")
+    );
+
+    exec.runFully(processor, null);
+
+    final Sequence<List<Object>> rowsFromProcessor = 
FrameTestUtil.readRowsFromFrameChannel(
+        outputChannel.readable(),
+        FrameReader.create(outputSignature)
+    );
+
+    final List<List<Object>> rows = rowsFromProcessor.toList();
+
+    long actualNumFrames = 
Arrays.stream(channelCounters.snapshot().getFrames()).findFirst().getAsLong();
+    Assert.assertEquals(numFramesWritten, actualNumFrames);
+    Assert.assertEquals(7, rows.size());
+  }
+
+  private ReadableInput buildWindowTestInputChannel() throws IOException
+  {
+    RowSignature inputSignature = RowSignature.builder()
+                                              .add("cityName", 
ColumnType.STRING)
+                                              .add("added", ColumnType.LONG)
+                                              .build();
+
+    List<Map<String, Object>> rows = ImmutableList.of(
+        ImmutableMap.of("added", 1L, "cityName", "city1"),
+        ImmutableMap.of("added", 1L, "cityName", "city2"),
+        ImmutableMap.of("added", 2L, "cityName", "city3"),
+        ImmutableMap.of("added", 2L, "cityName", "city4"),
+        ImmutableMap.of("added", 2L, "cityName", "city5"),
+        ImmutableMap.of("added", 3L, "cityName", "city6"),
+        ImmutableMap.of("added", 3L, "cityName", "city7")
+    );
+
+    return makeChannelFromRows(rows, inputSignature, Collections.emptyList());
+  }
+
+  private ReadableInput makeChannelFromRows(
+      List<Map<String, Object>> rows,
+      RowSignature signature,
+      List<KeyColumn> keyColumns
+  ) throws IOException
+  {
+    RowBasedSegment<Map<String, Object>> segment = new RowBasedSegment<>(
+        SegmentId.dummy("test"),
+        Sequences.simple(rows),
+        columnName -> m -> m.get(columnName),
+        signature
+    );
+
+    return makeChannelFromAdapter(segment.asStorageAdapter(), keyColumns);
+  }
+
+  private ReadableInput makeChannelFromAdapter(
+      final StorageAdapter adapter,
+      final List<KeyColumn> keyColumns
+  ) throws IOException
+  {
+    return makeChannelFromAdapter(adapter, keyColumns, 1000);
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessorTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessorTest.java
index 0094aeb7369..4a8ec435873 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessorTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessorTest.java
@@ -21,15 +21,11 @@ package org.apache.druid.msq.querykit.common;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.common.guava.FutureUtils;
-import org.apache.druid.frame.Frame;
-import org.apache.druid.frame.FrameType;
 import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
 import org.apache.druid.frame.allocation.SingleMemoryAllocatorFactory;
 import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
@@ -37,23 +33,17 @@ import org.apache.druid.frame.channel.ReadableFrameChannel;
 import org.apache.druid.frame.channel.ReadableNilFrameChannel;
 import org.apache.druid.frame.key.KeyColumn;
 import org.apache.druid.frame.key.KeyOrder;
-import org.apache.druid.frame.processor.FrameProcessorExecutor;
 import org.apache.druid.frame.read.FrameReader;
-import org.apache.druid.frame.segment.FrameStorageAdapter;
-import org.apache.druid.frame.testutil.FrameSequenceBuilder;
 import org.apache.druid.frame.testutil.FrameTestUtil;
 import org.apache.druid.frame.write.FrameWriterFactory;
 import org.apache.druid.frame.write.FrameWriters;
-import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Unit;
-import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.msq.indexing.error.MSQException;
 import org.apache.druid.msq.indexing.error.TooManyRowsWithSameKeyFault;
 import org.apache.druid.msq.input.ReadableInput;
-import org.apache.druid.msq.kernel.StageId;
-import org.apache.druid.msq.kernel.StagePartition;
+import org.apache.druid.msq.querykit.FrameProcessorTestBase;
 import org.apache.druid.msq.test.LimitedFrameWriterFactory;
 import org.apache.druid.segment.RowBasedSegment;
 import org.apache.druid.segment.StorageAdapter;
@@ -61,15 +51,12 @@ import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.join.JoinTestHelper;
 import org.apache.druid.segment.join.JoinType;
-import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.SegmentId;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.MatcherAssert;
 import org.hamcrest.Matchers;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
-import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -82,19 +69,15 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 @RunWith(Parameterized.class)
-public class SortMergeJoinFrameProcessorTest extends 
InitializedNullHandlingTest
+public class SortMergeJoinFrameProcessorTest extends FrameProcessorTestBase
 {
-  private static final StagePartition STAGE_PARTITION = new StagePartition(new 
StageId("q", 0), 0);
   private static final long MAX_BUFFERED_BYTES = 10_000_000;
 
   private final int rowsPerInputFrame;
   private final int rowsPerOutputFrame;
 
-  private FrameProcessorExecutor exec;
-
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
@@ -118,19 +101,6 @@ public class SortMergeJoinFrameProcessorTest extends 
InitializedNullHandlingTest
     return constructors;
   }
 
-  @Before
-  public void setUp()
-  {
-    exec = new 
FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")));
-  }
-
-  @After
-  public void tearDown() throws Exception
-  {
-    exec.getExecutorService().shutdownNow();
-    exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES);
-  }
-
   @Test
   public void testLeftJoinEmptyLeftSide() throws Exception
   {
@@ -1531,40 +1501,7 @@ public class SortMergeJoinFrameProcessorTest extends 
InitializedNullHandlingTest
       final List<KeyColumn> keyColumns
   ) throws IOException
   {
-    // Create a single, sorted frame.
-    final FrameSequenceBuilder singleFrameBuilder =
-        FrameSequenceBuilder.fromAdapter(adapter)
-                            .frameType(FrameType.ROW_BASED)
-                            .maxRowsPerFrame(Integer.MAX_VALUE)
-                            .sortBy(keyColumns);
-
-    final RowSignature signature = singleFrameBuilder.signature();
-    final Frame frame = 
Iterables.getOnlyElement(singleFrameBuilder.frames().toList());
-
-    // Split it up into frames that match rowsPerFrame. Set max size enough to 
hold all rows we might ever want to use.
-    final BlockingQueueFrameChannel channel = new 
BlockingQueueFrameChannel(10_000);
-
-    final FrameReader frameReader = FrameReader.create(signature);
-
-    final FrameSequenceBuilder frameSequenceBuilder =
-        FrameSequenceBuilder.fromAdapter(new FrameStorageAdapter(frame, 
frameReader, Intervals.ETERNITY))
-                            .frameType(FrameType.ROW_BASED)
-                            .maxRowsPerFrame(rowsPerInputFrame);
-
-    final Sequence<Frame> frames = frameSequenceBuilder.frames();
-    frames.forEach(
-        f -> {
-          try {
-            channel.writable().write(f);
-          }
-          catch (IOException e) {
-            throw new RuntimeException(e);
-          }
-        }
-    );
-
-    channel.writable().close();
-    return ReadableInput.channel(channel.readable(), 
FrameReader.create(signature), STAGE_PARTITION);
+    return makeChannelFromAdapter(adapter, keyColumns, rowsPerInputFrame);
   }
 
   private FrameWriterFactory makeFrameWriterFactory(final RowSignature 
signature)
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/results/QueryResultsFrameProcessorTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/results/QueryResultsFrameProcessorTest.java
index d89b62977a3..e300a416705 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/results/QueryResultsFrameProcessorTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/results/QueryResultsFrameProcessorTest.java
@@ -20,54 +20,32 @@
 package org.apache.druid.msq.querykit.results;
 
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.FrameType;
 import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
 import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
 import org.apache.druid.frame.channel.WritableFrameChannel;
-import org.apache.druid.frame.processor.FrameProcessorExecutor;
 import org.apache.druid.frame.read.FrameReader;
 import org.apache.druid.frame.testutil.FrameSequenceBuilder;
 import org.apache.druid.frame.testutil.FrameTestUtil;
 import org.apache.druid.java.util.common.Unit;
-import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.msq.input.ReadableInput;
 import org.apache.druid.msq.kernel.StageId;
 import org.apache.druid.msq.kernel.StagePartition;
+import org.apache.druid.msq.querykit.FrameProcessorTestBase;
 import org.apache.druid.segment.TestIndex;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
-import org.apache.druid.testing.InitializedNullHandlingTest;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
-public class QueryResultsFrameProcessorTest extends InitializedNullHandlingTest
+public class QueryResultsFrameProcessorTest extends FrameProcessorTestBase
 {
-  private FrameProcessorExecutor exec;
-
-  @Before
-  public void setUp()
-  {
-    exec = new 
FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")));
-  }
-
-  @After
-  public void tearDown() throws Exception
-  {
-    exec.getExecutorService().shutdownNow();
-    exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES);
-  }
-
-
   @Test
   public void sanityTest() throws ExecutionException, InterruptedException, 
IOException
   {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
index e3272cbcee7..205f3495807 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
@@ -20,7 +20,6 @@
 package org.apache.druid.msq.querykit.scan;
 
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.collections.ReferenceCountingResourceHolder;
 import org.apache.druid.collections.ResourceHolder;
 import org.apache.druid.frame.Frame;
@@ -30,7 +29,6 @@ import org.apache.druid.frame.allocation.HeapMemoryAllocator;
 import org.apache.druid.frame.allocation.SingleMemoryAllocatorFactory;
 import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
 import org.apache.druid.frame.channel.WritableFrameChannel;
-import org.apache.druid.frame.processor.FrameProcessorExecutor;
 import org.apache.druid.frame.read.FrameReader;
 import org.apache.druid.frame.testutil.FrameSequenceBuilder;
 import org.apache.druid.frame.testutil.FrameTestUtil;
@@ -39,11 +37,11 @@ import org.apache.druid.frame.write.FrameWriters;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Unit;
-import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.msq.input.ReadableInput;
 import org.apache.druid.msq.kernel.StageId;
 import org.apache.druid.msq.kernel.StagePartition;
+import org.apache.druid.msq.querykit.FrameProcessorTestBase;
 import org.apache.druid.msq.test.LimitedFrameWriterFactory;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.scan.ScanQuery;
@@ -51,35 +49,16 @@ import 
org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.segment.TestIndex;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
-import org.apache.druid.testing.InitializedNullHandlingTest;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
-public class ScanQueryFrameProcessorTest extends InitializedNullHandlingTest
+public class ScanQueryFrameProcessorTest extends FrameProcessorTestBase
 {
-  private FrameProcessorExecutor exec;
-
-  @Before
-  public void setUp()
-  {
-    exec = new 
FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")));
-  }
-
-  @After
-  public void tearDown() throws Exception
-  {
-    exec.getExecutorService().shutdownNow();
-    exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES);
-  }
-
   @Test
   public void test_runWithInputChannel() throws Exception
   {
diff --git 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
index 42972f9340d..29f092f6744 100644
--- 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
+++ 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
@@ -84,11 +84,16 @@ public class MapOfColumnsRowsAndColumns implements 
RowsAndColumns
   }
 
   public static MapOfColumnsRowsAndColumns fromResultRow(ArrayList<ResultRow> 
objs, RowSignature signature)
+  {
+    return fromResultRowTillIndex(objs, signature, objs.size() - 1);
+  }
+
+  public static MapOfColumnsRowsAndColumns 
fromResultRowTillIndex(ArrayList<ResultRow> objs, RowSignature signature, int 
index)
   {
     final Builder bob = builder();
     if (!objs.isEmpty()) {
-      Object[][] columnOriented = new 
Object[objs.get(0).length()][objs.size()];
-      for (int i = 0; i < objs.size(); ++i) {
+      Object[][] columnOriented = new Object[objs.get(0).length()][index + 1];
+      for (int i = 0; i <= index; ++i) {
         for (int j = 0; j < objs.get(i).length(); ++j) {
           columnOriented[j][i] = objs.get(i).get(j);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to