cryptoe commented on code in PR #15470:
URL: https://github.com/apache/druid/pull/15470#discussion_r1515945562


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.FrameRowTooLargeException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.util.SettableLongVirtualColumn;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.OffsetLimit;
+import org.apache.druid.query.operator.Operator;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
+import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
+{
+  private static final Logger log = new 
Logger(WindowOperatorQueryFrameProcessor.class);
+  private final WindowOperatorQuery query;
+
+  private final List<OperatorFactory> operatorFactoryList;
+  private final ObjectMapper jsonMapper;
+  private final RowSignature outputStageSignature;
+  private final ArrayList<RowsAndColumns> frameRowsAndCols;
+  private final ArrayList<RowsAndColumns> resultRowAndCols;
+  private final ReadableFrameChannel inputChannel;
+  private final WritableFrameChannel outputChannel;
+  private final FrameWriterFactory frameWriterFactory;
+  private final FrameReader frameReader;
+  private final SettableLongVirtualColumn partitionBoostVirtualColumn;
+  ArrayList<ResultRow> objectsOfASingleRac;
+  List<Integer> partitionColsIndex;
+  private long currentAllocatorCapacity; // Used for generating 
FrameRowTooLargeException if needed
+  private Cursor frameCursor = null;
+  private Supplier<ResultRow> rowSupplierFromFrameCursor;
+  private ResultRow outputRow = null;
+  private FrameWriter frameWriter = null;
+
+  public WindowOperatorQueryFrameProcessor(
+      WindowOperatorQuery query,
+      ReadableFrameChannel inputChannel,
+      WritableFrameChannel outputChannel,
+      FrameWriterFactory frameWriterFactory,
+      FrameReader frameReader,
+      ObjectMapper jsonMapper,
+      final List<OperatorFactory> operatorFactoryList,
+      final RowSignature rowSignature
+  )
+  {
+    this.inputChannel = inputChannel;
+    this.outputChannel = outputChannel;
+    this.frameWriterFactory = frameWriterFactory;
+    this.partitionBoostVirtualColumn = new 
SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
+    this.operatorFactoryList = operatorFactoryList;
+    this.outputStageSignature = rowSignature;
+    this.jsonMapper = jsonMapper;
+    this.frameReader = frameReader;
+    this.query = query;
+    this.frameRowsAndCols = new ArrayList<>();
+    this.resultRowAndCols = new ArrayList<>();
+    this.objectsOfASingleRac = new ArrayList<>();
+    this.partitionColsIndex = new ArrayList<>();
+  }
+
+  private static VirtualColumns makeVirtualColumnsForFrameWriter(
+      @Nullable final VirtualColumn partitionBoostVirtualColumn,
+      final ObjectMapper jsonMapper,
+      final WindowOperatorQuery query
+  )
+  {
+    List<VirtualColumn> virtualColumns = new ArrayList<>();
+
+    virtualColumns.add(partitionBoostVirtualColumn);
+    final VirtualColumn segmentGranularityVirtualColumn =
+        QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
+    if (segmentGranularityVirtualColumn != null) {
+      virtualColumns.add(segmentGranularityVirtualColumn);
+    }
+
+    return VirtualColumns.create(virtualColumns);
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return Collections.singletonList(inputChannel);
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.singletonList(outputChannel);
+  }
+
+  @Override
+  public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
+  {
+    /*
+     *
+     * PARTITION BY A ORDER BY B
+     *
+     * Frame 1   -> rac1
+     * A  B
+     * 1, 2
+     * 1, 3
+     * 2, 1 --> key changed
+     * 2, 2
+     *
+     *
+     * Frame 2 -> rac2
+     * 3, 1 --> key changed
+     * 3, 2
+     * 3, 3
+     * 3, 4
+     *
+     * Frame 3 -> rac3
+     *
+     * 3, 5
+     * 3, 6
+     * 4, 1 --> key changed
+     * 4, 2
+     *
+     * In case of empty OVER clause, all these racs need to be added to a 
single rows and columns
+     * to be processed. The way we can do this is to use a ConcatRowsAndColumns
+     * ConcatRC [rac1, rac2, rac3]
+     * Run all ops on this
+     *
+     *
+     * The flow would look like:
+     * 1. Validate if the operator has an empty OVER clause
+     * 2. If 1 is true make a giant rows and columns (R&C) using concat as 
shown above
+     *    Let all operators run amok on that R&C
+     * 3. If 1 is false
+     *    Read a frame
+     *    keep the older row in a class variable
+     *    check row by row and compare current with older row to check if 
partition boundary is reached
+     *    when frame partition by changes
+     *    create R&C for those particular set of columns, they would have the 
same partition key
+     *    output will be a single R&C
+     *    write to output channel
+     *
+     *
+     *  Future thoughts:
+     *
+     *  1. We are writing 1 partition to each frame in this way. In case of 
high cardinality data

Review Comment:
   Lets create future github issues for these things. 
   They way to do this might be do go over the data once more. 
   
   MSQ has a limit on number of partitions. I feel with high cardinality data, 
we would easily overwhelm the stage below incase we are not shuffling. WDYT ?
   Local developer testing would go a long way in building confidence here. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.FrameRowTooLargeException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.util.SettableLongVirtualColumn;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.OffsetLimit;
+import org.apache.druid.query.operator.Operator;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
+import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
+{
+  private static final Logger log = new 
Logger(WindowOperatorQueryFrameProcessor.class);
+  private final WindowOperatorQuery query;
+
+  private final List<OperatorFactory> operatorFactoryList;
+  private final ObjectMapper jsonMapper;

Review Comment:
   Can the object mapper be declared static ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.FrameRowTooLargeException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.util.SettableLongVirtualColumn;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.OffsetLimit;
+import org.apache.druid.query.operator.Operator;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
+import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
+{
+  private static final Logger log = new 
Logger(WindowOperatorQueryFrameProcessor.class);
+  private final WindowOperatorQuery query;
+
+  private final List<OperatorFactory> operatorFactoryList;
+  private final ObjectMapper jsonMapper;
+  private final RowSignature outputStageSignature;
+  private final ArrayList<RowsAndColumns> frameRowsAndCols;
+  private final ArrayList<RowsAndColumns> resultRowAndCols;
+  private final ReadableFrameChannel inputChannel;
+  private final WritableFrameChannel outputChannel;
+  private final FrameWriterFactory frameWriterFactory;
+  private final FrameReader frameReader;
+  private final SettableLongVirtualColumn partitionBoostVirtualColumn;
+  ArrayList<ResultRow> objectsOfASingleRac;
+  List<Integer> partitionColsIndex;
+  private long currentAllocatorCapacity; // Used for generating 
FrameRowTooLargeException if needed
+  private Cursor frameCursor = null;
+  private Supplier<ResultRow> rowSupplierFromFrameCursor;
+  private ResultRow outputRow = null;
+  private FrameWriter frameWriter = null;
+
+  public WindowOperatorQueryFrameProcessor(
+      WindowOperatorQuery query,
+      ReadableFrameChannel inputChannel,
+      WritableFrameChannel outputChannel,
+      FrameWriterFactory frameWriterFactory,
+      FrameReader frameReader,
+      ObjectMapper jsonMapper,
+      final List<OperatorFactory> operatorFactoryList,
+      final RowSignature rowSignature
+  )
+  {
+    this.inputChannel = inputChannel;
+    this.outputChannel = outputChannel;
+    this.frameWriterFactory = frameWriterFactory;
+    this.partitionBoostVirtualColumn = new 
SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
+    this.operatorFactoryList = operatorFactoryList;
+    this.outputStageSignature = rowSignature;
+    this.jsonMapper = jsonMapper;
+    this.frameReader = frameReader;
+    this.query = query;
+    this.frameRowsAndCols = new ArrayList<>();
+    this.resultRowAndCols = new ArrayList<>();
+    this.objectsOfASingleRac = new ArrayList<>();
+    this.partitionColsIndex = new ArrayList<>();
+  }
+
+  private static VirtualColumns makeVirtualColumnsForFrameWriter(
+      @Nullable final VirtualColumn partitionBoostVirtualColumn,
+      final ObjectMapper jsonMapper,
+      final WindowOperatorQuery query
+  )
+  {
+    List<VirtualColumn> virtualColumns = new ArrayList<>();
+
+    virtualColumns.add(partitionBoostVirtualColumn);
+    final VirtualColumn segmentGranularityVirtualColumn =
+        QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
+    if (segmentGranularityVirtualColumn != null) {
+      virtualColumns.add(segmentGranularityVirtualColumn);
+    }
+
+    return VirtualColumns.create(virtualColumns);
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return Collections.singletonList(inputChannel);
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.singletonList(outputChannel);
+  }
+
+  @Override
+  public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
+  {
+    /*
+     *
+     * PARTITION BY A ORDER BY B
+     *
+     * Frame 1   -> rac1
+     * A  B
+     * 1, 2
+     * 1, 3
+     * 2, 1 --> key changed
+     * 2, 2
+     *
+     *
+     * Frame 2 -> rac2
+     * 3, 1 --> key changed
+     * 3, 2
+     * 3, 3
+     * 3, 4
+     *
+     * Frame 3 -> rac3
+     *
+     * 3, 5
+     * 3, 6
+     * 4, 1 --> key changed
+     * 4, 2
+     *
+     * In case of empty OVER clause, all these racs need to be added to a 
single rows and columns
+     * to be processed. The way we can do this is to use a ConcatRowsAndColumns

Review Comment:
   
https://github.com/apache/druid/blob/5c7512c54db85f1db6e550777a6cdad04bd5a10e/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java#L46
 has a materialized arraylist . Woudn't that cause memory issues ?
   



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java:
##########
@@ -0,0 +1,1401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.msq.indexing.MSQSpec;
+import org.apache.druid.msq.indexing.MSQTuningConfig;
+import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
+import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
+import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
+import org.apache.druid.msq.test.CounterSnapshotMatcher;
+import org.apache.druid.msq.test.MSQTestBase;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.UnnestDataSource;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.operator.ColumnWithDirection;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.NaiveSortOperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.operator.window.WindowFrame;
+import org.apache.druid.query.operator.window.WindowFramedAggregateProcessor;
+import org.apache.druid.query.operator.window.WindowOperatorFactory;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.spec.LegacySegmentSpec;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.join.JoinType;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
+import org.apache.druid.sql.calcite.filtration.Filtration;
+import org.apache.druid.sql.calcite.planner.ColumnMapping;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
+import org.apache.druid.sql.calcite.rel.DruidQuery;
+import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+@RunWith(Parameterized.class)
+public class MSQWindowTest extends MSQTestBase

Review Comment:
   It looks like we only have select tests. I would be good to also add 
insert/replace tests here as well. 
   Also I think you donot need all of the below context since they were targed 
more for MSQSelect tests. 
   ```
     public static final String QUERY_RESULTS_WITH_DURABLE_STORAGE = 
"query_results_with_durable_storage";
   
     public static final String QUERY_RESULTS_WITH_DEFAULT = 
"query_results_with_default_storage";
   
     public static final Map<String, Object> 
QUERY_RESULTS_WITH_DURABLE_STORAGE_CONTEXT =
         ImmutableMap.<String, Object>builder()
                     .putAll(DURABLE_STORAGE_MSQ_CONTEXT)
                     .put(MultiStageQueryContext.CTX_ROWS_PER_PAGE, 2)
                     .put(
                         MultiStageQueryContext.CTX_SELECT_DESTINATION,
                         
StringUtils.toLowerCase(MSQSelectDestination.DURABLESTORAGE.getName())
                     )
                     .build();
   
   
     public static final Map<String, Object> QUERY_RESULTS_WITH_DEFAULT_CONTEXT 
=
         ImmutableMap.<String, Object>builder()
                     .putAll(DEFAULT_MSQ_CONTEXT)
                     .put(
                         MultiStageQueryContext.CTX_SELECT_DESTINATION,
                         
StringUtils.toLowerCase(MSQSelectDestination.DURABLESTORAGE.getName())
                     )
                     .build();
                     
   ```
                     
   
   You can add one window function test in MSQSelect test for sanity thought. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.frame.key.KeyOrder;
+import org.apache.druid.msq.input.stage.StageInputSpec;
+import org.apache.druid.msq.kernel.HashShuffleSpec;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
+import org.apache.druid.msq.kernel.ShuffleSpec;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.operator.ColumnWithDirection;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.NaiveSortOperatorFactory;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.operator.window.WindowOperatorFactory;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
+{
+  private final ObjectMapper jsonMapper;
+
+  public WindowOperatorQueryKit(ObjectMapper jsonMapper)
+  {
+    this.jsonMapper = jsonMapper;
+  }
+
+  @Override
+  public QueryDefinition makeQueryDefinition(
+      String queryId,
+      WindowOperatorQuery originalQuery,
+      QueryKit<Query<?>> queryKit,
+      ShuffleSpecFactory resultShuffleSpecFactory,
+      int maxWorkerCount,
+      int minStageNumber
+  )
+  {
+    // need to validate query first
+    // populate the group of operators to be processed as each stage
+    // the size of the operators is the number of serialized stages
+    // later we should also check if these can be parallelized
+    // check there is an empty over clause or not
+    List<List<OperatorFactory>> operatorList = new ArrayList<>();
+    boolean status = validateAndReturnOperatorList(originalQuery, 
operatorList);
+
+
+    ShuffleSpec nextShuffleSpec = 
findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount);
+    // add this shuffle spec to the last stage of the inner query
+
+    final QueryDefinitionBuilder queryDefBuilder = 
QueryDefinition.builder().queryId(queryId);
+    if (nextShuffleSpec != null) {
+      final ClusterBy windowClusterBy = nextShuffleSpec.clusterBy();
+      originalQuery = (WindowOperatorQuery) 
originalQuery.withOverriddenContext(ImmutableMap.of(
+          MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL,
+          windowClusterBy
+      ));
+    } else {
+      nextShuffleSpec = ShuffleSpecFactories.singlePartition()
+                                            .build(ClusterBy.none(), false);
+    }
+    final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource(
+        queryKit,
+        queryId,
+        originalQuery.context(),
+        originalQuery.getDataSource(),
+        originalQuery.getQuerySegmentSpec(),
+        originalQuery.getFilter(),
+        null,
+        maxWorkerCount,
+        minStageNumber,
+        false
+    );
+
+    dataSourcePlan.getSubQueryDefBuilder().ifPresent(queryDefBuilder::addAll);
+
+    final int firstStageNumber = Math.max(minStageNumber, 
queryDefBuilder.getNextStageNumber());
+    final WindowOperatorQuery queryToRun = (WindowOperatorQuery) 
originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
+    RowSignature rowSignature = queryToRun.getRowSignature();
+
+    if (status) {
+      // empty over clause found
+      // moving everything to a single partition
+      queryDefBuilder.add(
+          StageDefinition.builder(firstStageNumber)
+                         .inputs(new StageInputSpec(firstStageNumber - 1))
+                         .signature(rowSignature)
+                         .maxWorkerCount(maxWorkerCount)
+                         .shuffleSpec(nextShuffleSpec)
+                         .processorFactory(new 
WindowOperatorQueryFrameProcessorFactory(
+                             queryToRun,
+                             queryToRun.getOperators(),
+                             rowSignature
+                         ))
+      );
+    } else {
+      // there are multiple windows present in the query
+      // Create stages for each window in the query
+      // These stages will be serialized
+      // the partition by clause of the next window will be the shuffle key 
for the previous window
+      RowSignature.Builder bob = RowSignature.builder();
+      final int numberOfWindows = operatorList.size();
+      final int baseSize = rowSignature.size() - numberOfWindows;
+      for (int i = 0; i < baseSize; i++) {
+        bob.add(rowSignature.getColumnName(i), 
rowSignature.getColumnType(i).get());
+      }
+
+      for (int i = 0; i < numberOfWindows; i++) {
+        bob.add(rowSignature.getColumnName(baseSize + i), 
rowSignature.getColumnType(baseSize + i).get()).build();
+        // find the shuffle spec of the next stage
+        // if it is the last stage set the next shuffle spec to single 
partition
+        if (i + 1 == numberOfWindows) {
+          nextShuffleSpec = ShuffleSpecFactories.singlePartition()
+                                                .build(ClusterBy.none(), 
false);
+        } else {
+          nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 
1), maxWorkerCount);
+        }
+
+        final RowSignature intermediateSignature = bob.build();
+        final RowSignature stageRowSignature;
+        if (nextShuffleSpec == null) {
+          stageRowSignature = intermediateSignature;
+        } else {
+          stageRowSignature = QueryKitUtils.sortableSignature(
+              intermediateSignature,
+              nextShuffleSpec.clusterBy().getColumns()
+          );
+        }
+
+        queryDefBuilder.add(
+            StageDefinition.builder(firstStageNumber + i)
+                           .inputs(new StageInputSpec(firstStageNumber + i - 
1))
+                           .signature(stageRowSignature)
+                           .maxWorkerCount(maxWorkerCount)
+                           .shuffleSpec(nextShuffleSpec)
+                           .processorFactory(new 
WindowOperatorQueryFrameProcessorFactory(
+                               queryToRun,
+                               operatorList.get(i),
+                               stageRowSignature
+                           ))
+        );
+      }
+    }
+    return queryDefBuilder.queryId(queryId).build();
+  }
+
+  private boolean validateAndReturnOperatorList(
+      WindowOperatorQuery originalQuery,
+      List<List<OperatorFactory>> operatorList
+  )
+  {
+    final List<OperatorFactory> operators = originalQuery.getOperators();
+    List<OperatorFactory> operatorFactoryList = new ArrayList<>();
+    for (OperatorFactory of : operators) {
+      operatorFactoryList.add(of);
+      if (of instanceof WindowOperatorFactory) {
+        operatorList.add(operatorFactoryList);
+        operatorFactoryList = new ArrayList<>();
+      } else if (of instanceof NaivePartitioningOperatorFactory) {
+        if (((NaivePartitioningOperatorFactory) 
of).getPartitionColumns().isEmpty()) {
+          operatorList.clear();
+          operatorList.add(originalQuery.getOperators());
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory> 
operatorFactories, int maxWorkerCount)
+  {
+    NaivePartitioningOperatorFactory partition = null;
+    NaiveSortOperatorFactory sort = null;
+    List<KeyColumn> keyColsOfWindow = new ArrayList<>();
+    for (OperatorFactory of : operatorFactories) {
+      if (of instanceof NaivePartitioningOperatorFactory) {
+        partition = (NaivePartitioningOperatorFactory) of;
+      } else if (of instanceof NaiveSortOperatorFactory) {
+        sort = (NaiveSortOperatorFactory) of;
+      }
+    }
+    Map<String, ColumnWithDirection.Direction> colMap = new HashMap<>();
+    if (sort != null) {
+      for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
+        colMap.put(sortColumn.getColumn(), sortColumn.getDirection());
+      }
+    }
+    assert partition != null;
+    if (partition.getPartitionColumns().isEmpty()) {
+      return null;
+    }
+    for (String partitionColumn : partition.getPartitionColumns()) {
+      KeyColumn kc;
+      if (colMap.containsKey(partitionColumn)) {
+        if (colMap.get(partitionColumn) == ColumnWithDirection.Direction.ASC) {
+          kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
+        } else {
+          kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING);
+        }
+      } else {
+        kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
+      }
+      keyColsOfWindow.add(kc);
+    }
+    return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), 
maxWorkerCount);

Review Comment:
   Or we need `HASH_LOCAL_SORT` 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.FrameRowTooLargeException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.util.SettableLongVirtualColumn;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.OffsetLimit;
+import org.apache.druid.query.operator.Operator;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
+import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
+{
+  private static final Logger log = new 
Logger(WindowOperatorQueryFrameProcessor.class);
+  private final WindowOperatorQuery query;
+
+  private final List<OperatorFactory> operatorFactoryList;
+  private final ObjectMapper jsonMapper;
+  private final RowSignature outputStageSignature;
+  private final ArrayList<RowsAndColumns> frameRowsAndCols;
+  private final ArrayList<RowsAndColumns> resultRowAndCols;
+  private final ReadableFrameChannel inputChannel;
+  private final WritableFrameChannel outputChannel;
+  private final FrameWriterFactory frameWriterFactory;
+  private final FrameReader frameReader;
+  private final SettableLongVirtualColumn partitionBoostVirtualColumn;
+  ArrayList<ResultRow> objectsOfASingleRac;
+  List<Integer> partitionColsIndex;
+  private long currentAllocatorCapacity; // Used for generating 
FrameRowTooLargeException if needed
+  private Cursor frameCursor = null;
+  private Supplier<ResultRow> rowSupplierFromFrameCursor;
+  private ResultRow outputRow = null;
+  private FrameWriter frameWriter = null;
+
+  public WindowOperatorQueryFrameProcessor(
+      WindowOperatorQuery query,
+      ReadableFrameChannel inputChannel,
+      WritableFrameChannel outputChannel,
+      FrameWriterFactory frameWriterFactory,
+      FrameReader frameReader,
+      ObjectMapper jsonMapper,
+      final List<OperatorFactory> operatorFactoryList,
+      final RowSignature rowSignature
+  )
+  {
+    this.inputChannel = inputChannel;
+    this.outputChannel = outputChannel;
+    this.frameWriterFactory = frameWriterFactory;
+    this.partitionBoostVirtualColumn = new 
SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
+    this.operatorFactoryList = operatorFactoryList;
+    this.outputStageSignature = rowSignature;
+    this.jsonMapper = jsonMapper;
+    this.frameReader = frameReader;
+    this.query = query;
+    this.frameRowsAndCols = new ArrayList<>();
+    this.resultRowAndCols = new ArrayList<>();
+    this.objectsOfASingleRac = new ArrayList<>();
+    this.partitionColsIndex = new ArrayList<>();
+  }
+
+  private static VirtualColumns makeVirtualColumnsForFrameWriter(
+      @Nullable final VirtualColumn partitionBoostVirtualColumn,
+      final ObjectMapper jsonMapper,
+      final WindowOperatorQuery query
+  )
+  {
+    List<VirtualColumn> virtualColumns = new ArrayList<>();
+
+    virtualColumns.add(partitionBoostVirtualColumn);
+    final VirtualColumn segmentGranularityVirtualColumn =
+        QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
+    if (segmentGranularityVirtualColumn != null) {
+      virtualColumns.add(segmentGranularityVirtualColumn);
+    }
+
+    return VirtualColumns.create(virtualColumns);
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return Collections.singletonList(inputChannel);
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.singletonList(outputChannel);
+  }
+
+  @Override
+  public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
+  {
+    /*
+     *
+     * PARTITION BY A ORDER BY B
+     *
+     * Frame 1   -> rac1
+     * A  B
+     * 1, 2
+     * 1, 3
+     * 2, 1 --> key changed
+     * 2, 2
+     *
+     *
+     * Frame 2 -> rac2
+     * 3, 1 --> key changed
+     * 3, 2
+     * 3, 3
+     * 3, 4
+     *
+     * Frame 3 -> rac3
+     *
+     * 3, 5
+     * 3, 6
+     * 4, 1 --> key changed
+     * 4, 2
+     *
+     * In case of empty OVER clause, all these racs need to be added to a 
single rows and columns
+     * to be processed. The way we can do this is to use a ConcatRowsAndColumns
+     * ConcatRC [rac1, rac2, rac3]
+     * Run all ops on this
+     *
+     *
+     * The flow would look like:
+     * 1. Validate if the operator has an empty OVER clause
+     * 2. If 1 is true make a giant rows and columns (R&C) using concat as 
shown above
+     *    Let all operators run amok on that R&C
+     * 3. If 1 is false
+     *    Read a frame
+     *    keep the older row in a class variable
+     *    check row by row and compare current with older row to check if 
partition boundary is reached
+     *    when frame partition by changes
+     *    create R&C for those particular set of columns, they would have the 
same partition key
+     *    output will be a single R&C
+     *    write to output channel
+     *
+     *
+     *  Future thoughts:
+     *
+     *  1. We are writing 1 partition to each frame in this way. In case of 
high cardinality data
+     *      we will me making a large number of small frames. We can have a 
check to keep size of frame to a value
+     *      say 20k rows and keep on adding to the same pending frame and not 
create a new frame
+     *
+     *  2. Current approach with R&C and operators materialize a single R&C 
for processing. In case of data
+     *     with low cardinality a single R&C might be too big to consume. Same 
for the case of empty OVER() clause
+     *     Most of the window operations like SUM(), RANK(), RANGE() etc. can 
be made with 2 passes of the data.
+     *     We might think to reimplement them in the MSQ way so that we do not 
have to materialize so much data
+     */
+
+    // Phase 1 of the execution
+    // eagerly validate presence of empty OVER() clause
+    boolean status = checkEagerlyForEmptyWindow(operatorFactoryList);
+    if (status) {
+      // if OVER() found
+      // have to bring all data to a single executor for processing
+      // convert each frame to rac
+      // concat all the racs to make a giant rac
+      // let all operators run on the giant rac when channel is finished
+      if (inputChannel.canRead()) {
+        final Frame frame = inputChannel.read();
+        convertRowFrameToRowsAndColumns(frame);
+      } else if (inputChannel.isFinished()) {
+        runAllOpsOnMultipleRac(frameRowsAndCols);
+        return ReturnOrAwait.returnObject(Unit.instance());
+      } else {
+        return ReturnOrAwait.awaitAll(inputChannels().size());
+      }
+      return ReturnOrAwait.runAgain();
+    } else {
+      // Aha, you found a PARTITION BY and maybe ORDER BY TO
+      // PARTITION BY can also be on multiple keys
+      // typically the last stage would already partition and sort for you
+      // figure out frame boundaries and convert each distinct group to a rac
+      // then run the windowing operator only on each rac
+      if (frameCursor == null || frameCursor.isDone()) {
+        if (readableInputs.isEmpty()) {
+          return ReturnOrAwait.awaitAll(1);
+        } else if (inputChannel.canRead()) {
+          final Frame frame = inputChannel.read();
+          frameCursor = FrameProcessors.makeCursor(frame, frameReader);
+          final ColumnSelectorFactory frameColumnSelectorFactory = 
frameCursor.getColumnSelectorFactory();
+          partitionColsIndex = findPartitionColumns(frameReader.signature());
+          final Supplier<Object>[] fieldSuppliers = new 
Supplier[frameReader.signature().size()];
+          for (int i = 0; i < fieldSuppliers.length; i++) {
+            final ColumnValueSelector<?> selector =
+                
frameColumnSelectorFactory.makeColumnValueSelector(frameReader.signature().getColumnName(i));
+            fieldSuppliers[i] = selector::getObject;
+
+          }
+          rowSupplierFromFrameCursor = () -> {
+            final ResultRow row = ResultRow.create(fieldSuppliers.length);
+            for (int i = 0; i < fieldSuppliers.length; i++) {
+              row.set(i, fieldSuppliers[i].get());
+            }
+            return row;
+          };
+        } else if (inputChannel.isFinished()) {
+          // reaached end of channel
+          // if there is data remaining
+          // write it into a rac
+          // and run operators on it
+          if (!objectsOfASingleRac.isEmpty()) {
+            RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow(
+                objectsOfASingleRac,
+                frameReader.signature()
+            );
+            runAllOpsOnSingleRac(rac);
+            objectsOfASingleRac.clear();
+          }
+          return ReturnOrAwait.returnObject(Unit.instance());
+        } else {
+          return ReturnOrAwait.runAgain();
+        }
+      }
+      while (!frameCursor.isDone()) {
+        final ResultRow currentRow = rowSupplierFromFrameCursor.get();
+        if (outputRow == null) {
+          outputRow = currentRow.copy();
+          objectsOfASingleRac.add(currentRow);
+        } else if (comparePartitionKeys(outputRow, currentRow, 
partitionColsIndex)) {
+          // if they have the same partition key
+          // keep adding them
+          objectsOfASingleRac.add(currentRow);
+        } else {
+          // key change noted
+          // create rac from the rows seen before
+          // run the operators on these rows and columns
+          // clean up the object to hold the new rows only
+          RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow(
+              objectsOfASingleRac,
+              frameReader.signature()
+          );
+          runAllOpsOnSingleRac(rac);
+          objectsOfASingleRac.clear();
+          outputRow = currentRow.copy();
+          return ReturnOrAwait.runAgain();
+        }
+        frameCursor.advance();
+      }
+    }
+    return ReturnOrAwait.runAgain();
+  }
+
+  /**
+   * @param operatorFactoryList the list of operators to check for empty window
+   * @return true is there is a single OVER() clause across all the operators, 
false otherwise
+   */
+  private boolean checkEagerlyForEmptyWindow(List<OperatorFactory> 
operatorFactoryList)
+  {
+    for (OperatorFactory of : operatorFactoryList) {
+      if (of instanceof NaivePartitioningOperatorFactory) {
+        if (((NaivePartitioningOperatorFactory) 
of).getPartitionColumns().isEmpty()) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * @param singleRac Use this {@link RowsAndColumns} as a single input for 
the operators to be run
+   */
+  private void runAllOpsOnSingleRac(RowsAndColumns singleRac)
+  {
+    Operator op = new Operator()
+    {
+      @Nullable
+      @Override
+      public Closeable goOrContinue(Closeable continuationObject, Receiver 
receiver)
+      {
+        receiver.push(singleRac);
+        receiver.completed();
+        return null;
+      }
+    };
+    runOperatorsAfterThis(op);
+  }
+
+  /**
+   * @param listOfRacs Concat this list of {@link RowsAndColumns} to a {@link 
ConcatRowsAndColumns} to use as a single input for the operators to be run
+   */
+  private void runAllOpsOnMultipleRac(ArrayList<RowsAndColumns> listOfRacs)
+  {
+    Operator op = new Operator()
+    {
+      @Nullable
+      @Override
+      public Closeable goOrContinue(Closeable continuationObject, Receiver 
receiver)
+      {
+        RowsAndColumns rac = new ConcatRowsAndColumns(listOfRacs);
+        receiver.push(rac);
+        receiver.completed();
+        return null;
+      }
+    };
+    runOperatorsAfterThis(op);
+  }
+
+  /**
+   * @param op Base operator for the operators to be run. Other operators are 
wrapped under this to run
+   */
+  private void runOperatorsAfterThis(Operator op)
+  {
+    for (OperatorFactory of : operatorFactoryList) {
+      op = of.wrap(op);
+    }
+    Operator.go(op, new Operator.Receiver()
+    {
+      @Override
+      public Operator.Signal push(RowsAndColumns rac)
+      {
+        resultRowAndCols.add(rac);
+        return Operator.Signal.GO;
+      }
+
+      @Override
+      public void completed()
+      {
+        try {
+          flushAllRowsAndCols(resultRowAndCols);
+        }
+        catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        finally {
+          resultRowAndCols.clear();
+        }
+      }
+    });
+  }
+
+  /**
+   * @param resultRowAndCols Flush the list of {@link RowsAndColumns} to a 
frame
+   * @throws IOException
+   */
+  private void flushAllRowsAndCols(ArrayList<RowsAndColumns> resultRowAndCols) 
throws IOException
+  {
+    RowsAndColumns rac = new ConcatRowsAndColumns(resultRowAndCols);
+    AtomicInteger rowId = new AtomicInteger(0);
+    createFrameWriterIfNeeded(rac, rowId);
+    writeRacToFrame(rac, rowId);
+  }
+
+  /**
+   * @param rac   The frame writer to write this {@link RowsAndColumns} object
+   * @param rowId RowId to get the column selector factory from the {@link 
RowsAndColumns} object
+   */
+  private void createFrameWriterIfNeeded(RowsAndColumns rac, AtomicInteger 
rowId)
+  {
+    if (frameWriter == null) {
+      final ColumnSelectorFactoryMaker csfm = 
ColumnSelectorFactoryMaker.fromRAC(rac);
+      final ColumnSelectorFactory frameWriterColumnSelectorFactory = 
makeVirtualColumnsForFrameWriter(
+          partitionBoostVirtualColumn,
+          jsonMapper,
+          query
+      ).wrap(csfm.make(rowId));
+      frameWriter = 
frameWriterFactory.newFrameWriter(frameWriterColumnSelectorFactory);
+      currentAllocatorCapacity = frameWriterFactory.allocatorCapacity();
+    }
+  }
+
+  /**
+   * @param rac   {@link RowsAndColumns} to be written to frame
+   * @param rowId Counter to keep track of how many rows are added
+   * @throws IOException
+   */
+  public void writeRacToFrame(RowsAndColumns rac, AtomicInteger rowId) throws 
IOException
+  {
+    final int numRows = rac.numRows();
+    rowId.set(0);
+    while (rowId.get() < numRows) {
+      final boolean didAddToFrame = frameWriter.addSelection();
+      if (didAddToFrame) {
+        incrementBoostColumn();
+        rowId.incrementAndGet();
+      } else if (frameWriter.getNumRows() == 0) {
+        throw new FrameRowTooLargeException(currentAllocatorCapacity);
+      } else {
+        flushFrameWriter();
+        return;
+      }
+    }
+    flushFrameWriter();
+  }
+
+  @Override
+  public void cleanup() throws IOException
+  {
+    FrameProcessors.closeAll(inputChannels(), outputChannels(), frameWriter);
+  }
+
+  /**
+   * @return Number of rows flushed to the output channel
+   * @throws IOException
+   */
+  private long flushFrameWriter() throws IOException
+  {
+    if (frameWriter == null || frameWriter.getNumRows() <= 0) {
+      if (frameWriter != null) {
+        frameWriter.close();
+        frameWriter = null;
+      }
+      return 0;
+    } else {
+      final Frame frame = Frame.wrap(frameWriter.toByteArray());
+      Iterables.getOnlyElement(outputChannels()).write(new 
FrameWithPartition(frame, FrameWithPartition.NO_PARTITION));
+      frameWriter.close();
+      frameWriter = null;
+      return frame.numRows();
+    }
+  }
+
+  /**
+   * @param frame Row based frame to be converted to a {@link RowsAndColumns} 
object
+   */
+  private void convertRowFrameToRowsAndColumns(Frame frame)
+  {
+    final RowSignature signature = frameReader.signature();
+    RowBasedFrameRowsAndColumns frameRowsAndColumns = new 
RowBasedFrameRowsAndColumns(frame, signature);
+    LazilyDecoratedRowsAndColumns ldrc = new LazilyDecoratedRowsAndColumns(
+        frameRowsAndColumns,
+        null,
+        null,
+        null,
+        OffsetLimit.limit(Integer.MAX_VALUE),
+        null,
+        null
+    );
+    frameRowsAndCols.add(ldrc);
+  }
+
+  private List<Integer> findPartitionColumns(RowSignature rowSignature)
+  {
+    List<Integer> indexList = new ArrayList<>();
+    for (OperatorFactory of : operatorFactoryList) {
+      if (of instanceof NaivePartitioningOperatorFactory) {
+        for (String s : ((NaivePartitioningOperatorFactory) 
of).getPartitionColumns()) {
+          indexList.add(rowSignature.indexOf(s));
+        }
+      }
+    }
+    return indexList;
+  }
+
+  private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, 
List<Integer> partitionIndices)

Review Comment:
   Also are you skipping boosted columns in the input  ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.FrameRowTooLargeException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.util.SettableLongVirtualColumn;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.OffsetLimit;
+import org.apache.druid.query.operator.Operator;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
+import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
+{
+  private static final Logger log = new 
Logger(WindowOperatorQueryFrameProcessor.class);
+  private final WindowOperatorQuery query;
+
+  private final List<OperatorFactory> operatorFactoryList;
+  private final ObjectMapper jsonMapper;
+  private final RowSignature outputStageSignature;
+  private final ArrayList<RowsAndColumns> frameRowsAndCols;
+  private final ArrayList<RowsAndColumns> resultRowAndCols;
+  private final ReadableFrameChannel inputChannel;
+  private final WritableFrameChannel outputChannel;
+  private final FrameWriterFactory frameWriterFactory;
+  private final FrameReader frameReader;
+  private final SettableLongVirtualColumn partitionBoostVirtualColumn;
+  ArrayList<ResultRow> objectsOfASingleRac;

Review Comment:
   Nit: Why these fields are not private final ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.frame.key.KeyOrder;
+import org.apache.druid.msq.input.stage.StageInputSpec;
+import org.apache.druid.msq.kernel.HashShuffleSpec;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
+import org.apache.druid.msq.kernel.ShuffleSpec;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.operator.ColumnWithDirection;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.NaiveSortOperatorFactory;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.operator.window.WindowOperatorFactory;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
+{
+  private final ObjectMapper jsonMapper;
+
+  public WindowOperatorQueryKit(ObjectMapper jsonMapper)
+  {
+    this.jsonMapper = jsonMapper;
+  }
+
+  @Override
+  public QueryDefinition makeQueryDefinition(
+      String queryId,
+      WindowOperatorQuery originalQuery,
+      QueryKit<Query<?>> queryKit,
+      ShuffleSpecFactory resultShuffleSpecFactory,
+      int maxWorkerCount,
+      int minStageNumber
+  )
+  {
+    // need to validate query first
+    // populate the group of operators to be processed as each stage
+    // the size of the operators is the number of serialized stages
+    // later we should also check if these can be parallelized
+    // check there is an empty over clause or not
+    List<List<OperatorFactory>> operatorList = new ArrayList<>();

Review Comment:
   The empty over stuff should be populated from the query kit to the factory I 
think 



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java:
##########
@@ -80,6 +84,35 @@ public static MapOfColumnsRowsAndColumns fromMap(Map<String, 
? extends Column> m
     );
   }
 
+  public static MapOfColumnsRowsAndColumns fromResultRow(ArrayList<ResultRow> 
objs, RowSignature signature)

Review Comment:
   This change since it means we materialize everything in memory. Is it cool ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.FrameRowTooLargeException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.util.SettableLongVirtualColumn;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.OffsetLimit;
+import org.apache.druid.query.operator.Operator;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
+import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
+{
+  private static final Logger log = new 
Logger(WindowOperatorQueryFrameProcessor.class);
+  private final WindowOperatorQuery query;
+
+  private final List<OperatorFactory> operatorFactoryList;
+  private final ObjectMapper jsonMapper;
+  private final RowSignature outputStageSignature;
+  private final ArrayList<RowsAndColumns> frameRowsAndCols;
+  private final ArrayList<RowsAndColumns> resultRowAndCols;
+  private final ReadableFrameChannel inputChannel;
+  private final WritableFrameChannel outputChannel;
+  private final FrameWriterFactory frameWriterFactory;
+  private final FrameReader frameReader;
+  private final SettableLongVirtualColumn partitionBoostVirtualColumn;
+  ArrayList<ResultRow> objectsOfASingleRac;
+  List<Integer> partitionColsIndex;
+  private long currentAllocatorCapacity; // Used for generating 
FrameRowTooLargeException if needed
+  private Cursor frameCursor = null;
+  private Supplier<ResultRow> rowSupplierFromFrameCursor;
+  private ResultRow outputRow = null;
+  private FrameWriter frameWriter = null;
+
+  public WindowOperatorQueryFrameProcessor(
+      WindowOperatorQuery query,
+      ReadableFrameChannel inputChannel,
+      WritableFrameChannel outputChannel,
+      FrameWriterFactory frameWriterFactory,
+      FrameReader frameReader,
+      ObjectMapper jsonMapper,
+      final List<OperatorFactory> operatorFactoryList,
+      final RowSignature rowSignature
+  )
+  {
+    this.inputChannel = inputChannel;
+    this.outputChannel = outputChannel;
+    this.frameWriterFactory = frameWriterFactory;
+    this.partitionBoostVirtualColumn = new 
SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
+    this.operatorFactoryList = operatorFactoryList;
+    this.outputStageSignature = rowSignature;
+    this.jsonMapper = jsonMapper;
+    this.frameReader = frameReader;
+    this.query = query;
+    this.frameRowsAndCols = new ArrayList<>();
+    this.resultRowAndCols = new ArrayList<>();
+    this.objectsOfASingleRac = new ArrayList<>();
+    this.partitionColsIndex = new ArrayList<>();
+  }
+
+  private static VirtualColumns makeVirtualColumnsForFrameWriter(
+      @Nullable final VirtualColumn partitionBoostVirtualColumn,
+      final ObjectMapper jsonMapper,
+      final WindowOperatorQuery query
+  )
+  {
+    List<VirtualColumn> virtualColumns = new ArrayList<>();
+
+    virtualColumns.add(partitionBoostVirtualColumn);
+    final VirtualColumn segmentGranularityVirtualColumn =
+        QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
+    if (segmentGranularityVirtualColumn != null) {
+      virtualColumns.add(segmentGranularityVirtualColumn);
+    }
+
+    return VirtualColumns.create(virtualColumns);
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return Collections.singletonList(inputChannel);
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.singletonList(outputChannel);
+  }
+
+  @Override
+  public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
+  {
+    /*
+     *
+     * PARTITION BY A ORDER BY B
+     *
+     * Frame 1   -> rac1
+     * A  B
+     * 1, 2
+     * 1, 3
+     * 2, 1 --> key changed
+     * 2, 2
+     *
+     *
+     * Frame 2 -> rac2
+     * 3, 1 --> key changed
+     * 3, 2
+     * 3, 3
+     * 3, 4
+     *
+     * Frame 3 -> rac3
+     *
+     * 3, 5
+     * 3, 6
+     * 4, 1 --> key changed
+     * 4, 2
+     *
+     * In case of empty OVER clause, all these racs need to be added to a 
single rows and columns
+     * to be processed. The way we can do this is to use a ConcatRowsAndColumns
+     * ConcatRC [rac1, rac2, rac3]
+     * Run all ops on this
+     *
+     *
+     * The flow would look like:
+     * 1. Validate if the operator has an empty OVER clause
+     * 2. If 1 is true make a giant rows and columns (R&C) using concat as 
shown above
+     *    Let all operators run amok on that R&C
+     * 3. If 1 is false
+     *    Read a frame
+     *    keep the older row in a class variable
+     *    check row by row and compare current with older row to check if 
partition boundary is reached
+     *    when frame partition by changes
+     *    create R&C for those particular set of columns, they would have the 
same partition key
+     *    output will be a single R&C

Review Comment:
   I feel this would run out of memory, I am okay of pushing this as a v1 but 
we should create github issues for the remaining work here. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.frame.key.KeyOrder;
+import org.apache.druid.msq.input.stage.StageInputSpec;
+import org.apache.druid.msq.kernel.HashShuffleSpec;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
+import org.apache.druid.msq.kernel.ShuffleSpec;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.operator.ColumnWithDirection;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.NaiveSortOperatorFactory;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.operator.window.WindowOperatorFactory;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
+{
+  private final ObjectMapper jsonMapper;
+
+  public WindowOperatorQueryKit(ObjectMapper jsonMapper)
+  {
+    this.jsonMapper = jsonMapper;
+  }
+
+  @Override
+  public QueryDefinition makeQueryDefinition(
+      String queryId,
+      WindowOperatorQuery originalQuery,
+      QueryKit<Query<?>> queryKit,
+      ShuffleSpecFactory resultShuffleSpecFactory,
+      int maxWorkerCount,
+      int minStageNumber
+  )
+  {
+    // need to validate query first
+    // populate the group of operators to be processed as each stage
+    // the size of the operators is the number of serialized stages
+    // later we should also check if these can be parallelized
+    // check there is an empty over clause or not
+    List<List<OperatorFactory>> operatorList = new ArrayList<>();
+    boolean status = validateAndReturnOperatorList(originalQuery, 
operatorList);
+
+
+    ShuffleSpec nextShuffleSpec = 
findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount);
+    // add this shuffle spec to the last stage of the inner query
+
+    final QueryDefinitionBuilder queryDefBuilder = 
QueryDefinition.builder().queryId(queryId);
+    if (nextShuffleSpec != null) {
+      final ClusterBy windowClusterBy = nextShuffleSpec.clusterBy();
+      originalQuery = (WindowOperatorQuery) 
originalQuery.withOverriddenContext(ImmutableMap.of(
+          MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL,
+          windowClusterBy
+      ));
+    } else {
+      nextShuffleSpec = ShuffleSpecFactories.singlePartition()

Review Comment:
   Shoudn't this by null partition ? if the next shuffleSpec is null. Why would 
you put it in a single partition ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java:
##########
@@ -166,34 +170,83 @@ public QueryDefinition makeQueryDefinition(
         partitionBoost
     );
 
-    queryDefBuilder.add(
-        StageDefinition.builder(firstStageNumber + 1)
-                       .inputs(new StageInputSpec(firstStageNumber))
-                       .signature(resultSignature)
-                       .maxWorkerCount(maxWorkerCount)
-                       .shuffleSpec(
-                           shuffleSpecFactoryPostAggregation != null
-                           ? 
shuffleSpecFactoryPostAggregation.build(resultClusterBy, false)
-                           : null
-                       )
-                       .processorFactory(new 
GroupByPostShuffleFrameProcessorFactory(queryToRun))
-    );
+    // the result signature might change
+    // if window shufle spec is added
+    // say the output signature was d0, d1
+    // But shuffle spec for window was d1
+    // create the shufflespec from the column in the context
+    // and sort after wards to ensure prefix of shuffle is in row signature
+    final ShuffleSpec nextShuffleWindowSpec;

Review Comment:
   I feel this complexity should be added in 
   ```
    ClusterBy resultClusterBy = computeResultClusterBy(
           queryToRun,
           segmentGranularity,
           partitionBoost
       );
       RowSignature resultSignature = computeResultSignature(
           queryToRun,
           segmentGranularity,
           resultClusterBy,
           partitionBoost
       );
       ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.frame.key.KeyOrder;
+import org.apache.druid.msq.input.stage.StageInputSpec;
+import org.apache.druid.msq.kernel.HashShuffleSpec;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
+import org.apache.druid.msq.kernel.ShuffleSpec;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.operator.ColumnWithDirection;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.NaiveSortOperatorFactory;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.operator.window.WindowOperatorFactory;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
+{
+  private final ObjectMapper jsonMapper;
+
+  public WindowOperatorQueryKit(ObjectMapper jsonMapper)
+  {
+    this.jsonMapper = jsonMapper;
+  }
+
+  @Override
+  public QueryDefinition makeQueryDefinition(
+      String queryId,
+      WindowOperatorQuery originalQuery,
+      QueryKit<Query<?>> queryKit,
+      ShuffleSpecFactory resultShuffleSpecFactory,
+      int maxWorkerCount,
+      int minStageNumber
+  )
+  {
+    // need to validate query first
+    // populate the group of operators to be processed as each stage
+    // the size of the operators is the number of serialized stages
+    // later we should also check if these can be parallelized
+    // check there is an empty over clause or not
+    List<List<OperatorFactory>> operatorList = new ArrayList<>();
+    boolean status = validateAndReturnOperatorList(originalQuery, 
operatorList);
+
+
+    ShuffleSpec nextShuffleSpec = 
findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount);
+    // add this shuffle spec to the last stage of the inner query
+
+    final QueryDefinitionBuilder queryDefBuilder = 
QueryDefinition.builder().queryId(queryId);
+    if (nextShuffleSpec != null) {
+      final ClusterBy windowClusterBy = nextShuffleSpec.clusterBy();
+      originalQuery = (WindowOperatorQuery) 
originalQuery.withOverriddenContext(ImmutableMap.of(
+          MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL,
+          windowClusterBy
+      ));
+    } else {
+      nextShuffleSpec = ShuffleSpecFactories.singlePartition()
+                                            .build(ClusterBy.none(), false);
+    }
+    final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource(
+        queryKit,
+        queryId,
+        originalQuery.context(),
+        originalQuery.getDataSource(),
+        originalQuery.getQuerySegmentSpec(),
+        originalQuery.getFilter(),
+        null,
+        maxWorkerCount,
+        minStageNumber,
+        false
+    );
+
+    dataSourcePlan.getSubQueryDefBuilder().ifPresent(queryDefBuilder::addAll);
+
+    final int firstStageNumber = Math.max(minStageNumber, 
queryDefBuilder.getNextStageNumber());
+    final WindowOperatorQuery queryToRun = (WindowOperatorQuery) 
originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
+    RowSignature rowSignature = queryToRun.getRowSignature();
+
+    if (status) {
+      // empty over clause found
+      // moving everything to a single partition
+      queryDefBuilder.add(
+          StageDefinition.builder(firstStageNumber)
+                         .inputs(new StageInputSpec(firstStageNumber - 1))
+                         .signature(rowSignature)
+                         .maxWorkerCount(maxWorkerCount)
+                         .shuffleSpec(nextShuffleSpec)

Review Comment:
   Shoudn't this be single Partition ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.FrameRowTooLargeException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.util.SettableLongVirtualColumn;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.OffsetLimit;
+import org.apache.druid.query.operator.Operator;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
+import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
+{
+  private static final Logger log = new 
Logger(WindowOperatorQueryFrameProcessor.class);
+  private final WindowOperatorQuery query;
+
+  private final List<OperatorFactory> operatorFactoryList;
+  private final ObjectMapper jsonMapper;
+  private final RowSignature outputStageSignature;
+  private final ArrayList<RowsAndColumns> frameRowsAndCols;
+  private final ArrayList<RowsAndColumns> resultRowAndCols;
+  private final ReadableFrameChannel inputChannel;
+  private final WritableFrameChannel outputChannel;
+  private final FrameWriterFactory frameWriterFactory;
+  private final FrameReader frameReader;
+  private final SettableLongVirtualColumn partitionBoostVirtualColumn;
+  ArrayList<ResultRow> objectsOfASingleRac;
+  List<Integer> partitionColsIndex;
+  private long currentAllocatorCapacity; // Used for generating 
FrameRowTooLargeException if needed
+  private Cursor frameCursor = null;
+  private Supplier<ResultRow> rowSupplierFromFrameCursor;
+  private ResultRow outputRow = null;
+  private FrameWriter frameWriter = null;
+
+  public WindowOperatorQueryFrameProcessor(
+      WindowOperatorQuery query,
+      ReadableFrameChannel inputChannel,
+      WritableFrameChannel outputChannel,
+      FrameWriterFactory frameWriterFactory,
+      FrameReader frameReader,
+      ObjectMapper jsonMapper,
+      final List<OperatorFactory> operatorFactoryList,
+      final RowSignature rowSignature
+  )
+  {
+    this.inputChannel = inputChannel;
+    this.outputChannel = outputChannel;
+    this.frameWriterFactory = frameWriterFactory;
+    this.partitionBoostVirtualColumn = new 
SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
+    this.operatorFactoryList = operatorFactoryList;
+    this.outputStageSignature = rowSignature;
+    this.jsonMapper = jsonMapper;
+    this.frameReader = frameReader;
+    this.query = query;
+    this.frameRowsAndCols = new ArrayList<>();
+    this.resultRowAndCols = new ArrayList<>();
+    this.objectsOfASingleRac = new ArrayList<>();
+    this.partitionColsIndex = new ArrayList<>();
+  }
+
+  private static VirtualColumns makeVirtualColumnsForFrameWriter(
+      @Nullable final VirtualColumn partitionBoostVirtualColumn,
+      final ObjectMapper jsonMapper,
+      final WindowOperatorQuery query
+  )
+  {
+    List<VirtualColumn> virtualColumns = new ArrayList<>();
+
+    virtualColumns.add(partitionBoostVirtualColumn);
+    final VirtualColumn segmentGranularityVirtualColumn =
+        QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
+    if (segmentGranularityVirtualColumn != null) {
+      virtualColumns.add(segmentGranularityVirtualColumn);
+    }
+
+    return VirtualColumns.create(virtualColumns);
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return Collections.singletonList(inputChannel);
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.singletonList(outputChannel);
+  }
+
+  @Override
+  public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
+  {
+    /*
+     *
+     * PARTITION BY A ORDER BY B
+     *
+     * Frame 1   -> rac1
+     * A  B
+     * 1, 2
+     * 1, 3
+     * 2, 1 --> key changed
+     * 2, 2
+     *
+     *
+     * Frame 2 -> rac2
+     * 3, 1 --> key changed
+     * 3, 2
+     * 3, 3
+     * 3, 4
+     *
+     * Frame 3 -> rac3
+     *
+     * 3, 5
+     * 3, 6
+     * 4, 1 --> key changed
+     * 4, 2
+     *
+     * In case of empty OVER clause, all these racs need to be added to a 
single rows and columns
+     * to be processed. The way we can do this is to use a ConcatRowsAndColumns
+     * ConcatRC [rac1, rac2, rac3]
+     * Run all ops on this
+     *
+     *
+     * The flow would look like:
+     * 1. Validate if the operator has an empty OVER clause
+     * 2. If 1 is true make a giant rows and columns (R&C) using concat as 
shown above
+     *    Let all operators run amok on that R&C
+     * 3. If 1 is false
+     *    Read a frame
+     *    keep the older row in a class variable
+     *    check row by row and compare current with older row to check if 
partition boundary is reached
+     *    when frame partition by changes
+     *    create R&C for those particular set of columns, they would have the 
same partition key
+     *    output will be a single R&C
+     *    write to output channel
+     *
+     *
+     *  Future thoughts:
+     *
+     *  1. We are writing 1 partition to each frame in this way. In case of 
high cardinality data
+     *      we will me making a large number of small frames. We can have a 
check to keep size of frame to a value
+     *      say 20k rows and keep on adding to the same pending frame and not 
create a new frame
+     *
+     *  2. Current approach with R&C and operators materialize a single R&C 
for processing. In case of data
+     *     with low cardinality a single R&C might be too big to consume. Same 
for the case of empty OVER() clause
+     *     Most of the window operations like SUM(), RANK(), RANGE() etc. can 
be made with 2 passes of the data.
+     *     We might think to reimplement them in the MSQ way so that we do not 
have to materialize so much data
+     */
+
+    // Phase 1 of the execution
+    // eagerly validate presence of empty OVER() clause
+    boolean status = checkEagerlyForEmptyWindow(operatorFactoryList);
+    if (status) {
+      // if OVER() found
+      // have to bring all data to a single executor for processing
+      // convert each frame to rac
+      // concat all the racs to make a giant rac
+      // let all operators run on the giant rac when channel is finished
+      if (inputChannel.canRead()) {
+        final Frame frame = inputChannel.read();
+        convertRowFrameToRowsAndColumns(frame);
+      } else if (inputChannel.isFinished()) {
+        runAllOpsOnMultipleRac(frameRowsAndCols);
+        return ReturnOrAwait.returnObject(Unit.instance());
+      } else {
+        return ReturnOrAwait.awaitAll(inputChannels().size());
+      }
+      return ReturnOrAwait.runAgain();
+    } else {
+      // Aha, you found a PARTITION BY and maybe ORDER BY TO
+      // PARTITION BY can also be on multiple keys
+      // typically the last stage would already partition and sort for you
+      // figure out frame boundaries and convert each distinct group to a rac
+      // then run the windowing operator only on each rac
+      if (frameCursor == null || frameCursor.isDone()) {
+        if (readableInputs.isEmpty()) {
+          return ReturnOrAwait.awaitAll(1);
+        } else if (inputChannel.canRead()) {
+          final Frame frame = inputChannel.read();
+          frameCursor = FrameProcessors.makeCursor(frame, frameReader);
+          final ColumnSelectorFactory frameColumnSelectorFactory = 
frameCursor.getColumnSelectorFactory();
+          partitionColsIndex = findPartitionColumns(frameReader.signature());
+          final Supplier<Object>[] fieldSuppliers = new 
Supplier[frameReader.signature().size()];
+          for (int i = 0; i < fieldSuppliers.length; i++) {
+            final ColumnValueSelector<?> selector =
+                
frameColumnSelectorFactory.makeColumnValueSelector(frameReader.signature().getColumnName(i));
+            fieldSuppliers[i] = selector::getObject;
+
+          }
+          rowSupplierFromFrameCursor = () -> {
+            final ResultRow row = ResultRow.create(fieldSuppliers.length);
+            for (int i = 0; i < fieldSuppliers.length; i++) {
+              row.set(i, fieldSuppliers[i].get());
+            }
+            return row;
+          };
+        } else if (inputChannel.isFinished()) {
+          // reaached end of channel
+          // if there is data remaining
+          // write it into a rac
+          // and run operators on it
+          if (!objectsOfASingleRac.isEmpty()) {
+            RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow(
+                objectsOfASingleRac,
+                frameReader.signature()
+            );
+            runAllOpsOnSingleRac(rac);
+            objectsOfASingleRac.clear();
+          }
+          return ReturnOrAwait.returnObject(Unit.instance());
+        } else {
+          return ReturnOrAwait.runAgain();
+        }
+      }
+      while (!frameCursor.isDone()) {
+        final ResultRow currentRow = rowSupplierFromFrameCursor.get();
+        if (outputRow == null) {
+          outputRow = currentRow.copy();

Review Comment:
   Why is there a copy stored here ?
   Shoudn't a reference be sufficient ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.FrameRowTooLargeException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.util.SettableLongVirtualColumn;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.OffsetLimit;
+import org.apache.druid.query.operator.Operator;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
+import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
+{
+  private static final Logger log = new 
Logger(WindowOperatorQueryFrameProcessor.class);
+  private final WindowOperatorQuery query;
+
+  private final List<OperatorFactory> operatorFactoryList;
+  private final ObjectMapper jsonMapper;
+  private final RowSignature outputStageSignature;
+  private final ArrayList<RowsAndColumns> frameRowsAndCols;
+  private final ArrayList<RowsAndColumns> resultRowAndCols;
+  private final ReadableFrameChannel inputChannel;
+  private final WritableFrameChannel outputChannel;
+  private final FrameWriterFactory frameWriterFactory;
+  private final FrameReader frameReader;
+  private final SettableLongVirtualColumn partitionBoostVirtualColumn;
+  ArrayList<ResultRow> objectsOfASingleRac;
+  List<Integer> partitionColsIndex;
+  private long currentAllocatorCapacity; // Used for generating 
FrameRowTooLargeException if needed
+  private Cursor frameCursor = null;
+  private Supplier<ResultRow> rowSupplierFromFrameCursor;
+  private ResultRow outputRow = null;
+  private FrameWriter frameWriter = null;
+
+  public WindowOperatorQueryFrameProcessor(
+      WindowOperatorQuery query,
+      ReadableFrameChannel inputChannel,
+      WritableFrameChannel outputChannel,
+      FrameWriterFactory frameWriterFactory,
+      FrameReader frameReader,
+      ObjectMapper jsonMapper,
+      final List<OperatorFactory> operatorFactoryList,
+      final RowSignature rowSignature
+  )
+  {
+    this.inputChannel = inputChannel;
+    this.outputChannel = outputChannel;
+    this.frameWriterFactory = frameWriterFactory;
+    this.partitionBoostVirtualColumn = new 
SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
+    this.operatorFactoryList = operatorFactoryList;
+    this.outputStageSignature = rowSignature;
+    this.jsonMapper = jsonMapper;
+    this.frameReader = frameReader;
+    this.query = query;
+    this.frameRowsAndCols = new ArrayList<>();
+    this.resultRowAndCols = new ArrayList<>();
+    this.objectsOfASingleRac = new ArrayList<>();
+    this.partitionColsIndex = new ArrayList<>();
+  }
+
+  private static VirtualColumns makeVirtualColumnsForFrameWriter(
+      @Nullable final VirtualColumn partitionBoostVirtualColumn,
+      final ObjectMapper jsonMapper,
+      final WindowOperatorQuery query
+  )
+  {
+    List<VirtualColumn> virtualColumns = new ArrayList<>();
+
+    virtualColumns.add(partitionBoostVirtualColumn);
+    final VirtualColumn segmentGranularityVirtualColumn =
+        QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
+    if (segmentGranularityVirtualColumn != null) {
+      virtualColumns.add(segmentGranularityVirtualColumn);
+    }
+
+    return VirtualColumns.create(virtualColumns);
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return Collections.singletonList(inputChannel);
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.singletonList(outputChannel);
+  }
+
+  @Override
+  public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
+  {
+    /*
+     *
+     * PARTITION BY A ORDER BY B
+     *
+     * Frame 1   -> rac1
+     * A  B
+     * 1, 2
+     * 1, 3
+     * 2, 1 --> key changed
+     * 2, 2
+     *
+     *
+     * Frame 2 -> rac2
+     * 3, 1 --> key changed
+     * 3, 2
+     * 3, 3
+     * 3, 4
+     *
+     * Frame 3 -> rac3
+     *
+     * 3, 5
+     * 3, 6
+     * 4, 1 --> key changed
+     * 4, 2
+     *
+     * In case of empty OVER clause, all these racs need to be added to a 
single rows and columns
+     * to be processed. The way we can do this is to use a ConcatRowsAndColumns
+     * ConcatRC [rac1, rac2, rac3]
+     * Run all ops on this
+     *
+     *
+     * The flow would look like:
+     * 1. Validate if the operator has an empty OVER clause
+     * 2. If 1 is true make a giant rows and columns (R&C) using concat as 
shown above
+     *    Let all operators run amok on that R&C
+     * 3. If 1 is false
+     *    Read a frame
+     *    keep the older row in a class variable
+     *    check row by row and compare current with older row to check if 
partition boundary is reached
+     *    when frame partition by changes
+     *    create R&C for those particular set of columns, they would have the 
same partition key
+     *    output will be a single R&C
+     *    write to output channel
+     *
+     *
+     *  Future thoughts:
+     *
+     *  1. We are writing 1 partition to each frame in this way. In case of 
high cardinality data
+     *      we will me making a large number of small frames. We can have a 
check to keep size of frame to a value
+     *      say 20k rows and keep on adding to the same pending frame and not 
create a new frame
+     *
+     *  2. Current approach with R&C and operators materialize a single R&C 
for processing. In case of data
+     *     with low cardinality a single R&C might be too big to consume. Same 
for the case of empty OVER() clause
+     *     Most of the window operations like SUM(), RANK(), RANGE() etc. can 
be made with 2 passes of the data.
+     *     We might think to reimplement them in the MSQ way so that we do not 
have to materialize so much data
+     */
+
+    // Phase 1 of the execution
+    // eagerly validate presence of empty OVER() clause
+    boolean status = checkEagerlyForEmptyWindow(operatorFactoryList);

Review Comment:
   Empty over clause should be checked as part of the factory no ?
   



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.FrameRowTooLargeException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.util.SettableLongVirtualColumn;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.OffsetLimit;
+import org.apache.druid.query.operator.Operator;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
+import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
+{
+  private static final Logger log = new 
Logger(WindowOperatorQueryFrameProcessor.class);
+  private final WindowOperatorQuery query;
+
+  private final List<OperatorFactory> operatorFactoryList;
+  private final ObjectMapper jsonMapper;
+  private final RowSignature outputStageSignature;
+  private final ArrayList<RowsAndColumns> frameRowsAndCols;
+  private final ArrayList<RowsAndColumns> resultRowAndCols;
+  private final ReadableFrameChannel inputChannel;
+  private final WritableFrameChannel outputChannel;
+  private final FrameWriterFactory frameWriterFactory;
+  private final FrameReader frameReader;
+  private final SettableLongVirtualColumn partitionBoostVirtualColumn;
+  ArrayList<ResultRow> objectsOfASingleRac;
+  List<Integer> partitionColsIndex;
+  private long currentAllocatorCapacity; // Used for generating 
FrameRowTooLargeException if needed
+  private Cursor frameCursor = null;
+  private Supplier<ResultRow> rowSupplierFromFrameCursor;
+  private ResultRow outputRow = null;
+  private FrameWriter frameWriter = null;
+
+  public WindowOperatorQueryFrameProcessor(
+      WindowOperatorQuery query,
+      ReadableFrameChannel inputChannel,
+      WritableFrameChannel outputChannel,
+      FrameWriterFactory frameWriterFactory,
+      FrameReader frameReader,
+      ObjectMapper jsonMapper,
+      final List<OperatorFactory> operatorFactoryList,
+      final RowSignature rowSignature
+  )
+  {
+    this.inputChannel = inputChannel;
+    this.outputChannel = outputChannel;
+    this.frameWriterFactory = frameWriterFactory;
+    this.partitionBoostVirtualColumn = new 
SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
+    this.operatorFactoryList = operatorFactoryList;
+    this.outputStageSignature = rowSignature;
+    this.jsonMapper = jsonMapper;
+    this.frameReader = frameReader;
+    this.query = query;
+    this.frameRowsAndCols = new ArrayList<>();
+    this.resultRowAndCols = new ArrayList<>();
+    this.objectsOfASingleRac = new ArrayList<>();
+    this.partitionColsIndex = new ArrayList<>();
+  }
+
+  private static VirtualColumns makeVirtualColumnsForFrameWriter(
+      @Nullable final VirtualColumn partitionBoostVirtualColumn,
+      final ObjectMapper jsonMapper,
+      final WindowOperatorQuery query
+  )
+  {
+    List<VirtualColumn> virtualColumns = new ArrayList<>();
+
+    virtualColumns.add(partitionBoostVirtualColumn);
+    final VirtualColumn segmentGranularityVirtualColumn =
+        QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
+    if (segmentGranularityVirtualColumn != null) {
+      virtualColumns.add(segmentGranularityVirtualColumn);
+    }
+
+    return VirtualColumns.create(virtualColumns);
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return Collections.singletonList(inputChannel);
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.singletonList(outputChannel);
+  }
+
+  @Override
+  public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
+  {
+    /*
+     *
+     * PARTITION BY A ORDER BY B
+     *
+     * Frame 1   -> rac1
+     * A  B
+     * 1, 2
+     * 1, 3
+     * 2, 1 --> key changed
+     * 2, 2
+     *
+     *
+     * Frame 2 -> rac2
+     * 3, 1 --> key changed
+     * 3, 2
+     * 3, 3
+     * 3, 4
+     *
+     * Frame 3 -> rac3
+     *
+     * 3, 5
+     * 3, 6
+     * 4, 1 --> key changed
+     * 4, 2
+     *
+     * In case of empty OVER clause, all these racs need to be added to a 
single rows and columns
+     * to be processed. The way we can do this is to use a ConcatRowsAndColumns
+     * ConcatRC [rac1, rac2, rac3]
+     * Run all ops on this
+     *
+     *
+     * The flow would look like:
+     * 1. Validate if the operator has an empty OVER clause
+     * 2. If 1 is true make a giant rows and columns (R&C) using concat as 
shown above
+     *    Let all operators run amok on that R&C
+     * 3. If 1 is false
+     *    Read a frame
+     *    keep the older row in a class variable
+     *    check row by row and compare current with older row to check if 
partition boundary is reached
+     *    when frame partition by changes
+     *    create R&C for those particular set of columns, they would have the 
same partition key
+     *    output will be a single R&C
+     *    write to output channel
+     *
+     *
+     *  Future thoughts:
+     *
+     *  1. We are writing 1 partition to each frame in this way. In case of 
high cardinality data
+     *      we will me making a large number of small frames. We can have a 
check to keep size of frame to a value
+     *      say 20k rows and keep on adding to the same pending frame and not 
create a new frame
+     *
+     *  2. Current approach with R&C and operators materialize a single R&C 
for processing. In case of data
+     *     with low cardinality a single R&C might be too big to consume. Same 
for the case of empty OVER() clause
+     *     Most of the window operations like SUM(), RANK(), RANGE() etc. can 
be made with 2 passes of the data.
+     *     We might think to reimplement them in the MSQ way so that we do not 
have to materialize so much data
+     */
+
+    // Phase 1 of the execution
+    // eagerly validate presence of empty OVER() clause
+    boolean status = checkEagerlyForEmptyWindow(operatorFactoryList);
+    if (status) {
+      // if OVER() found
+      // have to bring all data to a single executor for processing
+      // convert each frame to rac
+      // concat all the racs to make a giant rac
+      // let all operators run on the giant rac when channel is finished
+      if (inputChannel.canRead()) {
+        final Frame frame = inputChannel.read();
+        convertRowFrameToRowsAndColumns(frame);
+      } else if (inputChannel.isFinished()) {
+        runAllOpsOnMultipleRac(frameRowsAndCols);
+        return ReturnOrAwait.returnObject(Unit.instance());
+      } else {
+        return ReturnOrAwait.awaitAll(inputChannels().size());
+      }
+      return ReturnOrAwait.runAgain();
+    } else {
+      // Aha, you found a PARTITION BY and maybe ORDER BY TO
+      // PARTITION BY can also be on multiple keys
+      // typically the last stage would already partition and sort for you
+      // figure out frame boundaries and convert each distinct group to a rac
+      // then run the windowing operator only on each rac
+      if (frameCursor == null || frameCursor.isDone()) {
+        if (readableInputs.isEmpty()) {
+          return ReturnOrAwait.awaitAll(1);
+        } else if (inputChannel.canRead()) {
+          final Frame frame = inputChannel.read();
+          frameCursor = FrameProcessors.makeCursor(frame, frameReader);
+          final ColumnSelectorFactory frameColumnSelectorFactory = 
frameCursor.getColumnSelectorFactory();
+          partitionColsIndex = findPartitionColumns(frameReader.signature());
+          final Supplier<Object>[] fieldSuppliers = new 
Supplier[frameReader.signature().size()];
+          for (int i = 0; i < fieldSuppliers.length; i++) {
+            final ColumnValueSelector<?> selector =
+                
frameColumnSelectorFactory.makeColumnValueSelector(frameReader.signature().getColumnName(i));
+            fieldSuppliers[i] = selector::getObject;
+
+          }
+          rowSupplierFromFrameCursor = () -> {
+            final ResultRow row = ResultRow.create(fieldSuppliers.length);
+            for (int i = 0; i < fieldSuppliers.length; i++) {
+              row.set(i, fieldSuppliers[i].get());
+            }
+            return row;
+          };
+        } else if (inputChannel.isFinished()) {
+          // reaached end of channel
+          // if there is data remaining
+          // write it into a rac
+          // and run operators on it
+          if (!objectsOfASingleRac.isEmpty()) {
+            RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow(
+                objectsOfASingleRac,
+                frameReader.signature()
+            );
+            runAllOpsOnSingleRac(rac);
+            objectsOfASingleRac.clear();
+          }
+          return ReturnOrAwait.returnObject(Unit.instance());
+        } else {
+          return ReturnOrAwait.runAgain();
+        }
+      }
+      while (!frameCursor.isDone()) {
+        final ResultRow currentRow = rowSupplierFromFrameCursor.get();
+        if (outputRow == null) {
+          outputRow = currentRow.copy();
+          objectsOfASingleRac.add(currentRow);
+        } else if (comparePartitionKeys(outputRow, currentRow, 
partitionColsIndex)) {
+          // if they have the same partition key
+          // keep adding them
+          objectsOfASingleRac.add(currentRow);
+        } else {
+          // key change noted
+          // create rac from the rows seen before
+          // run the operators on these rows and columns
+          // clean up the object to hold the new rows only
+          RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow(
+              objectsOfASingleRac,
+              frameReader.signature()
+          );
+          runAllOpsOnSingleRac(rac);
+          objectsOfASingleRac.clear();
+          outputRow = currentRow.copy();
+          return ReturnOrAwait.runAgain();
+        }
+        frameCursor.advance();
+      }
+    }
+    return ReturnOrAwait.runAgain();
+  }
+
+  /**
+   * @param operatorFactoryList the list of operators to check for empty window
+   * @return true is there is a single OVER() clause across all the operators, 
false otherwise
+   */
+  private boolean checkEagerlyForEmptyWindow(List<OperatorFactory> 
operatorFactoryList)
+  {
+    for (OperatorFactory of : operatorFactoryList) {
+      if (of instanceof NaivePartitioningOperatorFactory) {
+        if (((NaivePartitioningOperatorFactory) 
of).getPartitionColumns().isEmpty()) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * @param singleRac Use this {@link RowsAndColumns} as a single input for 
the operators to be run
+   */
+  private void runAllOpsOnSingleRac(RowsAndColumns singleRac)
+  {
+    Operator op = new Operator()
+    {
+      @Nullable
+      @Override
+      public Closeable goOrContinue(Closeable continuationObject, Receiver 
receiver)
+      {
+        receiver.push(singleRac);
+        receiver.completed();
+        return null;
+      }
+    };
+    runOperatorsAfterThis(op);
+  }
+
+  /**
+   * @param listOfRacs Concat this list of {@link RowsAndColumns} to a {@link 
ConcatRowsAndColumns} to use as a single input for the operators to be run
+   */
+  private void runAllOpsOnMultipleRac(ArrayList<RowsAndColumns> listOfRacs)
+  {
+    Operator op = new Operator()
+    {
+      @Nullable
+      @Override
+      public Closeable goOrContinue(Closeable continuationObject, Receiver 
receiver)
+      {
+        RowsAndColumns rac = new ConcatRowsAndColumns(listOfRacs);
+        receiver.push(rac);
+        receiver.completed();
+        return null;
+      }
+    };
+    runOperatorsAfterThis(op);
+  }
+
+  /**
+   * @param op Base operator for the operators to be run. Other operators are 
wrapped under this to run
+   */
+  private void runOperatorsAfterThis(Operator op)
+  {
+    for (OperatorFactory of : operatorFactoryList) {
+      op = of.wrap(op);
+    }
+    Operator.go(op, new Operator.Receiver()
+    {
+      @Override
+      public Operator.Signal push(RowsAndColumns rac)
+      {
+        resultRowAndCols.add(rac);
+        return Operator.Signal.GO;
+      }
+
+      @Override
+      public void completed()
+      {
+        try {
+          flushAllRowsAndCols(resultRowAndCols);
+        }
+        catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        finally {
+          resultRowAndCols.clear();
+        }
+      }
+    });
+  }
+
+  /**
+   * @param resultRowAndCols Flush the list of {@link RowsAndColumns} to a 
frame
+   * @throws IOException
+   */
+  private void flushAllRowsAndCols(ArrayList<RowsAndColumns> resultRowAndCols) 
throws IOException
+  {
+    RowsAndColumns rac = new ConcatRowsAndColumns(resultRowAndCols);
+    AtomicInteger rowId = new AtomicInteger(0);
+    createFrameWriterIfNeeded(rac, rowId);
+    writeRacToFrame(rac, rowId);
+  }
+
+  /**
+   * @param rac   The frame writer to write this {@link RowsAndColumns} object
+   * @param rowId RowId to get the column selector factory from the {@link 
RowsAndColumns} object
+   */
+  private void createFrameWriterIfNeeded(RowsAndColumns rac, AtomicInteger 
rowId)
+  {
+    if (frameWriter == null) {
+      final ColumnSelectorFactoryMaker csfm = 
ColumnSelectorFactoryMaker.fromRAC(rac);
+      final ColumnSelectorFactory frameWriterColumnSelectorFactory = 
makeVirtualColumnsForFrameWriter(
+          partitionBoostVirtualColumn,
+          jsonMapper,
+          query
+      ).wrap(csfm.make(rowId));
+      frameWriter = 
frameWriterFactory.newFrameWriter(frameWriterColumnSelectorFactory);
+      currentAllocatorCapacity = frameWriterFactory.allocatorCapacity();
+    }
+  }
+
+  /**
+   * @param rac   {@link RowsAndColumns} to be written to frame
+   * @param rowId Counter to keep track of how many rows are added
+   * @throws IOException
+   */
+  public void writeRacToFrame(RowsAndColumns rac, AtomicInteger rowId) throws 
IOException
+  {
+    final int numRows = rac.numRows();
+    rowId.set(0);
+    while (rowId.get() < numRows) {
+      final boolean didAddToFrame = frameWriter.addSelection();
+      if (didAddToFrame) {
+        incrementBoostColumn();
+        rowId.incrementAndGet();
+      } else if (frameWriter.getNumRows() == 0) {
+        throw new FrameRowTooLargeException(currentAllocatorCapacity);
+      } else {
+        flushFrameWriter();
+        return;
+      }
+    }
+    flushFrameWriter();
+  }
+
+  @Override
+  public void cleanup() throws IOException
+  {
+    FrameProcessors.closeAll(inputChannels(), outputChannels(), frameWriter);
+  }
+
+  /**
+   * @return Number of rows flushed to the output channel
+   * @throws IOException
+   */
+  private long flushFrameWriter() throws IOException
+  {
+    if (frameWriter == null || frameWriter.getNumRows() <= 0) {
+      if (frameWriter != null) {
+        frameWriter.close();
+        frameWriter = null;
+      }
+      return 0;
+    } else {
+      final Frame frame = Frame.wrap(frameWriter.toByteArray());
+      Iterables.getOnlyElement(outputChannels()).write(new 
FrameWithPartition(frame, FrameWithPartition.NO_PARTITION));
+      frameWriter.close();
+      frameWriter = null;
+      return frame.numRows();
+    }
+  }
+
+  /**
+   * @param frame Row based frame to be converted to a {@link RowsAndColumns} 
object
+   */
+  private void convertRowFrameToRowsAndColumns(Frame frame)
+  {
+    final RowSignature signature = frameReader.signature();
+    RowBasedFrameRowsAndColumns frameRowsAndColumns = new 
RowBasedFrameRowsAndColumns(frame, signature);
+    LazilyDecoratedRowsAndColumns ldrc = new LazilyDecoratedRowsAndColumns(
+        frameRowsAndColumns,
+        null,
+        null,
+        null,
+        OffsetLimit.limit(Integer.MAX_VALUE),
+        null,
+        null
+    );
+    frameRowsAndCols.add(ldrc);
+  }
+
+  private List<Integer> findPartitionColumns(RowSignature rowSignature)
+  {
+    List<Integer> indexList = new ArrayList<>();
+    for (OperatorFactory of : operatorFactoryList) {
+      if (of instanceof NaivePartitioningOperatorFactory) {
+        for (String s : ((NaivePartitioningOperatorFactory) 
of).getPartitionColumns()) {
+          indexList.add(rowSignature.indexOf(s));
+        }
+      }
+    }
+    return indexList;
+  }
+
+  private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, 
List<Integer> partitionIndices)

Review Comment:
   We can use the same trick that is used by the grouping engine no ?
   
https://github.com/apache/druid/blob/5c7512c54db85f1db6e550777a6cdad04bd5a10e/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java#L180-L182



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java:
##########
@@ -166,34 +170,83 @@ public QueryDefinition makeQueryDefinition(
         partitionBoost
     );
 
-    queryDefBuilder.add(
-        StageDefinition.builder(firstStageNumber + 1)
-                       .inputs(new StageInputSpec(firstStageNumber))
-                       .signature(resultSignature)
-                       .maxWorkerCount(maxWorkerCount)
-                       .shuffleSpec(
-                           shuffleSpecFactoryPostAggregation != null
-                           ? 
shuffleSpecFactoryPostAggregation.build(resultClusterBy, false)
-                           : null
-                       )
-                       .processorFactory(new 
GroupByPostShuffleFrameProcessorFactory(queryToRun))
-    );
+    // the result signature might change
+    // if window shufle spec is added
+    // say the output signature was d0, d1
+    // But shuffle spec for window was d1
+    // create the shufflespec from the column in the context
+    // and sort after wards to ensure prefix of shuffle is in row signature
+    final ShuffleSpec nextShuffleWindowSpec;
+    if 
(originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL))
 {

Review Comment:
   @LakshSingla Can you take a look at this code as well. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.frame.key.KeyOrder;
+import org.apache.druid.msq.input.stage.StageInputSpec;
+import org.apache.druid.msq.kernel.HashShuffleSpec;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
+import org.apache.druid.msq.kernel.ShuffleSpec;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.operator.ColumnWithDirection;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.NaiveSortOperatorFactory;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.operator.window.WindowOperatorFactory;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
+{
+  private final ObjectMapper jsonMapper;
+
+  public WindowOperatorQueryKit(ObjectMapper jsonMapper)
+  {
+    this.jsonMapper = jsonMapper;
+  }
+
+  @Override
+  public QueryDefinition makeQueryDefinition(
+      String queryId,
+      WindowOperatorQuery originalQuery,
+      QueryKit<Query<?>> queryKit,
+      ShuffleSpecFactory resultShuffleSpecFactory,
+      int maxWorkerCount,
+      int minStageNumber
+  )
+  {
+    // need to validate query first
+    // populate the group of operators to be processed as each stage
+    // the size of the operators is the number of serialized stages
+    // later we should also check if these can be parallelized
+    // check there is an empty over clause or not
+    List<List<OperatorFactory>> operatorList = new ArrayList<>();
+    boolean status = validateAndReturnOperatorList(originalQuery, 
operatorList);
+
+
+    ShuffleSpec nextShuffleSpec = 
findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount);
+    // add this shuffle spec to the last stage of the inner query
+
+    final QueryDefinitionBuilder queryDefBuilder = 
QueryDefinition.builder().queryId(queryId);
+    if (nextShuffleSpec != null) {
+      final ClusterBy windowClusterBy = nextShuffleSpec.clusterBy();
+      originalQuery = (WindowOperatorQuery) 
originalQuery.withOverriddenContext(ImmutableMap.of(
+          MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL,
+          windowClusterBy
+      ));
+    } else {
+      nextShuffleSpec = ShuffleSpecFactories.singlePartition()
+                                            .build(ClusterBy.none(), false);
+    }
+    final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource(
+        queryKit,
+        queryId,
+        originalQuery.context(),
+        originalQuery.getDataSource(),
+        originalQuery.getQuerySegmentSpec(),
+        originalQuery.getFilter(),
+        null,
+        maxWorkerCount,
+        minStageNumber,
+        false
+    );
+
+    dataSourcePlan.getSubQueryDefBuilder().ifPresent(queryDefBuilder::addAll);
+
+    final int firstStageNumber = Math.max(minStageNumber, 
queryDefBuilder.getNextStageNumber());
+    final WindowOperatorQuery queryToRun = (WindowOperatorQuery) 
originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
+    RowSignature rowSignature = queryToRun.getRowSignature();
+
+    if (status) {
+      // empty over clause found
+      // moving everything to a single partition
+      queryDefBuilder.add(
+          StageDefinition.builder(firstStageNumber)
+                         .inputs(new StageInputSpec(firstStageNumber - 1))
+                         .signature(rowSignature)
+                         .maxWorkerCount(maxWorkerCount)
+                         .shuffleSpec(nextShuffleSpec)
+                         .processorFactory(new 
WindowOperatorQueryFrameProcessorFactory(
+                             queryToRun,
+                             queryToRun.getOperators(),
+                             rowSignature
+                         ))
+      );
+    } else {
+      // there are multiple windows present in the query
+      // Create stages for each window in the query
+      // These stages will be serialized
+      // the partition by clause of the next window will be the shuffle key 
for the previous window
+      RowSignature.Builder bob = RowSignature.builder();
+      final int numberOfWindows = operatorList.size();
+      final int baseSize = rowSignature.size() - numberOfWindows;
+      for (int i = 0; i < baseSize; i++) {
+        bob.add(rowSignature.getColumnName(i), 
rowSignature.getColumnType(i).get());
+      }
+
+      for (int i = 0; i < numberOfWindows; i++) {
+        bob.add(rowSignature.getColumnName(baseSize + i), 
rowSignature.getColumnType(baseSize + i).get()).build();
+        // find the shuffle spec of the next stage
+        // if it is the last stage set the next shuffle spec to single 
partition
+        if (i + 1 == numberOfWindows) {
+          nextShuffleSpec = ShuffleSpecFactories.singlePartition()
+                                                .build(ClusterBy.none(), 
false);
+        } else {
+          nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 
1), maxWorkerCount);
+        }
+
+        final RowSignature intermediateSignature = bob.build();
+        final RowSignature stageRowSignature;
+        if (nextShuffleSpec == null) {
+          stageRowSignature = intermediateSignature;
+        } else {
+          stageRowSignature = QueryKitUtils.sortableSignature(
+              intermediateSignature,
+              nextShuffleSpec.clusterBy().getColumns()
+          );
+        }
+
+        queryDefBuilder.add(
+            StageDefinition.builder(firstStageNumber + i)
+                           .inputs(new StageInputSpec(firstStageNumber + i - 
1))
+                           .signature(stageRowSignature)
+                           .maxWorkerCount(maxWorkerCount)
+                           .shuffleSpec(nextShuffleSpec)
+                           .processorFactory(new 
WindowOperatorQueryFrameProcessorFactory(
+                               queryToRun,
+                               operatorList.get(i),
+                               stageRowSignature
+                           ))
+        );
+      }
+    }
+    return queryDefBuilder.queryId(queryId).build();
+  }
+
+  private boolean validateAndReturnOperatorList(
+      WindowOperatorQuery originalQuery,
+      List<List<OperatorFactory>> operatorList
+  )
+  {
+    final List<OperatorFactory> operators = originalQuery.getOperators();
+    List<OperatorFactory> operatorFactoryList = new ArrayList<>();
+    for (OperatorFactory of : operators) {
+      operatorFactoryList.add(of);
+      if (of instanceof WindowOperatorFactory) {
+        operatorList.add(operatorFactoryList);
+        operatorFactoryList = new ArrayList<>();
+      } else if (of instanceof NaivePartitioningOperatorFactory) {
+        if (((NaivePartitioningOperatorFactory) 
of).getPartitionColumns().isEmpty()) {
+          operatorList.clear();
+          operatorList.add(originalQuery.getOperators());
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory> 
operatorFactories, int maxWorkerCount)
+  {
+    NaivePartitioningOperatorFactory partition = null;
+    NaiveSortOperatorFactory sort = null;
+    List<KeyColumn> keyColsOfWindow = new ArrayList<>();
+    for (OperatorFactory of : operatorFactories) {
+      if (of instanceof NaivePartitioningOperatorFactory) {
+        partition = (NaivePartitioningOperatorFactory) of;
+      } else if (of instanceof NaiveSortOperatorFactory) {
+        sort = (NaiveSortOperatorFactory) of;
+      }
+    }
+    Map<String, ColumnWithDirection.Direction> colMap = new HashMap<>();
+    if (sort != null) {
+      for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
+        colMap.put(sortColumn.getColumn(), sortColumn.getDirection());
+      }
+    }
+    assert partition != null;
+    if (partition.getPartitionColumns().isEmpty()) {
+      return null;
+    }
+    for (String partitionColumn : partition.getPartitionColumns()) {
+      KeyColumn kc;
+      if (colMap.containsKey(partitionColumn)) {
+        if (colMap.get(partitionColumn) == ColumnWithDirection.Direction.ASC) {
+          kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
+        } else {
+          kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING);
+        }
+      } else {
+        kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
+      }
+      keyColsOfWindow.add(kc);
+    }
+    return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), 
maxWorkerCount);

Review Comment:
   Why are we doing hashShuffling ? Shoudn't it be sort merge shuffle spec. 
   since we would require keys to be sorted in a window  for RAC calculation 



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java:
##########
@@ -251,6 +251,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
                   .put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 2)
                   .put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 0)
                   .put(MSQTaskQueryMaker.USER_KEY, "allowAll")
+                  .put("enableWindowing", true)

Review Comment:
   I thought we removed that Context no ?
   Also we should put : 
https://github.com/apache/druid/blob/5c7512c54db85f1db6e550777a6cdad04bd5a10e/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java#L90
 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.frame.key.KeyOrder;
+import org.apache.druid.msq.input.stage.StageInputSpec;
+import org.apache.druid.msq.kernel.HashShuffleSpec;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
+import org.apache.druid.msq.kernel.ShuffleSpec;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.operator.ColumnWithDirection;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.NaiveSortOperatorFactory;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.operator.window.WindowOperatorFactory;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
+{
+  private final ObjectMapper jsonMapper;
+
+  public WindowOperatorQueryKit(ObjectMapper jsonMapper)
+  {
+    this.jsonMapper = jsonMapper;
+  }
+
+  @Override
+  public QueryDefinition makeQueryDefinition(
+      String queryId,
+      WindowOperatorQuery originalQuery,
+      QueryKit<Query<?>> queryKit,
+      ShuffleSpecFactory resultShuffleSpecFactory,
+      int maxWorkerCount,
+      int minStageNumber
+  )
+  {
+    // need to validate query first
+    // populate the group of operators to be processed as each stage
+    // the size of the operators is the number of serialized stages
+    // later we should also check if these can be parallelized
+    // check there is an empty over clause or not
+    List<List<OperatorFactory>> operatorList = new ArrayList<>();
+    boolean status = validateAndReturnOperatorList(originalQuery, 
operatorList);
+
+
+    ShuffleSpec nextShuffleSpec = 
findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount);
+    // add this shuffle spec to the last stage of the inner query
+
+    final QueryDefinitionBuilder queryDefBuilder = 
QueryDefinition.builder().queryId(queryId);
+    if (nextShuffleSpec != null) {
+      final ClusterBy windowClusterBy = nextShuffleSpec.clusterBy();
+      originalQuery = (WindowOperatorQuery) 
originalQuery.withOverriddenContext(ImmutableMap.of(
+          MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL,
+          windowClusterBy
+      ));
+    } else {
+      nextShuffleSpec = ShuffleSpecFactories.singlePartition()
+                                            .build(ClusterBy.none(), false);
+    }
+    final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource(
+        queryKit,
+        queryId,
+        originalQuery.context(),
+        originalQuery.getDataSource(),
+        originalQuery.getQuerySegmentSpec(),
+        originalQuery.getFilter(),
+        null,
+        maxWorkerCount,
+        minStageNumber,
+        false
+    );
+
+    dataSourcePlan.getSubQueryDefBuilder().ifPresent(queryDefBuilder::addAll);
+
+    final int firstStageNumber = Math.max(minStageNumber, 
queryDefBuilder.getNextStageNumber());
+    final WindowOperatorQuery queryToRun = (WindowOperatorQuery) 
originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
+    RowSignature rowSignature = queryToRun.getRowSignature();
+
+    if (status) {
+      // empty over clause found
+      // moving everything to a single partition
+      queryDefBuilder.add(
+          StageDefinition.builder(firstStageNumber)
+                         .inputs(new StageInputSpec(firstStageNumber - 1))
+                         .signature(rowSignature)
+                         .maxWorkerCount(maxWorkerCount)
+                         .shuffleSpec(nextShuffleSpec)
+                         .processorFactory(new 
WindowOperatorQueryFrameProcessorFactory(
+                             queryToRun,
+                             queryToRun.getOperators(),
+                             rowSignature
+                         ))
+      );
+    } else {
+      // there are multiple windows present in the query
+      // Create stages for each window in the query
+      // These stages will be serialized
+      // the partition by clause of the next window will be the shuffle key 
for the previous window
+      RowSignature.Builder bob = RowSignature.builder();
+      final int numberOfWindows = operatorList.size();
+      final int baseSize = rowSignature.size() - numberOfWindows;
+      for (int i = 0; i < baseSize; i++) {
+        bob.add(rowSignature.getColumnName(i), 
rowSignature.getColumnType(i).get());
+      }
+
+      for (int i = 0; i < numberOfWindows; i++) {
+        bob.add(rowSignature.getColumnName(baseSize + i), 
rowSignature.getColumnType(baseSize + i).get()).build();
+        // find the shuffle spec of the next stage
+        // if it is the last stage set the next shuffle spec to single 
partition
+        if (i + 1 == numberOfWindows) {
+          nextShuffleSpec = ShuffleSpecFactories.singlePartition()
+                                                .build(ClusterBy.none(), 
false);
+        } else {
+          nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 
1), maxWorkerCount);
+        }
+
+        final RowSignature intermediateSignature = bob.build();
+        final RowSignature stageRowSignature;
+        if (nextShuffleSpec == null) {
+          stageRowSignature = intermediateSignature;
+        } else {
+          stageRowSignature = QueryKitUtils.sortableSignature(
+              intermediateSignature,
+              nextShuffleSpec.clusterBy().getColumns()
+          );
+        }
+
+        queryDefBuilder.add(
+            StageDefinition.builder(firstStageNumber + i)
+                           .inputs(new StageInputSpec(firstStageNumber + i - 
1))
+                           .signature(stageRowSignature)
+                           .maxWorkerCount(maxWorkerCount)
+                           .shuffleSpec(nextShuffleSpec)
+                           .processorFactory(new 
WindowOperatorQueryFrameProcessorFactory(
+                               queryToRun,
+                               operatorList.get(i),
+                               stageRowSignature
+                           ))
+        );
+      }
+    }
+    return queryDefBuilder.queryId(queryId).build();
+  }
+
+  private boolean validateAndReturnOperatorList(
+      WindowOperatorQuery originalQuery,
+      List<List<OperatorFactory>> operatorList
+  )
+  {
+    final List<OperatorFactory> operators = originalQuery.getOperators();
+    List<OperatorFactory> operatorFactoryList = new ArrayList<>();
+    for (OperatorFactory of : operators) {
+      operatorFactoryList.add(of);
+      if (of instanceof WindowOperatorFactory) {
+        operatorList.add(operatorFactoryList);
+        operatorFactoryList = new ArrayList<>();
+      } else if (of instanceof NaivePartitioningOperatorFactory) {
+        if (((NaivePartitioningOperatorFactory) 
of).getPartitionColumns().isEmpty()) {
+          operatorList.clear();
+          operatorList.add(originalQuery.getOperators());
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory> 
operatorFactories, int maxWorkerCount)
+  {
+    NaivePartitioningOperatorFactory partition = null;
+    NaiveSortOperatorFactory sort = null;
+    List<KeyColumn> keyColsOfWindow = new ArrayList<>();
+    for (OperatorFactory of : operatorFactories) {
+      if (of instanceof NaivePartitioningOperatorFactory) {
+        partition = (NaivePartitioningOperatorFactory) of;
+      } else if (of instanceof NaiveSortOperatorFactory) {
+        sort = (NaiveSortOperatorFactory) of;
+      }
+    }
+    Map<String, ColumnWithDirection.Direction> colMap = new HashMap<>();
+    if (sort != null) {
+      for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
+        colMap.put(sortColumn.getColumn(), sortColumn.getDirection());
+      }
+    }
+    assert partition != null;
+    if (partition.getPartitionColumns().isEmpty()) {
+      return null;
+    }
+    for (String partitionColumn : partition.getPartitionColumns()) {
+      KeyColumn kc;
+      if (colMap.containsKey(partitionColumn)) {
+        if (colMap.get(partitionColumn) == ColumnWithDirection.Direction.ASC) {
+          kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
+        } else {
+          kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING);
+        }
+      } else {
+        kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
+      }
+      keyColsOfWindow.add(kc);
+    }
+    return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), 
maxWorkerCount);

Review Comment:
   I see that's why you are adding the keyOrder while generating the 
clusterByCol. 
   @LakshSingla We can also add partition boosting but that can be done as a 
follow up. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to