This is an automated email from the ASF dual-hosted git repository.
zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new fb7c84fb5d8 Catalog clustering keys fixes (#16351)
fb7c84fb5d8 is described below
commit fb7c84fb5d8c87f05298629839c6bc8fd442b776
Author: zachjsh <[email protected]>
AuthorDate: Fri May 3 14:02:56 2024 -0400
Catalog clustering keys fixes (#16351)
* * add another catalog clustering columns unit test
* * dissallow clusterKeys with descending order
* * make more clear that clustering is re-written into ingest node
whether a catalog table or not
* * when partitionedBy is stored in catalog, user shouldnt need to specify
it in order to specify clustering
* * fix intellij inspection failure
---
.../druid/server/http/catalog/EditorTest.java | 37 +++++-
.../druid/catalog/model/table/DatasourceDefn.java | 34 +++++-
.../catalog/model/table/DatasourceTableTest.java | 25 ++++
sql/src/main/codegen/includes/insert.ftl | 6 +-
sql/src/main/codegen/includes/replace.ftl | 6 +-
.../sql/calcite/planner/DruidSqlValidator.java | 20 ++-
.../calcite/CalciteCatalogIngestionDmlTest.java | 136 ++++++++++++++++++++-
.../druid/sql/calcite/CalciteInsertDmlTest.java | 2 +-
8 files changed, 240 insertions(+), 26 deletions(-)
diff --git
a/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/EditorTest.java
b/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/EditorTest.java
index 8686414cc0b..2556ce74cf0 100644
---
a/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/EditorTest.java
+++
b/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/EditorTest.java
@@ -19,6 +19,8 @@
package org.apache.druid.server.http.catalog;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
import org.apache.curator.shaded.com.google.common.collect.ImmutableMap;
import org.apache.druid.catalog.CatalogException;
import org.apache.druid.catalog.http.TableEditRequest;
@@ -34,10 +36,12 @@ import org.apache.druid.catalog.model.ColumnSpec;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.TableId;
import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
import org.apache.druid.catalog.model.table.DatasourceDefn;
import org.apache.druid.catalog.model.table.TableBuilder;
import org.apache.druid.catalog.storage.CatalogStorage;
import org.apache.druid.catalog.storage.CatalogTests;
+import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.metadata.TestDerbyConnector;
import org.junit.After;
import org.junit.Before;
@@ -56,6 +60,7 @@ import static org.junit.Assert.assertThrows;
public class EditorTest
{
+ private static final ObjectMapper MAPPER = new DefaultObjectMapper();
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new
TestDerbyConnector.DerbyConnectorRule();
@@ -326,7 +331,6 @@ public class EditorTest
// Can't test an empty property set: no table type allows empty
// properties.
- // Remove a required property
Map<String, Object> updates = new HashMap<>();
updates.put(DatasourceDefn.SEGMENT_GRANULARITY_PROPERTY, null);
cmd = new UpdateProperties(updates);
@@ -374,6 +378,37 @@ public class EditorTest
expected,
doEdit(tableName, cmd).spec().properties()
);
+
+ // Add a DESC cluster key - should fail
+ Map<String, Object> updates1 = new HashMap<>();
+ updates1.put(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new
ClusterKeySpec("clusterKeyA", true)));
+
+ assertThrows(
+ CatalogException.class,
+ () -> new TableEditor(
+ catalog,
+ table.id(),
+ new UpdateProperties(updates1)
+ ).go()
+ );
+
+ // Add a ASC cluster key - should succeed
+ updates = new HashMap<>();
+ updates.put(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new
ClusterKeySpec("clusterKeyA", false)));
+ cmd = new UpdateProperties(updates);
+ expected = ImmutableMap.of(
+ DatasourceDefn.SEGMENT_GRANULARITY_PROPERTY, "PT1H",
+ DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new
ClusterKeySpec("clusterKeyA", false))
+ );
+ Map<String, Object> actual = doEdit(tableName, cmd).spec().properties();
+ actual.put(
+ DatasourceDefn.CLUSTER_KEYS_PROPERTY,
+ MAPPER.convertValue(actual.get(DatasourceDefn.CLUSTER_KEYS_PROPERTY),
ClusterKeySpec.CLUSTER_KEY_LIST_TYPE_REF)
+ );
+ assertEquals(
+ expected,
+ actual
+ );
}
@Test
diff --git
a/server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java
b/server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java
index 46d21bf1279..b3dc953cf8e 100644
---
a/server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java
+++
b/server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java
@@ -110,6 +110,34 @@ public class DatasourceDefn extends TableDefn
}
}
+ public static class ClusterKeysDefn extends
ModelProperties.ListPropertyDefn<ClusterKeySpec>
+ {
+ public ClusterKeysDefn()
+ {
+ super(
+ CLUSTER_KEYS_PROPERTY,
+ "ClusterKeySpec list",
+ new TypeReference<List<ClusterKeySpec>>() {}
+ );
+ }
+
+ @Override
+ public void validate(Object value, ObjectMapper jsonMapper)
+ {
+ if (value == null) {
+ return;
+ }
+ List<ClusterKeySpec> clusterKeys = decode(value, jsonMapper);
+ for (ClusterKeySpec clusterKey : clusterKeys) {
+ if (clusterKey.desc()) {
+ throw new IAE(
+ StringUtils.format("Cannot specify DESC clustering key [%s].
Only ASC is supported.", clusterKey)
+ );
+ }
+ }
+ }
+ }
+
public DatasourceDefn()
{
super(
@@ -118,11 +146,7 @@ public class DatasourceDefn extends TableDefn
Arrays.asList(
new SegmentGranularityFieldDefn(),
new ModelProperties.IntPropertyDefn(TARGET_SEGMENT_ROWS_PROPERTY),
- new ModelProperties.ListPropertyDefn<ClusterKeySpec>(
- CLUSTER_KEYS_PROPERTY,
- "cluster keys",
- new TypeReference<List<ClusterKeySpec>>() { }
- ),
+ new ClusterKeysDefn(),
new HiddenColumnsDefn(),
new ModelProperties.BooleanPropertyDefn(SEALED_PROPERTY)
),
diff --git
a/server/src/test/java/org/apache/druid/catalog/model/table/DatasourceTableTest.java
b/server/src/test/java/org/apache/druid/catalog/model/table/DatasourceTableTest.java
index 1bccf580eb3..b93db97acc6 100644
---
a/server/src/test/java/org/apache/druid/catalog/model/table/DatasourceTableTest.java
+++
b/server/src/test/java/org/apache/druid/catalog/model/table/DatasourceTableTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.catalog.model.table;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.catalog.CatalogTest;
@@ -116,6 +117,28 @@ public class DatasourceTableTest
}
}
+ @Test
+ public void testSpecWithClusterKeyProp()
+ {
+ {
+ TableSpec spec = new TableSpec(
+ DatasourceDefn.TABLE_TYPE,
+ ImmutableMap.of(DatasourceDefn.CLUSTER_KEYS_PROPERTY,
ImmutableList.of(new ClusterKeySpec("clusterKeyA", true))),
+ null
+ );
+ expectValidationFails(spec);
+ }
+
+ {
+ TableSpec spec = new TableSpec(
+ DatasourceDefn.TABLE_TYPE,
+ ImmutableMap.of(DatasourceDefn.CLUSTER_KEYS_PROPERTY,
ImmutableList.of(new ClusterKeySpec("clusterKeyA", false))),
+ null
+ );
+ expectValidationSucceeds(spec);
+ }
+ }
+
@Test
public void testAllProperties()
{
@@ -125,6 +148,7 @@ public class DatasourceTableTest
.put(DatasourceDefn.TARGET_SEGMENT_ROWS_PROPERTY, 1_000_000)
.put(DatasourceDefn.HIDDEN_COLUMNS_PROPERTY, Arrays.asList("foo",
"bar"))
.put(DatasourceDefn.SEALED_PROPERTY, true)
+ .put(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new
ClusterKeySpec("clusterKeyA", false)))
.build();
TableSpec spec = new TableSpec(DatasourceDefn.TABLE_TYPE, props, null);
@@ -132,6 +156,7 @@ public class DatasourceTableTest
assertEquals("P1D", facade.segmentGranularityString());
assertEquals(1_000_000, (int) facade.targetSegmentRows());
assertEquals(Arrays.asList("foo", "bar"), facade.hiddenColumns());
+ assertEquals(Collections.singletonList(new ClusterKeySpec("clusterKeyA",
false)), facade.clusterKeys());
assertTrue(facade.isSealed());
}
diff --git a/sql/src/main/codegen/includes/insert.ftl
b/sql/src/main/codegen/includes/insert.ftl
index 323757dd78d..0f053a4f655 100644
--- a/sql/src/main/codegen/includes/insert.ftl
+++ b/sql/src/main/codegen/includes/insert.ftl
@@ -93,11 +93,7 @@ SqlNode DruidSqlInsertEof() :
clusteredBy = ClusteredBy()
]
{
- if (clusteredBy != null && partitionedBy == null) {
- throw
org.apache.druid.sql.calcite.parser.DruidSqlParserUtils.problemParsing(
- "CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come
after the PARTITIONED BY clause"
- );
- }
+
}
// EOF is also present in SqlStmtEof but EOF is a special case and a single
EOF can be consumed multiple times.
// The reason for adding EOF here is to ensure that we create a
DruidSqlInsert node after the syntax has been
diff --git a/sql/src/main/codegen/includes/replace.ftl
b/sql/src/main/codegen/includes/replace.ftl
index b8851167036..10449c3fccb 100644
--- a/sql/src/main/codegen/includes/replace.ftl
+++ b/sql/src/main/codegen/includes/replace.ftl
@@ -77,11 +77,7 @@ SqlNode DruidSqlReplaceEof() :
clusteredBy = ClusteredBy()
]
{
- if (clusteredBy != null && partitionedBy == null) {
- throw
org.apache.druid.sql.calcite.parser.DruidSqlParserUtils.problemParsing(
- "CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come
after the PARTITIONED BY clause"
- );
- }
+
}
// EOF is also present in SqlStmtEof but EOF is a special case and a
single EOF can be consumed multiple times.
// The reason for adding EOF here is to ensure that we create a
DruidSqlReplace node after the syntax has been
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
index aa04a55eb50..b4f006ce97e 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
@@ -291,6 +291,7 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
SqlSelect select,
SqlNode enclosingNode)
{
+ SqlNodeList catalogClustering = null;
if (enclosingNode instanceof DruidSqlIngest) {
// The target is a new or existing datasource.
// The target namespace is both the target table ID and the row type for
that table.
@@ -308,11 +309,10 @@ public class DruidSqlValidator extends
BaseDruidSqlValidator
final DatasourceFacade tableMetadata = table == null
? null
: table.effectiveMetadata().catalogMetadata();
- // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY
clause
- final SqlNodeList catalogClustering =
convertCatalogClustering(tableMetadata);
- rewriteClusteringToOrderBy(select, (DruidSqlIngest) enclosingNode,
catalogClustering);
- return new SelectNamespace(this, select, enclosingNode);
+ catalogClustering = convertCatalogClustering(tableMetadata);
}
+ // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
+ rewriteClusteringToOrderBy(select, (DruidSqlIngest) enclosingNode,
catalogClustering);
}
return super.createSelectNamespace(select, enclosingNode);
}
@@ -408,6 +408,7 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
}
final SqlSelect select = (SqlSelect) source;
+ DruidSqlParserUtils.validateClusteredByColumns(clusteredBy);
select.setOrderBy(clusteredBy);
}
@@ -451,6 +452,17 @@ public class DruidSqlValidator extends
BaseDruidSqlValidator
}
if (effectiveGranularity == null) {
+ SqlNode source = ingestNode.getSource();
+ while (source instanceof SqlWith) {
+ source = ((SqlWith) source).getOperandList().get(1);
+ }
+ final SqlSelect select = (SqlSelect) source;
+
+ if (select.getOrderList() != null) {
+ throw DruidSqlParserUtils.problemParsing(
+ "CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come
after the PARTITIONED BY clause"
+ );
+ }
throw InvalidSqlInput.exception(
"Operation [%s] requires a PARTITIONED BY to be explicitly defined,
but none was found.",
operationName);
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java
index 45b4b40e41c..c85e6336de5 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java
@@ -264,6 +264,55 @@ public abstract class CalciteCatalogIngestionDmlTest
extends CalciteIngestionDml
null,
null
),
+ new TableSpec(
+ DatasourceDefn.TABLE_TYPE,
+ ImmutableMap.of(
+ DatasourceDefn.SEGMENT_GRANULARITY_PROPERTY, "ALL",
+ DatasourceDefn.CLUSTER_KEYS_PROPERTY,
+ ImmutableList.of(
+ new ClusterKeySpec("dim1", false),
+ new ClusterKeySpec("dim2", false)
+ )
+ ),
+ ImmutableList.of(
+ new ColumnSpec("__time", Columns.TIME_COLUMN,
null),
+ new ColumnSpec("dim1", Columns.STRING, null),
+ new ColumnSpec("dim2", Columns.STRING, null),
+ new ColumnSpec("cnt", Columns.LONG, null)
+ )
+ ),
+ MAPPER
+ )),
+
DatasourceTable.EffectiveMetadata.toEffectiveColumns(RowSignature.builder()
+ .addTimeColumn()
+ .add("dim1", ColumnType.STRING)
+ .add("dim2", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .build()),
+ false
+ )
+ ),
+ "tableWithClusteringDesc", new DatasourceTable(
+ FOO_TABLE_SIGNATURE,
+ new DatasourceTable.PhysicalDatasourceMetadata(
+ new TableDataSource("tableWithClusteringDesc"),
+ RowSignature.builder()
+ .addTimeColumn()
+ .add("dim1", ColumnType.STRING)
+ .add("dim2", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .build(),
+ false,
+ false
+ ),
+ new DatasourceTable.EffectiveMetadata(
+ new DatasourceFacade(new ResolvedTable(
+ new TableDefn(
+ "tableWithClusteringDesc",
+ DatasourceDefn.TABLE_TYPE,
+ null,
+ null
+ ),
new TableSpec(
DatasourceDefn.TABLE_TYPE,
ImmutableMap.of(
@@ -523,7 +572,7 @@ public abstract class CalciteCatalogIngestionDmlTest
extends CalciteIngestionDml
.orderBy(
ImmutableList.of(
new ScanQuery.OrderBy("b", ScanQuery.Order.ASCENDING),
- new ScanQuery.OrderBy("d", ScanQuery.Order.DESCENDING)
+ new ScanQuery.OrderBy("d", ScanQuery.Order.ASCENDING)
)
)
// Scan query lists columns in alphabetical order independent
of the
@@ -570,7 +619,6 @@ public abstract class CalciteCatalogIngestionDmlTest
extends CalciteIngestionDml
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
" (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
- "PARTITIONED BY ALL TIME\n" +
"CLUSTERED BY dim1")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("tableWithClustering", signature)
@@ -664,11 +712,11 @@ public abstract class CalciteCatalogIngestionDmlTest
extends CalciteIngestionDml
/**
* Insert into a catalog table that has clustering defined on the table
definition, but user specifies
- * clustering on the ingest query on column that has not been defined in the
table catalog definition.
- * or in the select clause. Should fail with validation error.
+ * clustering on the ingest query on column that has not been specified in
the select clause. Should fail with
+ * validation error.
*/
@Test
- public void testInsertTableWithClusteringWithClusteringOnBadColumn()
+ public void testInsertTableWithQueryDefinedClusteringOnNonSelectedColumn()
{
testIngestionQuery()
.sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") +
"\n" +
@@ -690,6 +738,84 @@ public abstract class CalciteCatalogIngestionDmlTest
extends CalciteIngestionDml
.verify();
}
+ /**
+ * Insert into a catalog table that has clustering defined on the table
definition, but one of the clustering
+ * columns specified has not been specified in the select clause. Should
fail with validation error.
+ */
+ @Test
+ public void testInsertTableWithCatalogDefinedClusteringOnNonSelectedColumn()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") +
"\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " e AS dim3,\n" +
+ " 1 AS cnt\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ "PARTITIONED BY ALL TIME")
+ .expectValidationError(
+ DruidException.class,
+ "Column 'dim2' not found in any table (line [0], column [0])")
+ .verify();
+ }
+
+ /**
+ * Insert into a catalog table that has clustering defined on the table
definition, but one of the clustering
+ * columns specified has not been specified in the select clause. Should
fail with validation error.
+ */
+ @Test
+ public void testInsertTableWithCatalogDefinedClusteringDesc()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "tableWithClusteringDesc") +
"\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " d AS dim2,\n" +
+ " e AS dim3,\n" +
+ " 1 AS cnt\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ "PARTITIONED BY ALL TIME")
+ .expectValidationError(
+ DruidException.class,
+ "Invalid CLUSTERED BY clause [`dim2` DESC]: cannot sort in
descending order.")
+ .verify();
+ }
+
+ /**
+ * Insert into a catalog table that has clustering defined on the table
definition, but one of the clustering
+ * columns specified has not been specified in the select clause. Should
fail with validation error.
+ */
+ @Test
+ public void testInsertTableWithQueryDefinedClusteringDesc()
+ {
+ testIngestionQuery()
+ .sql(StringUtils.format(dmlPrefixPattern, "tableWithClusteringDesc") +
"\n" +
+ "SELECT\n" +
+ " TIME_PARSE(a) AS __time,\n" +
+ " b AS dim1,\n" +
+ " d AS dim2,\n" +
+ " e AS dim3,\n" +
+ " 1 AS cnt\n" +
+ "FROM TABLE(inline(\n" +
+ " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+ " format => 'csv'))\n" +
+ " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+ "PARTITIONED BY ALL TIME\n" +
+ "CLUSTERED BY dim1 DESC")
+ .expectValidationError(
+ DruidException.class,
+ "Invalid CLUSTERED BY clause [`dim1` DESC]: cannot sort in
descending order.")
+ .verify();
+ }
+
/**
* Adding a new column during group by ingestion that is not defined in a
non-sealed table should succeed.
*/
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
index 8b62bd66805..63e12aec2c3 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
@@ -1113,7 +1113,7 @@ public class CalciteInsertDmlTest extends
CalciteIngestionDmlTest
.sql(
"INSERT INTO druid.dst "
+ "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2
FROM foo "
- + "CLUSTERED BY 2, dim1 DESC, CEIL(m2)"
+ + "CLUSTERED BY 2, dim1, CEIL(m2)"
)
.expectValidationError(invalidSqlIs(
"CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come
after the PARTITIONED BY clause"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]