This is an automated email from the ASF dual-hosted git repository.
lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0ce1b6b22fb MSQ window function: Take segment granularity into
consideration to fix NPE issues with ingestion (#16854)
0ce1b6b22fb is described below
commit 0ce1b6b22fbb1d6a9ab257b9a8ad4cd8637da933
Author: Akshat Jain <[email protected]>
AuthorDate: Wed Aug 21 10:06:04 2024 +0530
MSQ window function: Take segment granularity into consideration to fix NPE
issues with ingestion (#16854)
This PR changes the logic for window functions to use the
resultShuffleSpecFactory for the last window stage.
---
.../WindowOperatorQueryFrameProcessor.java | 21 ++++++-
.../druid/msq/querykit/WindowOperatorQueryKit.java | 67 ++++++++++++++++------
.../org/apache/druid/msq/exec/MSQWindowTest.java | 64 ++++++++++++++++-----
.../drill/window/queries/lag_func/lag_Fn_18.q | 7 ++-
.../drill/window/queries/lead_func/lead_Fn_18.q | 7 ++-
5 files changed, 132 insertions(+), 34 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
index 5fbfd3119d0..6b28c0263a8 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
@@ -31,6 +31,7 @@ import org.apache.druid.frame.processor.FrameProcessors;
import org.apache.druid.frame.processor.FrameRowTooLargeException;
import org.apache.druid.frame.processor.ReturnOrAwait;
import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.util.SettableLongVirtualColumn;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.Unit;
@@ -51,6 +52,8 @@ import
org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.NullableTypeStrategy;
import org.apache.druid.segment.column.RowSignature;
@@ -85,6 +88,9 @@ public class WindowOperatorQueryFrameProcessor implements
FrameProcessor<Object>
private ResultRow outputRow = null;
private FrameWriter frameWriter = null;
+ private final VirtualColumns frameWriterVirtualColumns;
+ private final SettableLongVirtualColumn partitionBoostVirtualColumn;
+
// List of type strategies to compare the partition columns across rows.
// Type strategies are pushed in the same order as column types in
frameReader.signature()
private final NullableTypeStrategy[] typeStrategies;
@@ -119,6 +125,16 @@ public class WindowOperatorQueryFrameProcessor implements
FrameProcessor<Object>
for (int i = 0; i < frameReader.signature().size(); i++) {
typeStrategies[i] =
frameReader.signature().getColumnType(i).get().getNullableStrategy();
}
+
+ // Get virtual columns to be added to the frame writer.
+ this.partitionBoostVirtualColumn = new
SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
+ final List<VirtualColumn> frameWriterVirtualColumns = new ArrayList<>();
+ final VirtualColumn segmentGranularityVirtualColumn =
+ QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
+ if (segmentGranularityVirtualColumn != null) {
+ frameWriterVirtualColumns.add(segmentGranularityVirtualColumn);
+ }
+ this.frameWriterVirtualColumns =
VirtualColumns.create(frameWriterVirtualColumns);
}
@Override
@@ -404,7 +420,9 @@ public class WindowOperatorQueryFrameProcessor implements
FrameProcessor<Object>
if (frameWriter == null) {
final ColumnSelectorFactoryMaker csfm =
ColumnSelectorFactoryMaker.fromRAC(rac);
final ColumnSelectorFactory frameWriterColumnSelectorFactory =
csfm.make(rowId);
- frameWriter =
frameWriterFactory.newFrameWriter(frameWriterColumnSelectorFactory);
+ final ColumnSelectorFactory
frameWriterColumnSelectorFactoryWithVirtualColumns =
+ frameWriterVirtualColumns.wrap(frameWriterColumnSelectorFactory);
+ frameWriter =
frameWriterFactory.newFrameWriter(frameWriterColumnSelectorFactoryWithVirtualColumns);
currentAllocatorCapacity = frameWriterFactory.allocatorCapacity();
}
}
@@ -422,6 +440,7 @@ public class WindowOperatorQueryFrameProcessor implements
FrameProcessor<Object>
final boolean didAddToFrame = frameWriter.addSelection();
if (didAddToFrame) {
rowId.incrementAndGet();
+
partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() +
1);
} else if (frameWriter.getNumRows() == 0) {
throw new FrameRowTooLargeException(currentAllocatorCapacity);
} else {
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
index a814640f704..b3686359d2a 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
@@ -24,6 +24,7 @@ import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.key.KeyOrder;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.stage.StageInputSpec;
@@ -105,8 +106,14 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
final int firstStageNumber = Math.max(minStageNumber,
queryDefBuilder.getNextStageNumber());
final WindowOperatorQuery queryToRun = (WindowOperatorQuery)
originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
- final int maxRowsMaterialized;
+ // Get segment granularity from query context, and create ShuffleSpec and
RowSignature to be used for the final window stage.
+ final Granularity segmentGranularity =
QueryKitUtils.getSegmentGranularityFromContext(jsonMapper,
queryToRun.getContext());
+ final ClusterBy finalWindowClusterBy =
computeClusterByForFinalWindowStage(segmentGranularity);
+ final ShuffleSpec finalWindowStageShuffleSpec =
resultShuffleSpecFactory.build(finalWindowClusterBy, false);
+ final RowSignature finalWindowStageRowSignature =
computeSignatureForFinalWindowStage(rowSignature, finalWindowClusterBy,
segmentGranularity);
+
+ final int maxRowsMaterialized;
if (originalQuery.context() != null &&
originalQuery.context().containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW))
{
maxRowsMaterialized = (int)
originalQuery.context().get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW);
} else {
@@ -122,13 +129,13 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber)
.inputs(new StageInputSpec(firstStageNumber - 1))
- .signature(rowSignature)
+ .signature(finalWindowStageRowSignature)
.maxWorkerCount(maxWorkerCount)
- .shuffleSpec(null)
+ .shuffleSpec(finalWindowStageShuffleSpec)
.processorFactory(new
WindowOperatorQueryFrameProcessorFactory(
queryToRun,
queryToRun.getOperators(),
- rowSignature,
+ finalWindowStageRowSignature,
maxRowsMaterialized,
Collections.emptyList()
))
@@ -178,23 +185,22 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
}
}
- // find the shuffle spec of the next stage
- // if it is the last stage set the next shuffle spec to single
partition
- if (i + 1 == operatorList.size()) {
- nextShuffleSpec = MixShuffleSpec.instance();
- } else {
- nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i +
1), maxWorkerCount);
- }
-
final RowSignature intermediateSignature = bob.build();
final RowSignature stageRowSignature;
- if (nextShuffleSpec == null) {
- stageRowSignature = intermediateSignature;
+
+ if (i + 1 == operatorList.size()) {
+ stageRowSignature = finalWindowStageRowSignature;
+ nextShuffleSpec = finalWindowStageShuffleSpec;
} else {
- stageRowSignature = QueryKitUtils.sortableSignature(
- intermediateSignature,
- nextShuffleSpec.clusterBy().getColumns()
- );
+ nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i +
1), maxWorkerCount);
+ if (nextShuffleSpec == null) {
+ stageRowSignature = intermediateSignature;
+ } else {
+ stageRowSignature = QueryKitUtils.sortableSignature(
+ intermediateSignature,
+ nextShuffleSpec.clusterBy().getColumns()
+ );
+ }
}
log.info("Using row signature [%s] for window stage.",
stageRowSignature);
@@ -346,4 +352,29 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
}
return queryDefBuilder;
}
+
+ /**
+ * Computes the ClusterBy for the final window stage. We don't have to take
the CLUSTERED BY columns into account,
+ * as they are handled as {@link
org.apache.druid.query.scan.ScanQuery#orderBys}.
+ */
+ private static ClusterBy computeClusterByForFinalWindowStage(Granularity
segmentGranularity)
+ {
+ final List<KeyColumn> clusterByColumns = Collections.singletonList(new
KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING));
+ return QueryKitUtils.clusterByWithSegmentGranularity(new
ClusterBy(clusterByColumns, 0), segmentGranularity);
+ }
+
+ /**
+ * Computes the signature for the final window stage. The
finalWindowClusterBy will always have the
+ * partition boost column as computed in {@link
#computeClusterByForFinalWindowStage(Granularity)}.
+ */
+ private static RowSignature computeSignatureForFinalWindowStage(RowSignature
rowSignature, ClusterBy finalWindowClusterBy, Granularity segmentGranularity)
+ {
+ final RowSignature.Builder finalWindowStageRowSignatureBuilder =
RowSignature.builder()
+
.addAll(rowSignature)
+
.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG);
+ return QueryKitUtils.sortableSignature(
+
QueryKitUtils.signatureWithSegmentGranularity(finalWindowStageRowSignatureBuilder.build(),
segmentGranularity),
+ finalWindowClusterBy.getColumns()
+ );
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java
index ee7121b3678..3a1e3d95f80 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java
@@ -1272,20 +1272,20 @@ public class MSQWindowTest extends MSQTestBase
.setExpectedResultRows(
NullHandling.replaceWithDefault() ?
ImmutableList.of(
- new Object[]{"", 11.0},
+ new Object[]{"a", 5.0},
new Object[]{"", 11.0},
new Object[]{"", 11.0},
new Object[]{"a", 5.0},
- new Object[]{"a", 5.0},
- new Object[]{"abc", 5.0}
+ new Object[]{"abc", 5.0},
+ new Object[]{"", 11.0}
) :
ImmutableList.of(
- new Object[]{null, 8.0},
+ new Object[]{"a", 5.0},
new Object[]{null, 8.0},
new Object[]{"", 3.0},
new Object[]{"a", 5.0},
- new Object[]{"a", 5.0},
- new Object[]{"abc", 5.0}
+ new Object[]{"abc", 5.0},
+ new Object[]{null, 8.0}
))
.setQueryContext(context)
.verifyResults();
@@ -1935,11 +1935,11 @@ public class MSQWindowTest extends MSQTestBase
.build())
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(ImmutableList.of(
- new Object[]{"Al Ain", 8L, 6334L},
- new Object[]{"Dubai", 3L, 6334L},
- new Object[]{"Dubai", 6323L, 6334L},
- new Object[]{"Tirana", 26L, 26L},
- new Object[]{"Benguela", 0L, 0L}
+ new Object[]{"Auburn", 0L, 1698L},
+ new Object[]{"Mexico City", 0L, 6136L},
+ new Object[]{"Seoul", 663L, 5582L},
+ new Object[]{"Tokyo", 0L, 12615L},
+ new Object[]{"Santiago", 161L, 401L}
))
.setQueryContext(context)
.verifyResults();
@@ -2266,13 +2266,13 @@ public class MSQWindowTest extends MSQTestBase
2, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
- CounterSnapshotMatcher.with().rows(13).bytes(1158).frames(1),
+ CounterSnapshotMatcher.with().rows(13).bytes(1379).frames(1),
2, 0, "output"
)
// Stage 3, Worker 0
.setExpectedCountersForStageWorkerChannel(
- CounterSnapshotMatcher.with().rows(13).bytes(1158).frames(1),
+ CounterSnapshotMatcher.with().rows(13).bytes(1327).frames(1),
3, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
@@ -2285,4 +2285,42 @@ public class MSQWindowTest extends MSQTestBase
)
.verifyResults();
}
+
+ @MethodSource("data")
+ @ParameterizedTest(name = "{index}:with context {0}")
+ public void testReplaceWithPartitionedByDayOnWikipedia(String contextName,
Map<String, Object> context)
+ {
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("cityName", ColumnType.STRING)
+ .add("added", ColumnType.LONG)
+ .add("cc", ColumnType.LONG)
+ .build();
+
+ testIngestQuery().setSql(" REPLACE INTO foo1 OVERWRITE ALL\n"
+ + "select __time, cityName, added, SUM(added)
OVER () cc from wikipedia \n"
+ + "where cityName IN ('Ahmedabad',
'Albuquerque')\n"
+ + "GROUP BY __time, cityName, added\n"
+ + "PARTITIONED BY DAY")
+ .setExpectedDataSource("foo1")
+ .setExpectedRowSignature(rowSignature)
+ .setQueryContext(context)
+ .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
+ .setExpectedResultRows(
+ ImmutableList.of(
+ new Object[]{1442055085114L, "Ahmedabad", 0L,
140L},
+ new Object[]{1442061929238L, "Ahmedabad", 0L,
140L},
+ new Object[]{1442069353218L, "Albuquerque", 129L,
140L},
+ new Object[]{1442069411614L, "Albuquerque", 9L,
140L},
+ new Object[]{1442097803851L, "Albuquerque", 2L,
140L}
+ )
+ )
+ .setExpectedSegments(ImmutableSet.of(SegmentId.of(
+ "foo1",
+ Intervals.of("2015-09-12/2015-09-13"),
+ "test",
+ 0
+ )))
+ .verifyResults();
+ }
}
diff --git a/sql/src/test/resources/drill/window/queries/lag_func/lag_Fn_18.q
b/sql/src/test/resources/drill/window/queries/lag_func/lag_Fn_18.q
index e24ed5379df..a1bb3a29536 100644
--- a/sql/src/test/resources/drill/window/queries/lag_func/lag_Fn_18.q
+++ b/sql/src/test/resources/drill/window/queries/lag_func/lag_Fn_18.q
@@ -1 +1,6 @@
-SELECT col2 , col8 , LAG(col8 ) OVER ( PARTITION BY col2 ORDER BY col2 , col8
nulls FIRST ) LAG_col8 FROM "fewRowsAllData.parquet" FETCH FIRST 15 ROWS ONLY
+SELECT
+col2, col8,
+LAG(col8) OVER (PARTITION BY col2 ORDER BY col2, col8 nulls FIRST) LAG_col8
+FROM "fewRowsAllData.parquet"
+ORDER BY col2, col8
+FETCH FIRST 15 ROWS ONLY
diff --git a/sql/src/test/resources/drill/window/queries/lead_func/lead_Fn_18.q
b/sql/src/test/resources/drill/window/queries/lead_func/lead_Fn_18.q
index e8a15695382..5249f290fae 100644
--- a/sql/src/test/resources/drill/window/queries/lead_func/lead_Fn_18.q
+++ b/sql/src/test/resources/drill/window/queries/lead_func/lead_Fn_18.q
@@ -1 +1,6 @@
-SELECT col2 , col8 , LEAD(col8 ) OVER ( PARTITION BY col2 ORDER BY col8 nulls
FIRST ) LEAD_col8 FROM "fewRowsAllData.parquet" limit 10
+SELECT
+col2, col8,
+LEAD(col8) OVER (PARTITION BY col2 ORDER BY col8 nulls FIRST) LEAD_col8
+FROM "fewRowsAllData.parquet"
+ORDER BY col2, col8
+limit 10
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]