somu-imply commented on code in PR #15470:
URL: https://github.com/apache/druid/pull/15470#discussion_r1525474901


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.FrameRowTooLargeException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.util.SettableLongVirtualColumn;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.OffsetLimit;
+import org.apache.druid.query.operator.Operator;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
+import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
+{
+  private static final Logger log = new 
Logger(WindowOperatorQueryFrameProcessor.class);
+  private final WindowOperatorQuery query;
+
+  private final List<OperatorFactory> operatorFactoryList;
+  private final ObjectMapper jsonMapper;
+  private final RowSignature outputStageSignature;
+  private final ArrayList<RowsAndColumns> frameRowsAndCols;
+  private final ArrayList<RowsAndColumns> resultRowAndCols;
+  private final ReadableFrameChannel inputChannel;
+  private final WritableFrameChannel outputChannel;
+  private final FrameWriterFactory frameWriterFactory;
+  private final FrameReader frameReader;
+  private final SettableLongVirtualColumn partitionBoostVirtualColumn;
+  ArrayList<ResultRow> objectsOfASingleRac;
+  List<Integer> partitionColsIndex;
+  private long currentAllocatorCapacity; // Used for generating 
FrameRowTooLargeException if needed
+  private Cursor frameCursor = null;
+  private Supplier<ResultRow> rowSupplierFromFrameCursor;
+  private ResultRow outputRow = null;
+  private FrameWriter frameWriter = null;
+
+  public WindowOperatorQueryFrameProcessor(
+      WindowOperatorQuery query,
+      ReadableFrameChannel inputChannel,
+      WritableFrameChannel outputChannel,
+      FrameWriterFactory frameWriterFactory,
+      FrameReader frameReader,
+      ObjectMapper jsonMapper,
+      final List<OperatorFactory> operatorFactoryList,
+      final RowSignature rowSignature
+  )
+  {
+    this.inputChannel = inputChannel;
+    this.outputChannel = outputChannel;
+    this.frameWriterFactory = frameWriterFactory;
+    this.partitionBoostVirtualColumn = new 
SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
+    this.operatorFactoryList = operatorFactoryList;
+    this.outputStageSignature = rowSignature;
+    this.jsonMapper = jsonMapper;
+    this.frameReader = frameReader;
+    this.query = query;
+    this.frameRowsAndCols = new ArrayList<>();
+    this.resultRowAndCols = new ArrayList<>();
+    this.objectsOfASingleRac = new ArrayList<>();
+    this.partitionColsIndex = new ArrayList<>();
+  }
+
+  private static VirtualColumns makeVirtualColumnsForFrameWriter(
+      @Nullable final VirtualColumn partitionBoostVirtualColumn,
+      final ObjectMapper jsonMapper,
+      final WindowOperatorQuery query
+  )
+  {
+    List<VirtualColumn> virtualColumns = new ArrayList<>();
+
+    virtualColumns.add(partitionBoostVirtualColumn);
+    final VirtualColumn segmentGranularityVirtualColumn =
+        QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
+    if (segmentGranularityVirtualColumn != null) {
+      virtualColumns.add(segmentGranularityVirtualColumn);
+    }
+
+    return VirtualColumns.create(virtualColumns);
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return Collections.singletonList(inputChannel);
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.singletonList(outputChannel);
+  }
+
+  @Override
+  public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
+  {
+    /*
+     *
+     * PARTITION BY A ORDER BY B
+     *
+     * Frame 1   -> rac1
+     * A  B
+     * 1, 2
+     * 1, 3
+     * 2, 1 --> key changed
+     * 2, 2
+     *
+     *
+     * Frame 2 -> rac2
+     * 3, 1 --> key changed
+     * 3, 2
+     * 3, 3
+     * 3, 4
+     *
+     * Frame 3 -> rac3
+     *
+     * 3, 5
+     * 3, 6
+     * 4, 1 --> key changed
+     * 4, 2
+     *
+     * In case of empty OVER clause, all these racs need to be added to a 
single rows and columns
+     * to be processed. The way we can do this is to use a ConcatRowsAndColumns
+     * ConcatRC [rac1, rac2, rac3]
+     * Run all ops on this
+     *
+     *
+     * The flow would look like:
+     * 1. Validate if the operator has an empty OVER clause
+     * 2. If 1 is true make a giant rows and columns (R&C) using concat as 
shown above
+     *    Let all operators run amok on that R&C
+     * 3. If 1 is false
+     *    Read a frame
+     *    keep the older row in a class variable
+     *    check row by row and compare current with older row to check if 
partition boundary is reached
+     *    when frame partition by changes
+     *    create R&C for those particular set of columns, they would have the 
same partition key
+     *    output will be a single R&C
+     *    write to output channel
+     *
+     *
+     *  Future thoughts:
+     *
+     *  1. We are writing 1 partition to each frame in this way. In case of 
high cardinality data
+     *      we will me making a large number of small frames. We can have a 
check to keep size of frame to a value
+     *      say 20k rows and keep on adding to the same pending frame and not 
create a new frame
+     *
+     *  2. Current approach with R&C and operators materialize a single R&C 
for processing. In case of data
+     *     with low cardinality a single R&C might be too big to consume. Same 
for the case of empty OVER() clause
+     *     Most of the window operations like SUM(), RANK(), RANGE() etc. can 
be made with 2 passes of the data.
+     *     We might think to reimplement them in the MSQ way so that we do not 
have to materialize so much data
+     */
+
+    // Phase 1 of the execution
+    // eagerly validate presence of empty OVER() clause
+    boolean status = checkEagerlyForEmptyWindow(operatorFactoryList);

Review Comment:
   The empty over check is not being pushed down from the query kit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to