This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ed3dbd62424 MSQ: Fix validation of time position in collations. 
(#16961)
ed3dbd62424 is described below

commit ed3dbd62424ef49fa69316e50c907e6891fbd3e8
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Aug 27 00:02:32 2024 -0700

    MSQ: Fix validation of time position in collations. (#16961)
    
    * MSQ: Fix validation of time position in collations.
    
    It is possible for the collation to refer to a field that isn't mapped,
    such as when the DML includes "CLUSTERED BY some_function(some_field)".
    In this case, the collation refers to a projected column that is not
    part of the field mappings. Prior to this patch, that would lead to an
    out of bounds list access on fieldMappings.
    
    This patch fixes the problem by identifying the position of __time in
    the fieldMappings first, rather than retrieving each collation field
    from fieldMappings.
    
    Fixes a bug introduced in #16849.
    
    * Fix test. Better warning message.
---
 .../org/apache/druid/msq/sql/MSQTaskSqlEngine.java | 21 ++++--
 .../org/apache/druid/msq/exec/MSQReplaceTest.java  | 80 ++++++++++++++++++++++
 .../druid/data/input/impl/DimensionsSpec.java      |  6 +-
 3 files changed, 98 insertions(+), 9 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
index 24fbd373c95..01d87805f78 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
@@ -479,22 +479,29 @@ public class MSQTaskSqlEngine implements SqlEngine
         );
       }
     } else if (!rootCollation.getFieldCollations().isEmpty()) {
-      int timePosition = -1;
+      int timePositionInRow = -1;
+      for (int i = 0; i < fieldMappings.size(); i++) {
+        Entry<Integer, String> entry = fieldMappings.get(i);
+        if (ColumnHolder.TIME_COLUMN_NAME.equals(entry.getValue())) {
+          timePositionInRow = i;
+          break;
+        }
+      }
+
+      int timePositionInCollation = -1;
       for (int i = 0; i < rootCollation.getFieldCollations().size(); i++) {
-        final String fieldCollationName =
-            
fieldMappings.get(rootCollation.getFieldCollations().get(i).getFieldIndex()).getValue();
-        if (ColumnHolder.TIME_COLUMN_NAME.equals(fieldCollationName)) {
-          timePosition = i;
+        if (rootCollation.getFieldCollations().get(i).getFieldIndex() == 
timePositionInRow) {
+          timePositionInCollation = i;
           break;
         }
       }
 
-      if (timePosition > 0) {
+      if (timePositionInCollation > 0) {
         throw InvalidSqlInput.exception(
             "Sort order (CLUSTERED BY) cannot include[%s] in position[%d] 
unless context parameter[%s] "
             + "is set to[false]. %s",
             ColumnHolder.TIME_COLUMN_NAME,
-            timePosition,
+            timePositionInCollation,
             MultiStageQueryContext.CTX_FORCE_TIME_SORT,
             DimensionsSpec.WARNING_NON_TIME_SORT_ORDER
         );
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 72541046913..798a27a95e4 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -277,6 +277,86 @@ public class MSQReplaceTest extends MSQTestBase
   @MethodSource("data")
   @ParameterizedTest(name = "{index}:with context {0}")
   public void testReplaceOnFooWithAllClusteredByDimExplicitSort(String 
contextName, Map<String, Object> context)
+  {
+    // Tests [CLUSTERED BY LOWER(dim1)], i.e. an expression that is not 
actually stored.
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("dim1", ColumnType.STRING)
+                                            .add("m1", ColumnType.FLOAT)
+                                            .build();
+
+    DataSegment existingDataSegment0 = DataSegment.builder()
+                                                  
.interval(Intervals.of("2000-01-01T/2000-01-04T"))
+                                                  .size(50)
+                                                  
.version(MSQTestTaskActionClient.VERSION)
+                                                  .dataSource("foo")
+                                                  .build();
+
+    DataSegment existingDataSegment1 = DataSegment.builder()
+                                                  
.interval(Intervals.of("2001-01-01T/2001-01-04T"))
+                                                  .size(50)
+                                                  
.version(MSQTestTaskActionClient.VERSION)
+                                                  .dataSource("foo")
+                                                  .build();
+
+    Mockito.doCallRealMethod()
+           .doReturn(ImmutableSet.of(existingDataSegment0, 
existingDataSegment1))
+           .when(testTaskActionClient)
+           .submit(new RetrieveUsedSegmentsAction("foo", 
ImmutableList.of(Intervals.ETERNITY)));
+
+    testIngestQuery().setSql(" REPLACE INTO foo OVERWRITE ALL "
+                             + "SELECT __time, dim1, m1 "
+                             + "FROM foo "
+                             + "PARTITIONED BY ALL "
+                             + "CLUSTERED BY LOWER(dim1)")
+                     .setExpectedDataSource("foo")
+                     .setExpectedRowSignature(rowSignature)
+                     .setQueryContext(context)
+                     .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
+                     .setExpectedSegments(
+                         ImmutableSet.of(
+                             SegmentId.of("foo", Intervals.ETERNITY, "test", 0)
+                         )
+                     )
+                     .setExpectedShardSpec(NumberedShardSpec.class)
+                     .setExpectedResultRows(
+                         ImmutableList.of(
+                             new Object[]{946684800000L, 
NullHandling.sqlCompatible() ? "" : null, 1.0f},
+                             new Object[]{946771200000L, "10.1", 2.0f},
+                             new Object[]{946857600000L, "2", 3.0f},
+                             new Object[]{978307200000L, "1", 4.0f},
+                             new Object[]{978393600000L, "def", 5.0f},
+                             new Object[]{978480000000L, "abc", 6.0f}
+                         )
+                     )
+                     
.setExpectedSegmentGenerationProgressCountersForStageWorker(
+                         CounterSnapshotMatcher
+                             .with().segmentRowsProcessed(6),
+                         1, 0
+                     )
+                     .setExpectedLastCompactionState(
+                         expectedCompactionState(
+                             context,
+                             Collections.emptyList(),
+                             DimensionsSpec.builder()
+                                           .setDimensions(
+                                               ImmutableList.of(
+                                                   new 
StringDimensionSchema("dim1"),
+                                                   new 
FloatDimensionSchema("m1")
+                                               )
+                                           )
+                                           
.setDimensionExclusions(Collections.singletonList("__time"))
+                                           .build(),
+                             GranularityType.ALL,
+                             Intervals.ETERNITY
+                         )
+                     )
+                     .verifyResults();
+  }
+
+  @MethodSource("data")
+  @ParameterizedTest(name = "{index}:with context {0}")
+  public void testReplaceOnFooWithAllClusteredByExpression(String contextName, 
Map<String, Object> context)
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("dim1", ColumnType.STRING)
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java 
b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java
index e529aa2d218..efc3718d4ff 100644
--- 
a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java
@@ -60,8 +60,10 @@ public class DimensionsSpec
   public static final String WARNING_NON_TIME_SORT_ORDER = StringUtils.format(
       "Warning: support for segments not sorted by[%s] is experimental. Such 
segments are not readable by older "
       + "version of Druid, and certain queries cannot run on them. See "
-      + "https://druid.apache.org/docs/latest/ingestion/partitioning#sorting 
for details before using this option.",
-      ColumnHolder.TIME_COLUMN_NAME
+      + "https://druid.apache.org/docs/latest/ingestion/partitioning#sorting 
for details before setting "
+      + "%s to[false].",
+      ColumnHolder.TIME_COLUMN_NAME,
+      PARAMETER_FORCE_TIME_SORT
   );
 
   public static final boolean DEFAULT_FORCE_TIME_SORT = true;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to