This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ac664859004 additional validation for projection schema (#18524)
ac664859004 is described below

commit ac66485900448ace4ff280ab60e4098c1fc9619a
Author: Clint Wylie <[email protected]>
AuthorDate: Thu Sep 18 13:36:55 2025 -0700

    additional validation for projection schema (#18524)
---
 .../data/input/impl/AggregateProjectionSpec.java   |  31 ++++--
 .../input/impl/AggregateProjectionSpecTest.java    |  65 ++++++++++--
 .../druid/segment/CursorFactoryProjectionTest.java |  22 +++-
 .../apache/druid/segment/indexing/DataSchema.java  | 114 +++++++++++++++------
 .../druid/segment/indexing/DataSchemaTest.java     |  74 +++++++++++++
 5 files changed, 254 insertions(+), 52 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java
 
b/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java
index 42fc58eb15f..ff742b88f0e 100644
--- 
a/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.collect.Lists;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.error.InvalidInput;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
@@ -37,6 +38,7 @@ import org.apache.druid.segment.AggregateProjectionMetadata;
 import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.DateTimeZone;
 
@@ -225,19 +227,34 @@ public class AggregateProjectionSpec
     Granularity granularity = null;
 
     // determine the granularity and time column name for the projection, 
based on the finest time-like grouping column.
-    for (final DimensionSchema dimension : groupingColumns) {
-      ordering.add(OrderBy.ascending(dimension.getName()));
-      if (ColumnHolder.TIME_COLUMN_NAME.equals(dimension.getName())) {
-        timeColumnName = dimension.getName();
+    for (final DimensionSchema groupingColumn : groupingColumns) {
+      ordering.add(OrderBy.ascending(groupingColumn.getName()));
+      if (ColumnHolder.TIME_COLUMN_NAME.equals(groupingColumn.getName())) {
+        // time must be a LONG type
+        if (!groupingColumn.getColumnType().is(ValueType.LONG)) {
+          throw DruidException
+              .forPersona(DruidException.Persona.USER)
+              .ofCategory(DruidException.Category.INVALID_INPUT)
+              .build(
+                  "Encountered grouping column[%s] with incorrect type[%s]. 
Type must be 'long'.",
+                  groupingColumn.getName(),
+                  groupingColumn.getColumnType()
+              );
+        }
+        timeColumnName = groupingColumn.getName();
         // already found exact __time grouping, skip assigning, granularity = 
Granularities.NONE;
         break;
       } else {
-        final VirtualColumn vc = 
virtualColumns.getVirtualColumn(dimension.getName());
+        // time must be a LONG type
+        if (!groupingColumn.getColumnType().is(ValueType.LONG)) {
+          continue;
+        }
+        final VirtualColumn vc = 
virtualColumns.getVirtualColumn(groupingColumn.getName());
         final Granularity maybeGranularity = 
Granularities.fromVirtualColumn(vc);
         if (maybeGranularity == null || 
maybeGranularity.equals(Granularities.ALL)) {
           // no __time in inputs or not supported, skip
         } else if (Granularities.NONE.equals(maybeGranularity)) {
-          timeColumnName = dimension.getName();
+          timeColumnName = groupingColumn.getName();
           // already found exact __time grouping, skip assigning, granularity 
= Granularities.NONE;
           break;
         } else if (maybeGranularity.getClass().equals(PeriodGranularity.class)
@@ -245,7 +262,7 @@ public class AggregateProjectionSpec
             && ((PeriodGranularity) maybeGranularity).getOrigin() == null
             && (granularity == null || 
maybeGranularity.isFinerThan(granularity))) {
           // found a finer period granularity than the existing granularity, 
or it's the first one
-          timeColumnName = dimension.getName();
+          timeColumnName = groupingColumn.getName();
           granularity = maybeGranularity;
         }
       }
diff --git 
a/processing/src/test/java/org/apache/druid/data/input/impl/AggregateProjectionSpecTest.java
 
b/processing/src/test/java/org/apache/druid/data/input/impl/AggregateProjectionSpecTest.java
index 24f7d2741a4..6c33ec57a11 100644
--- 
a/processing/src/test/java/org/apache/druid/data/input/impl/AggregateProjectionSpecTest.java
+++ 
b/processing/src/test/java/org/apache/druid/data/input/impl/AggregateProjectionSpecTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.filter.EqualityFilter;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
 import org.apache.druid.testing.InitializedNullHandlingTest;
@@ -112,12 +113,18 @@ class AggregateProjectionSpecTest extends 
InitializedNullHandlingTest
         ColumnType.LONG,
         TestExprMacroTable.INSTANCE
     );
-    ExpressionVirtualColumn ptEvery10Min = new ExpressionVirtualColumn(
-        "ptEvery10Min",
+    ExpressionVirtualColumn every10MinLA = new ExpressionVirtualColumn(
+        "every10MinLA",
         "timestamp_floor(__time, 'PT10M', null, 'America/Los_Angeles')",
         ColumnType.LONG,
         TestExprMacroTable.INSTANCE
     );
+    ExpressionVirtualColumn every10Min = new ExpressionVirtualColumn(
+        "every10Min",
+        "timestamp_floor(__time, 'PT10M', null, null)",
+        ColumnType.LONG,
+        TestExprMacroTable.INSTANCE
+    );
     ExpressionVirtualColumn every90Min = new ExpressionVirtualColumn(
         "every90Min",
         "timestamp_floor(__time, 'PT1H30M', null, null)",
@@ -128,11 +135,11 @@ class AggregateProjectionSpecTest extends 
InitializedNullHandlingTest
     Assertions.assertEquals("hourly", new AggregateProjectionSpec(
         "some_projection",
         null,
-        VirtualColumns.create(daily, hourly, ptEvery10Min),
+        VirtualColumns.create(daily, hourly, every10MinLA),
         List.of(
             new LongDimensionSchema("daily"),
             new LongDimensionSchema("hourly"),
-            new LongDimensionSchema("ptEvery10Min")
+            new LongDimensionSchema("every10MinLA")
         ),
         new AggregatorFactory[]{}
     ).toMetadataSchema().getTimeColumnName());
@@ -140,16 +147,39 @@ class AggregateProjectionSpecTest extends 
InitializedNullHandlingTest
     Assertions.assertNull(new AggregateProjectionSpec(
         "some_projection",
         null,
-        VirtualColumns.create(ptEvery10Min),
-        List.of(new LongDimensionSchema("ptEvery10Min")),
+        VirtualColumns.create(every10MinLA),
+        List.of(new LongDimensionSchema("every10MinLA")),
         new AggregatorFactory[]{}
     ).toMetadataSchema().getTimeColumnName());
 
     Assertions.assertEquals("every90Min", new AggregateProjectionSpec(
         "some_projection",
         null,
-        VirtualColumns.create(every90Min, ptEvery10Min),
-        List.of(new LongDimensionSchema("every90Min"), new 
LongDimensionSchema("ptEvery10Min")),
+        VirtualColumns.create(every90Min, every10MinLA),
+        List.of(new LongDimensionSchema("every90Min"), new 
LongDimensionSchema("every10MinLA")),
+        new AggregatorFactory[]{}
+    ).toMetadataSchema().getTimeColumnName());
+
+    Assertions.assertEquals("every10Min", new AggregateProjectionSpec(
+        "some_projection",
+        null,
+        VirtualColumns.create(daily, hourly, every10Min),
+        List.of(
+            new LongDimensionSchema("daily"),
+            new LongDimensionSchema("hourly"),
+            new LongDimensionSchema("every10Min")
+        ),
+        new AggregatorFactory[]{}
+    ).toMetadataSchema().getTimeColumnName());
+    Assertions.assertEquals("hourly", new AggregateProjectionSpec(
+        "some_projection",
+        null,
+        VirtualColumns.create(daily, hourly, every10Min),
+        List.of(
+            new LongDimensionSchema("daily"),
+            new LongDimensionSchema("hourly"),
+            new StringDimensionSchema("every10Min")
+        ),
         new AggregatorFactory[]{}
     ).toMetadataSchema().getTimeColumnName());
   }
@@ -217,6 +247,25 @@ class AggregateProjectionSpecTest extends 
InitializedNullHandlingTest
     );
   }
 
+  @Test
+  void testInvalidTimeColumnType()
+  {
+    Throwable t = Assertions.assertThrows(
+        DruidException.class,
+        () -> new AggregateProjectionSpec(
+            "projection",
+            null,
+            VirtualColumns.EMPTY,
+            List.of(new StringDimensionSchema(ColumnHolder.TIME_COLUMN_NAME)),
+            null
+        )
+    );
+    Assertions.assertEquals(
+        "Encountered grouping column[__time] with incorrect type[STRING]. Type 
must be 'long'.",
+        t.getMessage()
+    );
+  }
+
   @Test
   void testEqualsAndHashcode()
   {
diff --git 
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
 
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
index ba8506ac5d6..23798992e50 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
@@ -130,6 +130,12 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
                                                         .add("f", 
ColumnType.NESTED_DATA)
                                                         .build();
 
+  private static final Set<String> PROJECTION_TIME_COLUMNS = Set.of(
+      ColumnHolder.TIME_COLUMN_NAME,
+      Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME,
+      "__gran"
+  );
+
   public static List<InputRow> makeRows(List<String> dimensions)
   {
     return Arrays.asList(
@@ -388,7 +394,7 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
                                                 .groupingColumns(
                                                     
projection.getGroupingColumns()
                                                               .stream()
-                                                              .map(x -> new 
AutoTypeColumnSchema(x.getName(), null))
+                                                              
.map(CursorFactoryProjectionTest::toAutoColumn)
                                                               
.collect(Collectors.toList())
                                                 )
                                                 .build()
@@ -403,7 +409,7 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
                                     .builder(projection)
                                     
.groupingColumns(projection.getGroupingColumns()
                                                                .stream()
-                                                               .map(x -> new 
AutoTypeColumnSchema(x.getName(), null))
+                                                               
.map(CursorFactoryProjectionTest::toAutoColumn)
                                                                
.collect(Collectors.toList()))
                                     .build()
                         )
@@ -451,12 +457,12 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
 
     List<DimensionSchema> autoDims = dimsOrdered.getDimensions()
                                                 .stream()
-                                                .map(x -> new 
AutoTypeColumnSchema(x.getName(), null))
+                                                
.map(CursorFactoryProjectionTest::toAutoColumn)
                                                 .collect(Collectors.toList());
 
     List<DimensionSchema> rollupAutoDims = rollupDimsOrdered.getDimensions()
                                                             .stream()
-                                                            .map(x -> new 
AutoTypeColumnSchema(x.getName(), null))
+                                                            
.map(CursorFactoryProjectionTest::toAutoColumn)
                                                             
.collect(Collectors.toList());
 
     for (boolean incremental : new boolean[]{true, false}) {
@@ -2069,6 +2075,14 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
       Assert.assertEquals(expectedRowCount, rowCount);
     }
   }
+  
+  private static AutoTypeColumnSchema toAutoColumn(DimensionSchema x)
+  {
+    if (PROJECTION_TIME_COLUMNS.contains(x.getName())) {
+      return new AutoTypeColumnSchema(x.getName(), ColumnType.LONG);
+    }
+    return new AutoTypeColumnSchema(x.getName(), null);
+  }
 
   private static IndexBuilder makeBuilder(DimensionsSpec dimensionsSpec, 
boolean autoSchema, boolean writeNullColumns)
   {
diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java 
b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
index 782ed8580b7..6e9728d1b55 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
@@ -415,9 +415,88 @@ public class DataSchema
       }
     }
 
+    return getFieldsOrThrowIfErrors(fields);
+  }
+
+  /**
+   * Validates that each {@link AggregateProjectionSpec} does not have 
duplicate column names in
+   * {@link AggregateProjectionSpec#groupingColumns} and {@link 
AggregateProjectionSpec#aggregators} and that segment
+   * {@link Granularity} is at least as coarse as {@link 
AggregateProjectionMetadata.Schema#effectiveGranularity}
+   */
+  public static void validateProjections(
+      @Nullable List<AggregateProjectionSpec> projections,
+      @Nullable Granularity segmentGranularity
+  )
+  {
+    if (projections != null) {
+      final Set<String> names = 
Sets.newHashSetWithExpectedSize(projections.size());
+      for (AggregateProjectionSpec projection : projections) {
+        if (names.contains(projection.getName())) {
+          throw InvalidInput.exception("projection[%s] is already defined, 
projection names must be unique", projection.getName());
+        }
+        names.add(projection.getName());
+        final AggregateProjectionMetadata.Schema schema = 
projection.toMetadataSchema();
+
+        if (schema.getTimeColumnName() != null) {
+          final Granularity projectionGranularity = 
schema.getEffectiveGranularity();
+          if (segmentGranularity != null) {
+            if (segmentGranularity.isFinerThan(projectionGranularity)) {
+              throw InvalidInput.exception(
+                  "projection[%s] has granularity[%s] which must be finer than 
or equal to segment granularity[%s]",
+                  projection.getName(),
+                  projectionGranularity,
+                  segmentGranularity
+              );
+            }
+          }
+        }
+
+        final Map<String, Multiset<String>> fields = new TreeMap<>();
+        int position = 0;
+        for (DimensionSchema grouping : projection.getGroupingColumns()) {
+          final String field = grouping.getName();
+          if (Strings.isNullOrEmpty(field)) {
+            throw DruidException
+                .forPersona(DruidException.Persona.USER)
+                .ofCategory(DruidException.Category.INVALID_INPUT)
+                .build("Encountered grouping column with null or empty name at 
position[%d]", position);
+          }
+          fields.computeIfAbsent(field, k -> 
TreeMultiset.create()).add("projection[" + projection.getName() + "] grouping 
column list");
+          position++;
+        }
+        for (AggregatorFactory aggregator : projection.getAggregators()) {
+          final String field = aggregator.getName();
+          if (Strings.isNullOrEmpty(field)) {
+            throw DruidException
+                .forPersona(DruidException.Persona.USER)
+                .ofCategory(DruidException.Category.INVALID_INPUT)
+                .build("Encountered aggregator with null or empty name at 
position[%d]", position);
+          }
+
+          fields.computeIfAbsent(field, k -> 
TreeMultiset.create()).add("projection[" + projection.getName() + "] 
aggregators list");
+          position++;
+        }
+
+        getFieldsOrThrowIfErrors(fields);
+      }
+    }
+  }
+
+  /**
+   * Helper method that processes a validation result stored as a {@link Map} 
of field names to {@link Multiset} of
+   * where they were defined. An error is indicated by the multi-set having 
more than a single entry
+   * (such as if a field is defined as both a dimension and an aggregator). If 
all fields have only a single entry, this
+   * method returns the list of output field names. If there are duplicates, 
this method throws a {@link DruidException}
+   * collecting all validation errors to help indicate where a field is defined
+   *
+   * @see #computeAndValidateOutputFieldNames
+   * @see #validateProjections(List, Granularity)
+   */
+  private static Set<String> getFieldsOrThrowIfErrors(Map<String, 
Multiset<String>> validatedFields)
+  {
     final List<String> errors = new ArrayList<>();
 
-    for (Map.Entry<String, Multiset<String>> fieldEntry : fields.entrySet()) {
+    for (Map.Entry<String, Multiset<String>> fieldEntry : 
validatedFields.entrySet()) {
       if 
(fieldEntry.getValue().entrySet().stream().mapToInt(Multiset.Entry::getCount).sum()
 > 1) {
         errors.add(
             StringUtils.format(
@@ -440,7 +519,7 @@ public class DataSchema
     }
 
     if (errors.isEmpty()) {
-      return fields.keySet();
+      return validatedFields.keySet();
     } else {
       throw DruidException.forPersona(DruidException.Persona.USER)
                           .ofCategory(DruidException.Category.INVALID_INPUT)
@@ -448,37 +527,6 @@ public class DataSchema
     }
   }
 
-  public static void validateProjections(
-      @Nullable List<AggregateProjectionSpec> projections,
-      @Nullable Granularity segmentGranularity
-  )
-  {
-    if (projections != null) {
-      final Set<String> names = 
Sets.newHashSetWithExpectedSize(projections.size());
-      for (AggregateProjectionSpec projection : projections) {
-        if (names.contains(projection.getName())) {
-          throw InvalidInput.exception("projection[%s] is already defined, 
projection names must be unique", projection.getName());
-        }
-        names.add(projection.getName());
-        final AggregateProjectionMetadata.Schema schema = 
projection.toMetadataSchema();
-        if (schema.getTimeColumnName() == null) {
-          continue;
-        }
-        final Granularity projectionGranularity = 
schema.getEffectiveGranularity();
-        if (segmentGranularity != null) {
-          if (segmentGranularity.isFinerThan(projectionGranularity)) {
-            throw InvalidInput.exception(
-                "projection[%s] has granularity[%s] which must be finer than 
or equal to segment granularity[%s]",
-                projection.getName(),
-                projectionGranularity,
-                segmentGranularity
-            );
-          }
-        }
-      }
-    }
-  }
-
   public static class Builder
   {
     private String dataSource;
diff --git 
a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java 
b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
index 64f864a26fd..d810b9c95db 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
@@ -49,6 +49,7 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.filter.SelectorDimFilter;
 import org.apache.druid.segment.TestHelper;
@@ -944,4 +945,77 @@ class DataSchemaTest extends InitializedNullHandlingTest
         t.getMessage()
     );
   }
+
+  @Test
+  void testInvalidProjectionDupeGroupingNames()
+  {
+    Throwable t = Assertions.assertThrows(
+        DruidException.class,
+        () -> DataSchema.builder()
+                        .withDataSource("dataSource")
+                        .withGranularity(
+                            new UniformGranularitySpec(
+                                Granularities.HOUR,
+                                Granularities.NONE,
+                                false,
+                                List.of(Intervals.of("2014/2015"))
+                            )
+                        )
+                        .withProjections(
+                            List.of(
+                                AggregateProjectionSpec.builder("some 
projection")
+                                                       .virtualColumns(
+                                                           
Granularities.toVirtualColumn(Granularities.HOUR, "g")
+                                                       )
+                                                       .groupingColumns(new 
LongDimensionSchema("g"), new StringDimensionSchema("g"))
+                                                       .aggregators(new 
CountAggregatorFactory("count"))
+                                                       .build()
+                            )
+                        )
+                        .build()
+    );
+
+    Assertions.assertEquals(
+        "Cannot specify a column more than once: [g] seen in projection[some 
projection] grouping column list (2 occurrences)",
+        t.getMessage()
+    );
+  }
+
+  @Test
+  void testInvalidProjectionDupeAggNames()
+  {
+    Throwable t = Assertions.assertThrows(
+        DruidException.class,
+        () -> DataSchema.builder()
+                        .withDataSource("dataSource")
+                        .withGranularity(
+                            new UniformGranularitySpec(
+                                Granularities.HOUR,
+                                Granularities.NONE,
+                                false,
+                                List.of(Intervals.of("2014/2015"))
+                            )
+                        )
+                        .withProjections(
+                            List.of(
+                                AggregateProjectionSpec.builder("some 
projection")
+                                                       .virtualColumns(
+                                                           
Granularities.toVirtualColumn(Granularities.HOUR, "g")
+                                                       )
+                                                       .groupingColumns(new 
LongDimensionSchema("g"))
+                                                       .aggregators(
+                                                           new 
LongSumAggregatorFactory("a0", "added"),
+                                                           new 
DoubleSumAggregatorFactory("a0", "added")
+                                                       )
+                                                       .build()
+                            )
+                        )
+                        .build()
+    );
+
+    Assertions.assertEquals(
+        "Cannot specify a column more than once: [a0] seen in projection[some 
projection] aggregators list (2 occurrences)",
+        t.getMessage()
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to