This is an automated email from the ASF dual-hosted git repository.

zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 365cd7e8e7e INSERT/REPLACE can omit clustering when catalog has 
default (#16260)
365cd7e8e7e is described below

commit 365cd7e8e7e2a828cd924e729b1a78df9fe390aa
Author: zachjsh <[email protected]>
AuthorDate: Fri Apr 26 10:19:45 2024 -0400

    INSERT/REPLACE can omit clustering when catalog has default (#16260)
    
    * * fix
    
    * * fix
    
    * * address review comments
    
    * * fix
    
    * * simplify tests
    
    * * fix complex type nullability issue
    
    * * implement and add tests
    
    * * address review comments
    
    * * address test review comments
    
    * * fix checkstyle
    
    * * fix dependencies
    
    * * all tests passing
    
    * * cleanup
    
    * * remove unneeded code
    
    * * remove unused dependency
    
    * * fix checkstyle
---
 .../sql/calcite/planner/DruidSqlValidator.java     |  90 +++++++
 .../calcite/CalciteCatalogIngestionDmlTest.java    | 286 ++++++++++++++++++++-
 2 files changed, 370 insertions(+), 6 deletions(-)

diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
index 03dbbd4b7a7..aa04a55eb50 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
@@ -40,10 +40,12 @@ import org.apache.calcite.sql.SqlUpdate;
 import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.SqlWindow;
 import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.type.SqlTypeUtil;
 import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.SelectNamespace;
 import org.apache.calcite.sql.validate.SqlNonNullableAccessors;
 import org.apache.calcite.sql.validate.SqlValidatorException;
 import org.apache.calcite.sql.validate.SqlValidatorNamespace;
@@ -53,6 +55,7 @@ import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Static;
 import org.apache.calcite.util.Util;
 import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
 import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.error.InvalidSqlInput;
 import org.apache.druid.java.util.common.StringUtils;
@@ -68,9 +71,11 @@ import 
org.apache.druid.sql.calcite.parser.ExternalDestinationSqlIdentifier;
 import org.apache.druid.sql.calcite.run.EngineFeature;
 import org.apache.druid.sql.calcite.table.DatasourceTable;
 import org.apache.druid.sql.calcite.table.RowSignatures;
+import org.apache.druid.utils.CollectionUtils;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -281,6 +286,37 @@ public class DruidSqlValidator extends 
BaseDruidSqlValidator
     }
   }
 
