cryptoe commented on code in PR #15470: URL: https://github.com/apache/druid/pull/15470#discussion_r1538813795
########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyRowsInAWindowFault.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing.error; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +import java.util.Objects; + +@JsonTypeName(TooManyRowsInAWindowFault.CODE) +public class TooManyRowsInAWindowFault extends BaseMSQFault +{ + + static final String CODE = "TooManyRowsInAWindow"; + + private final int numRows; + private final int maxRows; + + @JsonCreator + public TooManyRowsInAWindowFault( + @JsonProperty("numRows") final int numRows, + @JsonProperty("maxRows") final int maxRows + ) + { + super(CODE, "Too many rows in a window (requested = %d, max = %d)", numRows, maxRows); Review Comment: ```suggestion super(CODE, "Too many rows in a window (requested = %d, max = %d). Try creating a window with a higher cardinality column or change the query shape.", numRows, maxRows); ``` ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java: ########## @@ -154,6 +154,10 @@ public class MultiStageQueryContext public static final String CTX_ARRAY_INGEST_MODE = "arrayIngestMode"; public static final ArrayIngestMode DEFAULT_ARRAY_INGEST_MODE = ArrayIngestMode.MVD; + public static final String NEXT_WINDOW_SHUFFLE_COL = "__windowShuffleCol"; + + public static final String MAX_ROWS_MATERIALIZED_IN_WINDOW = "__maxRowsMaterializedInWindow"; Review Comment: Can users set this limit. Seems like it. We would probably want to keep it undocumented since we are going to remove it eventually. https://github.com/apache/druid/blob/c72e69a8c8aff192a22f5f198e61722380832380/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java#L59 Please dev document it here. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java: ########## @@ -164,39 +168,80 @@ public QueryDefinition makeQueryDefinition( partitionBoost ); - queryDefBuilder.add( - StageDefinition.builder(firstStageNumber + 1) - .inputs(new StageInputSpec(firstStageNumber)) - .signature(resultSignature) - .maxWorkerCount(maxWorkerCount) - .shuffleSpec( - shuffleSpecFactoryPostAggregation != null - ? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false) - : null - ) - .processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun)) - ); + final ShuffleSpec nextShuffleWindowSpec = getShuffleSpecForNextWindow(originalQuery, maxWorkerCount); - if (doLimitOrOffset) { - final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec(); + if (nextShuffleWindowSpec == null) { queryDefBuilder.add( - StageDefinition.builder(firstStageNumber + 2) - .inputs(new StageInputSpec(firstStageNumber + 1)) + StageDefinition.builder(firstStageNumber + 1) + .inputs(new StageInputSpec(firstStageNumber)) .signature(resultSignature) - .maxWorkerCount(1) - .shuffleSpec(null) // no shuffling should be required after a limit processor. - .processorFactory( - new OffsetLimitFrameProcessorFactory( - limitSpec.getOffset(), - limitSpec.isLimited() ? (long) limitSpec.getLimit() : null - ) + .maxWorkerCount(maxWorkerCount) + .shuffleSpec( + shuffleSpecFactoryPostAggregation != null + ? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false) + : null ) + .processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun)) + ); + + if (doLimitOrOffset) { + final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec(); + queryDefBuilder.add( + StageDefinition.builder(firstStageNumber + 2) + .inputs(new StageInputSpec(firstStageNumber + 1)) + .signature(resultSignature) + .maxWorkerCount(1) + .shuffleSpec(null) // no shuffling should be required after a limit processor. + .processorFactory( + new OffsetLimitFrameProcessorFactory( + limitSpec.getOffset(), + limitSpec.isLimited() ? (long) limitSpec.getLimit() : null + ) + ) + ); + } + } else { + final RowSignature stageSignature; + // sort the signature to make sure the prefix is aligned + stageSignature = QueryKitUtils.sortableSignature( + resultSignature, + nextShuffleWindowSpec.clusterBy().getColumns() + ); + + queryDefBuilder.add( Review Comment: Have we missed the case where there is a dolimitOrOffset ? ########## sql/src/test/java/org/apache/druid/sql/calcite/CalciteSysQueryTest.java: ########## @@ -22,8 +22,8 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.sql.calcite.NotYetSupported.Modes; import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor; -import org.junit.Rule; -import org.junit.Test; +import org.junit.jupiter.api.Test; Review Comment: I think these changes need to be reverted no ? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java: ########## @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.channel.FrameWithPartition; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.FrameProcessors; +import org.apache.druid.frame.processor.FrameRowTooLargeException; +import org.apache.druid.frame.processor.ReturnOrAwait; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.util.SettableLongVirtualColumn; +import org.apache.druid.frame.write.FrameWriter; +import org.apache.druid.frame.write.FrameWriterFactory; +import org.apache.druid.java.util.common.Unit; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.exec.Limits; +import org.apache.druid.msq.indexing.error.MSQException; +import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; +import org.apache.druid.query.operator.OffsetLimit; +import org.apache.druid.query.operator.Operator; +import org.apache.druid.query.operator.OperatorFactory; +import org.apache.druid.query.operator.WindowOperatorQuery; +import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns; +import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object> +{ + private static final Logger log = new Logger(WindowOperatorQueryFrameProcessor.class); + private final WindowOperatorQuery query; + + private final List<OperatorFactory> operatorFactoryList; + private final ObjectMapper jsonMapper; + private final RowSignature outputStageSignature; + private final ArrayList<RowsAndColumns> frameRowsAndCols; + private final ArrayList<RowsAndColumns> resultRowAndCols; + private final ReadableFrameChannel inputChannel; + private final WritableFrameChannel outputChannel; + private final FrameWriterFactory frameWriterFactory; + private final FrameReader frameReader; + private final SettableLongVirtualColumn partitionBoostVirtualColumn; + private final ArrayList<ResultRow> objectsOfASingleRac; + private final int maxRowsMaterialized; + List<Integer> partitionColsIndex; + private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed + private Cursor frameCursor = null; + private Supplier<ResultRow> rowSupplierFromFrameCursor; + private ResultRow outputRow = null; + private FrameWriter frameWriter = null; + private boolean isOverEmpty; + + public WindowOperatorQueryFrameProcessor( + WindowOperatorQuery query, + ReadableFrameChannel inputChannel, + WritableFrameChannel outputChannel, + FrameWriterFactory frameWriterFactory, + FrameReader frameReader, + ObjectMapper jsonMapper, + final List<OperatorFactory> operatorFactoryList, + final RowSignature rowSignature, + final boolean isOverEmpty + ) + { + this.inputChannel = inputChannel; + this.outputChannel = outputChannel; + this.frameWriterFactory = frameWriterFactory; + this.partitionBoostVirtualColumn = new SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN); Review Comment: Are we boosting ever ? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java: ########## @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.channel.FrameWithPartition; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.FrameProcessors; +import org.apache.druid.frame.processor.FrameRowTooLargeException; +import org.apache.druid.frame.processor.ReturnOrAwait; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.util.SettableLongVirtualColumn; +import org.apache.druid.frame.write.FrameWriter; +import org.apache.druid.frame.write.FrameWriterFactory; +import org.apache.druid.java.util.common.Unit; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.exec.Limits; +import org.apache.druid.msq.indexing.error.MSQException; +import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; +import org.apache.druid.query.operator.OffsetLimit; +import org.apache.druid.query.operator.Operator; +import org.apache.druid.query.operator.OperatorFactory; +import org.apache.druid.query.operator.WindowOperatorQuery; +import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns; +import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object> +{ + private static final Logger log = new Logger(WindowOperatorQueryFrameProcessor.class); + private final WindowOperatorQuery query; + + private final List<OperatorFactory> operatorFactoryList; + private final ObjectMapper jsonMapper; + private final RowSignature outputStageSignature; + private final ArrayList<RowsAndColumns> frameRowsAndCols; + private final ArrayList<RowsAndColumns> resultRowAndCols; + private final ReadableFrameChannel inputChannel; + private final WritableFrameChannel outputChannel; + private final FrameWriterFactory frameWriterFactory; + private final FrameReader frameReader; + private final SettableLongVirtualColumn partitionBoostVirtualColumn; + private final ArrayList<ResultRow> objectsOfASingleRac; + private final int maxRowsMaterialized; + List<Integer> partitionColsIndex; + private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed + private Cursor frameCursor = null; + private Supplier<ResultRow> rowSupplierFromFrameCursor; + private ResultRow outputRow = null; + private FrameWriter frameWriter = null; + private boolean isOverEmpty; + + public WindowOperatorQueryFrameProcessor( + WindowOperatorQuery query, + ReadableFrameChannel inputChannel, + WritableFrameChannel outputChannel, + FrameWriterFactory frameWriterFactory, + FrameReader frameReader, + ObjectMapper jsonMapper, + final List<OperatorFactory> operatorFactoryList, + final RowSignature rowSignature, + final boolean isOverEmpty + ) + { + this.inputChannel = inputChannel; + this.outputChannel = outputChannel; + this.frameWriterFactory = frameWriterFactory; + this.partitionBoostVirtualColumn = new SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN); + this.operatorFactoryList = operatorFactoryList; + this.outputStageSignature = rowSignature; + this.jsonMapper = jsonMapper; + this.frameReader = frameReader; + this.query = query; + this.frameRowsAndCols = new ArrayList<>(); + this.resultRowAndCols = new ArrayList<>(); + this.objectsOfASingleRac = new ArrayList<>(); + this.partitionColsIndex = new ArrayList<>(); + this.isOverEmpty = isOverEmpty; + if (query.context() != null && query.context() + .containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW)) { Review Comment: Since this will run on the worker, the query context will not be supplied to the workers. Hence this will always be default. IMHO the flow should be get the value from the context in the query kit, (since that runs on the worker), then create a factory out of it, and then that factory uses the parameter and passes it to the frame processor. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java: ########## @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.channel.FrameWithPartition; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.FrameProcessors; +import org.apache.druid.frame.processor.FrameRowTooLargeException; +import org.apache.druid.frame.processor.ReturnOrAwait; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.util.SettableLongVirtualColumn; +import org.apache.druid.frame.write.FrameWriter; +import org.apache.druid.frame.write.FrameWriterFactory; +import org.apache.druid.java.util.common.Unit; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.exec.Limits; +import org.apache.druid.msq.indexing.error.MSQException; +import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; +import org.apache.druid.query.operator.OffsetLimit; +import org.apache.druid.query.operator.Operator; +import org.apache.druid.query.operator.OperatorFactory; +import org.apache.druid.query.operator.WindowOperatorQuery; +import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns; +import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object> +{ + private static final Logger log = new Logger(WindowOperatorQueryFrameProcessor.class); + private final WindowOperatorQuery query; + + private final List<OperatorFactory> operatorFactoryList; + private final ObjectMapper jsonMapper; + private final RowSignature outputStageSignature; + private final ArrayList<RowsAndColumns> frameRowsAndCols; + private final ArrayList<RowsAndColumns> resultRowAndCols; + private final ReadableFrameChannel inputChannel; + private final WritableFrameChannel outputChannel; + private final FrameWriterFactory frameWriterFactory; + private final FrameReader frameReader; + private final SettableLongVirtualColumn partitionBoostVirtualColumn; + private final ArrayList<ResultRow> objectsOfASingleRac; + private final int maxRowsMaterialized; + List<Integer> partitionColsIndex; + private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed + private Cursor frameCursor = null; + private Supplier<ResultRow> rowSupplierFromFrameCursor; + private ResultRow outputRow = null; + private FrameWriter frameWriter = null; + private boolean isOverEmpty; + + public WindowOperatorQueryFrameProcessor( + WindowOperatorQuery query, + ReadableFrameChannel inputChannel, + WritableFrameChannel outputChannel, + FrameWriterFactory frameWriterFactory, + FrameReader frameReader, + ObjectMapper jsonMapper, + final List<OperatorFactory> operatorFactoryList, + final RowSignature rowSignature, + final boolean isOverEmpty + ) + { + this.inputChannel = inputChannel; + this.outputChannel = outputChannel; + this.frameWriterFactory = frameWriterFactory; + this.partitionBoostVirtualColumn = new SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN); + this.operatorFactoryList = operatorFactoryList; + this.outputStageSignature = rowSignature; + this.jsonMapper = jsonMapper; + this.frameReader = frameReader; + this.query = query; + this.frameRowsAndCols = new ArrayList<>(); + this.resultRowAndCols = new ArrayList<>(); + this.objectsOfASingleRac = new ArrayList<>(); + this.partitionColsIndex = new ArrayList<>(); + this.isOverEmpty = isOverEmpty; + if (query.context() != null && query.context() + .containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW)) { + maxRowsMaterialized = (int) query.context().get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW); + } else { + maxRowsMaterialized = Limits.MAX_ROWS_MATERIALIZED_IN_WINDOW; + } + } + + private static VirtualColumns makeVirtualColumnsForFrameWriter( + @Nullable final VirtualColumn partitionBoostVirtualColumn, + final ObjectMapper jsonMapper, + final WindowOperatorQuery query + ) + { + List<VirtualColumn> virtualColumns = new ArrayList<>(); + + virtualColumns.add(partitionBoostVirtualColumn); + final VirtualColumn segmentGranularityVirtualColumn = + QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query); + if (segmentGranularityVirtualColumn != null) { + virtualColumns.add(segmentGranularityVirtualColumn); + } + + return VirtualColumns.create(virtualColumns); + } + + @Override + public List<ReadableFrameChannel> inputChannels() + { + return Collections.singletonList(inputChannel); + } + + @Override + public List<WritableFrameChannel> outputChannels() + { + return Collections.singletonList(outputChannel); + } + + @Override + public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) + { + /* + * + * PARTITION BY A ORDER BY B + * + * Frame 1 -> rac1 + * A B + * 1, 2 + * 1, 3 + * 2, 1 --> key changed + * 2, 2 + * + * + * Frame 2 -> rac2 + * 3, 1 --> key changed + * 3, 2 + * 3, 3 + * 3, 4 + * + * Frame 3 -> rac3 + * + * 3, 5 + * 3, 6 + * 4, 1 --> key changed + * 4, 2 + * + * In case of empty OVER clause, all these racs need to be added to a single rows and columns + * to be processed. The way we can do this is to use a ConcatRowsAndColumns + * ConcatRC [rac1, rac2, rac3] + * Run all ops on this + * + * + * The flow would look like: + * 1. Validate if the operator has an empty OVER clause + * 2. If 1 is true make a giant rows and columns (R&C) using concat as shown above + * Let all operators run amok on that R&C + * 3. If 1 is false + * Read a frame + * keep the older row in a class variable + * check row by row and compare current with older row to check if partition boundary is reached + * when frame partition by changes + * create R&C for those particular set of columns, they would have the same partition key + * output will be a single R&C + * write to output channel + * + * + * Future thoughts: {@link https://github.com/apache/druid/issues/16126} + * + * 1. We are writing 1 partition to each frame in this way. In case of low cardinality data + * we will me making a large number of small frames. We can have a check to keep size of frame to a value + * say 20k rows and keep on adding to the same pending frame and not create a new frame + * + * 2. Current approach with R&C and operators materialize a single R&C for processing. In case of data + * with high cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause + * Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data. + * We might think to reimplement them in the MSQ way so that we do not have to materialize so much data + */ + + // Phase 1 of the execution + // eagerly validate presence of empty OVER() clause + if (isOverEmpty) { + // if OVER() found + // have to bring all data to a single executor for processing + // convert each frame to rac + // concat all the racs to make a giant rac + // let all operators run on the giant rac when channel is finished + if (inputChannel.canRead()) { + final Frame frame = inputChannel.read(); + convertRowFrameToRowsAndColumns(frame); Review Comment: Shoudn't the limit be enforced when we are converting the rowFrame to RowsAndCol's? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyRowsInAWindowFault.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing.error; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +import java.util.Objects; + +@JsonTypeName(TooManyRowsInAWindowFault.CODE) +public class TooManyRowsInAWindowFault extends BaseMSQFault +{ + + static final String CODE = "TooManyRowsInAWindow"; + + private final int numRows; + private final int maxRows; + + @JsonCreator + public TooManyRowsInAWindowFault( + @JsonProperty("numRows") final int numRows, + @JsonProperty("maxRows") final int maxRows + ) + { + super(CODE, "Too many rows in a window (requested = %d, max = %d)", numRows, maxRows); Review Comment: Can we also add the clustering column details in the error message ? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyRowsInAWindowFault.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing.error; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +import java.util.Objects; + +@JsonTypeName(TooManyRowsInAWindowFault.CODE) +public class TooManyRowsInAWindowFault extends BaseMSQFault +{ + + static final String CODE = "TooManyRowsInAWindow"; + + private final int numRows; + private final int maxRows; + + @JsonCreator + public TooManyRowsInAWindowFault( + @JsonProperty("numRows") final int numRows, + @JsonProperty("maxRows") final int maxRows + ) + { + super(CODE, "Too many rows in a window (requested = %d, max = %d)", numRows, maxRows); Review Comment: Effectively what it means an empty partition over can have 100K rows at the max ? Lets document it in https://github.com/apache/druid/blob/c72e69a8c8aff192a22f5f198e61722380832380/docs/multi-stage-query/known-issues.md#L2 ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyRowsInAWindowFault.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing.error; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +import java.util.Objects; + +@JsonTypeName(TooManyRowsInAWindowFault.CODE) +public class TooManyRowsInAWindowFault extends BaseMSQFault +{ + + static final String CODE = "TooManyRowsInAWindow"; + + private final int numRows; + private final int maxRows; + + @JsonCreator + public TooManyRowsInAWindowFault( + @JsonProperty("numRows") final int numRows, + @JsonProperty("maxRows") final int maxRows + ) + { + super(CODE, "Too many rows in a window (requested = %d, max = %d)", numRows, maxRows); Review Comment: Also we should also say that user can set `MAX_ROWS_MATERIALIZED_IN_WINDOW` context parameter to increase the limit per query. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java: ########## @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.channel.FrameWithPartition; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.FrameProcessors; +import org.apache.druid.frame.processor.FrameRowTooLargeException; +import org.apache.druid.frame.processor.ReturnOrAwait; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.util.SettableLongVirtualColumn; +import org.apache.druid.frame.write.FrameWriter; +import org.apache.druid.frame.write.FrameWriterFactory; +import org.apache.druid.java.util.common.Unit; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.exec.Limits; +import org.apache.druid.msq.indexing.error.MSQException; +import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; +import org.apache.druid.query.operator.OffsetLimit; +import org.apache.druid.query.operator.Operator; +import org.apache.druid.query.operator.OperatorFactory; +import org.apache.druid.query.operator.WindowOperatorQuery; +import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns; +import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object> +{ + private static final Logger log = new Logger(WindowOperatorQueryFrameProcessor.class); + private final WindowOperatorQuery query; + + private final List<OperatorFactory> operatorFactoryList; + private final ObjectMapper jsonMapper; + private final RowSignature outputStageSignature; + private final ArrayList<RowsAndColumns> frameRowsAndCols; + private final ArrayList<RowsAndColumns> resultRowAndCols; + private final ReadableFrameChannel inputChannel; + private final WritableFrameChannel outputChannel; + private final FrameWriterFactory frameWriterFactory; + private final FrameReader frameReader; + private final SettableLongVirtualColumn partitionBoostVirtualColumn; + private final ArrayList<ResultRow> objectsOfASingleRac; + private final int maxRowsMaterialized; + List<Integer> partitionColsIndex; + private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed + private Cursor frameCursor = null; + private Supplier<ResultRow> rowSupplierFromFrameCursor; + private ResultRow outputRow = null; + private FrameWriter frameWriter = null; + private boolean isOverEmpty; + + public WindowOperatorQueryFrameProcessor( + WindowOperatorQuery query, + ReadableFrameChannel inputChannel, + WritableFrameChannel outputChannel, + FrameWriterFactory frameWriterFactory, + FrameReader frameReader, + ObjectMapper jsonMapper, + final List<OperatorFactory> operatorFactoryList, + final RowSignature rowSignature, + final boolean isOverEmpty + ) + { + this.inputChannel = inputChannel; + this.outputChannel = outputChannel; + this.frameWriterFactory = frameWriterFactory; + this.partitionBoostVirtualColumn = new SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN); + this.operatorFactoryList = operatorFactoryList; + this.outputStageSignature = rowSignature; + this.jsonMapper = jsonMapper; + this.frameReader = frameReader; + this.query = query; + this.frameRowsAndCols = new ArrayList<>(); + this.resultRowAndCols = new ArrayList<>(); + this.objectsOfASingleRac = new ArrayList<>(); + this.partitionColsIndex = new ArrayList<>(); + this.isOverEmpty = isOverEmpty; + if (query.context() != null && query.context() + .containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW)) { + maxRowsMaterialized = (int) query.context().get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW); + } else { + maxRowsMaterialized = Limits.MAX_ROWS_MATERIALIZED_IN_WINDOW; + } + } + + private static VirtualColumns makeVirtualColumnsForFrameWriter( + @Nullable final VirtualColumn partitionBoostVirtualColumn, + final ObjectMapper jsonMapper, + final WindowOperatorQuery query + ) + { + List<VirtualColumn> virtualColumns = new ArrayList<>(); + + virtualColumns.add(partitionBoostVirtualColumn); + final VirtualColumn segmentGranularityVirtualColumn = + QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query); + if (segmentGranularityVirtualColumn != null) { + virtualColumns.add(segmentGranularityVirtualColumn); + } + + return VirtualColumns.create(virtualColumns); + } + + @Override + public List<ReadableFrameChannel> inputChannels() + { + return Collections.singletonList(inputChannel); + } + + @Override + public List<WritableFrameChannel> outputChannels() + { + return Collections.singletonList(outputChannel); + } + + @Override + public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) + { + /* + * + * PARTITION BY A ORDER BY B + * + * Frame 1 -> rac1 + * A B + * 1, 2 + * 1, 3 + * 2, 1 --> key changed + * 2, 2 + * + * + * Frame 2 -> rac2 + * 3, 1 --> key changed + * 3, 2 + * 3, 3 + * 3, 4 + * + * Frame 3 -> rac3 + * + * 3, 5 + * 3, 6 + * 4, 1 --> key changed + * 4, 2 + * + * In case of empty OVER clause, all these racs need to be added to a single rows and columns + * to be processed. The way we can do this is to use a ConcatRowsAndColumns + * ConcatRC [rac1, rac2, rac3] + * Run all ops on this + * + * + * The flow would look like: + * 1. Validate if the operator has an empty OVER clause + * 2. If 1 is true make a giant rows and columns (R&C) using concat as shown above + * Let all operators run amok on that R&C + * 3. If 1 is false + * Read a frame + * keep the older row in a class variable + * check row by row and compare current with older row to check if partition boundary is reached + * when frame partition by changes + * create R&C for those particular set of columns, they would have the same partition key + * output will be a single R&C + * write to output channel + * + * + * Future thoughts: {@link https://github.com/apache/druid/issues/16126} + * + * 1. We are writing 1 partition to each frame in this way. In case of low cardinality data + * we will me making a large number of small frames. We can have a check to keep size of frame to a value + * say 20k rows and keep on adding to the same pending frame and not create a new frame + * + * 2. Current approach with R&C and operators materialize a single R&C for processing. In case of data + * with high cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause + * Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data. + * We might think to reimplement them in the MSQ way so that we do not have to materialize so much data + */ + + // Phase 1 of the execution + // eagerly validate presence of empty OVER() clause + if (isOverEmpty) { + // if OVER() found + // have to bring all data to a single executor for processing + // convert each frame to rac + // concat all the racs to make a giant rac + // let all operators run on the giant rac when channel is finished + if (inputChannel.canRead()) { + final Frame frame = inputChannel.read(); + convertRowFrameToRowsAndColumns(frame); + } else if (inputChannel.isFinished()) { + runAllOpsOnMultipleRac(frameRowsAndCols); + return ReturnOrAwait.returnObject(Unit.instance()); + } else { + return ReturnOrAwait.awaitAll(inputChannels().size()); + } + return ReturnOrAwait.runAgain(); + } else { + // Aha, you found a PARTITION BY and maybe ORDER BY TO + // PARTITION BY can also be on multiple keys + // typically the last stage would already partition and sort for you + // figure out frame boundaries and convert each distinct group to a rac + // then run the windowing operator only on each rac + if (frameCursor == null || frameCursor.isDone()) { + if (readableInputs.isEmpty()) { + return ReturnOrAwait.awaitAll(1); + } else if (inputChannel.canRead()) { + final Frame frame = inputChannel.read(); + frameCursor = FrameProcessors.makeCursor(frame, frameReader); + final ColumnSelectorFactory frameColumnSelectorFactory = frameCursor.getColumnSelectorFactory(); + partitionColsIndex = findPartitionColumns(frameReader.signature()); + final Supplier<Object>[] fieldSuppliers = new Supplier[frameReader.signature().size()]; + for (int i = 0; i < fieldSuppliers.length; i++) { + final ColumnValueSelector<?> selector = + frameColumnSelectorFactory.makeColumnValueSelector(frameReader.signature().getColumnName(i)); + fieldSuppliers[i] = selector::getObject; + + } + rowSupplierFromFrameCursor = () -> { + final ResultRow row = ResultRow.create(fieldSuppliers.length); + for (int i = 0; i < fieldSuppliers.length; i++) { + row.set(i, fieldSuppliers[i].get()); + } + return row; + }; + } else if (inputChannel.isFinished()) { + // reaached end of channel + // if there is data remaining + // write it into a rac + // and run operators on it + if (!objectsOfASingleRac.isEmpty()) { + if (objectsOfASingleRac.size() > maxRowsMaterialized) { Review Comment: The reason I say this is that if we have a big RAC, we would probably OOM before we hit this guard rail. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java: ########## @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.channel.FrameWithPartition; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.FrameProcessors; +import org.apache.druid.frame.processor.FrameRowTooLargeException; +import org.apache.druid.frame.processor.ReturnOrAwait; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.util.SettableLongVirtualColumn; +import org.apache.druid.frame.write.FrameWriter; +import org.apache.druid.frame.write.FrameWriterFactory; +import org.apache.druid.java.util.common.Unit; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.exec.Limits; +import org.apache.druid.msq.indexing.error.MSQException; +import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; +import org.apache.druid.query.operator.OffsetLimit; +import org.apache.druid.query.operator.Operator; +import org.apache.druid.query.operator.OperatorFactory; +import org.apache.druid.query.operator.WindowOperatorQuery; +import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns; +import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object> +{ + private static final Logger log = new Logger(WindowOperatorQueryFrameProcessor.class); + private final WindowOperatorQuery query; + + private final List<OperatorFactory> operatorFactoryList; + private final ObjectMapper jsonMapper; + private final RowSignature outputStageSignature; + private final ArrayList<RowsAndColumns> frameRowsAndCols; + private final ArrayList<RowsAndColumns> resultRowAndCols; + private final ReadableFrameChannel inputChannel; + private final WritableFrameChannel outputChannel; + private final FrameWriterFactory frameWriterFactory; + private final FrameReader frameReader; + private final SettableLongVirtualColumn partitionBoostVirtualColumn; + private final ArrayList<ResultRow> objectsOfASingleRac; + private final int maxRowsMaterialized; + List<Integer> partitionColsIndex; + private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed + private Cursor frameCursor = null; + private Supplier<ResultRow> rowSupplierFromFrameCursor; + private ResultRow outputRow = null; + private FrameWriter frameWriter = null; + private boolean isOverEmpty; + + public WindowOperatorQueryFrameProcessor( + WindowOperatorQuery query, + ReadableFrameChannel inputChannel, + WritableFrameChannel outputChannel, + FrameWriterFactory frameWriterFactory, + FrameReader frameReader, + ObjectMapper jsonMapper, + final List<OperatorFactory> operatorFactoryList, + final RowSignature rowSignature, + final boolean isOverEmpty + ) + { + this.inputChannel = inputChannel; + this.outputChannel = outputChannel; + this.frameWriterFactory = frameWriterFactory; + this.partitionBoostVirtualColumn = new SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN); + this.operatorFactoryList = operatorFactoryList; + this.outputStageSignature = rowSignature; + this.jsonMapper = jsonMapper; + this.frameReader = frameReader; + this.query = query; + this.frameRowsAndCols = new ArrayList<>(); + this.resultRowAndCols = new ArrayList<>(); + this.objectsOfASingleRac = new ArrayList<>(); + this.partitionColsIndex = new ArrayList<>(); + this.isOverEmpty = isOverEmpty; + if (query.context() != null && query.context() + .containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW)) { + maxRowsMaterialized = (int) query.context().get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW); + } else { + maxRowsMaterialized = Limits.MAX_ROWS_MATERIALIZED_IN_WINDOW; + } + } + + private static VirtualColumns makeVirtualColumnsForFrameWriter( + @Nullable final VirtualColumn partitionBoostVirtualColumn, + final ObjectMapper jsonMapper, + final WindowOperatorQuery query + ) + { + List<VirtualColumn> virtualColumns = new ArrayList<>(); + + virtualColumns.add(partitionBoostVirtualColumn); + final VirtualColumn segmentGranularityVirtualColumn = + QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query); + if (segmentGranularityVirtualColumn != null) { + virtualColumns.add(segmentGranularityVirtualColumn); + } + + return VirtualColumns.create(virtualColumns); + } + + @Override + public List<ReadableFrameChannel> inputChannels() + { + return Collections.singletonList(inputChannel); + } + + @Override + public List<WritableFrameChannel> outputChannels() + { + return Collections.singletonList(outputChannel); + } + + @Override + public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) + { + /* + * + * PARTITION BY A ORDER BY B + * + * Frame 1 -> rac1 + * A B + * 1, 2 + * 1, 3 + * 2, 1 --> key changed + * 2, 2 + * + * + * Frame 2 -> rac2 + * 3, 1 --> key changed + * 3, 2 + * 3, 3 + * 3, 4 + * + * Frame 3 -> rac3 + * + * 3, 5 + * 3, 6 + * 4, 1 --> key changed + * 4, 2 + * + * In case of empty OVER clause, all these racs need to be added to a single rows and columns + * to be processed. The way we can do this is to use a ConcatRowsAndColumns + * ConcatRC [rac1, rac2, rac3] + * Run all ops on this + * + * + * The flow would look like: + * 1. Validate if the operator has an empty OVER clause + * 2. If 1 is true make a giant rows and columns (R&C) using concat as shown above + * Let all operators run amok on that R&C + * 3. If 1 is false + * Read a frame + * keep the older row in a class variable + * check row by row and compare current with older row to check if partition boundary is reached + * when frame partition by changes + * create R&C for those particular set of columns, they would have the same partition key + * output will be a single R&C + * write to output channel + * + * + * Future thoughts: {@link https://github.com/apache/druid/issues/16126} + * + * 1. We are writing 1 partition to each frame in this way. In case of low cardinality data + * we will me making a large number of small frames. We can have a check to keep size of frame to a value + * say 20k rows and keep on adding to the same pending frame and not create a new frame + * + * 2. Current approach with R&C and operators materialize a single R&C for processing. In case of data + * with high cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause + * Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data. + * We might think to reimplement them in the MSQ way so that we do not have to materialize so much data + */ + + // Phase 1 of the execution + // eagerly validate presence of empty OVER() clause + if (isOverEmpty) { + // if OVER() found + // have to bring all data to a single executor for processing + // convert each frame to rac + // concat all the racs to make a giant rac + // let all operators run on the giant rac when channel is finished + if (inputChannel.canRead()) { + final Frame frame = inputChannel.read(); + convertRowFrameToRowsAndColumns(frame); + } else if (inputChannel.isFinished()) { + runAllOpsOnMultipleRac(frameRowsAndCols); + return ReturnOrAwait.returnObject(Unit.instance()); + } else { + return ReturnOrAwait.awaitAll(inputChannels().size()); + } + return ReturnOrAwait.runAgain(); + } else { + // Aha, you found a PARTITION BY and maybe ORDER BY TO + // PARTITION BY can also be on multiple keys + // typically the last stage would already partition and sort for you + // figure out frame boundaries and convert each distinct group to a rac + // then run the windowing operator only on each rac + if (frameCursor == null || frameCursor.isDone()) { + if (readableInputs.isEmpty()) { + return ReturnOrAwait.awaitAll(1); + } else if (inputChannel.canRead()) { + final Frame frame = inputChannel.read(); + frameCursor = FrameProcessors.makeCursor(frame, frameReader); + final ColumnSelectorFactory frameColumnSelectorFactory = frameCursor.getColumnSelectorFactory(); + partitionColsIndex = findPartitionColumns(frameReader.signature()); + final Supplier<Object>[] fieldSuppliers = new Supplier[frameReader.signature().size()]; + for (int i = 0; i < fieldSuppliers.length; i++) { + final ColumnValueSelector<?> selector = + frameColumnSelectorFactory.makeColumnValueSelector(frameReader.signature().getColumnName(i)); + fieldSuppliers[i] = selector::getObject; + + } + rowSupplierFromFrameCursor = () -> { + final ResultRow row = ResultRow.create(fieldSuppliers.length); + for (int i = 0; i < fieldSuppliers.length; i++) { + row.set(i, fieldSuppliers[i].get()); + } + return row; + }; + } else if (inputChannel.isFinished()) { + // reaached end of channel + // if there is data remaining + // write it into a rac + // and run operators on it + if (!objectsOfASingleRac.isEmpty()) { + if (objectsOfASingleRac.size() > maxRowsMaterialized) { Review Comment: The limit should be enforced as we are populating the RAC's no ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
