github-advanced-security[bot] commented on code in PR #16260:
URL: https://github.com/apache/druid/pull/16260#discussion_r1560032789
##########
sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java:
##########
@@ -210,11 +287,727 @@
final DatasourceTable.PhysicalDatasourceMetadata dsMetadata
)
{
- if (resolvedTables.get(tableName) != null) {
- return resolvedTables.get(tableName);
+ if (RESOLVED_TABLES.get(tableName) != null) {
+ return RESOLVED_TABLES.get(tableName);
}
return dsMetadata == null ? null : new DatasourceTable(dsMetadata);
}
};
}
+
+ /**
+ * If the segment grain is given in the catalog and absent in the
PARTITIONED BY clause in the query, then use the
+ * value from the catalog.
+ */
+ @Test
+ public void testInsertHourGrainPartitonedByFromCatalog()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "hourDs") + "\n" +
+ "SELECT * FROM foo")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("hourDs", FOO_TABLE_SIGNATURE)
+ .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ // Scan query lists columns in alphabetical order independent
of the
+ // SQL project list or the defined schema.
+ .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2",
"unique_dim1")
+ .context(queryContextWithGranularity(Granularities.HOUR))
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * If the segment grain is given in the catalog, and also by PARTITIONED BY,
then
+ * the query value is used.
+ */
+ @Test
+ public void testInsertHourGrainWithDayPartitonedByFromQuery()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "hourDs") + "\n" +
+ "SELECT * FROM foo\n" +
+ "PARTITIONED BY day")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("hourDs", FOO_TABLE_SIGNATURE)
+ .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ // Scan query lists columns in alphabetical order independent
of the
+ // SQL project list or the defined schema.
+ .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2",
"unique_dim1")
+ .context(queryContextWithGranularity(Granularities.DAY))
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * If the segment grain is absent in the catalog and absent in the
PARTITIONED BY clause in the query, then
+ * validation error.
+ */
+ @Test
+ public void testInsertNoPartitonedByFromCatalog()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "noPartitonedBy") + "\n" +
+ "SELECT * FROM foo")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectValidationError(
+ DruidException.class,
+ StringUtils.format("Operation [%s] requires a PARTITIONED BY to be
explicitly defined, but none was found.", operationName)
+ )
+ .verify();
+ }
+
+ /**
+ * If the segment grain is absent in the catalog, but given by PARTITIONED
BY, then
+ * the query value is used.
+ */
+ @Test
+ public void testInsertNoPartitonedByWithDayPartitonedByFromQuery()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "noPartitonedBy") + "\n" +
+ "SELECT * FROM foo\n" +
+ "PARTITIONED BY day")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE)
+ .expectResources(dataSourceWrite("noPartitonedBy"),
dataSourceRead("foo"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ // Scan query lists columns in alphabetical order independent
of the
+ // SQL project list or the defined schema.
+ .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2",
"unique_dim1")
+ .context(queryContextWithGranularity(Granularities.DAY))
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * Adding a new column during ingestion that is not defined in a non-sealed
table should succeed.
+ */
+ @Test
+ public void testInsertAddNonDefinedColumnIntoNonSealedCatalogTable()
+ {
+ ExternalDataSource externalDataSource = new ExternalDataSource(
+ new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+ new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null,
false, false, 0),
+ RowSignature.builder()
+ .add("a", ColumnType.STRING)
+ .add("b", ColumnType.STRING)
+ .add("c", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .add("e", ColumnType.STRING)
+ .build()
+ );
+ final RowSignature signature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .add("m2", ColumnType.DOUBLE)
+ .add("extra2", ColumnType.LONG)
+ .add("extra3", ColumnType.STRING)
+ .build();
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " 1 AS cnt,\n" +
+ " c AS m2,\n" +
+ " CAST(d AS BIGINT) AS extra2,\n" +
+ " e AS extra3\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ "PARTITIONED BY ALL TIME")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("foo", signature)
+ .expectResources(dataSourceWrite("foo"),
Externals.externalRead("EXTERNAL"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(externalDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .virtualColumns(
+ expressionVirtualColumn("v0",
"timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+ expressionVirtualColumn("v1", "1", ColumnType.LONG),
+ expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')",
ColumnType.DOUBLE),
+ expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')",
ColumnType.LONG)
+ )
+ // Scan query lists columns in alphabetical order independent
of the
+ // SQL project list or the defined schema.
+ .columns("b", "e", "v0", "v1", "v2", "v3")
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * Insert into a catalog table that has clustering defined on the table
definition. Should use
+ * the catalog defined clustering
+ */
+ @Test
+ public void testInsertTableWithClusteringWithClusteringFromCatalog()
+ {
+ ExternalDataSource externalDataSource = new ExternalDataSource(
+ new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+ new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null,
false, false, 0),
+ RowSignature.builder()
+ .add("a", ColumnType.STRING)
+ .add("b", ColumnType.STRING)
+ .add("c", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .add("e", ColumnType.STRING)
+ .build()
+ );
+ final RowSignature signature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("dim2", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .build();
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") +
"\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " d AS dim2,\n" +
+ " 1 AS cnt\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ "PARTITIONED BY ALL TIME")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("tableWithClustering", signature)
+ .expectResources(dataSourceWrite("tableWithClustering"),
Externals.externalRead("EXTERNAL"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(externalDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .virtualColumns(
+ expressionVirtualColumn("v0",
"timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+ expressionVirtualColumn("v1", "1", ColumnType.LONG)
+ )
+ .orderBy(
+ ImmutableList.of(
+ new ScanQuery.OrderBy("b", ScanQuery.Order.ASCENDING),
+ new ScanQuery.OrderBy("d", ScanQuery.Order.DESCENDING)
+ )
+ )
+ // Scan query lists columns in alphabetical order independent
of the
+ // SQL project list or the defined schema.
+ .columns("b", "d", "v0", "v1")
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * Insert into a catalog table that has clustering defined on the table
definition, but user specifies
+ * clustering on the ingest query. Should use the query defined clustering
+ */
+ @Test
+ public void testInsertTableWithClusteringWithClusteringFromQuery()
+ {
+ ExternalDataSource externalDataSource = new ExternalDataSource(
+ new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+ new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null,
false, false, 0),
+ RowSignature.builder()
+ .add("a", ColumnType.STRING)
+ .add("b", ColumnType.STRING)
+ .add("c", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .add("e", ColumnType.STRING)
+ .build()
+ );
+ final RowSignature signature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("dim2", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .build();
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") +
"\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " d AS dim2,\n" +
+ " 1 AS cnt\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ "PARTITIONED BY ALL TIME\n" +
+ "CLUSTERED BY dim1")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("tableWithClustering", signature)
+ .expectResources(dataSourceWrite("tableWithClustering"),
Externals.externalRead("EXTERNAL"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(externalDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .virtualColumns(
+ expressionVirtualColumn("v0",
"timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+ expressionVirtualColumn("v1", "1", ColumnType.LONG)
+ )
+ .orderBy(
+ ImmutableList.of(
+ new ScanQuery.OrderBy("b", ScanQuery.Order.ASCENDING)
+ )
+ )
+ // Scan query lists columns in alphabetical order independent
of the
+ // SQL project list or the defined schema.
+ .columns("b", "d", "v0", "v1")
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * Insert into a catalog table that has clustering defined on the table
definition, but user specifies
+ * clustering on the ingest query on column that has not been defined in the
table catalog definition.
+ * Should use the query defined clustering
+ */
+ @Test
+ public void testInsertTableWithClusteringWithClusteringOnNewColumnFromQuery()
+ {
+ ExternalDataSource externalDataSource = new ExternalDataSource(
+ new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+ new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null,
false, false, 0),
+ RowSignature.builder()
+ .add("a", ColumnType.STRING)
+ .add("b", ColumnType.STRING)
+ .add("c", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .add("e", ColumnType.STRING)
+ .build()
+ );
+ final RowSignature signature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("dim2", ColumnType.STRING)
+ .add("dim3", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .build();
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") +
"\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " d AS dim2,\n" +
+ " e AS dim3,\n" +
+ " 1 AS cnt\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ "PARTITIONED BY ALL TIME\n" +
+ "CLUSTERED BY dim3")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("tableWithClustering", signature)
+ .expectResources(dataSourceWrite("tableWithClustering"),
Externals.externalRead("EXTERNAL"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(externalDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .virtualColumns(
+ expressionVirtualColumn("v0",
"timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+ expressionVirtualColumn("v1", "1", ColumnType.LONG)
+ )
+ .orderBy(
+ ImmutableList.of(
+ new ScanQuery.OrderBy("e", ScanQuery.Order.ASCENDING)
+ )
+ )
+ // Scan query lists columns in alphabetical order independent
of the
+ // SQL project list or the defined schema.
+ .columns("b", "d", "e", "v0", "v1")
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * Insert into a catalog table that has clustering defined on the table
definition, but user specifies
+ * clustering on the ingest query on column that has not been defined in the
table catalog definition.
+ * or in the select clause. Should fail with validation error.
+ */
+ @Test
+ public void testInsertTableWithClusteringWithClusteringOnBadColumn()
+ {
+ ExternalDataSource externalDataSource = new ExternalDataSource(
+ new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+ new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null,
false, false, 0),
+ RowSignature.builder()
+ .add("a", ColumnType.STRING)
+ .add("b", ColumnType.STRING)
+ .add("c", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .add("e", ColumnType.STRING)
+ .build()
+ );
Review Comment:
## Unread local variable
Variable 'ExternalDataSource externalDataSource' is never read.
[Show more
details](https://github.com/apache/druid/security/code-scanning/7257)
##########
sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java:
##########
@@ -210,11 +287,727 @@
final DatasourceTable.PhysicalDatasourceMetadata dsMetadata
)
{
- if (resolvedTables.get(tableName) != null) {
- return resolvedTables.get(tableName);
+ if (RESOLVED_TABLES.get(tableName) != null) {
+ return RESOLVED_TABLES.get(tableName);
}
return dsMetadata == null ? null : new DatasourceTable(dsMetadata);
}
};
}
+
+ /**
+ * If the segment grain is given in the catalog and absent in the
PARTITIONED BY clause in the query, then use the
+ * value from the catalog.
+ */
+ @Test
+ public void testInsertHourGrainPartitonedByFromCatalog()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "hourDs") + "\n" +
+ "SELECT * FROM foo")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("hourDs", FOO_TABLE_SIGNATURE)
+ .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ // Scan query lists columns in alphabetical order independent
of the
+ // SQL project list or the defined schema.
+ .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2",
"unique_dim1")
+ .context(queryContextWithGranularity(Granularities.HOUR))
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * If the segment grain is given in the catalog, and also by PARTITIONED BY,
then
+ * the query value is used.
+ */
+ @Test
+ public void testInsertHourGrainWithDayPartitonedByFromQuery()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "hourDs") + "\n" +
+ "SELECT * FROM foo\n" +
+ "PARTITIONED BY day")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("hourDs", FOO_TABLE_SIGNATURE)
+ .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ // Scan query lists columns in alphabetical order independent
of the
+ // SQL project list or the defined schema.
+ .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2",
"unique_dim1")
+ .context(queryContextWithGranularity(Granularities.DAY))
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * If the segment grain is absent in the catalog and absent in the
PARTITIONED BY clause in the query, then
+ * validation error.
+ */
+ @Test
+ public void testInsertNoPartitonedByFromCatalog()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "noPartitonedBy") + "\n" +
+ "SELECT * FROM foo")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectValidationError(
+ DruidException.class,
+ StringUtils.format("Operation [%s] requires a PARTITIONED BY to be
explicitly defined, but none was found.", operationName)
+ )
+ .verify();
+ }
+
+ /**
+ * If the segment grain is absent in the catalog, but given by PARTITIONED
BY, then
+ * the query value is used.
+ */
+ @Test
+ public void testInsertNoPartitonedByWithDayPartitonedByFromQuery()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "noPartitonedBy") + "\n" +
+ "SELECT * FROM foo\n" +
+ "PARTITIONED BY day")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE)
+ .expectResources(dataSourceWrite("noPartitonedBy"),
dataSourceRead("foo"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ // Scan query lists columns in alphabetical order independent
of the
+ // SQL project list or the defined schema.
+ .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2",
"unique_dim1")
+ .context(queryContextWithGranularity(Granularities.DAY))
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * Adding a new column during ingestion that is not defined in a non-sealed
table should succeed.
+ */
+ @Test
+ public void testInsertAddNonDefinedColumnIntoNonSealedCatalogTable()
+ {
+ ExternalDataSource externalDataSource = new ExternalDataSource(
+ new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+ new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null,
false, false, 0),
+ RowSignature.builder()
+ .add("a", ColumnType.STRING)
+ .add("b", ColumnType.STRING)
+ .add("c", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .add("e", ColumnType.STRING)
+ .build()
+ );
+ final RowSignature signature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .add("m2", ColumnType.DOUBLE)
+ .add("extra2", ColumnType.LONG)
+ .add("extra3", ColumnType.STRING)
+ .build();
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " 1 AS cnt,\n" +
+ " c AS m2,\n" +
+ " CAST(d AS BIGINT) AS extra2,\n" +
+ " e AS extra3\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ "PARTITIONED BY ALL TIME")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("foo", signature)
+ .expectResources(dataSourceWrite("foo"),
Externals.externalRead("EXTERNAL"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(externalDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .virtualColumns(
+ expressionVirtualColumn("v0",
"timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+ expressionVirtualColumn("v1", "1", ColumnType.LONG),
+ expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')",
ColumnType.DOUBLE),
+ expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')",
ColumnType.LONG)
+ )
+ // Scan query lists columns in alphabetical order independent
of the
+ // SQL project list or the defined schema.
+ .columns("b", "e", "v0", "v1", "v2", "v3")
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * Insert into a catalog table that has clustering defined on the table
definition. Should use
+ * the catalog defined clustering
+ */
+ @Test
+ public void testInsertTableWithClusteringWithClusteringFromCatalog()
+ {
+ ExternalDataSource externalDataSource = new ExternalDataSource(
+ new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+ new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null,
false, false, 0),
+ RowSignature.builder()
+ .add("a", ColumnType.STRING)
+ .add("b", ColumnType.STRING)
+ .add("c", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .add("e", ColumnType.STRING)
+ .build()
+ );
+ final RowSignature signature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("dim2", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .build();
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") +
"\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " d AS dim2,\n" +
+ " 1 AS cnt\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ "PARTITIONED BY ALL TIME")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("tableWithClustering", signature)
+ .expectResources(dataSourceWrite("tableWithClustering"),
Externals.externalRead("EXTERNAL"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(externalDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .virtualColumns(
+ expressionVirtualColumn("v0",
"timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+ expressionVirtualColumn("v1", "1", ColumnType.LONG)
+ )
+ .orderBy(
+ ImmutableList.of(
+ new ScanQuery.OrderBy("b", ScanQuery.Order.ASCENDING),
+ new ScanQuery.OrderBy("d", ScanQuery.Order.DESCENDING)
+ )
+ )
+ // Scan query lists columns in alphabetical order independent
of the
+ // SQL project list or the defined schema.
+ .columns("b", "d", "v0", "v1")
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * Insert into a catalog table that has clustering defined on the table
definition, but user specifies
+ * clustering on the ingest query. Should use the query defined clustering
+ */
+ @Test
+ public void testInsertTableWithClusteringWithClusteringFromQuery()
+ {
+ ExternalDataSource externalDataSource = new ExternalDataSource(
+ new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+ new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null,
false, false, 0),
+ RowSignature.builder()
+ .add("a", ColumnType.STRING)
+ .add("b", ColumnType.STRING)
+ .add("c", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .add("e", ColumnType.STRING)
+ .build()
+ );
+ final RowSignature signature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("dim2", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .build();
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") +
"\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " d AS dim2,\n" +
+ " 1 AS cnt\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ "PARTITIONED BY ALL TIME\n" +
+ "CLUSTERED BY dim1")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("tableWithClustering", signature)
+ .expectResources(dataSourceWrite("tableWithClustering"),
Externals.externalRead("EXTERNAL"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(externalDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .virtualColumns(
+ expressionVirtualColumn("v0",
"timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+ expressionVirtualColumn("v1", "1", ColumnType.LONG)
+ )
+ .orderBy(
+ ImmutableList.of(
+ new ScanQuery.OrderBy("b", ScanQuery.Order.ASCENDING)
+ )
+ )
+ // Scan query lists columns in alphabetical order independent
of the
+ // SQL project list or the defined schema.
+ .columns("b", "d", "v0", "v1")
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * Insert into a catalog table that has clustering defined on the table
definition, but user specifies
+ * clustering on the ingest query on column that has not been defined in the
table catalog definition.
+ * Should use the query defined clustering
+ */
+ @Test
+ public void testInsertTableWithClusteringWithClusteringOnNewColumnFromQuery()
+ {
+ ExternalDataSource externalDataSource = new ExternalDataSource(
+ new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+ new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null,
false, false, 0),
+ RowSignature.builder()
+ .add("a", ColumnType.STRING)
+ .add("b", ColumnType.STRING)
+ .add("c", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .add("e", ColumnType.STRING)
+ .build()
+ );
+ final RowSignature signature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("dim2", ColumnType.STRING)
+ .add("dim3", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .build();
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") +
"\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " d AS dim2,\n" +
+ " e AS dim3,\n" +
+ " 1 AS cnt\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ "PARTITIONED BY ALL TIME\n" +
+ "CLUSTERED BY dim3")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("tableWithClustering", signature)
+ .expectResources(dataSourceWrite("tableWithClustering"),
Externals.externalRead("EXTERNAL"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(externalDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .virtualColumns(
+ expressionVirtualColumn("v0",
"timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+ expressionVirtualColumn("v1", "1", ColumnType.LONG)
+ )
+ .orderBy(
+ ImmutableList.of(
+ new ScanQuery.OrderBy("e", ScanQuery.Order.ASCENDING)
+ )
+ )
+ // Scan query lists columns in alphabetical order independent
of the
+ // SQL project list or the defined schema.
+ .columns("b", "d", "e", "v0", "v1")
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .verify();
+ }
+
+ /**
+ * Insert into a catalog table that has clustering defined on the table
definition, but user specifies
+ * clustering on the ingest query on column that has not been defined in the
table catalog definition.
+ * or in the select clause. Should fail with validation error.
+ */
+ @Test
+ public void testInsertTableWithClusteringWithClusteringOnBadColumn()
+ {
+ ExternalDataSource externalDataSource = new ExternalDataSource(
+ new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+ new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null,
false, false, 0),
+ RowSignature.builder()
+ .add("a", ColumnType.STRING)
+ .add("b", ColumnType.STRING)
+ .add("c", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .add("e", ColumnType.STRING)
+ .build()
+ );
+ final RowSignature signature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("dim2", ColumnType.STRING)
+ .add("dim3", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .build();
Review Comment:
## Unread local variable
Variable 'RowSignature signature' is never read.
[Show more
details](https://github.com/apache/druid/security/code-scanning/7258)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]