This is an automated email from the ASF dual-hosted git repository.
zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 365cd7e8e7e INSERT/REPLACE can omit clustering when catalog has
default (#16260)
365cd7e8e7e is described below
commit 365cd7e8e7e2a828cd924e729b1a78df9fe390aa
Author: zachjsh <[email protected]>
AuthorDate: Fri Apr 26 10:19:45 2024 -0400
INSERT/REPLACE can omit clustering when catalog has default (#16260)
* * fix
* * fix
* * address review comments
* * fix
* * simplify tests
* * fix complex type nullability issue
* * implement and add tests
* * address review comments
* * address test review comments
* * fix checkstyle
* * fix dependencies
* * all tests passing
* * cleanup
* * remove unneeded code
* * remove unused dependency
* * fix checkstyle
---
.../sql/calcite/planner/DruidSqlValidator.java | 90 +++++++
.../calcite/CalciteCatalogIngestionDmlTest.java | 286 ++++++++++++++++++++-
2 files changed, 370 insertions(+), 6 deletions(-)
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
index 03dbbd4b7a7..aa04a55eb50 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
@@ -40,10 +40,12 @@ import org.apache.calcite.sql.SqlUpdate;
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.SqlWindow;
import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.SelectNamespace;
import org.apache.calcite.sql.validate.SqlNonNullableAccessors;
import org.apache.calcite.sql.validate.SqlValidatorException;
import org.apache.calcite.sql.validate.SqlValidatorNamespace;
@@ -53,6 +55,7 @@ import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Static;
import org.apache.calcite.util.Util;
import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.StringUtils;
@@ -68,9 +71,11 @@ import
org.apache.druid.sql.calcite.parser.ExternalDestinationSqlIdentifier;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.table.RowSignatures;
+import org.apache.druid.utils.CollectionUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -281,6 +286,37 @@ public class DruidSqlValidator extends
BaseDruidSqlValidator
}
}
+ @Override
+ protected SelectNamespace createSelectNamespace(
+ SqlSelect select,
+ SqlNode enclosingNode)
+ {
+ if (enclosingNode instanceof DruidSqlIngest) {
+ // The target is a new or existing datasource.
+ // The target namespace is both the target table ID and the row type for
that table.
+ final SqlValidatorNamespace targetNamespace = Objects.requireNonNull(
+ getNamespace(enclosingNode),
+ () -> "namespace for " + enclosingNode
+ );
+ final IdentifierNamespace insertNs = (IdentifierNamespace)
targetNamespace;
+ SqlIdentifier identifier = insertNs.getId();
+ SqlValidatorTable catalogTable =
getCatalogReader().getTable(identifier.names);
+ if (catalogTable != null) {
+ final DatasourceTable table =
catalogTable.unwrap(DatasourceTable.class);
+
+ // An existing datasource may have metadata.
+ final DatasourceFacade tableMetadata = table == null
+ ? null
+ : table.effectiveMetadata().catalogMetadata();
+ // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY
clause
+ final SqlNodeList catalogClustering =
convertCatalogClustering(tableMetadata);
+ rewriteClusteringToOrderBy(select, (DruidSqlIngest) enclosingNode,
catalogClustering);
+ return new SelectNamespace(this, select, enclosingNode);
+ }
+ }
+ return super.createSelectNamespace(select, enclosingNode);
+ }
+
/**
* Validate the target table. Druid {@code INSERT/REPLACE} can create a new
datasource,
* or insert into an existing one. If the target exists, it must be a
datasource. If it
@@ -349,6 +385,32 @@ public class DruidSqlValidator extends
BaseDruidSqlValidator
}
}
+ /**
+ * Clustering is a kind of ordering. We push the CLUSTERED BY clause into
the source query as
+ * an ORDER BY clause. In an ideal world, clustering would be outside of
SELECT: it is an operation
+ * applied after the data is selected. For now, we push it into the SELECT,
then MSQ pulls it back
+ * out. This is unfortunate as errors will be attributed to ORDER BY, which
the user did not
+ * actually specify (it is an error to do so.) However, with the current
hybrid structure, it is
+ * not possible to add the ORDER by later: doing so requires access to the
order by namespace
+ * which is not visible to subclasses.
+ */
+ private void rewriteClusteringToOrderBy(SqlNode source, DruidSqlIngest
ingestNode, @Nullable SqlNodeList catalogClustering)
+ {
+ SqlNodeList clusteredBy = ingestNode.getClusteredBy();
+ if (clusteredBy == null || clusteredBy.getList().isEmpty()) {
+ if (catalogClustering == null || catalogClustering.getList().isEmpty()) {
+ return;
+ }
+ clusteredBy = catalogClustering;
+ }
+ while (source instanceof SqlWith) {
+ source = ((SqlWith) source).getOperandList().get(1);
+ }
+ final SqlSelect select = (SqlSelect) source;
+
+ select.setOrderBy(clusteredBy);
+ }
+
/**
* Gets the effective PARTITIONED BY granularity. Resolves the granularity
from the granularity specified on the
* ingest node, and on the table metadata as stored in catalog, if any.
Mismatches between the 2 granularities are
@@ -397,6 +459,34 @@ public class DruidSqlValidator extends
BaseDruidSqlValidator
return effectiveGranularity;
}
+ @Nullable
+ private SqlNodeList convertCatalogClustering(final DatasourceFacade
tableMetadata)
+ {
+ if (tableMetadata == null) {
+ return null;
+ }
+ final List<ClusterKeySpec> keyCols = tableMetadata.clusterKeys();
+ if (CollectionUtils.isNullOrEmpty(keyCols)) {
+ return null;
+ }
+ final SqlNodeList keyNodes = new SqlNodeList(SqlParserPos.ZERO);
+ for (ClusterKeySpec keyCol : keyCols) {
+ final SqlIdentifier colIdent = new SqlIdentifier(
+ Collections.singletonList(keyCol.expr()),
+ null, SqlParserPos.ZERO,
+ Collections.singletonList(SqlParserPos.ZERO)
+ );
+ final SqlNode keyNode;
+ if (keyCol.desc()) {
+ keyNode = SqlStdOperatorTable.DESC.createCall(SqlParserPos.ZERO,
colIdent);
+ } else {
+ keyNode = colIdent;
+ }
+ keyNodes.add(keyNode);
+ }
+ return keyNodes;
+ }
+
/**
* Compute and validate the target type. In normal SQL, the engine would
insert
* a project operator after the SELECT before the write to cast columns from
the
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java
index d05e618c881..45b4b40e41c 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.catalog.model.ResolvedTable;
import org.apache.druid.catalog.model.TableDefn;
import org.apache.druid.catalog.model.TableSpec;
import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
import org.apache.druid.catalog.model.table.DatasourceDefn;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.InlineInputSource;
@@ -40,6 +41,7 @@ import
org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFacto
import
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import
org.apache.druid.sql.calcite.CalciteCatalogIngestionDmlTest.CatalogIngestionDmlComponentSupplier;
@@ -53,6 +55,8 @@ import org.apache.druid.sql.calcite.util.CalciteTests;
import
org.apache.druid.sql.calcite.util.SqlTestFramework.SqlTestFrameWorkModule;
import org.junit.jupiter.api.Test;
+import javax.annotation.Nullable;
+
@SqlTestFrameWorkModule(CatalogIngestionDmlComponentSupplier.class)
public abstract class CalciteCatalogIngestionDmlTest extends
CalciteIngestionDmlTest
{
@@ -88,7 +92,7 @@ public abstract class CalciteCatalogIngestionDmlTest extends
CalciteIngestionDml
new DatasourceTable.EffectiveMetadata(
new DatasourceFacade(new ResolvedTable(
new TableDefn(
- "foo",
+ "hourDs",
DatasourceDefn.TABLE_TYPE,
null,
null
@@ -112,7 +116,7 @@ public abstract class CalciteCatalogIngestionDmlTest
extends CalciteIngestionDml
"noPartitonedBy", new DatasourceTable(
RowSignature.builder().addTimeColumn().build(),
new DatasourceTable.PhysicalDatasourceMetadata(
- new TableDataSource("hourDs"),
+ new TableDataSource("noPartitonedBy"),
RowSignature.builder().addTimeColumn().build(),
false,
false
@@ -120,7 +124,7 @@ public abstract class CalciteCatalogIngestionDmlTest
extends CalciteIngestionDml
new DatasourceTable.EffectiveMetadata(
new DatasourceFacade(new ResolvedTable(
new TableDefn(
- "foo",
+ "noPartitonedBy",
DatasourceDefn.TABLE_TYPE,
null,
null
@@ -157,7 +161,11 @@ public abstract class CalciteCatalogIngestionDmlTest
extends CalciteIngestionDml
null,
null
),
- new TableSpec(DatasourceDefn.TABLE_TYPE,
ImmutableMap.of(DatasourceDefn.SEALED_PROPERTY, true), null),
+ new TableSpec(
+ DatasourceDefn.TABLE_TYPE,
+ ImmutableMap.of(DatasourceDefn.SEALED_PROPERTY, true),
+ null
+ ),
MAPPER
)),
DatasourceTable.EffectiveMetadata.toEffectiveColumns(RowSignature.builder().build()),
@@ -203,7 +211,7 @@ public abstract class CalciteCatalogIngestionDmlTest
extends CalciteIngestionDml
"fooSealed", new DatasourceTable(
FOO_TABLE_SIGNATURE,
new DatasourceTable.PhysicalDatasourceMetadata(
- new TableDataSource("foo"),
+ new TableDataSource("fooSealed"),
FOO_TABLE_SIGNATURE,
false,
false
@@ -211,7 +219,7 @@ public abstract class CalciteCatalogIngestionDmlTest
extends CalciteIngestionDml
new DatasourceTable.EffectiveMetadata(
new DatasourceFacade(new ResolvedTable(
new TableDefn(
- "foo",
+ "fooSealed",
DatasourceDefn.TABLE_TYPE,
null,
null
@@ -234,6 +242,54 @@ public abstract class CalciteCatalogIngestionDmlTest
extends CalciteIngestionDml
DatasourceTable.EffectiveMetadata.toEffectiveColumns(FOO_TABLE_SIGNATURE),
false
)
+ ),
+ "tableWithClustering", new DatasourceTable(
+ FOO_TABLE_SIGNATURE,
+ new DatasourceTable.PhysicalDatasourceMetadata(
+ new TableDataSource("tableWithClustering"),
+ RowSignature.builder()
+ .addTimeColumn()
+ .add("dim1", ColumnType.STRING)
+ .add("dim2", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .build(),
+ false,
+ false
+ ),
+ new DatasourceTable.EffectiveMetadata(
+ new DatasourceFacade(new ResolvedTable(
+ new TableDefn(
+ "tableWithClustering",
+ DatasourceDefn.TABLE_TYPE,
+ null,
+ null
+ ),
+ new TableSpec(
+ DatasourceDefn.TABLE_TYPE,
+ ImmutableMap.of(
+ DatasourceDefn.CLUSTER_KEYS_PROPERTY,
+ ImmutableList.of(
+ new ClusterKeySpec("dim1", false),
+ new ClusterKeySpec("dim2", true)
+ )
+ ),
+ ImmutableList.of(
+ new ColumnSpec("__time", Columns.TIME_COLUMN,
null),
+ new ColumnSpec("dim1", Columns.STRING, null),
+ new ColumnSpec("dim2", Columns.STRING, null),
+ new ColumnSpec("cnt", Columns.LONG, null)
+ )
+ ),
+ MAPPER
+ )),
+
DatasourceTable.EffectiveMetadata.toEffectiveColumns(RowSignature.builder()
+ .addTimeColumn()
+ .add("dim1", ColumnType.STRING)
+ .add("dim2", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .build()),
+ false
+ )
)
);
@@ -241,6 +297,7 @@ public abstract class CalciteCatalogIngestionDmlTest
extends CalciteIngestionDml
public CatalogResolver createCatalogResolver()
{
return new CatalogResolver.NullCatalogResolver() {
+ @Nullable
@Override
public DruidTable resolveDatasource(
final String tableName,
@@ -416,6 +473,223 @@ public abstract class CalciteCatalogIngestionDmlTest
extends CalciteIngestionDml
.verify();
}
+ /**
+ * Insert into a catalog table that has clustering defined on the table
definition. Should use
+ * the catalog defined clustering
+ */
+ @Test
+ public void testInsertTableWithClusteringWithClusteringFromCatalog()
+ {
+ ExternalDataSource externalDataSource = new ExternalDataSource(
+ new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+ new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null,
false, false, 0),
+ RowSignature.builder()
+ .add("a", ColumnType.STRING)
+ .add("b", ColumnType.STRING)
+ .add("c", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .add("e", ColumnType.STRING)
+ .build()
+ );
+ final RowSignature signature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("dim2", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .build();
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") +
"\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " d AS dim2,\n" +
+ " 1 AS cnt\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ "PARTITIONED BY ALL TIME")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("tableWithClustering", signature)
+ .expectResources(dataSourceWrite("tableWithClustering"),
Externals.externalRead("EXTERNAL"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(externalDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .virtualColumns(
+ expressionVirtualColumn("v0",
"timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+ expressionVirtualColumn("v1", "1", ColumnType.LONG)
+ )
+ .orderBy(
+ ImmutableList.of(
+ new ScanQuery.OrderBy("b", ScanQuery.Order.ASCENDING),
+ new ScanQuery.OrderBy("d", ScanQuery.Order.DESCENDING)
+ )
+ )
+ // Scan query lists columns in alphabetical order independent
of the
+ // SQL project list or the defined schema.
+ .columns("b", "d", "v0", "v1")
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * Insert into a catalog table that has clustering defined on the table
definition, but user specifies
+ * clustering on the ingest query. Should use the query defined clustering
+ */
+ @Test
+ public void testInsertTableWithClusteringWithClusteringFromQuery()
+ {
+ ExternalDataSource externalDataSource = new ExternalDataSource(
+ new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+ new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null,
false, false, 0),
+ RowSignature.builder()
+ .add("a", ColumnType.STRING)
+ .add("b", ColumnType.STRING)
+ .add("c", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .add("e", ColumnType.STRING)
+ .build()
+ );
+ final RowSignature signature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("dim2", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .build();
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") +
"\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " d AS dim2,\n" +
+ " 1 AS cnt\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ "PARTITIONED BY ALL TIME\n" +
+ "CLUSTERED BY dim1")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("tableWithClustering", signature)
+ .expectResources(dataSourceWrite("tableWithClustering"),
Externals.externalRead("EXTERNAL"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(externalDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .virtualColumns(
+ expressionVirtualColumn("v0",
"timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+ expressionVirtualColumn("v1", "1", ColumnType.LONG)
+ )
+ .orderBy(
+ ImmutableList.of(
+ new ScanQuery.OrderBy("b", ScanQuery.Order.ASCENDING)
+ )
+ )
+ // Scan query lists columns in alphabetical order independent
of the
+ // SQL project list or the defined schema.
+ .columns("b", "d", "v0", "v1")
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * Insert into a catalog table that has clustering defined on the table
definition, but user specifies
+ * clustering on the ingest query on column that has not been defined in the
table catalog definition.
+ * Should use the query defined clustering
+ */
+ @Test
+ public void testInsertTableWithClusteringWithClusteringOnNewColumnFromQuery()
+ {
+ ExternalDataSource externalDataSource = new ExternalDataSource(
+ new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+ new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null,
false, false, 0),
+ RowSignature.builder()
+ .add("a", ColumnType.STRING)
+ .add("b", ColumnType.STRING)
+ .add("c", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .add("e", ColumnType.STRING)
+ .build()
+ );
+ final RowSignature signature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("dim2", ColumnType.STRING)
+ .add("dim3", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .build();
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") +
"\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " d AS dim2,\n" +
+ " e AS dim3,\n" +
+ " 1 AS cnt\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ "PARTITIONED BY ALL TIME\n" +
+ "CLUSTERED BY dim3")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("tableWithClustering", signature)
+ .expectResources(dataSourceWrite("tableWithClustering"),
Externals.externalRead("EXTERNAL"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(externalDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .virtualColumns(
+ expressionVirtualColumn("v0",
"timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+ expressionVirtualColumn("v1", "1", ColumnType.LONG)
+ )
+ .orderBy(
+ ImmutableList.of(
+ new ScanQuery.OrderBy("e", ScanQuery.Order.ASCENDING)
+ )
+ )
+ // Scan query lists columns in alphabetical order independent
of the
+ // SQL project list or the defined schema.
+ .columns("b", "d", "e", "v0", "v1")
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * Insert into a catalog table that has clustering defined on the table
definition, but user specifies
+ * clustering on the ingest query on column that has not been defined in the
table catalog definition.
+ * or in the select clause. Should fail with validation error.
+ */
+ @Test
+ public void testInsertTableWithClusteringWithClusteringOnBadColumn()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") +
"\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " d AS dim2,\n" +
+ " e AS dim3,\n" +
+ " 1 AS cnt\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ "PARTITIONED BY ALL TIME\n" +
+ "CLUSTERED BY blah")
+ .expectValidationError(
+ DruidException.class,
+ "Column 'blah' not found in any table (line [13], column [14])")
+ .verify();
+ }
+
/**
* Adding a new column during group by ingestion that is not defined in a
non-sealed table should succeed.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]