This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new fcb1c0b7bf Add cluster by support for replace syntax (#12524)
fcb1c0b7bf is described below
commit fcb1c0b7bfc737d76e7dd956b5f4ff49055007c7
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Tue May 17 15:15:29 2022 +0530
Add cluster by support for replace syntax (#12524)
* Add cluster by support for replace syntax
* Add unit test for with list
---
sql/src/main/codegen/includes/common.ftl | 19 ++++++
sql/src/main/codegen/includes/insert.ftl | 19 ------
sql/src/main/codegen/includes/replace.ftl | 7 ++-
.../sql/calcite/parser/DruidSqlParserUtils.java | 34 +++++++++++
.../druid/sql/calcite/parser/DruidSqlReplace.java | 21 +++++++
.../druid/sql/calcite/planner/DruidPlanner.java | 29 ++--------
.../druid/sql/calcite/CalciteReplaceDmlTest.java | 67 +++++++++++++++++++++-
.../sql/calcite/parser/DruidSqlUnparseTest.java | 10 ++--
8 files changed, 156 insertions(+), 50 deletions(-)
diff --git a/sql/src/main/codegen/includes/common.ftl
b/sql/src/main/codegen/includes/common.ftl
index 136492482c..5b74df6332 100644
--- a/sql/src/main/codegen/includes/common.ftl
+++ b/sql/src/main/codegen/includes/common.ftl
@@ -72,3 +72,22 @@ org.apache.druid.java.util.common.Pair<Granularity, String>
PartitionGranularity
return new org.apache.druid.java.util.common.Pair(granularity,
unparseString);
}
}
+
+SqlNodeList ClusterItems() :
+{
+ List<SqlNode> list;
+ final Span s;
+ SqlNode e;
+}
+{
+ e = OrderItem() {
+ s = span();
+ list = startList(e);
+ }
+ (
+ LOOKAHEAD(2) <COMMA> e = OrderItem() { list.add(e); }
+ )*
+ {
+ return new SqlNodeList(list, s.addAll(list).pos());
+ }
+}
diff --git a/sql/src/main/codegen/includes/insert.ftl
b/sql/src/main/codegen/includes/insert.ftl
index 6f7c1148f5..3eac7d19b2 100644
--- a/sql/src/main/codegen/includes/insert.ftl
+++ b/sql/src/main/codegen/includes/insert.ftl
@@ -51,22 +51,3 @@ SqlNode DruidSqlInsertEof() :
return new DruidSqlInsert(sqlInsert, partitionedBy.lhs, partitionedBy.rhs,
clusteredBy);
}
}
-
-SqlNodeList ClusterItems() :
-{
- List<SqlNode> list;
- final Span s;
- SqlNode e;
-}
-{
- e = OrderItem() {
- s = span();
- list = startList(e);
- }
- (
- LOOKAHEAD(2) <COMMA> e = OrderItem() { list.add(e); }
- )*
- {
- return new SqlNodeList(list, s.addAll(list).pos());
- }
-}
diff --git a/sql/src/main/codegen/includes/replace.ftl
b/sql/src/main/codegen/includes/replace.ftl
index b8b0199d70..20c9aac9a8 100644
--- a/sql/src/main/codegen/includes/replace.ftl
+++ b/sql/src/main/codegen/includes/replace.ftl
@@ -27,6 +27,7 @@ SqlNode DruidSqlReplaceEof() :
SqlInsert sqlInsert;
// Using fully qualified name for Pair class, since Calcite also has a
same class name being used in the Parser.jj
org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy
= new org.apache.druid.java.util.common.Pair(null, null);
+ SqlNodeList clusteredBy = null;
final Pair<SqlNodeList, SqlNodeList> p;
SqlNode replaceTimeQuery = null;
}
@@ -51,6 +52,10 @@ SqlNode DruidSqlReplaceEof() :
<PARTITIONED> <BY>
partitionedBy = PartitionGranularity()
]
+ [
+ <CLUSTERED> <BY>
+ clusteredBy = ClusterItems()
+ ]
// EOF is also present in SqlStmtEof but EOF is a special case and a
single EOF can be consumed multiple times.
// The reason for adding EOF here is to ensure that we create a
DruidSqlReplace node after the syntax has been
// validated and throw SQL syntax errors before performing validations in
the DruidSqlReplace which can overshadow the
@@ -58,7 +63,7 @@ SqlNode DruidSqlReplaceEof() :
<EOF>
{
sqlInsert = new SqlInsert(s.end(source), SqlNodeList.EMPTY, table,
source, columnList);
- return new DruidSqlReplace(sqlInsert, partitionedBy.lhs,
partitionedBy.rhs, replaceTimeQuery);
+ return new DruidSqlReplace(sqlInsert, partitionedBy.lhs,
partitionedBy.rhs, clusteredBy, replaceTimeQuery);
}
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
index 2ef587e3b0..3100fbb880 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
@@ -28,6 +28,8 @@ import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOrderBy;
import org.apache.calcite.sql.SqlTimestampLiteral;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.java.util.common.StringUtils;
@@ -245,6 +247,38 @@ public class DruidSqlParserUtils
.collect(Collectors.toList());
}
+ /**
+ * Extracts and converts the information in the CLUSTERED BY clause to a new
SqlOrderBy node.
+ *
+ * @param query sql query
+ * @param clusteredByList List of clustered by columns
+ * @return SqlOrderBy node containing the clusteredByList information
+ */
+ public static SqlOrderBy convertClusterByToOrderBy(SqlNode query,
SqlNodeList clusteredByList)
+ {
+ // If we have a CLUSTERED BY clause, extract the information in that
CLUSTERED BY and create a new
+ // SqlOrderBy node
+ SqlNode offset = null;
+ SqlNode fetch = null;
+
+ if (query instanceof SqlOrderBy) {
+ SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
+ // This represents the underlying query free of OFFSET, FETCH and ORDER
BY clauses
+ // For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo OFFSET
10 FETCH 30 ORDER BY dim1 GROUP
+ // BY dim1 this would represent the "SELECT dim1, sum(dim2) from foo
GROUP BY dim1
+ query = sqlOrderBy.query;
+ offset = sqlOrderBy.offset;
+ fetch = sqlOrderBy.fetch;
+ }
+ // Creates a new SqlOrderBy query, which may have our CLUSTERED BY
overwritten
+ return new SqlOrderBy(
+ query.getParserPosition(),
+ query,
+ clusteredByList,
+ offset,
+ fetch
+ );
+ }
/**
* This method is used to convert an {@link SqlNode} representing a query
into a {@link DimFilter} for the same query.
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
index d1860600e8..fb41bf7656 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
@@ -51,6 +51,9 @@ public class DruidSqlReplace extends SqlInsert
private final SqlNode replaceTimeQuery;
+ @Nullable
+ private final SqlNodeList clusteredBy;
+
/**
* While partitionedBy and partitionedByStringForUnparse can be null as
arguments to the constructor, this is
* disallowed (semantically) and the constructor performs checks to ensure
that. This helps in producing friendly
@@ -61,6 +64,7 @@ public class DruidSqlReplace extends SqlInsert
@Nonnull SqlInsert insertNode,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
+ @Nullable SqlNodeList clusteredBy,
@Nullable SqlNode replaceTimeQuery
) throws ParseException
{
@@ -82,6 +86,8 @@ public class DruidSqlReplace extends SqlInsert
this.partitionedByStringForUnparse =
Preconditions.checkNotNull(partitionedByStringForUnparse);
this.replaceTimeQuery = replaceTimeQuery;
+
+ this.clusteredBy = clusteredBy;
}
public SqlNode getReplaceTimeQuery()
@@ -94,6 +100,12 @@ public class DruidSqlReplace extends SqlInsert
return partitionedBy;
}
+ @Nullable
+ public SqlNodeList getClusteredBy()
+ {
+ return clusteredBy;
+ }
+
@Nonnull
@Override
public SqlOperator getOperator()
@@ -128,5 +140,14 @@ public class DruidSqlReplace extends SqlInsert
writer.keyword("PARTITIONED BY");
writer.keyword(partitionedByStringForUnparse);
+
+ if (getClusteredBy() != null) {
+ writer.keyword("CLUSTERED BY");
+ SqlWriter.Frame frame = writer.startList("", "");
+ for (SqlNode clusterByOpts : getClusteredBy().getList()) {
+ clusterByOpts.unparse(writer, leftPrec, rightPrec);
+ }
+ writer.endList(frame);
+ }
}
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
index 7680c7a335..01b7b03d57 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
@@ -852,28 +852,7 @@ public class DruidPlanner implements Closeable
Granularity ingestionGranularity = druidSqlInsert.getPartitionedBy();
if (druidSqlInsert.getClusteredBy() != null) {
- // If we have a CLUSTERED BY clause, extract the information in that
CLUSTERED BY and create a new
- // SqlOrderBy node
- SqlNode offset = null;
- SqlNode fetch = null;
-
- if (query instanceof SqlOrderBy) {
- SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
- // This represents the underlying query free of OFFSET, FETCH and
ORDER BY clauses
- // For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo
OFFSET 10 FETCH 30 ORDER BY dim1 GROUP
- // BY dim1 this would represent the "SELECT dim1, sum(dim2) from foo
GROUP BY dim1
- query = sqlOrderBy.query;
- offset = sqlOrderBy.offset;
- fetch = sqlOrderBy.fetch;
- }
- // Creates a new SqlOrderBy query, which may have our CLUSTERED BY
overwritten
- query = new SqlOrderBy(
- query.getParserPosition(),
- query,
- druidSqlInsert.getClusteredBy(),
- offset,
- fetch
- );
+ query = DruidSqlParserUtils.convertClusterByToOrderBy(query,
druidSqlInsert.getClusteredBy());
}
if (!query.isA(SqlKind.QUERY)) {
@@ -893,7 +872,7 @@ public class DruidPlanner implements Closeable
SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
SqlNodeList orderByList = sqlOrderBy.orderList;
if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) {
- throw new ValidationException("Cannot have ORDER BY on a REPLACE
query.");
+ throw new ValidationException("Cannot have ORDER BY on a REPLACE
query, use CLUSTERED BY instead.");
}
}
@@ -905,6 +884,10 @@ public class DruidPlanner implements Closeable
Granularity ingestionGranularity = druidSqlReplace.getPartitionedBy();
List<String> replaceIntervals =
DruidSqlParserUtils.validateQueryAndConvertToIntervals(replaceTimeQuery,
ingestionGranularity, dateTimeZone);
+ if (druidSqlReplace.getClusteredBy() != null) {
+ query = DruidSqlParserUtils.convertClusterByToOrderBy(query,
druidSqlReplace.getClusteredBy());
+ }
+
if (!query.isA(SqlKind.QUERY)) {
throw new ValidationException(StringUtils.format("Cannot execute
[%s].", query.getKind()));
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
index b3b0f97556..6877ce8042 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
@@ -36,6 +36,7 @@ import org.apache.druid.sql.SqlPlanningException;
import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
@@ -54,7 +55,7 @@ public class CalciteReplaceDmlTest extends
CalciteIngestionDmlTest
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
"{\"type\":\"all\"}",
DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS,
- "all"
+ DruidSqlParserUtils.ALL
);
protected Map<String, Object> addReplaceTimeChunkToQueryContext(Map<String,
Object> context, String replaceTimeChunks)
@@ -252,7 +253,7 @@ public class CalciteReplaceDmlTest extends
CalciteIngestionDmlTest
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo ORDER BY dim1
PARTITIONED BY ALL TIME")
- .expectValidationError(SqlPlanningException.class, "Cannot have ORDER
BY on a REPLACE query.")
+ .expectValidationError(SqlPlanningException.class, "Cannot have ORDER
BY on a REPLACE query, use CLUSTERED BY instead.")
.verify();
}
@@ -350,6 +351,29 @@ public class CalciteReplaceDmlTest extends
CalciteIngestionDmlTest
.verify();
}
+ @Test
+ public void testReplaceContainingWithList()
+ {
+ testIngestionQuery()
+ .sql("REPLACE INTO dst OVERWRITE ALL WITH foo_data AS (SELECT * FROM
foo) SELECT dim1, dim3 FROM foo_data PARTITIONED BY ALL TIME")
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("dst", RowSignature.builder()
+ .add("dim1", ColumnType.STRING)
+ .add("dim3", ColumnType.STRING)
+ .build()
+ )
+ .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("dim1", "dim3")
+ .context(REPLACE_ALL_TIME_CHUNKS)
+ .build()
+ )
+ .verify();
+ }
+
@Test
public void testReplaceIntoInvalidDataSourceName()
{
@@ -484,7 +508,7 @@ public class CalciteReplaceDmlTest extends
CalciteIngestionDmlTest
.context(
addReplaceTimeChunkToQueryContext(
queryContextWithGranularity(Granularities.DAY),
- "all"
+ DruidSqlParserUtils.ALL
)
)
.build()
@@ -492,6 +516,43 @@ public class CalciteReplaceDmlTest extends
CalciteIngestionDmlTest
.verify();
}
+ @Test
+ public void testReplaceWithClusteredBy()
+ {
+ // Test correctness of the query when CLUSTERED BY clause is present
+ RowSignature targetRowSignature = RowSignature.builder()
+ .add("__time",
ColumnType.LONG)
+ .add("floor_m1",
ColumnType.FLOAT)
+ .add("dim1",
ColumnType.STRING)
+ .build();
+
+ testIngestionQuery()
+ .sql(
+ "REPLACE INTO druid.dst OVERWRITE ALL SELECT __time, FLOOR(m1) as
floor_m1, dim1 FROM foo PARTITIONED BY DAY CLUSTERED BY 2, dim1")
+ .expectTarget("dst", targetRowSignature)
+ .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("__time", "dim1", "v0")
+ .virtualColumns(expressionVirtualColumn("v0", "floor(\"m1\")",
ColumnType.FLOAT))
+ .orderBy(
+ ImmutableList.of(
+ new ScanQuery.OrderBy("v0", ScanQuery.Order.ASCENDING),
+ new ScanQuery.OrderBy("dim1",
ScanQuery.Order.ASCENDING)
+ )
+ )
+ .context(
+ addReplaceTimeChunkToQueryContext(
+ queryContextWithGranularity(Granularities.DAY),
+ DruidSqlParserUtils.ALL)
+ )
+ .build()
+ )
+ .verify();
+ }
+
@Test
public void testReplaceWithPartitionedByContainingInvalidGranularity()
throws Exception
{
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlUnparseTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlUnparseTest.java
index 96a992f0dd..20ea22c67a 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlUnparseTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlUnparseTest.java
@@ -53,12 +53,13 @@ public class DruidSqlUnparseTest
@Test
public void testUnparseReplaceAll() throws ParseException
{
- String sqlQuery = "REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo
PARTITIONED BY ALL TIME";
+ String sqlQuery = "REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo
PARTITIONED BY ALL TIME CLUSTERED BY dim1";
String prettySqlQuery = "REPLACE INTO \"dst\"\n"
+ "OVERWRITE ALL\n"
+ "(SELECT *\n"
+ " FROM \"foo\")\n"
- + "PARTITIONED BY ALL TIME";
+ + "PARTITIONED BY ALL TIME "
+ + "CLUSTERED BY \"dim1\"";
DruidSqlParserImpl druidSqlParser = createTestParser(sqlQuery);
DruidSqlReplace druidSqlReplace = (DruidSqlReplace)
druidSqlParser.DruidSqlReplaceEof();
@@ -70,12 +71,13 @@ public class DruidSqlUnparseTest
@Test
public void testUnparseReplaceWhere() throws ParseException
{
- String sqlQuery = "REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP
'2000-01-01 00:00:00' AND __time < TIMESTAMP '2000-01-02 00:00:00' SELECT *
FROM foo PARTITIONED BY DAY";
+ String sqlQuery = "REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP
'2000-01-01 00:00:00' AND __time < TIMESTAMP '2000-01-02 00:00:00' SELECT *
FROM foo PARTITIONED BY DAY CLUSTERED BY dim1";
String prettySqlQuery = "REPLACE INTO \"dst\"\n"
+ "OVERWRITE \"__time\" >= TIMESTAMP '2000-01-01
00:00:00' AND \"__time\" < TIMESTAMP '2000-01-02 00:00:00'\n"
+ "(SELECT *\n"
+ " FROM \"foo\")\n"
- + "PARTITIONED BY DAY";
+ + "PARTITIONED BY DAY "
+ + "CLUSTERED BY \"dim1\"";
DruidSqlParserImpl druidSqlParser = createTestParser(sqlQuery);
DruidSqlReplace druidSqlReplace = (DruidSqlReplace)
druidSqlParser.DruidSqlReplaceEof();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]