+  @Override
+  protected SelectNamespace createSelectNamespace(
+      SqlSelect select,
+      SqlNode enclosingNode)
+  {
+    if (enclosingNode instanceof DruidSqlIngest) {
+      // The target is a new or existing datasource.
+      // The target namespace is both the target table ID and the row type for 
that table.
+      final SqlValidatorNamespace targetNamespace = Objects.requireNonNull(
+          getNamespace(enclosingNode),
+          () -> "namespace for " + enclosingNode
+      );
+      final IdentifierNamespace insertNs = (IdentifierNamespace) 
targetNamespace;
+      SqlIdentifier identifier = insertNs.getId();
+      SqlValidatorTable catalogTable = 
getCatalogReader().getTable(identifier.names);
+      if (catalogTable != null) {
+        final DatasourceTable table = 
catalogTable.unwrap(DatasourceTable.class);
+
+        // An existing datasource may have metadata.
+        final DatasourceFacade tableMetadata = table == null
+            ? null
+            : table.effectiveMetadata().catalogMetadata();
+        // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY 
clause
+        final SqlNodeList catalogClustering = 
convertCatalogClustering(tableMetadata);
+        rewriteClusteringToOrderBy(select, (DruidSqlIngest) enclosingNode, 
catalogClustering);
+        return new SelectNamespace(this, select, enclosingNode);
+      }
+    }
+    return super.createSelectNamespace(select, enclosingNode);
+  }
+
   /**
    * Validate the target table. Druid {@code INSERT/REPLACE} can create a new 
datasource,
    * or insert into an existing one. If the target exists, it must be a 
datasource. If it
@@ -349,6 +385,32 @@ public class DruidSqlValidator extends 
BaseDruidSqlValidator
     }
   }
 
+  /**
+   * Clustering is a kind of ordering. We push the CLUSTERED BY clause into 
the source query as
+   * an ORDER BY clause. In an ideal world, clustering would be outside of 
SELECT: it is an operation
+   * applied after the data is selected. For now, we push it into the SELECT, 
then MSQ pulls it back
+   * out. This is unfortunate as errors will be attributed to ORDER BY, which 
the user did not
+   * actually specify (it is an error to do so.) However, with the current 
hybrid structure, it is
+   * not possible to add the ORDER by later: doing so requires access to the 
order by namespace
+   * which is not visible to subclasses.
+   */
+  private void rewriteClusteringToOrderBy(SqlNode source, DruidSqlIngest 
ingestNode, @Nullable SqlNodeList catalogClustering)
+  {
+    SqlNodeList clusteredBy = ingestNode.getClusteredBy();
+    if (clusteredBy == null || clusteredBy.getList().isEmpty()) {
+      if (catalogClustering == null || catalogClustering.getList().isEmpty()) {
+        return;
+      }
+      clusteredBy = catalogClustering;
+    }
+    while (source instanceof SqlWith) {
+      source = ((SqlWith) source).getOperandList().get(1);
+    }
+    final SqlSelect select = (SqlSelect) source;
+
+    select.setOrderBy(clusteredBy);
+  }
+
   /**
    * Gets the effective PARTITIONED BY granularity. Resolves the granularity 
from the granularity specified on the
    * ingest node, and on the table metadata as stored in catalog, if any. 
Mismatches between the 2 granularities are
@@ -397,6 +459,34 @@ public class DruidSqlValidator extends 
BaseDruidSqlValidator
     return effectiveGranularity;
   }
 
+  @Nullable
+  private SqlNodeList convertCatalogClustering(final DatasourceFacade 
tableMetadata)
+  {
+    if (tableMetadata == null) {
+      return null;
+    }
+    final List<ClusterKeySpec> keyCols = tableMetadata.clusterKeys();
+    if (CollectionUtils.isNullOrEmpty(keyCols)) {
+      return null;
+    }
+    final SqlNodeList keyNodes = new SqlNodeList(SqlParserPos.ZERO);
+    for (ClusterKeySpec keyCol : keyCols) {
+      final SqlIdentifier colIdent = new SqlIdentifier(
+          Collections.singletonList(keyCol.expr()),
+          null, SqlParserPos.ZERO,
+          Collections.singletonList(SqlParserPos.ZERO)
+      );
+      final SqlNode keyNode;
+      if (keyCol.desc()) {
+        keyNode = SqlStdOperatorTable.DESC.createCall(SqlParserPos.ZERO, 
colIdent);
+      } else {
+        keyNode = colIdent;
+      }
+      keyNodes.add(keyNode);
+    }
+    return keyNodes;
+  }
+
   /**
    * Compute and validate the target type. In normal SQL, the engine would 
insert
    * a project operator after the SELECT before the write to cast columns from 
the
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java
index d05e618c881..45b4b40e41c 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.catalog.model.ResolvedTable;
 import org.apache.druid.catalog.model.TableDefn;
 import org.apache.druid.catalog.model.TableSpec;
 import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
 import org.apache.druid.catalog.model.table.DatasourceDefn;
 import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.InlineInputSource;
@@ -40,6 +41,7 @@ import 
org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFacto
 import 
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.scan.ScanQuery;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import 
org.apache.druid.sql.calcite.CalciteCatalogIngestionDmlTest.CatalogIngestionDmlComponentSupplier;
@@ -53,6 +55,8 @@ import org.apache.druid.sql.calcite.util.CalciteTests;
 import 
org.apache.druid.sql.calcite.util.SqlTestFramework.SqlTestFrameWorkModule;
 import org.junit.jupiter.api.Test;
 
+import javax.annotation.Nullable;
+
 @SqlTestFrameWorkModule(CatalogIngestionDmlComponentSupplier.class)
 public abstract class CalciteCatalogIngestionDmlTest extends 
CalciteIngestionDmlTest
 {
@@ -88,7 +92,7 @@ public abstract class CalciteCatalogIngestionDmlTest extends 
CalciteIngestionDml
             new DatasourceTable.EffectiveMetadata(
                 new DatasourceFacade(new ResolvedTable(
                     new TableDefn(
-                        "foo",
+                        "hourDs",
                         DatasourceDefn.TABLE_TYPE,
                         null,
                         null
@@ -112,7 +116,7 @@ public abstract class CalciteCatalogIngestionDmlTest 
extends CalciteIngestionDml
         "noPartitonedBy", new DatasourceTable(
             RowSignature.builder().addTimeColumn().build(),
             new DatasourceTable.PhysicalDatasourceMetadata(
-                new TableDataSource("hourDs"),
+                new TableDataSource("noPartitonedBy"),
                 RowSignature.builder().addTimeColumn().build(),
                 false,
                 false
@@ -120,7 +124,7 @@ public abstract class CalciteCatalogIngestionDmlTest 
extends CalciteIngestionDml
             new DatasourceTable.EffectiveMetadata(
                 new DatasourceFacade(new ResolvedTable(
                     new TableDefn(
-                        "foo",
+                        "noPartitonedBy",
                         DatasourceDefn.TABLE_TYPE,
                         null,
                         null
@@ -157,7 +161,11 @@ public abstract class CalciteCatalogIngestionDmlTest 
extends CalciteIngestionDml
                         null,
                         null
                     ),
-                    new TableSpec(DatasourceDefn.TABLE_TYPE, 
ImmutableMap.of(DatasourceDefn.SEALED_PROPERTY, true), null),
+                    new TableSpec(
+                        DatasourceDefn.TABLE_TYPE,
+                        ImmutableMap.of(DatasourceDefn.SEALED_PROPERTY, true),
+                        null
+                    ),
                     MAPPER
                 )),
                 
DatasourceTable.EffectiveMetadata.toEffectiveColumns(RowSignature.builder().build()),
@@ -203,7 +211,7 @@ public abstract class CalciteCatalogIngestionDmlTest 
extends CalciteIngestionDml
         "fooSealed", new DatasourceTable(
             FOO_TABLE_SIGNATURE,
             new DatasourceTable.PhysicalDatasourceMetadata(
-                new TableDataSource("foo"),
+                new TableDataSource("fooSealed"),
                 FOO_TABLE_SIGNATURE,
                 false,
                 false
@@ -211,7 +219,7 @@ public abstract class CalciteCatalogIngestionDmlTest 
extends CalciteIngestionDml
             new DatasourceTable.EffectiveMetadata(
                 new DatasourceFacade(new ResolvedTable(
                     new TableDefn(
-                        "foo",
+                        "fooSealed",
                         DatasourceDefn.TABLE_TYPE,
                         null,
                         null
@@ -234,6 +242,54 @@ public abstract class CalciteCatalogIngestionDmlTest 
extends CalciteIngestionDml
                 
DatasourceTable.EffectiveMetadata.toEffectiveColumns(FOO_TABLE_SIGNATURE),
                 false
             )
+        ),
+        "tableWithClustering", new DatasourceTable(
+            FOO_TABLE_SIGNATURE,
+            new DatasourceTable.PhysicalDatasourceMetadata(
+                new TableDataSource("tableWithClustering"),
+                RowSignature.builder()
+                    .addTimeColumn()
+                    .add("dim1", ColumnType.STRING)
+                    .add("dim2", ColumnType.STRING)
+                    .add("cnt", ColumnType.LONG)
+                    .build(),
+                false,
+                false
+            ),
+            new DatasourceTable.EffectiveMetadata(
+                new DatasourceFacade(new ResolvedTable(
+                    new TableDefn(
+                        "tableWithClustering",
+                        DatasourceDefn.TABLE_TYPE,
+                        null,
+                        null
+                    ),
+                    new TableSpec(
+                        DatasourceDefn.TABLE_TYPE,
+                        ImmutableMap.of(
+                            DatasourceDefn.CLUSTER_KEYS_PROPERTY,
+                            ImmutableList.of(
+                                new ClusterKeySpec("dim1", false),
+                                new ClusterKeySpec("dim2", true)
+                            )
+                        ),
+                        ImmutableList.of(
+                            new ColumnSpec("__time", Columns.TIME_COLUMN, 
null),
+                            new ColumnSpec("dim1", Columns.STRING, null),
+                            new ColumnSpec("dim2", Columns.STRING, null),
+                            new ColumnSpec("cnt", Columns.LONG, null)
+                        )
+                    ),
+                    MAPPER
+                )),
+                
DatasourceTable.EffectiveMetadata.toEffectiveColumns(RowSignature.builder()
+                    .addTimeColumn()
+                    .add("dim1", ColumnType.STRING)
+                    .add("dim2", ColumnType.STRING)
+                    .add("cnt", ColumnType.LONG)
+                    .build()),
+                false
+            )
         )
     );
 
@@ -241,6 +297,7 @@ public abstract class CalciteCatalogIngestionDmlTest 
extends CalciteIngestionDml
     public CatalogResolver createCatalogResolver()
     {
       return new CatalogResolver.NullCatalogResolver() {
+        @Nullable
         @Override
         public DruidTable resolveDatasource(
             final String tableName,
@@ -416,6 +473,223 @@ public abstract class CalciteCatalogIngestionDmlTest 
extends CalciteIngestionDml
         .verify();
   }
 
+  /**
+   * Insert into a catalog table that has clustering defined on the table 
definition. Should use
+   * the catalog defined clustering
+   */
+  @Test
+  public void testInsertTableWithClusteringWithClusteringFromCatalog()
+  {
+    ExternalDataSource externalDataSource = new ExternalDataSource(
+        new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+        new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, 
false, false, 0),
+        RowSignature.builder()
+            .add("a", ColumnType.STRING)
+            .add("b", ColumnType.STRING)
+            .add("c", ColumnType.LONG)
+            .add("d", ColumnType.STRING)
+            .add("e", ColumnType.STRING)
+            .build()
+    );
+    final RowSignature signature = RowSignature.builder()
+        .add("__time", ColumnType.LONG)
+        .add("dim1", ColumnType.STRING)
+        .add("dim2", ColumnType.STRING)
+        .add("cnt", ColumnType.LONG)
+        .build();
+    testIngestionQuery()
+        .sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") + 
"\n" +
+             "SELECT\n" +
+             "  TIME_PARSE(a) AS __time,\n" +
+             "  b AS dim1,\n" +
+             "  d AS dim2,\n" +
+             "  1 AS cnt\n" +
+             "FROM TABLE(inline(\n" +
+             "  data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+             "  format => 'csv'))\n" +
+             "  (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+             "PARTITIONED BY ALL TIME")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("tableWithClustering", signature)
+        .expectResources(dataSourceWrite("tableWithClustering"), 
Externals.externalRead("EXTERNAL"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource(externalDataSource)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .virtualColumns(
+                    expressionVirtualColumn("v0", 
"timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+                    expressionVirtualColumn("v1", "1", ColumnType.LONG)
+                )
+                .orderBy(
+                    ImmutableList.of(
+                        new ScanQuery.OrderBy("b", ScanQuery.Order.ASCENDING),
+                        new ScanQuery.OrderBy("d", ScanQuery.Order.DESCENDING)
+                    )
+                )
+                // Scan query lists columns in alphabetical order independent 
of the
+                // SQL project list or the defined schema.
+                .columns("b", "d", "v0", "v1")
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                .build()
+        )
+        .verify();
+  }
+
+  /**
+   * Insert into a catalog table that has clustering defined on the table 
definition, but user specifies
+   * clustering on the ingest query. Should use the query defined clustering
+   */
+  @Test
+  public void testInsertTableWithClusteringWithClusteringFromQuery()
+  {
+    ExternalDataSource externalDataSource = new ExternalDataSource(
+        new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+        new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, 
false, false, 0),
+        RowSignature.builder()
+            .add("a", ColumnType.STRING)
+            .add("b", ColumnType.STRING)
+            .add("c", ColumnType.LONG)
+            .add("d", ColumnType.STRING)
+            .add("e", ColumnType.STRING)
+            .build()
+    );
+    final RowSignature signature = RowSignature.builder()
+        .add("__time", ColumnType.LONG)
+        .add("dim1", ColumnType.STRING)
+        .add("dim2", ColumnType.STRING)
+        .add("cnt", ColumnType.LONG)
+        .build();
+    testIngestionQuery()
+        .sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") + 
"\n" +
+             "SELECT\n" +
+             "  TIME_PARSE(a) AS __time,\n" +
+             "  b AS dim1,\n" +
+             "  d AS dim2,\n" +
+             "  1 AS cnt\n" +
+             "FROM TABLE(inline(\n" +
+             "  data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+             "  format => 'csv'))\n" +
+             "  (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+             "PARTITIONED BY ALL TIME\n" +
+             "CLUSTERED BY dim1")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("tableWithClustering", signature)
+        .expectResources(dataSourceWrite("tableWithClustering"), 
Externals.externalRead("EXTERNAL"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource(externalDataSource)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .virtualColumns(
+                    expressionVirtualColumn("v0", 
"timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+                    expressionVirtualColumn("v1", "1", ColumnType.LONG)
+                )
+                .orderBy(
+                    ImmutableList.of(
+                        new ScanQuery.OrderBy("b", ScanQuery.Order.ASCENDING)
+                    )
+                )
+                // Scan query lists columns in alphabetical order independent 
of the
+                // SQL project list or the defined schema.
+                .columns("b", "d", "v0", "v1")
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                .build()
+        )
+        .verify();
+  }
+
+  /**
+   * Insert into a catalog table that has clustering defined on the table 
definition, but user specifies
+   * clustering on the ingest query on column that has not been defined in the 
table catalog definition.
+   * Should use the query defined clustering
+   */
+  @Test
+  public void testInsertTableWithClusteringWithClusteringOnNewColumnFromQuery()
+  {
+    ExternalDataSource externalDataSource = new ExternalDataSource(
+        new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+        new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, 
false, false, 0),
+        RowSignature.builder()
+            .add("a", ColumnType.STRING)
+            .add("b", ColumnType.STRING)
+            .add("c", ColumnType.LONG)
+            .add("d", ColumnType.STRING)
+            .add("e", ColumnType.STRING)
+            .build()
+    );
+    final RowSignature signature = RowSignature.builder()
+        .add("__time", ColumnType.LONG)
+        .add("dim1", ColumnType.STRING)
+        .add("dim2", ColumnType.STRING)
+        .add("dim3", ColumnType.STRING)
+        .add("cnt", ColumnType.LONG)
+        .build();
+    testIngestionQuery()
+        .sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") + 
"\n" +
+             "SELECT\n" +
+             "  TIME_PARSE(a) AS __time,\n" +
+             "  b AS dim1,\n" +
+             "  d AS dim2,\n" +
+             "  e AS dim3,\n" +
+             "  1 AS cnt\n" +
+             "FROM TABLE(inline(\n" +
+             "  data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+             "  format => 'csv'))\n" +
+             "  (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+             "PARTITIONED BY ALL TIME\n" +
+             "CLUSTERED BY dim3")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("tableWithClustering", signature)
+        .expectResources(dataSourceWrite("tableWithClustering"), 
Externals.externalRead("EXTERNAL"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource(externalDataSource)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .virtualColumns(
+                    expressionVirtualColumn("v0", 
"timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+                    expressionVirtualColumn("v1", "1", ColumnType.LONG)
+                )
+                .orderBy(
+                    ImmutableList.of(
+                        new ScanQuery.OrderBy("e", ScanQuery.Order.ASCENDING)
+                    )
+                )
+                // Scan query lists columns in alphabetical order independent 
of the
+                // SQL project list or the defined schema.
+                .columns("b", "d", "e", "v0", "v1")
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                .build()
+        )
+        .verify();
+  }
+
+  /**
+   * Insert into a catalog table that has clustering defined on the table 
definition, but user specifies
+   * clustering on the ingest query on column that has not been defined in the 
table catalog definition.
+   * or in the select clause. Should fail with validation error.
+   */
+  @Test
+  public void testInsertTableWithClusteringWithClusteringOnBadColumn()
+  {
+    testIngestionQuery()
+        .sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") + 
"\n" +
+             "SELECT\n" +
+             "  TIME_PARSE(a) AS __time,\n" +
+             "  b AS dim1,\n" +
+             "  d AS dim2,\n" +
+             "  e AS dim3,\n" +
+             "  1 AS cnt\n" +
+             "FROM TABLE(inline(\n" +
+             "  data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+             "  format => 'csv'))\n" +
+             "  (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+             "PARTITIONED BY ALL TIME\n" +
+             "CLUSTERED BY blah")
+        .expectValidationError(
+            DruidException.class,
+            "Column 'blah' not found in any table (line [13], column [14])")
+        .verify();
+  }
+
   /**
    * Adding a new column during group by ingestion that is not defined in a 
non-sealed table should succeed.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to