kgyrtkirk commented on code in PR #17038:
URL: https://github.com/apache/druid/pull/17038#discussion_r1793257851


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -158,174 +131,73 @@ public List<WritableFrameChannel> outputChannels()
   @Override
   public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws 
IOException
   {
-    /*
-     There are 2 scenarios:
-
-     *** Scenario 1: Query has atleast one window function with an OVER() 
clause without a PARTITION BY ***
-
-     In this scenario, we add all the RACs to a single RowsAndColumns to be 
processed. We do it via ConcatRowsAndColumns, and run all the operators on the 
ConcatRowsAndColumns.
-     This is done because we anyway need to run the operators on the entire 
set of rows when we have an OVER() clause without a PARTITION BY.
-     This scenario corresponds to partitionColumnNames.isEmpty()=true code 
flow.
-
-     *** Scenario 2: All window functions in the query have OVER() clause with 
a PARTITION BY ***
-
-     In this scenario, we need to process rows for each PARTITION BY group 
together, but we can batch multiple PARTITION BY keys into the same RAC before 
passing it to the operators for processing.
-     Batching is fine since the operators list would have the required 
NaivePartitioningOperatorFactory to segregate each PARTITION BY group during 
the processing.
-
-     The flow for this scenario can be summarised as following:
-     1. Frame Reading and Cursor Initialization: We start by reading a frame 
from the inputChannel and initializing frameCursor to iterate over the rows in 
that frame.
-     2. Row Comparison: For each row in the frame, we decide whether it 
belongs to the same PARTITION BY group as the previous row.
-                        This is determined by comparePartitionKeys() method.
-                        Please refer to the Javadoc of that method for further 
details and an example illustration.
-        2.1. If the PARTITION BY columns of current row matches the PARTITION 
BY columns of the previous row,
-             they belong to the same PARTITION BY group, and gets added to 
rowsToProcess.
-             If the number of total rows materialized exceed 
maxRowsMaterialized, we process the pending batch via 
processRowsUpToLastPartition() method.
-        2.2. If they don't match, then we have reached a partition boundary.
-             In this case, we update the value for lastPartitionIndex.
-     3. End of Input: If the input channel is finished, any remaining rows in 
rowsToProcess are processed.
-
-     *Illustration of Row Comparison step*
-
-     Let's say we have window_function() OVER (PARTITION BY A ORDER BY B) in 
our query, and we get 3 frames in the input channel to process.
-
-     Frame 1
-     A, B
-     1, 2
-     1, 3
-     2, 1 --> PARTITION BY key (column A) changed from 1 to 2.
-     2, 2
-
-     Frame 2
-     A, B
-     3, 1 --> PARTITION BY key (column A) changed from 2 to 3.
-     3, 2
-     3, 3
-     3, 4
-
-     Frame 3
-     A, B
-     3, 5
-     3, 6
-     4, 1 --> PARTITION BY key (column A) changed from 3 to 4.
-     4, 2
-
-     *Why batching?*
-     We batch multiple PARTITION BY keys for processing together to avoid the 
overhead of creating different RACs for each PARTITION BY keys, as that would 
be unnecessary in scenarios where we have a large number of PARTITION BY keys, 
but each key having a single row.
-
-     *Future thoughts: https://github.com/apache/druid/issues/16126*
-     Current approach with R&C and operators materialize a single R&C for 
processing. In case of data with low cardinality a single R&C might be too big 
to consume. Same for the case of empty OVER() clause.
-     Most of the window operations like SUM(), RANK(), RANGE() etc. can be 
made with 2 passes of the data. We might think to reimplement them in the MSQ 
way so that we do not have to materialize so much data.
-     */
-
     // If there are rows pending flush, flush them and run again before 
processing any more rows.
     if (frameHasRowsPendingFlush()) {
       flushAllRowsAndCols();
       return ReturnOrAwait.runAgain();
     }
 
-    if (partitionColumnNames.isEmpty()) {
-      // Scenario 1: Query has atleast one window function with an OVER() 
clause without a PARTITION BY.
-      if (inputChannel.canRead()) {
-        final Frame frame = inputChannel.read();
-        convertRowFrameToRowsAndColumns(frame);
-        return ReturnOrAwait.runAgain();
-      }
-
-      if (inputChannel.isFinished()) {
-        // If no rows are flushed yet, process all rows.
-        if (rowId.get() == 0) {
-          runAllOpsOnMultipleRac(frameRowsAndCols);
-        }
+    if (inputChannel.canRead()) {
+      final Frame frame = inputChannel.read();
+      convertRowFrameToRowsAndColumns(frame);

Review Comment:
   with this name; this method should return something



##########
processing/src/main/java/org/apache/druid/query/operator/GlueingPartitioningOperator.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LimitedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
+import 
org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This glueing partitioning operator is supposed to continuously receive 
data, and output batches of partitioned RACs.
+ * It maintains a last-partitioning-boundary of the last-pushed-RAC, and 
attempts to glue it with the next RAC it receives,
+ * ensuring that partitions are handled correctly, even across multiple RACs.
+ * <p>
+ * Additionally, this assumes that data has been pre-sorted according to the 
partitioning columns.
+ */
+public class GlueingPartitioningOperator extends AbstractPartitioningOperator
+{
+  private final int maxRowsMaterialized;
+  private RowsAndColumns previousRac;
+
+  private static final int MAX_ROWS_MATERIALIZED_NO_LIMIT = -1;
+
+  public GlueingPartitioningOperator(
+      Operator child,
+      List<String> partitionColumns
+  )
+  {
+    this(child, partitionColumns, MAX_ROWS_MATERIALIZED_NO_LIMIT);
+  }
+
+  public GlueingPartitioningOperator(
+      Operator child,
+      List<String> partitionColumns,
+      int maxRowsMaterialized
+  )
+  {
+    super(partitionColumns, child);
+    this.maxRowsMaterialized = maxRowsMaterialized;
+  }
+
+  @Override
+  public Closeable goOrContinue(Closeable continuation, Receiver receiver)
+  {
+    if (continuation != null) {
+      Continuation cont = (Continuation) continuation;
+
+      if (cont.iter != null) {
+        while (cont.iter.hasNext()) {
+          RowsAndColumns next = cont.iter.next();
+
+          if (!cont.iter.hasNext()) {
+            // We are at the last RAC. Process it only if subContinuation is 
null, otherwise save it in previousRac.
+            if (cont.subContinuation == null) {
+              receiver.push(next);
+              receiver.completed();
+              return null;
+            } else {
+              previousRac = next;
+              break;
+            }
+          }
+
+          final Signal signal = receiver.push(next);
+          if (signal != Signal.GO) {
+            return handleNonGoCases(signal, cont.iter, receiver, cont);
+          }
+        }
+
+        if (cont.subContinuation == null) {
+          receiver.completed();
+          return null;
+        }
+      }
+
+      continuation = cont.subContinuation;
+    }
+
+    AtomicReference<Iterator<RowsAndColumns>> iterHolder = new 
AtomicReference<>();
+
+    final Closeable retVal = child.goOrContinue(
+        continuation,
+        new Receiver()
+        {
+          @Override
+          public Signal push(RowsAndColumns rac)
+          {
+            ensureMaxRowsMaterializedConstraint(rac.numRows());
+            return handlePush(rac, receiver, iterHolder);
+          }
+
+          @Override
+          public void completed()
+          {
+            if (previousRac != null) {
+              receiver.push(previousRac);
+              previousRac = null;
+            }
+            if (iterHolder.get() == null) {
+              receiver.completed();
+            }
+          }
+        }
+    );
+
+    if (iterHolder.get() != null || retVal != null) {
+      return new Continuation(
+          iterHolder.get(),
+          retVal
+      );
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Iterator implementation for gluing partitioned RowsAndColumns.
+   * It handles the boundaries of partitions within a single RAC and 
potentially glues
+   * the first partition of the current RAC with the previous RAC if needed.
+   */
+  private class GluedRACsIterator implements Iterator<RowsAndColumns>
+  {
+    private final RowsAndColumns rac;
+    private final int[] boundaries;
+    private int currentIndex = 0;
+    private boolean firstPartitionHandled = false;
+
+    public GluedRACsIterator(RowsAndColumns rac)
+    {
+      this.rac = rac;
+      ClusteredGroupPartitioner groupPartitioner = 
rac.as(ClusteredGroupPartitioner.class);
+      if (groupPartitioner == null) {
+        groupPartitioner = new DefaultClusteredGroupPartitioner(rac);
+      }
+      this.boundaries = groupPartitioner.computeBoundaries(partitionColumns);
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      return currentIndex < boundaries.length - 1;
+    }
+
+    /**
+     * Retrieves the next partition in the RowsAndColumns. If the first 
partition has not been handled yet,
+     * it may be glued with the previous RowsAndColumns if the partition 
columns match.
+     *
+     * @return The next RowsAndColumns partition, potentially glued with the 
previous one.
+     * @throws NoSuchElementException if there are no more partitions.
+     */
+    @Override
+    public RowsAndColumns next()
+    {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+
+      if (!firstPartitionHandled) {
+        firstPartitionHandled = true;
+        int start = boundaries[currentIndex];
+        int end = boundaries[currentIndex + 1];
+        LimitedRowsAndColumns limitedRAC = new LimitedRowsAndColumns(rac, 
start, end);
+
+        if (isGlueingNeeded(previousRac, limitedRAC)) {
+          RowsAndColumns gluedRAC = getConcatRacForFirstPartition(previousRac, 
limitedRAC);
+          ensureMaxRowsMaterializedConstraint(gluedRAC.numRows());
+          previousRac = null;
+          currentIndex++;
+          return gluedRAC;
+        } else {
+          if (previousRac != null) {
+            RowsAndColumns temp = previousRac;
+            previousRac = null;
+            return temp;
+          }
+        }
+      }
+
+      int start = boundaries[currentIndex];
+      int end = boundaries[currentIndex + 1];
+      currentIndex++;
+      return new LimitedRowsAndColumns(rac, start, end);
+    }
+
+    /**
+     * Determines whether gluing is needed between the previous RowsAndColumns 
and the first partition
+     * of the current RowsAndColumns based on the partition columns. If the 
columns match, the two RACs
+     * will be glued together.
+     *
+     * @param previousRac The previous RowsAndColumns.
+     * @param firstPartitionOfCurrentRac The first partition of the current 
RowsAndColumns.
+     * @return true if gluing is needed, false otherwise.
+     */
+    private boolean isGlueingNeeded(RowsAndColumns previousRac, RowsAndColumns 
firstPartitionOfCurrentRac)
+    {
+      if (previousRac == null) {
+        return false;
+      }
+
+      final ConcatRowsAndColumns concatRac = 
getConcatRacForFirstPartition(previousRac, firstPartitionOfCurrentRac);
+      for (String column : partitionColumns) {
+        final Column theCol = concatRac.findColumn(column);
+        if (theCol == null) {
+          continue;
+        }
+        final ColumnAccessor accessor = theCol.toAccessor();
+        // Compare 1st row of previousRac and firstPartitionOfCurrentRac in 
[previousRac, firstPartitionOfCurrentRac] form.
+        int comparison = accessor.compareRows(0, previousRac.numRows());
+        if (comparison != 0) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private ConcatRowsAndColumns getConcatRacForFirstPartition(RowsAndColumns 
previousRac, RowsAndColumns firstPartitionOfCurrentRac)
+    {
+      return new ConcatRowsAndColumns(new 
ArrayList<>(Arrays.asList(previousRac, firstPartitionOfCurrentRac)));
+    }
+  }
+
+  private void ensureMaxRowsMaterializedConstraint(int numRows)
+  {
+    if (maxRowsMaterialized != MAX_ROWS_MATERIALIZED_NO_LIMIT && numRows > 
maxRowsMaterialized) {
+      throw InvalidInput.exception(
+          "Too many rows to process (requested = %d, max = %d).",
+          numRows,
+          maxRowsMaterialized
+      );
+    }
+  }
+
+  @Override
+  protected Iterator<RowsAndColumns> getIteratorForRAC(RowsAndColumns rac)
+  {
+    return new GluedRACsIterator(rac);

Review Comment:
   why not build the 1st glued part of the rac here - and reduce the complexity 
in the `next()` ?
   that way it would be possible to make the Iterator class static



##########
processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java:
##########
@@ -110,28 +79,7 @@ public Closeable goOrContinue(Closeable continuation, 
Receiver receiver)
           @Override
           public Signal push(RowsAndColumns rac)

Review Comment:
   make the `Receiver` a static inner class; it should have the `Iterator` as 
its field.
   
   handlePush naturally wants to be a method of it
   
   * the full `if (cont.iter != null) ` part should also go into the abstract;
   * put the full body of the `while` into a method in the abstract
   * the glueing should override that method and before calling the `super()` 
it could check if there are more elements and save that and use that when the 
`Receiver` is built
   
   



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java:
##########
@@ -130,16 +128,7 @@ public boolean isNull(int rowNum)
             @Override
             public Object getObject(int rowNum)
             {
-              Object value = accessor.getObject(pointers[start + rowNum]);
-              if (ColumnType.STRING.equals(getType()) && value instanceof 
List) {
-                // special handling to reject MVDs
-                throw new UOE(
-                    "Encountered a multi value column [%s]. Window processing 
does not support MVDs. "
-                    + "Consider using UNNEST or MV_TO_ARRAY.",
-                    name
-                );
-              }
-              return value;

Review Comment:
   why this is being removed?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -158,174 +131,73 @@ public List<WritableFrameChannel> outputChannels()
   @Override
   public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws 
IOException
   {
-    /*
-     There are 2 scenarios:
-
-     *** Scenario 1: Query has atleast one window function with an OVER() 
clause without a PARTITION BY ***
-
-     In this scenario, we add all the RACs to a single RowsAndColumns to be 
processed. We do it via ConcatRowsAndColumns, and run all the operators on the 
ConcatRowsAndColumns.
-     This is done because we anyway need to run the operators on the entire 
set of rows when we have an OVER() clause without a PARTITION BY.
-     This scenario corresponds to partitionColumnNames.isEmpty()=true code 
flow.
-
-     *** Scenario 2: All window functions in the query have OVER() clause with 
a PARTITION BY ***
-
-     In this scenario, we need to process rows for each PARTITION BY group 
together, but we can batch multiple PARTITION BY keys into the same RAC before 
passing it to the operators for processing.
-     Batching is fine since the operators list would have the required 
NaivePartitioningOperatorFactory to segregate each PARTITION BY group during 
the processing.
-
-     The flow for this scenario can be summarised as following:
-     1. Frame Reading and Cursor Initialization: We start by reading a frame 
from the inputChannel and initializing frameCursor to iterate over the rows in 
that frame.
-     2. Row Comparison: For each row in the frame, we decide whether it 
belongs to the same PARTITION BY group as the previous row.
-                        This is determined by comparePartitionKeys() method.
-                        Please refer to the Javadoc of that method for further 
details and an example illustration.
-        2.1. If the PARTITION BY columns of current row matches the PARTITION 
BY columns of the previous row,
-             they belong to the same PARTITION BY group, and gets added to 
rowsToProcess.
-             If the number of total rows materialized exceed 
maxRowsMaterialized, we process the pending batch via 
processRowsUpToLastPartition() method.
-        2.2. If they don't match, then we have reached a partition boundary.
-             In this case, we update the value for lastPartitionIndex.
-     3. End of Input: If the input channel is finished, any remaining rows in 
rowsToProcess are processed.
-
-     *Illustration of Row Comparison step*
-
-     Let's say we have window_function() OVER (PARTITION BY A ORDER BY B) in 
our query, and we get 3 frames in the input channel to process.
-
-     Frame 1
-     A, B
-     1, 2
-     1, 3
-     2, 1 --> PARTITION BY key (column A) changed from 1 to 2.
-     2, 2
-
-     Frame 2
-     A, B
-     3, 1 --> PARTITION BY key (column A) changed from 2 to 3.
-     3, 2
-     3, 3
-     3, 4
-
-     Frame 3
-     A, B
-     3, 5
-     3, 6
-     4, 1 --> PARTITION BY key (column A) changed from 3 to 4.
-     4, 2
-
-     *Why batching?*
-     We batch multiple PARTITION BY keys for processing together to avoid the 
overhead of creating different RACs for each PARTITION BY keys, as that would 
be unnecessary in scenarios where we have a large number of PARTITION BY keys, 
but each key having a single row.
-
-     *Future thoughts: https://github.com/apache/druid/issues/16126*
-     Current approach with R&C and operators materialize a single R&C for 
processing. In case of data with low cardinality a single R&C might be too big 
to consume. Same for the case of empty OVER() clause.
-     Most of the window operations like SUM(), RANK(), RANGE() etc. can be 
made with 2 passes of the data. We might think to reimplement them in the MSQ 
way so that we do not have to materialize so much data.
-     */
-
     // If there are rows pending flush, flush them and run again before 
processing any more rows.
     if (frameHasRowsPendingFlush()) {
       flushAllRowsAndCols();
       return ReturnOrAwait.runAgain();
     }
 
-    if (partitionColumnNames.isEmpty()) {
-      // Scenario 1: Query has atleast one window function with an OVER() 
clause without a PARTITION BY.
-      if (inputChannel.canRead()) {
-        final Frame frame = inputChannel.read();
-        convertRowFrameToRowsAndColumns(frame);
-        return ReturnOrAwait.runAgain();
-      }
-
-      if (inputChannel.isFinished()) {
-        // If no rows are flushed yet, process all rows.
-        if (rowId.get() == 0) {
-          runAllOpsOnMultipleRac(frameRowsAndCols);
-        }
+    if (inputChannel.canRead()) {
+      final Frame frame = inputChannel.read();
+      convertRowFrameToRowsAndColumns(frame);
 
-        // If there are still rows pending after operations, run again.
-        if (frameHasRowsPendingFlush()) {
-          return ReturnOrAwait.runAgain();
+      if (needToProcessBatch()) {
+        runAllOpsOnBatch();
+        try {
+          flushAllRowsAndCols();
         }
-        return ReturnOrAwait.returnObject(Unit.instance());
-      }
-      return ReturnOrAwait.awaitAll(inputChannels().size());
-    }
-
-    // Scenario 2: All window functions in the query have OVER() clause with a 
PARTITION BY
-    if (frameCursor == null || frameCursor.isDone()) {
-      if (readableInputs.isEmpty()) {
-        return ReturnOrAwait.awaitAll(1);
-      }
-
-      if (inputChannel.canRead()) {
-        final Frame frame = inputChannel.read();
-        frameCursor = FrameProcessors.makeCursor(frame, frameReader);
-        makeRowSupplierFromFrameCursor();
-      } else if (inputChannel.isFinished()) {
-        // If we have some rows pending processing, process them.
-        // We run it again as it's possible that frame writer's capacity got 
reached and some output rows are
-        // pending flush to the output channel.
-        if (!rowsToProcess.isEmpty()) {
-          lastPartitionIndex = rowsToProcess.size() - 1;
-          processRowsUpToLastPartition();
-          return ReturnOrAwait.runAgain();
+        catch (IOException e) {
+          throw new RuntimeException(e);
         }
-        return ReturnOrAwait.returnObject(Unit.instance());
-      } else {
-        return ReturnOrAwait.runAgain();
       }
-    }
-
-    while (!frameCursor.isDone()) {
-      final ResultRow currentRow = rowSupplierFromFrameCursor.get();
-      if (outputRow == null) {
-        outputRow = currentRow;
-        rowsToProcess.add(currentRow);
-      } else if (comparePartitionKeys(outputRow, currentRow, 
partitionColumnNames)) {
-        // Add current row to the same batch of rows for processing.
-        rowsToProcess.add(currentRow);
-      } else {
-        lastPartitionIndex = rowsToProcess.size() - 1;
-        outputRow = currentRow.copy();
-        rowsToProcess.add(currentRow);
+      return ReturnOrAwait.runAgain();
+    } else if (inputChannel.isFinished()) {
+      if (rowId.get() == 0) {
+        runAllOpsOnBatch();
       }
-      frameCursor.advance();
 
-      if (rowsToProcess.size() > maxRowsMaterialized) {
-        // We don't want to materialize more than maxRowsMaterialized rows at 
any point in time, so process the pending batch.
-        processRowsUpToLastPartition();
-        ensureMaxRowsInAWindowConstraint(rowsToProcess.size());
+      // If there are still rows pending after operations, run again.
+      if (frameHasRowsPendingFlush()) {
         return ReturnOrAwait.runAgain();
       }
+      return ReturnOrAwait.returnObject(Unit.instance());
+    } else {
+      return ReturnOrAwait.awaitAll(inputChannels().size());
     }
-    return ReturnOrAwait.runAgain();
   }
 
-  /**
-   * @param listOfRacs Concat this list of {@link RowsAndColumns} to a {@link 
ConcatRowsAndColumns} to use as a single input for the operators to be run
-   */
-  private void runAllOpsOnMultipleRac(ArrayList<RowsAndColumns> listOfRacs)
+  private void initialiseOperator()
   {
-    Operator op = new Operator()
+    op = new Operator()
     {
       @Nullable
       @Override
       public Closeable goOrContinue(Closeable continuationObject, Receiver 
receiver)
       {
-        RowsAndColumns rac = new ConcatRowsAndColumns(listOfRacs);
+        RowsAndColumns rac = frameRowsAndColsBuilder.build();
+        frameRowsAndColsBuilder.clear();

Review Comment:
   why the serial coupling? can't `build` clear the builder?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -445,100 +318,60 @@ private void convertRowFrameToRowsAndColumns(Frame frame)
         null,
         OffsetLimit.limit(Integer.MAX_VALUE),
         null,
-        null
+        null,
+        (int) frameWriterFactory.allocatorCapacity()
     );
     // check if existing + newly added rows exceed guardrails
-    ensureMaxRowsInAWindowConstraint(frameRowsAndCols.size() + ldrc.numRows());
-    frameRowsAndCols.add(ldrc);
+    ensureMaxRowsInAWindowConstraint(frameRowsAndColsBuilder.getNumRows() + 
ldrc.numRows());
+    frameRowsAndColsBuilder.add(ldrc);

Review Comment:
   make this method return `ldrc` ; and do the add externally



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -445,100 +318,60 @@ private void convertRowFrameToRowsAndColumns(Frame frame)
         null,
         OffsetLimit.limit(Integer.MAX_VALUE),
         null,
-        null
+        null,
+        (int) frameWriterFactory.allocatorCapacity()
     );
     // check if existing + newly added rows exceed guardrails
-    ensureMaxRowsInAWindowConstraint(frameRowsAndCols.size() + ldrc.numRows());
-    frameRowsAndCols.add(ldrc);
+    ensureMaxRowsInAWindowConstraint(frameRowsAndColsBuilder.getNumRows() + 
ldrc.numRows());

Review Comment:
   this check could rely on `frameRowsAndColsBuilder.size()` after the add is 
done



##########
processing/src/test/java/org/apache/druid/query/operator/GlueingPartitioningOperatorTest.java:
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.function.BiFunction;
+
+public class GlueingPartitioningOperatorTest
+{
+  @Test
+  public void testDefaultImplementation()

Review Comment:
   it would be nice to name these methods better - `testDefaultImplementation` 
has not much meaning; especially because it can't be altered - so there is 
nothing else than the "defaultImplementation"
   
   instead of complex cases testing an aspect along the way - simpler directed 
tests are simpler to understand - test one aspect in a test cleanly.
   
   would it be possible to make these tests more readable....do we really need 
400 lines? for 10 testcases?
   we really need `new IntArrayColumn` and similar things >10 times?
   in general during the testing of this class we should not evencare about the 
types of the columns...so it could be all `int`-s or something.
   you could make methods/etc improve readbility and maintainability.
   I don't see much value in having a second column in these racs either...it 
would be enough to have that in only one
   
   ```
       InlineScanOperator inlineScanOperator = InlineScanOperator.make(
         makeSimpleRac(1, 1, 1, 2, 2, 1)
       );
   
       GlueingPartitioningOperator op = new GlueingPartitioningOperator(
           inlineScanOperator,
           ImmutableList.of("column")
       );
   
       new OperatorTestHelper()
           .expectRowsAndColumns(
               expectedSimpleRac(1,1,1),
               expectedSimpleRac(2,2),
               expectedSimpleRac(1)
           )
           .runToCompletion(op);
   ```
   
   



##########
processing/src/main/java/org/apache/druid/query/operator/GlueingPartitioningOperatorFactory.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+public class GlueingPartitioningOperatorFactory extends 
AbstractPartitioningOperatorFactory
+{
+  private final int maxRowsMaterialized;
+
+  @JsonCreator
+  public GlueingPartitioningOperatorFactory(
+      @JsonProperty("partitionColumns") List<String> partitionColumns,
+      @JsonProperty("maxRowsMaterialized") int maxRowsMaterialized
+  )
+  {
+    super(partitionColumns);
+    this.maxRowsMaterialized = maxRowsMaterialized;
+  }
+
+  @JsonProperty("maxRowsMaterialized")
+  public int getMaxRowsMaterialized()
+  {
+    return maxRowsMaterialized;
+  }
+
+  @Override
+  public Operator wrap(Operator op)
+  {
+    return new GlueingPartitioningOperator(op, partitionColumns, 
maxRowsMaterialized);
+  }
+
+  @Override
+  public boolean validateEquivalent(OperatorFactory other)
+  {
+    return super.validateEquivalent(other) &&
+           maxRowsMaterialized == ((GlueingPartitioningOperatorFactory) 
other).getMaxRowsMaterialized();

Review Comment:
   * I doubt that this cast is safe
   * why it will be non-equivalent if the maxRowsMaterialized is different?
   



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -445,100 +318,60 @@ private void convertRowFrameToRowsAndColumns(Frame frame)
         null,
         OffsetLimit.limit(Integer.MAX_VALUE),
         null,
-        null
+        null,
+        (int) frameWriterFactory.allocatorCapacity()

Review Comment:
   what's the benefit of casting it to `int`?
   



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java:
##########
@@ -95,6 +110,7 @@ public LazilyDecoratedRowsAndColumns(
     this.limit = limit;
     this.ordering = ordering;
     this.viewableColumns = viewableColumns;
+    this.allocatorCapacity = allocatorCapacity != null ? allocatorCapacity : 
200 << 20;

Review Comment:
   I have a feeling that we will have to pass serialization stuff later; so 
exand the integer into a class named like FrameSerializationConfig or 
something; and make the this capacity a part of it.
   
   also make it mandatory; there are just a handfull of callsites
   
   and remove the new contructor...
   



##########
processing/src/main/java/org/apache/druid/query/operator/GlueingPartitioningOperator.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LimitedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
+import 
org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This glueing partitioning operator is supposed to continuously receive 
data, and output batches of partitioned RACs.
+ * It maintains a last-partitioning-boundary of the last-pushed-RAC, and 
attempts to glue it with the next RAC it receives,
+ * ensuring that partitions are handled correctly, even across multiple RACs.
+ * <p>
+ * Additionally, this assumes that data has been pre-sorted according to the 
partitioning columns.
+ */
+public class GlueingPartitioningOperator extends AbstractPartitioningOperator
+{
+  private final int maxRowsMaterialized;
+  private RowsAndColumns previousRac;
+
+  private static final int MAX_ROWS_MATERIALIZED_NO_LIMIT = -1;
+
+  public GlueingPartitioningOperator(
+      Operator child,
+      List<String> partitionColumns
+  )
+  {
+    this(child, partitionColumns, MAX_ROWS_MATERIALIZED_NO_LIMIT);
+  }
+
+  public GlueingPartitioningOperator(
+      Operator child,
+      List<String> partitionColumns,
+      int maxRowsMaterialized

Review Comment:
   why use `int`?



##########
processing/src/main/java/org/apache/druid/query/operator/GlueingPartitioningOperator.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LimitedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
+import 
org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This glueing partitioning operator is supposed to continuously receive 
data, and output batches of partitioned RACs.
+ * It maintains a last-partitioning-boundary of the last-pushed-RAC, and 
attempts to glue it with the next RAC it receives,
+ * ensuring that partitions are handled correctly, even across multiple RACs.
+ * <p>
+ * Additionally, this assumes that data has been pre-sorted according to the 
partitioning columns.
+ */
+public class GlueingPartitioningOperator extends AbstractPartitioningOperator
+{
+  private final int maxRowsMaterialized;
+  private RowsAndColumns previousRac;
+
+  private static final int MAX_ROWS_MATERIALIZED_NO_LIMIT = -1;
+
+  public GlueingPartitioningOperator(
+      Operator child,
+      List<String> partitionColumns
+  )
+  {
+    this(child, partitionColumns, MAX_ROWS_MATERIALIZED_NO_LIMIT);
+  }
+
+  public GlueingPartitioningOperator(
+      Operator child,
+      List<String> partitionColumns,
+      int maxRowsMaterialized
+  )
+  {
+    super(partitionColumns, child);
+    this.maxRowsMaterialized = maxRowsMaterialized;
+  }
+
+  @Override
+  public Closeable goOrContinue(Closeable continuation, Receiver receiver)
+  {
+    if (continuation != null) {
+      Continuation cont = (Continuation) continuation;
+
+      if (cont.iter != null) {
+        while (cont.iter.hasNext()) {
+          RowsAndColumns next = cont.iter.next();
+
+          if (!cont.iter.hasNext()) {
+            // We are at the last RAC. Process it only if subContinuation is 
null, otherwise save it in previousRac.
+            if (cont.subContinuation == null) {
+              receiver.push(next);
+              receiver.completed();
+              return null;
+            } else {
+              previousRac = next;
+              break;
+            }
+          }
+
+          final Signal signal = receiver.push(next);
+          if (signal != Signal.GO) {
+            return handleNonGoCases(signal, cont.iter, receiver, cont);
+          }
+        }
+
+        if (cont.subContinuation == null) {
+          receiver.completed();
+          return null;
+        }
+      }
+
+      continuation = cont.subContinuation;
+    }
+
+    AtomicReference<Iterator<RowsAndColumns>> iterHolder = new 
AtomicReference<>();
+
+    final Closeable retVal = child.goOrContinue(
+        continuation,
+        new Receiver()
+        {
+          @Override
+          public Signal push(RowsAndColumns rac)
+          {
+            ensureMaxRowsMaterializedConstraint(rac.numRows());
+            return handlePush(rac, receiver, iterHolder);
+          }
+
+          @Override
+          public void completed()
+          {
+            if (previousRac != null) {
+              receiver.push(previousRac);
+              previousRac = null;
+            }
+            if (iterHolder.get() == null) {
+              receiver.completed();
+            }
+          }
+        }
+    );
+
+    if (iterHolder.get() != null || retVal != null) {
+      return new Continuation(
+          iterHolder.get(),
+          retVal
+      );
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Iterator implementation for gluing partitioned RowsAndColumns.
+   * It handles the boundaries of partitions within a single RAC and 
potentially glues
+   * the first partition of the current RAC with the previous RAC if needed.
+   */
+  private class GluedRACsIterator implements Iterator<RowsAndColumns>
+  {
+    private final RowsAndColumns rac;
+    private final int[] boundaries;
+    private int currentIndex = 0;
+    private boolean firstPartitionHandled = false;
+
+    public GluedRACsIterator(RowsAndColumns rac)
+    {
+      this.rac = rac;
+      ClusteredGroupPartitioner groupPartitioner = 
rac.as(ClusteredGroupPartitioner.class);
+      if (groupPartitioner == null) {
+        groupPartitioner = new DefaultClusteredGroupPartitioner(rac);
+      }
+      this.boundaries = groupPartitioner.computeBoundaries(partitionColumns);
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      return currentIndex < boundaries.length - 1;
+    }
+
+    /**
+     * Retrieves the next partition in the RowsAndColumns. If the first 
partition has not been handled yet,
+     * it may be glued with the previous RowsAndColumns if the partition 
columns match.
+     *
+     * @return The next RowsAndColumns partition, potentially glued with the 
previous one.
+     * @throws NoSuchElementException if there are no more partitions.
+     */
+    @Override
+    public RowsAndColumns next()
+    {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+
+      if (!firstPartitionHandled) {
+        firstPartitionHandled = true;
+        int start = boundaries[currentIndex];
+        int end = boundaries[currentIndex + 1];
+        LimitedRowsAndColumns limitedRAC = new LimitedRowsAndColumns(rac, 
start, end);
+
+        if (isGlueingNeeded(previousRac, limitedRAC)) {
+          RowsAndColumns gluedRAC = getConcatRacForFirstPartition(previousRac, 
limitedRAC);
+          ensureMaxRowsMaterializedConstraint(gluedRAC.numRows());
+          previousRac = null;
+          currentIndex++;
+          return gluedRAC;
+        } else {
+          if (previousRac != null) {
+            RowsAndColumns temp = previousRac;
+            previousRac = null;
+            return temp;
+          }
+        }
+      }
+
+      int start = boundaries[currentIndex];
+      int end = boundaries[currentIndex + 1];
+      currentIndex++;
+      return new LimitedRowsAndColumns(rac, start, end);
+    }
+
+    /**
+     * Determines whether gluing is needed between the previous RowsAndColumns 
and the first partition
+     * of the current RowsAndColumns based on the partition columns. If the 
columns match, the two RACs
+     * will be glued together.
+     *
+     * @param previousRac The previous RowsAndColumns.
+     * @param firstPartitionOfCurrentRac The first partition of the current 
RowsAndColumns.
+     * @return true if gluing is needed, false otherwise.
+     */
+    private boolean isGlueingNeeded(RowsAndColumns previousRac, RowsAndColumns 
firstPartitionOfCurrentRac)
+    {
+      if (previousRac == null) {
+        return false;
+      }
+
+      final ConcatRowsAndColumns concatRac = 
getConcatRacForFirstPartition(previousRac, firstPartitionOfCurrentRac);
+      for (String column : partitionColumns) {
+        final Column theCol = concatRac.findColumn(column);
+        if (theCol == null) {
+          continue;
+        }
+        final ColumnAccessor accessor = theCol.toAccessor();
+        // Compare 1st row of previousRac and firstPartitionOfCurrentRac in 
[previousRac, firstPartitionOfCurrentRac] form.
+        int comparison = accessor.compareRows(0, previousRac.numRows());
+        if (comparison != 0) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private ConcatRowsAndColumns getConcatRacForFirstPartition(RowsAndColumns 
previousRac, RowsAndColumns firstPartitionOfCurrentRac)
+    {
+      return new ConcatRowsAndColumns(new 
ArrayList<>(Arrays.asList(previousRac, firstPartitionOfCurrentRac)));
+    }
+  }
+
+  private void ensureMaxRowsMaterializedConstraint(int numRows)
+  {
+    if (maxRowsMaterialized != MAX_ROWS_MATERIALIZED_NO_LIMIT && numRows > 
maxRowsMaterialized) {

Review Comment:
   why the need for an extra comparision with `-1` ? why not use 
`Integer.MAX_VALUE` as the default; and then it will be business as usual...



##########
processing/src/main/java/org/apache/druid/query/operator/GlueingPartitioningOperator.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LimitedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
+import 
org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This glueing partitioning operator is supposed to continuously receive 
data, and output batches of partitioned RACs.
+ * It maintains a last-partitioning-boundary of the last-pushed-RAC, and 
attempts to glue it with the next RAC it receives,
+ * ensuring that partitions are handled correctly, even across multiple RACs.
+ * <p>
+ * Additionally, this assumes that data has been pre-sorted according to the 
partitioning columns.
+ */
+public class GlueingPartitioningOperator extends AbstractPartitioningOperator
+{
+  private final int maxRowsMaterialized;
+  private RowsAndColumns previousRac;
+
+  private static final int MAX_ROWS_MATERIALIZED_NO_LIMIT = -1;

Review Comment:
   please don't use `-1` 



##########
processing/src/main/java/org/apache/druid/query/operator/GlueingPartitioningOperator.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LimitedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
+import 
org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This glueing partitioning operator is supposed to continuously receive 
data, and output batches of partitioned RACs.
+ * It maintains a last-partitioning-boundary of the last-pushed-RAC, and 
attempts to glue it with the next RAC it receives,
+ * ensuring that partitions are handled correctly, even across multiple RACs.
+ * <p>
+ * Additionally, this assumes that data has been pre-sorted according to the 
partitioning columns.
+ */
+public class GlueingPartitioningOperator extends AbstractPartitioningOperator
+{
+  private final int maxRowsMaterialized;
+  private RowsAndColumns previousRac;
+
+  private static final int MAX_ROWS_MATERIALIZED_NO_LIMIT = -1;
+
+  public GlueingPartitioningOperator(
+      Operator child,
+      List<String> partitionColumns
+  )
+  {
+    this(child, partitionColumns, MAX_ROWS_MATERIALIZED_NO_LIMIT);
+  }
+
+  public GlueingPartitioningOperator(
+      Operator child,
+      List<String> partitionColumns,
+      int maxRowsMaterialized
+  )
+  {
+    super(partitionColumns, child);
+    this.maxRowsMaterialized = maxRowsMaterialized;
+  }
+
+  @Override
+  public Closeable goOrContinue(Closeable continuation, Receiver receiver)
+  {
+    if (continuation != null) {
+      Continuation cont = (Continuation) continuation;
+
+      if (cont.iter != null) {
+        while (cont.iter.hasNext()) {
+          RowsAndColumns next = cont.iter.next();
+
+          if (!cont.iter.hasNext()) {
+            // We are at the last RAC. Process it only if subContinuation is 
null, otherwise save it in previousRac.
+            if (cont.subContinuation == null) {
+              receiver.push(next);
+              receiver.completed();
+              return null;
+            } else {
+              previousRac = next;
+              break;
+            }
+          }
+
+          final Signal signal = receiver.push(next);
+          if (signal != Signal.GO) {
+            return handleNonGoCases(signal, cont.iter, receiver, cont);
+          }
+        }
+
+        if (cont.subContinuation == null) {
+          receiver.completed();
+          return null;
+        }
+      }
+
+      continuation = cont.subContinuation;
+    }
+
+    AtomicReference<Iterator<RowsAndColumns>> iterHolder = new 
AtomicReference<>();
+
+    final Closeable retVal = child.goOrContinue(
+        continuation,
+        new Receiver()
+        {
+          @Override
+          public Signal push(RowsAndColumns rac)
+          {
+            ensureMaxRowsMaterializedConstraint(rac.numRows());
+            return handlePush(rac, receiver, iterHolder);
+          }
+
+          @Override
+          public void completed()
+          {
+            if (previousRac != null) {
+              receiver.push(previousRac);
+              previousRac = null;
+            }
+            if (iterHolder.get() == null) {
+              receiver.completed();
+            }
+          }
+        }
+    );
+
+    if (iterHolder.get() != null || retVal != null) {
+      return new Continuation(
+          iterHolder.get(),
+          retVal
+      );
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Iterator implementation for gluing partitioned RowsAndColumns.
+   * It handles the boundaries of partitions within a single RAC and 
potentially glues
+   * the first partition of the current RAC with the previous RAC if needed.
+   */
+  private class GluedRACsIterator implements Iterator<RowsAndColumns>
+  {
+    private final RowsAndColumns rac;
+    private final int[] boundaries;
+    private int currentIndex = 0;
+    private boolean firstPartitionHandled = false;
+
+    public GluedRACsIterator(RowsAndColumns rac)
+    {
+      this.rac = rac;
+      ClusteredGroupPartitioner groupPartitioner = 
rac.as(ClusteredGroupPartitioner.class);
+      if (groupPartitioner == null) {
+        groupPartitioner = new DefaultClusteredGroupPartitioner(rac);
+      }
+      this.boundaries = groupPartitioner.computeBoundaries(partitionColumns);
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      return currentIndex < boundaries.length - 1;
+    }
+
+    /**
+     * Retrieves the next partition in the RowsAndColumns. If the first 
partition has not been handled yet,
+     * it may be glued with the previous RowsAndColumns if the partition 
columns match.
+     *
+     * @return The next RowsAndColumns partition, potentially glued with the 
previous one.
+     * @throws NoSuchElementException if there are no more partitions.
+     */
+    @Override
+    public RowsAndColumns next()
+    {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+
+      if (!firstPartitionHandled) {
+        firstPartitionHandled = true;
+        int start = boundaries[currentIndex];
+        int end = boundaries[currentIndex + 1];
+        LimitedRowsAndColumns limitedRAC = new LimitedRowsAndColumns(rac, 
start, end);
+
+        if (isGlueingNeeded(previousRac, limitedRAC)) {
+          RowsAndColumns gluedRAC = getConcatRacForFirstPartition(previousRac, 
limitedRAC);
+          ensureMaxRowsMaterializedConstraint(gluedRAC.numRows());
+          previousRac = null;
+          currentIndex++;
+          return gluedRAC;
+        } else {
+          if (previousRac != null) {
+            RowsAndColumns temp = previousRac;
+            previousRac = null;
+            return temp;
+          }
+        }
+      }
+
+      int start = boundaries[currentIndex];
+      int end = boundaries[currentIndex + 1];
+      currentIndex++;
+      return new LimitedRowsAndColumns(rac, start, end);
+    }
+
+    /**
+     * Determines whether gluing is needed between the previous RowsAndColumns 
and the first partition
+     * of the current RowsAndColumns based on the partition columns. If the 
columns match, the two RACs
+     * will be glued together.
+     *
+     * @param previousRac The previous RowsAndColumns.
+     * @param firstPartitionOfCurrentRac The first partition of the current 
RowsAndColumns.
+     * @return true if gluing is needed, false otherwise.
+     */
+    private boolean isGlueingNeeded(RowsAndColumns previousRac, RowsAndColumns 
firstPartitionOfCurrentRac)
+    {
+      if (previousRac == null) {
+        return false;
+      }
+
+      final ConcatRowsAndColumns concatRac = 
getConcatRacForFirstPartition(previousRac, firstPartitionOfCurrentRac);
+      for (String column : partitionColumns) {
+        final Column theCol = concatRac.findColumn(column);
+        if (theCol == null) {
+          continue;
+        }
+        final ColumnAccessor accessor = theCol.toAccessor();
+        // Compare 1st row of previousRac and firstPartitionOfCurrentRac in 
[previousRac, firstPartitionOfCurrentRac] form.
+        int comparison = accessor.compareRows(0, previousRac.numRows());
+        if (comparison != 0) {
+          return false;
+        }
+      }
+      return true;

Review Comment:
   `return true` as the default?



##########
processing/src/main/java/org/apache/druid/query/operator/GlueingPartitioningOperator.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LimitedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
+import 
org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This glueing partitioning operator is supposed to continuously receive 
data, and output batches of partitioned RACs.
+ * It maintains a last-partitioning-boundary of the last-pushed-RAC, and 
attempts to glue it with the next RAC it receives,
+ * ensuring that partitions are handled correctly, even across multiple RACs.
+ * <p>
+ * Additionally, this assumes that data has been pre-sorted according to the 
partitioning columns.
+ */
+public class GlueingPartitioningOperator extends AbstractPartitioningOperator
+{
+  private final int maxRowsMaterialized;
+  private RowsAndColumns previousRac;
+
+  private static final int MAX_ROWS_MATERIALIZED_NO_LIMIT = -1;
+
+  public GlueingPartitioningOperator(
+      Operator child,
+      List<String> partitionColumns
+  )
+  {
+    this(child, partitionColumns, MAX_ROWS_MATERIALIZED_NO_LIMIT);
+  }
+
+  public GlueingPartitioningOperator(
+      Operator child,
+      List<String> partitionColumns,
+      int maxRowsMaterialized
+  )
+  {
+    super(partitionColumns, child);
+    this.maxRowsMaterialized = maxRowsMaterialized;
+  }
+
+  @Override
+  public Closeable goOrContinue(Closeable continuation, Receiver receiver)
+  {
+    if (continuation != null) {
+      Continuation cont = (Continuation) continuation;
+
+      if (cont.iter != null) {
+        while (cont.iter.hasNext()) {
+          RowsAndColumns next = cont.iter.next();
+
+          if (!cont.iter.hasNext()) {
+            // We are at the last RAC. Process it only if subContinuation is 
null, otherwise save it in previousRac.
+            if (cont.subContinuation == null) {
+              receiver.push(next);
+              receiver.completed();
+              return null;
+            } else {
+              previousRac = next;
+              break;
+            }
+          }
+
+          final Signal signal = receiver.push(next);
+          if (signal != Signal.GO) {
+            return handleNonGoCases(signal, cont.iter, receiver, cont);
+          }
+        }
+
+        if (cont.subContinuation == null) {
+          receiver.completed();
+          return null;
+        }
+      }
+
+      continuation = cont.subContinuation;
+    }
+
+    AtomicReference<Iterator<RowsAndColumns>> iterHolder = new 
AtomicReference<>();
+
+    final Closeable retVal = child.goOrContinue(
+        continuation,
+        new Receiver()
+        {
+          @Override
+          public Signal push(RowsAndColumns rac)
+          {
+            ensureMaxRowsMaterializedConstraint(rac.numRows());
+            return handlePush(rac, receiver, iterHolder);
+          }
+
+          @Override
+          public void completed()
+          {
+            if (previousRac != null) {
+              receiver.push(previousRac);
+              previousRac = null;
+            }
+            if (iterHolder.get() == null) {
+              receiver.completed();
+            }
+          }
+        }
+    );
+
+    if (iterHolder.get() != null || retVal != null) {
+      return new Continuation(
+          iterHolder.get(),
+          retVal
+      );
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Iterator implementation for gluing partitioned RowsAndColumns.
+   * It handles the boundaries of partitions within a single RAC and 
potentially glues
+   * the first partition of the current RAC with the previous RAC if needed.
+   */
+  private class GluedRACsIterator implements Iterator<RowsAndColumns>
+  {
+    private final RowsAndColumns rac;
+    private final int[] boundaries;
+    private int currentIndex = 0;
+    private boolean firstPartitionHandled = false;
+
+    public GluedRACsIterator(RowsAndColumns rac)
+    {
+      this.rac = rac;
+      ClusteredGroupPartitioner groupPartitioner = 
rac.as(ClusteredGroupPartitioner.class);
+      if (groupPartitioner == null) {
+        groupPartitioner = new DefaultClusteredGroupPartitioner(rac);
+      }
+      this.boundaries = groupPartitioner.computeBoundaries(partitionColumns);
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      return currentIndex < boundaries.length - 1;
+    }
+
+    /**
+     * Retrieves the next partition in the RowsAndColumns. If the first 
partition has not been handled yet,
+     * it may be glued with the previous RowsAndColumns if the partition 
columns match.
+     *
+     * @return The next RowsAndColumns partition, potentially glued with the 
previous one.
+     * @throws NoSuchElementException if there are no more partitions.
+     */
+    @Override
+    public RowsAndColumns next()
+    {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+
+      if (!firstPartitionHandled) {
+        firstPartitionHandled = true;
+        int start = boundaries[currentIndex];
+        int end = boundaries[currentIndex + 1];
+        LimitedRowsAndColumns limitedRAC = new LimitedRowsAndColumns(rac, 
start, end);
+
+        if (isGlueingNeeded(previousRac, limitedRAC)) {
+          RowsAndColumns gluedRAC = getConcatRacForFirstPartition(previousRac, 
limitedRAC);
+          ensureMaxRowsMaterializedConstraint(gluedRAC.numRows());
+          previousRac = null;
+          currentIndex++;
+          return gluedRAC;
+        } else {
+          if (previousRac != null) {
+            RowsAndColumns temp = previousRac;
+            previousRac = null;
+            return temp;
+          }
+        }
+      }
+
+      int start = boundaries[currentIndex];
+      int end = boundaries[currentIndex + 1];
+      currentIndex++;
+      return new LimitedRowsAndColumns(rac, start, end);
+    }
+
+    /**
+     * Determines whether gluing is needed between the previous RowsAndColumns 
and the first partition
+     * of the current RowsAndColumns based on the partition columns. If the 
columns match, the two RACs
+     * will be glued together.
+     *
+     * @param previousRac The previous RowsAndColumns.
+     * @param firstPartitionOfCurrentRac The first partition of the current 
RowsAndColumns.
+     * @return true if gluing is needed, false otherwise.
+     */
+    private boolean isGlueingNeeded(RowsAndColumns previousRac, RowsAndColumns 
firstPartitionOfCurrentRac)
+    {
+      if (previousRac == null) {
+        return false;
+      }
+
+      final ConcatRowsAndColumns concatRac = 
getConcatRacForFirstPartition(previousRac, firstPartitionOfCurrentRac);
+      for (String column : partitionColumns) {
+        final Column theCol = concatRac.findColumn(column);
+        if (theCol == null) {
+          continue;

Review Comment:
   why not error or `true`?



##########
processing/src/main/java/org/apache/druid/query/operator/GlueingPartitioningOperator.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LimitedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
+import 
org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This glueing partitioning operator is supposed to continuously receive 
data, and output batches of partitioned RACs.
+ * It maintains a last-partitioning-boundary of the last-pushed-RAC, and 
attempts to glue it with the next RAC it receives,
+ * ensuring that partitions are handled correctly, even across multiple RACs.
+ * <p>
+ * Additionally, this assumes that data has been pre-sorted according to the 
partitioning columns.
+ */
+public class GlueingPartitioningOperator extends AbstractPartitioningOperator
+{
+  private final int maxRowsMaterialized;
+  private RowsAndColumns previousRac;
+
+  private static final int MAX_ROWS_MATERIALIZED_NO_LIMIT = -1;
+
+  public GlueingPartitioningOperator(
+      Operator child,
+      List<String> partitionColumns
+  )
+  {
+    this(child, partitionColumns, MAX_ROWS_MATERIALIZED_NO_LIMIT);
+  }
+
+  public GlueingPartitioningOperator(
+      Operator child,
+      List<String> partitionColumns,
+      int maxRowsMaterialized
+  )
+  {
+    super(partitionColumns, child);
+    this.maxRowsMaterialized = maxRowsMaterialized;
+  }
+
+  @Override
+  public Closeable goOrContinue(Closeable continuation, Receiver receiver)
+  {
+    if (continuation != null) {
+      Continuation cont = (Continuation) continuation;
+
+      if (cont.iter != null) {
+        while (cont.iter.hasNext()) {
+          RowsAndColumns next = cont.iter.next();
+
+          if (!cont.iter.hasNext()) {
+            // We are at the last RAC. Process it only if subContinuation is 
null, otherwise save it in previousRac.
+            if (cont.subContinuation == null) {
+              receiver.push(next);
+              receiver.completed();
+              return null;
+            } else {
+              previousRac = next;
+              break;
+            }
+          }
+
+          final Signal signal = receiver.push(next);
+          if (signal != Signal.GO) {
+            return handleNonGoCases(signal, cont.iter, receiver, cont);
+          }
+        }
+
+        if (cont.subContinuation == null) {
+          receiver.completed();
+          return null;
+        }
+      }
+
+      continuation = cont.subContinuation;
+    }
+
+    AtomicReference<Iterator<RowsAndColumns>> iterHolder = new 
AtomicReference<>();
+
+    final Closeable retVal = child.goOrContinue(
+        continuation,
+        new Receiver()
+        {
+          @Override
+          public Signal push(RowsAndColumns rac)
+          {
+            ensureMaxRowsMaterializedConstraint(rac.numRows());
+            return handlePush(rac, receiver, iterHolder);
+          }
+
+          @Override
+          public void completed()
+          {
+            if (previousRac != null) {
+              receiver.push(previousRac);
+              previousRac = null;
+            }
+            if (iterHolder.get() == null) {
+              receiver.completed();
+            }
+          }
+        }
+    );
+
+    if (iterHolder.get() != null || retVal != null) {
+      return new Continuation(
+          iterHolder.get(),
+          retVal
+      );
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Iterator implementation for gluing partitioned RowsAndColumns.
+   * It handles the boundaries of partitions within a single RAC and 
potentially glues
+   * the first partition of the current RAC with the previous RAC if needed.
+   */
+  private class GluedRACsIterator implements Iterator<RowsAndColumns>
+  {
+    private final RowsAndColumns rac;
+    private final int[] boundaries;
+    private int currentIndex = 0;
+    private boolean firstPartitionHandled = false;
+
+    public GluedRACsIterator(RowsAndColumns rac)
+    {
+      this.rac = rac;
+      ClusteredGroupPartitioner groupPartitioner = 
rac.as(ClusteredGroupPartitioner.class);
+      if (groupPartitioner == null) {
+        groupPartitioner = new DefaultClusteredGroupPartitioner(rac);
+      }
+      this.boundaries = groupPartitioner.computeBoundaries(partitionColumns);
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      return currentIndex < boundaries.length - 1;
+    }
+
+    /**
+     * Retrieves the next partition in the RowsAndColumns. If the first 
partition has not been handled yet,
+     * it may be glued with the previous RowsAndColumns if the partition 
columns match.
+     *
+     * @return The next RowsAndColumns partition, potentially glued with the 
previous one.
+     * @throws NoSuchElementException if there are no more partitions.
+     */
+    @Override
+    public RowsAndColumns next()
+    {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+
+      if (!firstPartitionHandled) {
+        firstPartitionHandled = true;
+        int start = boundaries[currentIndex];
+        int end = boundaries[currentIndex + 1];
+        LimitedRowsAndColumns limitedRAC = new LimitedRowsAndColumns(rac, 
start, end);
+
+        if (isGlueingNeeded(previousRac, limitedRAC)) {
+          RowsAndColumns gluedRAC = getConcatRacForFirstPartition(previousRac, 
limitedRAC);
+          ensureMaxRowsMaterializedConstraint(gluedRAC.numRows());
+          previousRac = null;
+          currentIndex++;
+          return gluedRAC;
+        } else {
+          if (previousRac != null) {
+            RowsAndColumns temp = previousRac;
+            previousRac = null;
+            return temp;
+          }
+        }
+      }
+
+      int start = boundaries[currentIndex];
+      int end = boundaries[currentIndex + 1];
+      currentIndex++;
+      return new LimitedRowsAndColumns(rac, start, end);
+    }
+
+    /**
+     * Determines whether gluing is needed between the previous RowsAndColumns 
and the first partition
+     * of the current RowsAndColumns based on the partition columns. If the 
columns match, the two RACs
+     * will be glued together.
+     *
+     * @param previousRac The previous RowsAndColumns.
+     * @param firstPartitionOfCurrentRac The first partition of the current 
RowsAndColumns.
+     * @return true if gluing is needed, false otherwise.
+     */
+    private boolean isGlueingNeeded(RowsAndColumns previousRac, RowsAndColumns 
firstPartitionOfCurrentRac)
+    {
+      if (previousRac == null) {
+        return false;
+      }
+
+      final ConcatRowsAndColumns concatRac = 
getConcatRacForFirstPartition(previousRac, firstPartitionOfCurrentRac);
+      for (String column : partitionColumns) {
+        final Column theCol = concatRac.findColumn(column);
+        if (theCol == null) {
+          continue;
+        }
+        final ColumnAccessor accessor = theCol.toAccessor();
+        // Compare 1st row of previousRac and firstPartitionOfCurrentRac in 
[previousRac, firstPartitionOfCurrentRac] form.
+        int comparison = accessor.compareRows(0, previousRac.numRows());
+        if (comparison != 0) {
+          return false;
+        }

Review Comment:
   isn't this the `return true` case?



##########
processing/src/main/java/org/apache/druid/query/operator/AbstractPartitioningOperator.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractPartitioningOperator implements Operator
+{
+  protected final List<String> partitionColumns;
+  protected final Operator child;
+
+  public AbstractPartitioningOperator(
+      List<String> partitionColumns,
+      Operator child
+  )
+  {
+    this.partitionColumns = partitionColumns;
+    this.child = child;
+  }
+
+  protected static class Continuation implements Closeable
+  {
+    Iterator<RowsAndColumns> iter;
+    Closeable subContinuation;
+
+    public Continuation(Iterator<RowsAndColumns> iter, Closeable 
subContinuation)
+    {
+      this.iter = iter;
+      this.subContinuation = subContinuation;
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+      if (subContinuation != null) {
+        subContinuation.close();
+      }
+    }
+  }
+
+  protected Signal handlePush(RowsAndColumns rac, Receiver receiver, 
AtomicReference<Iterator<RowsAndColumns>> iterHolder)
+  {
+    if (rac == null) {
+      throw DruidException.defensive("Should never get a null rac here.");
+    }
+
+    Iterator<RowsAndColumns> partitionsIter = getIteratorForRAC(rac);
+
+    AtomicReference<Signal> keepItGoing = new AtomicReference<>(Signal.GO);

Review Comment:
   why use an `AtomicReference` ; the `handleKeepItGoing` method is `void` why 
not use the return value?



##########
processing/src/main/java/org/apache/druid/query/operator/GlueingPartitioningOperator.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LimitedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
+import 
org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This glueing partitioning operator is supposed to continuously receive 
data, and output batches of partitioned RACs.
+ * It maintains a last-partitioning-boundary of the last-pushed-RAC, and 
attempts to glue it with the next RAC it receives,
+ * ensuring that partitions are handled correctly, even across multiple RACs.
+ * <p>
+ * Additionally, this assumes that data has been pre-sorted according to the 
partitioning columns.
+ */
+public class GlueingPartitioningOperator extends AbstractPartitioningOperator
+{
+  private final int maxRowsMaterialized;
+  private RowsAndColumns previousRac;
+
+  private static final int MAX_ROWS_MATERIALIZED_NO_LIMIT = -1;
+
+  public GlueingPartitioningOperator(
+      Operator child,
+      List<String> partitionColumns
+  )
+  {
+    this(child, partitionColumns, MAX_ROWS_MATERIALIZED_NO_LIMIT);
+  }
+
+  public GlueingPartitioningOperator(
+      Operator child,
+      List<String> partitionColumns,
+      int maxRowsMaterialized
+  )
+  {
+    super(partitionColumns, child);
+    this.maxRowsMaterialized = maxRowsMaterialized;
+  }
+
+  @Override
+  public Closeable goOrContinue(Closeable continuation, Receiver receiver)
+  {
+    if (continuation != null) {
+      Continuation cont = (Continuation) continuation;
+
+      if (cont.iter != null) {
+        while (cont.iter.hasNext()) {
+          RowsAndColumns next = cont.iter.next();
+
+          if (!cont.iter.hasNext()) {
+            // We are at the last RAC. Process it only if subContinuation is 
null, otherwise save it in previousRac.
+            if (cont.subContinuation == null) {
+              receiver.push(next);
+              receiver.completed();
+              return null;
+            } else {
+              previousRac = next;
+              break;
+            }
+          }
+
+          final Signal signal = receiver.push(next);
+          if (signal != Signal.GO) {
+            return handleNonGoCases(signal, cont.iter, receiver, cont);
+          }
+        }
+
+        if (cont.subContinuation == null) {
+          receiver.completed();
+          return null;
+        }
+      }
+
+      continuation = cont.subContinuation;
+    }
+
+    AtomicReference<Iterator<RowsAndColumns>> iterHolder = new 
AtomicReference<>();
+
+    final Closeable retVal = child.goOrContinue(
+        continuation,
+        new Receiver()
+        {
+          @Override
+          public Signal push(RowsAndColumns rac)
+          {
+            ensureMaxRowsMaterializedConstraint(rac.numRows());
+            return handlePush(rac, receiver, iterHolder);
+          }
+
+          @Override
+          public void completed()
+          {
+            if (previousRac != null) {
+              receiver.push(previousRac);
+              previousRac = null;
+            }
+            if (iterHolder.get() == null) {
+              receiver.completed();
+            }
+          }
+        }
+    );
+
+    if (iterHolder.get() != null || retVal != null) {
+      return new Continuation(
+          iterHolder.get(),
+          retVal
+      );
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Iterator implementation for gluing partitioned RowsAndColumns.
+   * It handles the boundaries of partitions within a single RAC and 
potentially glues
+   * the first partition of the current RAC with the previous RAC if needed.
+   */
+  private class GluedRACsIterator implements Iterator<RowsAndColumns>
+  {
+    private final RowsAndColumns rac;
+    private final int[] boundaries;
+    private int currentIndex = 0;
+    private boolean firstPartitionHandled = false;
+
+    public GluedRACsIterator(RowsAndColumns rac)
+    {
+      this.rac = rac;
+      ClusteredGroupPartitioner groupPartitioner = 
rac.as(ClusteredGroupPartitioner.class);
+      if (groupPartitioner == null) {
+        groupPartitioner = new DefaultClusteredGroupPartitioner(rac);
+      }
+      this.boundaries = groupPartitioner.computeBoundaries(partitionColumns);
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      return currentIndex < boundaries.length - 1;
+    }
+
+    /**
+     * Retrieves the next partition in the RowsAndColumns. If the first 
partition has not been handled yet,
+     * it may be glued with the previous RowsAndColumns if the partition 
columns match.
+     *
+     * @return The next RowsAndColumns partition, potentially glued with the 
previous one.
+     * @throws NoSuchElementException if there are no more partitions.
+     */
+    @Override
+    public RowsAndColumns next()
+    {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+
+      if (!firstPartitionHandled) {
+        firstPartitionHandled = true;
+        int start = boundaries[currentIndex];
+        int end = boundaries[currentIndex + 1];
+        LimitedRowsAndColumns limitedRAC = new LimitedRowsAndColumns(rac, 
start, end);
+
+        if (isGlueingNeeded(previousRac, limitedRAC)) {
+          RowsAndColumns gluedRAC = getConcatRacForFirstPartition(previousRac, 
limitedRAC);
+          ensureMaxRowsMaterializedConstraint(gluedRAC.numRows());
+          previousRac = null;
+          currentIndex++;
+          return gluedRAC;
+        } else {
+          if (previousRac != null) {
+            RowsAndColumns temp = previousRac;
+            previousRac = null;
+            return temp;
+          }
+        }
+      }
+
+      int start = boundaries[currentIndex];
+      int end = boundaries[currentIndex + 1];
+      currentIndex++;
+      return new LimitedRowsAndColumns(rac, start, end);
+    }
+
+    /**
+     * Determines whether gluing is needed between the previous RowsAndColumns 
and the first partition
+     * of the current RowsAndColumns based on the partition columns. If the 
columns match, the two RACs
+     * will be glued together.
+     *
+     * @param previousRac The previous RowsAndColumns.
+     * @param firstPartitionOfCurrentRac The first partition of the current 
RowsAndColumns.
+     * @return true if gluing is needed, false otherwise.
+     */
+    private boolean isGlueingNeeded(RowsAndColumns previousRac, RowsAndColumns 
firstPartitionOfCurrentRac)

Review Comment:
   I wonder if this worths the extra hassle...why not just assume `true` in all 
cases?
   the RACs should be pretty big already; so the extra concat will be amortized 
away



##########
processing/src/main/java/org/apache/druid/query/operator/AbstractPartitioningOperator.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractPartitioningOperator implements Operator
+{
+  protected final List<String> partitionColumns;
+  protected final Operator child;
+
+  public AbstractPartitioningOperator(
+      List<String> partitionColumns,
+      Operator child
+  )
+  {
+    this.partitionColumns = partitionColumns;
+    this.child = child;
+  }
+
+  protected static class Continuation implements Closeable
+  {
+    Iterator<RowsAndColumns> iter;
+    Closeable subContinuation;
+
+    public Continuation(Iterator<RowsAndColumns> iter, Closeable 
subContinuation)
+    {
+      this.iter = iter;
+      this.subContinuation = subContinuation;
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+      if (subContinuation != null) {
+        subContinuation.close();
+      }
+    }
+  }
+
+  protected Signal handlePush(RowsAndColumns rac, Receiver receiver, 
AtomicReference<Iterator<RowsAndColumns>> iterHolder)
+  {
+    if (rac == null) {
+      throw DruidException.defensive("Should never get a null rac here.");
+    }
+
+    Iterator<RowsAndColumns> partitionsIter = getIteratorForRAC(rac);
+
+    AtomicReference<Signal> keepItGoing = new AtomicReference<>(Signal.GO);
+    while (keepItGoing.get() == Signal.GO && partitionsIter.hasNext()) {
+      handleKeepItGoing(keepItGoing, partitionsIter, receiver);
+    }
+
+    if (keepItGoing.get() == Signal.PAUSE && partitionsIter.hasNext()) {
+      iterHolder.set(partitionsIter);
+      return Signal.PAUSE;
+    }
+
+    return keepItGoing.get();
+  }
+
+  protected abstract Iterator<RowsAndColumns> getIteratorForRAC(RowsAndColumns 
rac);
+
+  protected abstract void handleKeepItGoing(AtomicReference<Signal> signalRef, 
Iterator<RowsAndColumns> iterator, Receiver receiver);

Review Comment:
   I don't really see how these methods make the implementing class's 
simpler...they don't really hide away much complexity...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to