This is an automated email from the ASF dual-hosted git repository.
zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 04fb75719e Fail query planning if a `CLUSTERED BY` column contains
descending order (#14436)
04fb75719e is described below
commit 04fb75719e7dd76d051dbd4cf8f9c2712b227793
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Fri Jun 16 15:10:12 2023 -0700
Fail query planning if a `CLUSTERED BY` column contains descending order
(#14436)
* Throw ValidationException if CLUSTERED BY column descending order is
specified.
- Fails query planning
* Some more tests.
* fixup existing comment
* Update comment
* checkstyle fix: remove unused imports
* Remove InsertCannotOrderByDescendingFault and deprecate the fault in
readme.
* move deprecated field to the bottom
---
docs/multi-stage-query/reference.md | 2 +-
.../org/apache/druid/msq/exec/ControllerImpl.java | 5 --
.../apache/druid/msq/guice/MSQIndexingModule.java | 2 -
.../error/InsertCannotOrderByDescendingFault.java | 72 ---------------------
.../org/apache/druid/msq/exec/MSQFaultsTest.java | 18 ------
.../org/apache/druid/msq/exec/MSQInsertTest.java | 19 ++++++
.../org/apache/druid/msq/exec/MSQReplaceTest.java | 21 ++++++-
.../msq/indexing/error/MSQFaultSerdeTest.java | 1 -
.../sql/calcite/parser/DruidSqlParserUtils.java | 33 +++++++++-
.../druid/sql/calcite/CalciteInsertDmlTest.java | 38 ++++++++---
.../druid/sql/calcite/CalciteReplaceDmlTest.java | 20 ++++++
.../calcite/parser/DruidSqlParserUtilsTest.java | 73 +++++++++++++++++++++-
.../ingest/insertWithClusteredBy-logicalPlan.txt | 4 +-
13 files changed, 192 insertions(+), 116 deletions(-)
diff --git a/docs/multi-stage-query/reference.md
b/docs/multi-stage-query/reference.md
index b28b3a2f5b..3e54abfce8 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -425,7 +425,6 @@ The following table describes error codes you may encounter
in the `multiStageQu
| <a name="error_ColumnTypeNotSupported">`ColumnTypeNotSupported`</a> | The
column type is not supported. This can be because:<br /> <br /><ul><li>Support
for writing or reading from a particular column type is not
supported.</li><li>The query attempted to use a column type that is not
supported by the frame format. This occurs with ARRAY types, which are not yet
implemented for frames.</li></ul> | `columnName`: The column name with an
unsupported type.<br /> <br />`columnType`: The unkn [...]
| <a
name="error_InsertCannotAllocateSegment">`InsertCannotAllocateSegment`</a> |
The controller task could not allocate a new segment ID due to conflict with
existing segments or pending segments. Common reasons for such conflicts:<br />
<br /><ul><li>Attempting to mix different granularities in the same intervals
of the same datasource.</li><li>Prior ingestions that used non-extendable shard
specs.</li></ul>| `dataSource`<br /> <br />`interval`: The interval for the
attempted new segme [...]
| <a name="error_InsertCannotBeEmpty">`InsertCannotBeEmpty`</a> | An INSERT or
REPLACE query did not generate any output rows in a situation where output rows
are required for success. This can happen for INSERT or REPLACE queries with
`PARTITIONED BY` set to something other than `ALL` or `ALL TIME`. |
`dataSource` |
-| <a
name="error_InsertCannotOrderByDescending">`InsertCannotOrderByDescending`</a>
| An INSERT query contained a `CLUSTERED BY` expression in descending order.
Druid's segment generation code only supports ascending order. | `columnName` |
| <a name="error_InsertLockPreempted">`InsertLockPreempted`</a> | An INSERT or
REPLACE query was canceled by a higher-priority ingestion job, such as a
real-time ingestion task. | |
| <a name="error_InsertTimeNull">`InsertTimeNull`</a> | An INSERT or REPLACE
query encountered a null timestamp in the `__time` field.<br /><br />This can
happen due to using an expression like `TIME_PARSE(timestamp) AS __time` with a
timestamp that cannot be parsed.
([`TIME_PARSE`](../querying/sql-scalar.md#date-and-time-functions) returns null
when it cannot parse a timestamp.) In this case, try parsing your timestamps
using a different function or pattern. Or, if your timestamps may g [...]
| <a name="error_InsertTimeOutOfBounds">`InsertTimeOutOfBounds`</a> | A
REPLACE query generated a timestamp outside the bounds of the TIMESTAMP
parameter for your OVERWRITE WHERE clause.<br /> <br />To avoid this error,
verify that the you specified is valid. | `interval`: time chunk interval
corresponding to the out-of-bounds timestamp |
@@ -449,3 +448,4 @@ The following table describes error codes you may encounter
in the `multiStageQu
| <a name="error_WorkerFailed">`WorkerFailed`</a> | A worker task failed
unexpectedly. | `errorMsg`<br /><br />`workerTaskId`: The ID of the worker
task. |
| <a name="error_WorkerRpcFailed">`WorkerRpcFailed`</a> | A remote procedure
call to a worker task failed and could not recover. | `workerTaskId`: the id of
the worker task |
| <a name="error_UnknownError">`UnknownError`</a> | All other errors. |
`message` |
+| <a
name="error_InsertCannotOrderByDescending">`InsertCannotOrderByDescending`</a>
| Deprecated. An INSERT query contained a `CLUSTERED BY` expression in
descending order. Druid's segment generation code only supports ascending
order. The query returns a `ValidationException` instead of the fault. |
`columnName` |
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index a5ff643e8b..dd163d406d 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -101,7 +101,6 @@ import
org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.indexing.error.FaultsExceededChecker;
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
-import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
@@ -1854,10 +1853,6 @@ public class ControllerImpl implements Controller
// Such fields in CLUSTERED BY still control partitioning as expected, but
do not affect sort order of rows
// within an individual segment.
for (final KeyColumn clusterByColumn : queryClusterBy.getColumns()) {
- if (clusterByColumn.order() == KeyOrder.DESCENDING) {
- throw new MSQException(new
InsertCannotOrderByDescendingFault(clusterByColumn.columnName()));
- }
-
final IntList outputColumns =
columnMappings.getOutputColumnsForQueryColumn(clusterByColumn.columnName());
for (final int outputColumn : outputColumns) {
outputColumnsInOrder.add(columnMappings.getOutputColumnName(outputColumn));
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
index 8cc7ab35bd..59300316d5 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
@@ -45,7 +45,6 @@ import
org.apache.druid.msq.indexing.error.ColumnTypeNotSupportedFault;
import org.apache.druid.msq.indexing.error.DurableStorageConfigurationFault;
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
-import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
@@ -113,7 +112,6 @@ public class MSQIndexingModule implements DruidModule
DurableStorageConfigurationFault.class,
InsertCannotAllocateSegmentFault.class,
InsertCannotBeEmptyFault.class,
- InsertCannotOrderByDescendingFault.class,
InsertLockPreemptedFault.class,
InsertTimeNullFault.class,
InsertTimeOutOfBoundsFault.class,
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotOrderByDescendingFault.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotOrderByDescendingFault.java
deleted file mode 100644
index 43b50e8782..0000000000
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotOrderByDescendingFault.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.msq.indexing.error;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-
-import java.util.Objects;
-
-@JsonTypeName(InsertCannotOrderByDescendingFault.CODE)
-public class InsertCannotOrderByDescendingFault extends BaseMSQFault
-{
- static final String CODE = "InsertCannotOrderByDescending";
-
- private final String columnName;
-
- @JsonCreator
- public InsertCannotOrderByDescendingFault(
- @JsonProperty("columnName") final String columnName
- )
- {
- super(CODE, "Cannot ingest column [%s] in descending order", columnName);
- this.columnName = Preconditions.checkNotNull(columnName, "columnName");
- }
-
- @JsonProperty
- public String getColumnName()
- {
- return columnName;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- InsertCannotOrderByDescendingFault that =
(InsertCannotOrderByDescendingFault) o;
- return Objects.equals(columnName, that.columnName);
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(super.hashCode(), columnName);
- }
-}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
index c1da6ebc8e..646286acaf 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
@@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
-import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
@@ -92,23 +91,6 @@ public class MSQFaultsTest extends MSQTestBase
.verifyResults();
}
- @Test
- public void testInsertCannotOrderByDescendingFault()
- {
- RowSignature rowSignature = RowSignature.builder()
- .add("__time", ColumnType.LONG)
- .add("dim1", ColumnType.STRING)
- .add("cnt",
ColumnType.LONG).build();
-
- // Add an DESC clustered by column, which should not be allowed
- testIngestQuery().setSql(
- "insert into foo1 select __time, dim1 , count(*) as
cnt from foo where dim1 is not null and __time < TIMESTAMP '2000-01-02
00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1 DESC")
- .setExpectedDataSource("foo1")
- .setExpectedRowSignature(rowSignature)
- .setExpectedMSQFault(new
InsertCannotOrderByDescendingFault("d1"))
- .verifyResults();
- }
-
@Test
public void testInsertTimeOutOfBoundsFault()
{
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index 548e5e0166..b55de6c165 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -736,6 +736,25 @@ public class MSQInsertTest extends MSQTestBase
}
+ @Test
+ public void testInsertWithClusteredByDescendingThrowsException()
+ {
+ // Add a DESC clustered by column, which should not be allowed
+ testIngestQuery().setSql("INSERT INTO foo1 "
+ + "SELECT __time, dim1 , count(*) as cnt "
+ + "FROM foo "
+ + "GROUP BY 1, 2"
+ + "PARTITIONED BY DAY "
+ + "CLUSTERED BY dim1 DESC"
+ )
+ .setExpectedValidationErrorMatcher(CoreMatchers.allOf(
+ CoreMatchers.instanceOf(SqlPlanningException.class),
+
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
+ "[`dim1` DESC] is invalid. CLUSTERED BY columns
cannot be sorted in descending order."))
+ ))
+ .verifyPlanningErrors();
+ }
+
@Test
public void testRollUpOnFoo1WithTimeFunctionComplexCol()
{
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index eb1c459e66..1dfd774214 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -678,7 +678,7 @@ public class MSQReplaceTest extends MSQTestBase
}
@Test
- public void testInsertOnFoo1Range()
+ public void testReplaceOnFoo1Range()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
@@ -731,6 +731,25 @@ public class MSQReplaceTest extends MSQTestBase
.verifyResults();
}
+ @Test
+ public void testReplaceWithClusteredByDescendingThrowsException()
+ {
+ // Add a DESC clustered by column, which should not be allowed
+ testIngestQuery().setSql(" REPLACE INTO foobar "
+ + "OVERWRITE ALL "
+ + "SELECT __time, m1, m2 "
+ + "FROM foo "
+ + "PARTITIONED BY ALL TIME "
+ + "CLUSTERED BY m2, m1 DESC"
+ )
+ .setExpectedValidationErrorMatcher(CoreMatchers.allOf(
+ CoreMatchers.instanceOf(SqlPlanningException.class),
+
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
+ "[`m1` DESC] is invalid. CLUSTERED BY columns
cannot be sorted in descending order."))
+ ))
+ .verifyPlanningErrors();
+ }
+
@Test
public void testReplaceTombstonesOverPartiallyOverlappingSegments()
{
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
index 256397e9a2..484989e7ba 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
@@ -56,7 +56,6 @@ public class MSQFaultSerdeTest
assertFaultSerde(new ColumnNameRestrictedFault("the column"));
assertFaultSerde(new InsertCannotAllocateSegmentFault("the datasource",
Intervals.ETERNITY));
assertFaultSerde(new InsertCannotBeEmptyFault("the datasource"));
- assertFaultSerde(new InsertCannotOrderByDescendingFault("the column"));
assertFaultSerde(InsertLockPreemptedFault.INSTANCE);
assertFaultSerde(InsertTimeNullFault.INSTANCE);
assertFaultSerde(new InsertTimeOutOfBoundsFault(Intervals.ETERNITY));
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
index cc67cff119..9009237b78 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
@@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.parser;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
@@ -256,9 +257,12 @@ public class DruidSqlParserUtils
* @param query sql query
* @param clusteredByList List of clustered by columns
* @return SqlOrderBy node containing the clusteredByList information
+ * @throws ValidationException if any of the clustered by columns contain
DESCENDING order.
*/
public static SqlOrderBy convertClusterByToOrderBy(SqlNode query,
SqlNodeList clusteredByList)
+ throws ValidationException
{
+ validateClusteredByColumns(clusteredByList);
// If we have a CLUSTERED BY clause, extract the information in that
CLUSTERED BY and create a new
// SqlOrderBy node
SqlNode offset = null;
@@ -266,9 +270,9 @@ public class DruidSqlParserUtils
if (query instanceof SqlOrderBy) {
SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
- // This represents the underlying query free of OFFSET, FETCH and ORDER
BY clauses
- // For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo OFFSET
10 FETCH 30 ORDER BY dim1 GROUP
- // BY dim1 this would represent the "SELECT dim1, sum(dim2) from foo
GROUP BY dim1
+ // query represents the underlying query free of OFFSET, FETCH and ORDER
BY clauses
+ // For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo GROUP BY
dim1 ORDER BY dim1 FETCH 30 OFFSET 10",
+ // this would represent the "SELECT dim1, sum(dim2) from foo GROUP BY
dim1
query = sqlOrderBy.query;
offset = sqlOrderBy.offset;
fetch = sqlOrderBy.fetch;
@@ -283,6 +287,29 @@ public class DruidSqlParserUtils
);
}
+ /**
+ * Validates the clustered by columns to ensure that it does not contain
DESCENDING order columns.
+ *
+ * @param clusteredByNodes List of SqlNodes representing columns to be
clustered by.
+ * @throws ValidationException if any of the clustered by columns contain
DESCENDING order.
+ */
+ public static void validateClusteredByColumns(final SqlNodeList
clusteredByNodes) throws ValidationException
+ {
+ if (clusteredByNodes == null) {
+ return;
+ }
+
+ for (final SqlNode clusteredByNode : clusteredByNodes.getList()) {
+ if (clusteredByNode.isA(ImmutableSet.of(SqlKind.DESCENDING))) {
+ throw new ValidationException(
+ StringUtils.format("[%s] is invalid."
+ + " CLUSTERED BY columns cannot be sorted in
descending order.", clusteredByNode.toString()
+ )
+ );
+ }
+ }
+ }
+
/**
* This method is used to convert an {@link SqlNode} representing a query
into a {@link DimFilter} for the same query.
* It takes the timezone as a separate parameter, as Sql timestamps don't
contain that information. Supported functions
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
index 641b0ea49b..aeabf5241a 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
@@ -653,11 +653,11 @@ public class CalciteInsertDmlTest extends
CalciteIngestionDmlTest
skipVectorize();
final String resources =
"[{\"name\":\"dst\",\"type\":\"DATASOURCE\"},{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]";
- final String attributes =
"{\"statementType\":\"INSERT\",\"targetDataSource\":\"druid.dst\",\"partitionedBy\":\"DAY\",\"clusteredBy\":\"2,
`dim1` DESC, CEIL(`m2`)\"}";
+ final String attributes =
"{\"statementType\":\"INSERT\",\"targetDataSource\":\"druid.dst\",\"partitionedBy\":\"DAY\",\"clusteredBy\":\"2,
`dim1`, CEIL(`m2`)\"}";
final String sql = "EXPLAIN PLAN FOR INSERT INTO druid.dst "
+ "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2)
as ceil_m2 FROM foo "
- + "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2,
dim1 DESC, CEIL(m2)";
+ + "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2,
dim1, CEIL(m2)";
ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper();
final ScanQuery expectedQuery = newScanQueryBuilder()
@@ -671,7 +671,7 @@ public class CalciteInsertDmlTest extends
CalciteIngestionDmlTest
.orderBy(
ImmutableList.of(
new ScanQuery.OrderBy("v0", ScanQuery.Order.ASCENDING),
- new ScanQuery.OrderBy("dim1", ScanQuery.Order.DESCENDING),
+ new ScanQuery.OrderBy("dim1", ScanQuery.Order.ASCENDING),
new ScanQuery.OrderBy("v1", ScanQuery.Order.ASCENDING)
)
)
@@ -718,7 +718,7 @@ public class CalciteInsertDmlTest extends
CalciteIngestionDmlTest
+
"\"virtualColumns\":[{\"type\":\"expression\",\"name\":\"v0\",\"expression\":\"floor(\\\"m1\\\")\",\"outputType\":\"FLOAT\"},"
+
"{\"type\":\"expression\",\"name\":\"v1\",\"expression\":\"ceil(\\\"m2\\\")\",\"outputType\":\"DOUBLE\"}],"
+ "\"resultFormat\":\"compactedList\","
- +
"\"orderBy\":[{\"columnName\":\"v0\",\"order\":\"ascending\"},{\"columnName\":\"dim1\",\"order\":\"descending\"},"
+ +
"\"orderBy\":[{\"columnName\":\"v0\",\"order\":\"ascending\"},{\"columnName\":\"dim1\",\"order\":\"ascending\"},"
+
"{\"columnName\":\"v1\",\"order\":\"ascending\"}],\"columns\":[\"__time\",\"dim1\",\"v0\",\"v1\"],\"legacy\":false,"
+
"\"context\":{\"sqlInsertSegmentGranularity\":\"\\\"DAY\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"granularity\":{\"type\":\"all\"}},"
+
"\"signature\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"v0\",\"type\":\"FLOAT\"},{\"name\":\"dim1\",\"type\":\"STRING\"},"
@@ -751,6 +751,24 @@ public class CalciteInsertDmlTest extends
CalciteIngestionDmlTest
didTest = true;
}
+ @Test
+ public void testExplainPlanInsertWithClusteredByDescThrowsException()
+ {
+ skipVectorize();
+
+ final String sql = "EXPLAIN PLAN FOR INSERT INTO druid.dst "
+ + "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2)
as ceil_m2 FROM foo "
+ + "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2,
dim1 DESC, CEIL(m2)";
+
+ testIngestionQuery()
+ .sql(sql)
+ .expectValidationError(
+ SqlPlanningException.class,
+ "[`dim1` DESC] is invalid. CLUSTERED BY columns cannot be sorted
in descending order."
+ )
+ .verify();
+ }
+
@Test
public void testInsertWithClusteredBy()
{
@@ -765,7 +783,7 @@ public class CalciteInsertDmlTest extends
CalciteIngestionDmlTest
.sql(
"INSERT INTO druid.dst "
+ "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2
FROM foo "
- + "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2, dim1 DESC,
CEIL(m2)"
+ + "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2, dim1,
CEIL(m2)"
)
.expectTarget("dst", targetRowSignature)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
@@ -781,7 +799,7 @@ public class CalciteInsertDmlTest extends
CalciteIngestionDmlTest
.orderBy(
ImmutableList.of(
new ScanQuery.OrderBy("v0", ScanQuery.Order.ASCENDING),
- new ScanQuery.OrderBy("dim1",
ScanQuery.Order.DESCENDING),
+ new ScanQuery.OrderBy("dim1",
ScanQuery.Order.ASCENDING),
new ScanQuery.OrderBy("v1", ScanQuery.Order.ASCENDING)
)
)
@@ -1052,7 +1070,7 @@ public class CalciteInsertDmlTest extends
CalciteIngestionDmlTest
final String query = "EXPLAIN PLAN FOR INSERT INTO druid.dst "
+ "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2)
as ceil_m2 FROM foo "
- + "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2,
dim1 DESC, CEIL(m2)";
+ + "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2,
dim1 ASC, CEIL(m2)";
ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper();
final ScanQuery expectedQuery = newScanQueryBuilder()
@@ -1066,7 +1084,7 @@ public class CalciteInsertDmlTest extends
CalciteIngestionDmlTest
.orderBy(
ImmutableList.of(
new ScanQuery.OrderBy("v0", ScanQuery.Order.ASCENDING),
- new ScanQuery.OrderBy("dim1", ScanQuery.Order.DESCENDING),
+ new ScanQuery.OrderBy("dim1", ScanQuery.Order.ASCENDING),
new ScanQuery.OrderBy("v1", ScanQuery.Order.ASCENDING)
)
)
@@ -1091,7 +1109,7 @@ public class CalciteInsertDmlTest extends
CalciteIngestionDmlTest
+
"\"virtualColumns\":[{\"type\":\"expression\",\"name\":\"v0\",\"expression\":\"floor(\\\"m1\\\")\",\"outputType\":\"FLOAT\"},"
+
"{\"type\":\"expression\",\"name\":\"v1\",\"expression\":\"ceil(\\\"m2\\\")\",\"outputType\":\"DOUBLE\"}],"
+ "\"resultFormat\":\"compactedList\","
- +
"\"orderBy\":[{\"columnName\":\"v0\",\"order\":\"ascending\"},{\"columnName\":\"dim1\",\"order\":\"descending\"},"
+ +
"\"orderBy\":[{\"columnName\":\"v0\",\"order\":\"ascending\"},{\"columnName\":\"dim1\",\"order\":\"ascending\"},"
+
"{\"columnName\":\"v1\",\"order\":\"ascending\"}],\"columns\":[\"__time\",\"dim1\",\"v0\",\"v1\"],\"legacy\":false,"
+
"\"context\":{\"sqlInsertSegmentGranularity\":\"\\\"DAY\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"granularity\":{\"type\":\"all\"}},"
+
"\"signature\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"v0\",\"type\":\"FLOAT\"},{\"name\":\"dim1\",\"type\":\"STRING\"},"
@@ -1101,7 +1119,7 @@ public class CalciteInsertDmlTest extends
CalciteIngestionDmlTest
+ "}]";
final String resources =
"[{\"name\":\"dst\",\"type\":\"DATASOURCE\"},{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]";
- final String attributes =
"{\"statementType\":\"INSERT\",\"targetDataSource\":\"druid.dst\",\"partitionedBy\":\"DAY\",\"clusteredBy\":\"2,
`dim1` DESC, CEIL(`m2`)\"}";
+ final String attributes =
"{\"statementType\":\"INSERT\",\"targetDataSource\":\"druid.dst\",\"partitionedBy\":\"DAY\",\"clusteredBy\":\"2,
`dim1`, CEIL(`m2`)\"}";
// Use testQuery for EXPLAIN (not testIngestionQuery).
testQuery(
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
index fb922ffe77..0c1f016600 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
@@ -765,6 +765,26 @@ public class CalciteReplaceDmlTest extends
CalciteIngestionDmlTest
didTest = true;
}
+
+ @Test
+ public void testExplainPlanReplaceWithClusteredByDescThrowsException()
+ {
+ skipVectorize();
+
+ final String sql = "EXPLAIN PLAN FOR"
+ + " REPLACE INTO dst"
+ + " OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01
00:00:00' AND __time < TIMESTAMP '2000-01-02 00:00:00' "
+ + "SELECT * FROM foo PARTITIONED BY DAY CLUSTERED BY
dim1 DESC";
+
+ testIngestionQuery()
+ .sql(sql)
+ .expectValidationError(
+ SqlPlanningException.class,
+ "[`dim1` DESC] is invalid. CLUSTERED BY columns cannot be sorted
in descending order."
+ )
+ .verify();
+ }
+
@Test
public void testExplainReplaceFromExternalUnauthorized()
{
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtilsTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtilsTest.java
index 75c7f03994..1f295ea358 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtilsTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtilsTest.java
@@ -21,13 +21,17 @@ package org.apache.druid.sql.calcite.parser;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlPostfixOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.tools.ValidationException;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import
org.apache.druid.sql.calcite.expression.builtin.TimeFloorOperatorConversion;
@@ -119,7 +123,74 @@ public class DruidSqlParserUtilsTest
}
}
- public static class FloorToGranularityConversionTestErrors
+ public static class ClusteredByColumnsValidationTest
+ {
+ /**
+ * Tests an empty CLUSTERED BY clause
+ */
+ @Test
+ public void testEmptyClusteredByColumnsValid()
+ {
+ final SqlNodeList clusteredByArgs = new SqlNodeList(SqlParserPos.ZERO);
+ try {
+ DruidSqlParserUtils.validateClusteredByColumns(clusteredByArgs);
+ }
+ catch (ValidationException e) {
+ Assert.fail("Did not expect an exception" + e.getMessage());
+ }
+ }
+
+ /**
+ * Tests clause "CLUSTERED BY DIM1, DIM2 ASC, 3"
+ */
+ @Test
+ public void testClusteredByColumnsValid()
+ {
+ final SqlNodeList clusteredByArgs = new SqlNodeList(SqlParserPos.ZERO);
+ clusteredByArgs.add(new SqlIdentifier("DIM1", SqlParserPos.ZERO));
+ clusteredByArgs.add(new SqlIdentifier("DIM2 ASC", SqlParserPos.ZERO));
+ clusteredByArgs.add(SqlLiteral.createExactNumeric("3",
SqlParserPos.ZERO));
+
+ try {
+ DruidSqlParserUtils.validateClusteredByColumns(clusteredByArgs);
+ }
+ catch (ValidationException e) {
+ Assert.fail("Did not expect an exception" + e.getMessage());
+ }
+ }
+
+ /**
+ * Tests clause "CLUSTERED BY DIM1, DIM2 ASC, 3, DIM4 DESC"
+ */
+ @Test
+ public void testClusteredByColumnsWithDescThrowsException()
+ {
+ final SqlNodeList clusteredByArgs = new SqlNodeList(SqlParserPos.ZERO);
+ clusteredByArgs.add(new SqlIdentifier("DIM1", SqlParserPos.ZERO));
+ clusteredByArgs.add(new SqlIdentifier("DIM2 ASC", SqlParserPos.ZERO));
+ clusteredByArgs.add(SqlLiteral.createExactNumeric("3",
SqlParserPos.ZERO));
+
+ final SqlBasicCall sqlBasicCall = new SqlBasicCall(
+ new SqlPostfixOperator("DESC", SqlKind.DESCENDING, 2, null, null,
null),
+ new SqlNode[]{
+ new SqlIdentifier("DIM4", SqlParserPos.ZERO)
+ },
+ new SqlParserPos(0, 3)
+ );
+ clusteredByArgs.add(sqlBasicCall);
+
+ ValidationException e = Assert.assertThrows(
+ ValidationException.class,
+ () -> DruidSqlParserUtils.validateClusteredByColumns(clusteredByArgs)
+ );
+ Assert.assertEquals(
+ "[`DIM4` DESC] is invalid. CLUSTERED BY columns cannot be sorted in
descending order.",
+ e.getMessage()
+ );
+ }
+ }
+
+ public static class FloorToGranularityConversionErrorsTest
{
/**
* Tests clause like "PARTITIONED BY 'day'"
diff --git
a/sql/src/test/resources/calcite/expected/ingest/insertWithClusteredBy-logicalPlan.txt
b/sql/src/test/resources/calcite/expected/ingest/insertWithClusteredBy-logicalPlan.txt
index eb2d8501d5..9eb25a81ec 100644
---
a/sql/src/test/resources/calcite/expected/ingest/insertWithClusteredBy-logicalPlan.txt
+++
b/sql/src/test/resources/calcite/expected/ingest/insertWithClusteredBy-logicalPlan.txt
@@ -1,4 +1,4 @@
-LogicalInsert(target=[druid.dst], partitionedBy=[{type=period, period=P1D,
timeZone=UTC, origin=null}], clusteredBy=[2, `dim1` DESC, CEIL(`m2`)])
- LogicalSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[ASC], dir1=[DESC],
dir2=[ASC])
+LogicalInsert(target=[druid.dst], partitionedBy=[{type=period, period=P1D,
timeZone=UTC, origin=null}], clusteredBy=[2, `dim1`, CEIL(`m2`)])
+ LogicalSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[ASC], dir1=[ASC],
dir2=[ASC])
LogicalProject(__time=[$0], floor_m1=[FLOOR($5)], dim1=[$1],
ceil_m2=[CEIL($6)])
LogicalTableScan(table=[[druid, foo]])
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]