This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ed3dbd62424 MSQ: Fix validation of time position in collations.
(#16961)
ed3dbd62424 is described below
commit ed3dbd62424ef49fa69316e50c907e6891fbd3e8
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Aug 27 00:02:32 2024 -0700
MSQ: Fix validation of time position in collations. (#16961)
* MSQ: Fix validation of time position in collations.
It is possible for the collation to refer to a field that isn't mapped,
such as when the DML includes "CLUSTERED BY some_function(some_field)".
In this case, the collation refers to a projected column that is not
part of the field mappings. Prior to this patch, that would lead to an
out of bounds list access on fieldMappings.
This patch fixes the problem by identifying the position of __time in
the fieldMappings first, rather than retrieving each collation field
from fieldMappings.
Fixes a bug introduced in #16849.
* Fix test. Better warning message.
---
.../org/apache/druid/msq/sql/MSQTaskSqlEngine.java | 21 ++++--
.../org/apache/druid/msq/exec/MSQReplaceTest.java | 80 ++++++++++++++++++++++
.../druid/data/input/impl/DimensionsSpec.java | 6 +-
3 files changed, 98 insertions(+), 9 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
index 24fbd373c95..01d87805f78 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
@@ -479,22 +479,29 @@ public class MSQTaskSqlEngine implements SqlEngine
);
}
} else if (!rootCollation.getFieldCollations().isEmpty()) {
- int timePosition = -1;
+ int timePositionInRow = -1;
+ for (int i = 0; i < fieldMappings.size(); i++) {
+ Entry<Integer, String> entry = fieldMappings.get(i);
+ if (ColumnHolder.TIME_COLUMN_NAME.equals(entry.getValue())) {
+ timePositionInRow = i;
+ break;
+ }
+ }
+
+ int timePositionInCollation = -1;
for (int i = 0; i < rootCollation.getFieldCollations().size(); i++) {
- final String fieldCollationName =
-
fieldMappings.get(rootCollation.getFieldCollations().get(i).getFieldIndex()).getValue();
- if (ColumnHolder.TIME_COLUMN_NAME.equals(fieldCollationName)) {
- timePosition = i;
+ if (rootCollation.getFieldCollations().get(i).getFieldIndex() ==
timePositionInRow) {
+ timePositionInCollation = i;
break;
}
}
- if (timePosition > 0) {
+ if (timePositionInCollation > 0) {
throw InvalidSqlInput.exception(
"Sort order (CLUSTERED BY) cannot include[%s] in position[%d]
unless context parameter[%s] "
+ "is set to[false]. %s",
ColumnHolder.TIME_COLUMN_NAME,
- timePosition,
+ timePositionInCollation,
MultiStageQueryContext.CTX_FORCE_TIME_SORT,
DimensionsSpec.WARNING_NON_TIME_SORT_ORDER
);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 72541046913..798a27a95e4 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -277,6 +277,86 @@ public class MSQReplaceTest extends MSQTestBase
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceOnFooWithAllClusteredByDimExplicitSort(String
contextName, Map<String, Object> context)
+ {
+ // Tests [CLUSTERED BY LOWER(dim1)], i.e. an expression that is not
actually stored.
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("m1", ColumnType.FLOAT)
+ .build();
+
+ DataSegment existingDataSegment0 = DataSegment.builder()
+
.interval(Intervals.of("2000-01-01T/2000-01-04T"))
+ .size(50)
+
.version(MSQTestTaskActionClient.VERSION)
+ .dataSource("foo")
+ .build();
+
+ DataSegment existingDataSegment1 = DataSegment.builder()
+
.interval(Intervals.of("2001-01-01T/2001-01-04T"))
+ .size(50)
+
.version(MSQTestTaskActionClient.VERSION)
+ .dataSource("foo")
+ .build();
+
+ Mockito.doCallRealMethod()
+ .doReturn(ImmutableSet.of(existingDataSegment0,
existingDataSegment1))
+ .when(testTaskActionClient)
+ .submit(new RetrieveUsedSegmentsAction("foo",
ImmutableList.of(Intervals.ETERNITY)));
+
+ testIngestQuery().setSql(" REPLACE INTO foo OVERWRITE ALL "
+ + "SELECT __time, dim1, m1 "
+ + "FROM foo "
+ + "PARTITIONED BY ALL "
+ + "CLUSTERED BY LOWER(dim1)")
+ .setExpectedDataSource("foo")
+ .setExpectedRowSignature(rowSignature)
+ .setQueryContext(context)
+ .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
+ .setExpectedSegments(
+ ImmutableSet.of(
+ SegmentId.of("foo", Intervals.ETERNITY, "test", 0)
+ )
+ )
+ .setExpectedShardSpec(NumberedShardSpec.class)
+ .setExpectedResultRows(
+ ImmutableList.of(
+ new Object[]{946684800000L,
NullHandling.sqlCompatible() ? "" : null, 1.0f},
+ new Object[]{946771200000L, "10.1", 2.0f},
+ new Object[]{946857600000L, "2", 3.0f},
+ new Object[]{978307200000L, "1", 4.0f},
+ new Object[]{978393600000L, "def", 5.0f},
+ new Object[]{978480000000L, "abc", 6.0f}
+ )
+ )
+
.setExpectedSegmentGenerationProgressCountersForStageWorker(
+ CounterSnapshotMatcher
+ .with().segmentRowsProcessed(6),
+ 1, 0
+ )
+ .setExpectedLastCompactionState(
+ expectedCompactionState(
+ context,
+ Collections.emptyList(),
+ DimensionsSpec.builder()
+ .setDimensions(
+ ImmutableList.of(
+ new
StringDimensionSchema("dim1"),
+ new
FloatDimensionSchema("m1")
+ )
+ )
+
.setDimensionExclusions(Collections.singletonList("__time"))
+ .build(),
+ GranularityType.ALL,
+ Intervals.ETERNITY
+ )
+ )
+ .verifyResults();
+ }
+
+ @MethodSource("data")
+ @ParameterizedTest(name = "{index}:with context {0}")
+ public void testReplaceOnFooWithAllClusteredByExpression(String contextName,
Map<String, Object> context)
{
RowSignature rowSignature = RowSignature.builder()
.add("dim1", ColumnType.STRING)
diff --git
a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java
b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java
index e529aa2d218..efc3718d4ff 100644
---
a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java
+++
b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java
@@ -60,8 +60,10 @@ public class DimensionsSpec
public static final String WARNING_NON_TIME_SORT_ORDER = StringUtils.format(
"Warning: support for segments not sorted by[%s] is experimental. Such
segments are not readable by older "
+ "version of Druid, and certain queries cannot run on them. See "
- + "https://druid.apache.org/docs/latest/ingestion/partitioning#sorting
for details before using this option.",
- ColumnHolder.TIME_COLUMN_NAME
+ + "https://druid.apache.org/docs/latest/ingestion/partitioning#sorting
for details before setting "
+ + "%s to[false].",
+ ColumnHolder.TIME_COLUMN_NAME,
+ PARAMETER_FORCE_TIME_SORT
);
public static final boolean DEFAULT_FORCE_TIME_SORT = true;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]