cryptoe commented on code in PR #15470:
URL: https://github.com/apache/druid/pull/15470#discussion_r1464338395
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java:
##########
@@ -88,6 +88,7 @@ public class DataSourcePlan
* of subqueries.
*/
private static final Map<String, Object> CONTEXT_MAP_NO_SEGMENT_GRANULARITY
= new HashMap<>();
+ public static final String NEXT_SHUFFLE_COL = "shuffleCol";
Review Comment:
```suggestion
public static final String NEXT_WINDOW_SHUFFLE_COL = "__windowShuffleCol";
```
nit: added __ since this is an internal code generated context value
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterables;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.FrameRowTooLargeException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.input.ReadableInput;
+import org.apache.druid.msq.input.table.SegmentWithDescriptor;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.OffsetLimit;
+import org.apache.druid.query.operator.Operator;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowAndColumns;
+import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class WindowOperatorQueryFrameProcessor extends BaseLeafFrameProcessor
Review Comment:
Since WindowOperatorQueryFrameProcessor will always run after a previous
stage, this should not extend baseLeafFrameProcessor.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java:
##########
@@ -423,15 +425,27 @@ private static DataSourcePlan forQuery(
final QueryDataSource dataSource,
final int maxWorkerCount,
final int minStageNumber,
- final boolean broadcast
+ final boolean broadcast,
+ @Nullable final QueryContext parentContext
)
{
+ // check if parentContext has a window operator
+ final Map<String, Object> windowShuffleMap = new HashMap<>();
+ if (parentContext != null) {
Review Comment:
```suggestion
if (parentContext != null && parentContext.containsKey(XXX) {
```
Nit: codestyle
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java:
##########
@@ -687,7 +701,8 @@ private static DataSourcePlan forSortMergeJoin(
(QueryDataSource) dataSource.getLeft(),
maxWorkerCount,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
- false
+ false,
+ null
Review Comment:
What happens if there is a window in join query on one or both sides ?
A UT in MSQ select test would be cool.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterables;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.FrameRowTooLargeException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.input.ReadableInput;
+import org.apache.druid.msq.input.table.SegmentWithDescriptor;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.OffsetLimit;
+import org.apache.druid.query.operator.Operator;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowAndColumns;
+import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class WindowOperatorQueryFrameProcessor extends BaseLeafFrameProcessor
+{
+
+ private static final Logger log = new
Logger(WindowOperatorQueryFrameProcessor.class);
+ private final WindowOperatorQuery query;
+
+ private final List<OperatorFactory> operatorFactoryList;
+ private final ObjectMapper jsonMapper;
+ private final Closer closer = Closer.create();
+ private final RowSignature outputStageSignature;
+ private final ArrayList<RowsAndColumns> frameRowsAndCols;
+ private final ArrayList<RowsAndColumns> resultRowAndCols;
+ ArrayList<ResultRow> objectsOfASingleRac;
+ private long currentAllocatorCapacity; // Used for generating
FrameRowTooLargeException if needed
+ private Cursor frameCursor = null;
+ private Supplier<ResultRow> rowSupplierFromFrameCursor;
+ private ResultRow outputRow = null;
+ private FrameWriter frameWriter = null;
+
+ public WindowOperatorQueryFrameProcessor(
+ final WindowOperatorQuery query,
+ final List<OperatorFactory> operatorFactoryList,
+ final ObjectMapper jsonMapper,
+ final ReadableInput baseInput,
+ final Function<SegmentReference, SegmentReference> segmentMapFn,
+ final ResourceHolder<WritableFrameChannel> outputChannelHolder,
+ final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
+ final RowSignature rowSignature
+ )
+ {
+ super(
+ baseInput,
+ segmentMapFn,
+ outputChannelHolder,
+ frameWriterFactoryHolder
+ );
+ this.query = query;
+ this.jsonMapper = jsonMapper;
+ this.operatorFactoryList = operatorFactoryList;
+ this.outputStageSignature = rowSignature;
+ this.frameRowsAndCols = new ArrayList<>();
+ this.resultRowAndCols = new ArrayList<>();
+ this.objectsOfASingleRac = new ArrayList<>();
+ }
+
+ @Override
+ protected ReturnOrAwait<Unit> runWithSegment(SegmentWithDescriptor segment)
+ {
+ return null;
+ }
+
+ @Override
+ protected ReturnOrAwait<Unit> runWithLoadedSegment(SegmentWithDescriptor
segment)
+ {
+ return null;
Review Comment:
Lets throw an exception here.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -2728,8 +2731,21 @@ private void startStages() throws IOException,
InterruptedException
throw new MSQException(new
InsertCannotBeEmptyFault(task.getDataSource()));
}
- final ClusterByPartitions partitionBoundaries =
- queryKernel.getResultPartitionBoundariesForStage(shuffleStageId);
+ // Q for MSQ folks
+ // I am not sure why can't we switch to universal
+ // if the partition boundary for a stage derived from the last valid
shuffle spec
+ // is not ready, especially when the stage before the final has a
shuffle spec of null
+ final ClusterByPartitions partitionBoundaries;
Review Comment:
We should revert these changes.
##########
.idea/inspectionProfiles/Druid.xml:
##########
@@ -39,9 +39,13 @@
<option name="SUGGEST_NULLABLE_ANNOTATIONS" value="false" />
<option name="DONT_REPORT_TRUE_ASSERT_STATEMENTS" value="true" />
</inspection_tool>
+ <inspection_tool class="ConstantValue" enabled="true" level="WARNING"
enabled_by_default="true">
Review Comment:
I think these changes can be reverted.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -2728,8 +2731,21 @@ private void startStages() throws IOException,
InterruptedException
throw new MSQException(new
InsertCannotBeEmptyFault(task.getDataSource()));
}
- final ClusterByPartitions partitionBoundaries =
- queryKernel.getResultPartitionBoundariesForStage(shuffleStageId);
+ // Q for MSQ folks
Review Comment:
You might be hitting a case where you must be using a null shuffle spec
through out the query definition. That case is currently not supported in
inserts since it leads to weird segment sizes.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterables;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.FrameRowTooLargeException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.input.ReadableInput;
+import org.apache.druid.msq.input.table.SegmentWithDescriptor;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.OffsetLimit;
+import org.apache.druid.query.operator.Operator;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowAndColumns;
+import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class WindowOperatorQueryFrameProcessor extends BaseLeafFrameProcessor
+{
+
+ private static final Logger log = new
Logger(WindowOperatorQueryFrameProcessor.class);
+ private final WindowOperatorQuery query;
+
+ private final List<OperatorFactory> operatorFactoryList;
+ private final ObjectMapper jsonMapper;
+ private final Closer closer = Closer.create();
+ private final RowSignature outputStageSignature;
+ private final ArrayList<RowsAndColumns> frameRowsAndCols;
+ private final ArrayList<RowsAndColumns> resultRowAndCols;
+ ArrayList<ResultRow> objectsOfASingleRac;
+ private long currentAllocatorCapacity; // Used for generating
FrameRowTooLargeException if needed
+ private Cursor frameCursor = null;
+ private Supplier<ResultRow> rowSupplierFromFrameCursor;
+ private ResultRow outputRow = null;
+ private FrameWriter frameWriter = null;
+
+ public WindowOperatorQueryFrameProcessor(
+ final WindowOperatorQuery query,
+ final List<OperatorFactory> operatorFactoryList,
+ final ObjectMapper jsonMapper,
+ final ReadableInput baseInput,
+ final Function<SegmentReference, SegmentReference> segmentMapFn,
+ final ResourceHolder<WritableFrameChannel> outputChannelHolder,
+ final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
+ final RowSignature rowSignature
+ )
+ {
+ super(
+ baseInput,
+ segmentMapFn,
+ outputChannelHolder,
+ frameWriterFactoryHolder
+ );
+ this.query = query;
+ this.jsonMapper = jsonMapper;
+ this.operatorFactoryList = operatorFactoryList;
+ this.outputStageSignature = rowSignature;
+ this.frameRowsAndCols = new ArrayList<>();
+ this.resultRowAndCols = new ArrayList<>();
+ this.objectsOfASingleRac = new ArrayList<>();
+ }
+
+ @Override
+ protected ReturnOrAwait<Unit> runWithSegment(SegmentWithDescriptor segment)
+ {
+ return null;
Review Comment:
Lets throw an exception here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]