This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new fcb1c0b7bf Add cluster by support for replace syntax (#12524)
fcb1c0b7bf is described below

commit fcb1c0b7bfc737d76e7dd956b5f4ff49055007c7
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Tue May 17 15:15:29 2022 +0530

    Add cluster by support for replace syntax (#12524)
    
    * Add cluster by support for replace syntax
    
    * Add unit test for with list
---
 sql/src/main/codegen/includes/common.ftl           | 19 ++++++
 sql/src/main/codegen/includes/insert.ftl           | 19 ------
 sql/src/main/codegen/includes/replace.ftl          |  7 ++-
 .../sql/calcite/parser/DruidSqlParserUtils.java    | 34 +++++++++++
 .../druid/sql/calcite/parser/DruidSqlReplace.java  | 21 +++++++
 .../druid/sql/calcite/planner/DruidPlanner.java    | 29 ++--------
 .../druid/sql/calcite/CalciteReplaceDmlTest.java   | 67 +++++++++++++++++++++-
 .../sql/calcite/parser/DruidSqlUnparseTest.java    | 10 ++--
 8 files changed, 156 insertions(+), 50 deletions(-)

diff --git a/sql/src/main/codegen/includes/common.ftl 
b/sql/src/main/codegen/includes/common.ftl
index 136492482c..5b74df6332 100644
--- a/sql/src/main/codegen/includes/common.ftl
+++ b/sql/src/main/codegen/includes/common.ftl
@@ -72,3 +72,22 @@ org.apache.druid.java.util.common.Pair<Granularity, String> 
PartitionGranularity
     return new org.apache.druid.java.util.common.Pair(granularity, 
unparseString);
   }
 }
+
+SqlNodeList ClusterItems() :
+{
+  List<SqlNode> list;
+  final Span s;
+  SqlNode e;
+}
+{
+  e = OrderItem() {
+    s = span();
+    list = startList(e);
+  }
+  (
+    LOOKAHEAD(2) <COMMA> e = OrderItem() { list.add(e); }
+  )*
+  {
+    return new SqlNodeList(list, s.addAll(list).pos());
+  }
+}
diff --git a/sql/src/main/codegen/includes/insert.ftl 
b/sql/src/main/codegen/includes/insert.ftl
index 6f7c1148f5..3eac7d19b2 100644
--- a/sql/src/main/codegen/includes/insert.ftl
+++ b/sql/src/main/codegen/includes/insert.ftl
@@ -51,22 +51,3 @@ SqlNode DruidSqlInsertEof() :
     return new DruidSqlInsert(sqlInsert, partitionedBy.lhs, partitionedBy.rhs, 
clusteredBy);
   }
 }
-
-SqlNodeList ClusterItems() :
-{
-  List<SqlNode> list;
-  final Span s;
-  SqlNode e;
-}
-{
-  e = OrderItem() {
-    s = span();
-    list = startList(e);
-  }
-  (
-    LOOKAHEAD(2) <COMMA> e = OrderItem() { list.add(e); }
-  )*
-  {
-    return new SqlNodeList(list, s.addAll(list).pos());
-  }
-}
diff --git a/sql/src/main/codegen/includes/replace.ftl 
b/sql/src/main/codegen/includes/replace.ftl
index b8b0199d70..20c9aac9a8 100644
--- a/sql/src/main/codegen/includes/replace.ftl
+++ b/sql/src/main/codegen/includes/replace.ftl
@@ -27,6 +27,7 @@ SqlNode DruidSqlReplaceEof() :
     SqlInsert sqlInsert;
     // Using fully qualified name for Pair class, since Calcite also has a 
same class name being used in the Parser.jj
     org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy 
= new org.apache.druid.java.util.common.Pair(null, null);
+    SqlNodeList clusteredBy = null;
     final Pair<SqlNodeList, SqlNodeList> p;
     SqlNode replaceTimeQuery = null;
 }
@@ -51,6 +52,10 @@ SqlNode DruidSqlReplaceEof() :
       <PARTITIONED> <BY>
       partitionedBy = PartitionGranularity()
     ]
+    [
+      <CLUSTERED> <BY>
+      clusteredBy = ClusterItems()
+    ]
     // EOF is also present in SqlStmtEof but EOF is a special case and a 
single EOF can be consumed multiple times.
     // The reason for adding EOF here is to ensure that we create a 
DruidSqlReplace node after the syntax has been
     // validated and throw SQL syntax errors before performing validations in 
the DruidSqlReplace which can overshadow the
@@ -58,7 +63,7 @@ SqlNode DruidSqlReplaceEof() :
     <EOF>
     {
         sqlInsert = new SqlInsert(s.end(source), SqlNodeList.EMPTY, table, 
source, columnList);
-        return new DruidSqlReplace(sqlInsert, partitionedBy.lhs, 
partitionedBy.rhs, replaceTimeQuery);
+        return new DruidSqlReplace(sqlInsert, partitionedBy.lhs, 
partitionedBy.rhs, clusteredBy, replaceTimeQuery);
     }
 }
 
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
index 2ef587e3b0..3100fbb880 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
@@ -28,6 +28,8 @@ import org.apache.calcite.sql.SqlIntervalQualifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOrderBy;
 import org.apache.calcite.sql.SqlTimestampLiteral;
 import org.apache.calcite.tools.ValidationException;
 import org.apache.druid.java.util.common.StringUtils;
@@ -245,6 +247,38 @@ public class DruidSqlParserUtils
         .collect(Collectors.toList());
   }
 
+  /**
+   * Extracts and converts the information in the CLUSTERED BY clause to a new 
SqlOrderBy node.
+   *
+   * @param query sql query
+   * @param clusteredByList List of clustered by columns
+   * @return SqlOrderBy node containing the clusteredByList information
+   */
+  public static SqlOrderBy convertClusterByToOrderBy(SqlNode query, 
SqlNodeList clusteredByList)
+  {
+    // If we have a CLUSTERED BY clause, extract the information in that 
CLUSTERED BY and create a new
+    // SqlOrderBy node
+    SqlNode offset = null;
+    SqlNode fetch = null;
+
+    if (query instanceof SqlOrderBy) {
+      SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
+      // This represents the underlying query free of OFFSET, FETCH and ORDER 
BY clauses
+      // For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo OFFSET 
10 FETCH 30 ORDER BY dim1 GROUP
+      // BY dim1 this would represent the "SELECT dim1, sum(dim2) from foo 
GROUP BY dim1
+      query = sqlOrderBy.query;
+      offset = sqlOrderBy.offset;
+      fetch = sqlOrderBy.fetch;
+    }
+    // Creates a new SqlOrderBy query, which may have our CLUSTERED BY 
overwritten
+    return new SqlOrderBy(
+        query.getParserPosition(),
+        query,
+        clusteredByList,
+        offset,
+        fetch
+    );
+  }
 
   /**
    * This method is used to convert an {@link SqlNode} representing a query 
into a {@link DimFilter} for the same query.
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
index d1860600e8..fb41bf7656 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
@@ -51,6 +51,9 @@ public class DruidSqlReplace extends SqlInsert
 
   private final SqlNode replaceTimeQuery;
 
+  @Nullable
+  private final SqlNodeList clusteredBy;
+
   /**
    * While partitionedBy and partitionedByStringForUnparse can be null as 
arguments to the constructor, this is
    * disallowed (semantically) and the constructor performs checks to ensure 
that. This helps in producing friendly
@@ -61,6 +64,7 @@ public class DruidSqlReplace extends SqlInsert
       @Nonnull SqlInsert insertNode,
       @Nullable Granularity partitionedBy,
       @Nullable String partitionedByStringForUnparse,
+      @Nullable SqlNodeList clusteredBy,
       @Nullable SqlNode replaceTimeQuery
   ) throws ParseException
   {
@@ -82,6 +86,8 @@ public class DruidSqlReplace extends SqlInsert
     this.partitionedByStringForUnparse = 
Preconditions.checkNotNull(partitionedByStringForUnparse);
 
     this.replaceTimeQuery = replaceTimeQuery;
+
+    this.clusteredBy = clusteredBy;
   }
 
   public SqlNode getReplaceTimeQuery()
@@ -94,6 +100,12 @@ public class DruidSqlReplace extends SqlInsert
     return partitionedBy;
   }
 
+  @Nullable
+  public SqlNodeList getClusteredBy()
+  {
+    return clusteredBy;
+  }
+
   @Nonnull
   @Override
   public SqlOperator getOperator()
@@ -128,5 +140,14 @@ public class DruidSqlReplace extends SqlInsert
 
     writer.keyword("PARTITIONED BY");
     writer.keyword(partitionedByStringForUnparse);
+
+    if (getClusteredBy() != null) {
+      writer.keyword("CLUSTERED BY");
+      SqlWriter.Frame frame = writer.startList("", "");
+      for (SqlNode clusterByOpts : getClusteredBy().getList()) {
+        clusterByOpts.unparse(writer, leftPrec, rightPrec);
+      }
+      writer.endList(frame);
+    }
   }
 }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
index 7680c7a335..01b7b03d57 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
@@ -852,28 +852,7 @@ public class DruidPlanner implements Closeable
       Granularity ingestionGranularity = druidSqlInsert.getPartitionedBy();
 
       if (druidSqlInsert.getClusteredBy() != null) {
-        // If we have a CLUSTERED BY clause, extract the information in that 
CLUSTERED BY and create a new
-        // SqlOrderBy node
-        SqlNode offset = null;
-        SqlNode fetch = null;
-
-        if (query instanceof SqlOrderBy) {
-          SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
-          // This represents the underlying query free of OFFSET, FETCH and 
ORDER BY clauses
-          // For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo 
OFFSET 10 FETCH 30 ORDER BY dim1 GROUP
-          // BY dim1 this would represent the "SELECT dim1, sum(dim2) from foo 
GROUP BY dim1
-          query = sqlOrderBy.query;
-          offset = sqlOrderBy.offset;
-          fetch = sqlOrderBy.fetch;
-        }
-        // Creates a new SqlOrderBy query, which may have our CLUSTERED BY 
overwritten
-        query = new SqlOrderBy(
-            query.getParserPosition(),
-            query,
-            druidSqlInsert.getClusteredBy(),
-            offset,
-            fetch
-        );
+        query = DruidSqlParserUtils.convertClusterByToOrderBy(query, 
druidSqlInsert.getClusteredBy());
       }
 
       if (!query.isA(SqlKind.QUERY)) {
@@ -893,7 +872,7 @@ public class DruidPlanner implements Closeable
         SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
         SqlNodeList orderByList = sqlOrderBy.orderList;
         if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) {
-          throw new ValidationException("Cannot have ORDER BY on a REPLACE 
query.");
+          throw new ValidationException("Cannot have ORDER BY on a REPLACE 
query, use CLUSTERED BY instead.");
         }
       }
 
@@ -905,6 +884,10 @@ public class DruidPlanner implements Closeable
       Granularity ingestionGranularity = druidSqlReplace.getPartitionedBy();
       List<String> replaceIntervals = 
DruidSqlParserUtils.validateQueryAndConvertToIntervals(replaceTimeQuery, 
ingestionGranularity, dateTimeZone);
 
+      if (druidSqlReplace.getClusteredBy() != null) {
+        query = DruidSqlParserUtils.convertClusterByToOrderBy(query, 
druidSqlReplace.getClusteredBy());
+      }
+
       if (!query.isA(SqlKind.QUERY)) {
         throw new ValidationException(StringUtils.format("Cannot execute 
[%s].", query.getKind()));
       }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
index b3b0f97556..6877ce8042 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
@@ -36,6 +36,7 @@ import org.apache.druid.sql.SqlPlanningException;
 import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
 import org.apache.druid.sql.calcite.filtration.Filtration;
 import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
 import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
@@ -54,7 +55,7 @@ public class CalciteReplaceDmlTest extends 
CalciteIngestionDmlTest
       DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
       "{\"type\":\"all\"}",
       DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS,
-      "all"
+      DruidSqlParserUtils.ALL
   );
 
   protected Map<String, Object> addReplaceTimeChunkToQueryContext(Map<String, 
Object> context, String replaceTimeChunks)
@@ -252,7 +253,7 @@ public class CalciteReplaceDmlTest extends 
CalciteIngestionDmlTest
   {
     testIngestionQuery()
         .sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo ORDER BY dim1 
PARTITIONED BY ALL TIME")
-        .expectValidationError(SqlPlanningException.class, "Cannot have ORDER 
BY on a REPLACE query.")
+        .expectValidationError(SqlPlanningException.class, "Cannot have ORDER 
BY on a REPLACE query, use CLUSTERED BY instead.")
         .verify();
   }
 
@@ -350,6 +351,29 @@ public class CalciteReplaceDmlTest extends 
CalciteIngestionDmlTest
         .verify();
   }
 
+  @Test
+  public void testReplaceContainingWithList()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst OVERWRITE ALL WITH foo_data AS (SELECT * FROM 
foo) SELECT dim1, dim3 FROM foo_data PARTITIONED BY ALL TIME")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("dst", RowSignature.builder()
+                                         .add("dim1", ColumnType.STRING)
+                                         .add("dim3", ColumnType.STRING)
+                                         .build()
+        )
+        .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("dim1", "dim3")
+                .context(REPLACE_ALL_TIME_CHUNKS)
+                .build()
+        )
+        .verify();
+  }
+
   @Test
   public void testReplaceIntoInvalidDataSourceName()
   {
@@ -484,7 +508,7 @@ public class CalciteReplaceDmlTest extends 
CalciteIngestionDmlTest
                 .context(
                     addReplaceTimeChunkToQueryContext(
                         queryContextWithGranularity(Granularities.DAY),
-                        "all"
+                        DruidSqlParserUtils.ALL
                     )
                 )
                 .build()
@@ -492,6 +516,43 @@ public class CalciteReplaceDmlTest extends 
CalciteIngestionDmlTest
         .verify();
   }
 
+  @Test
+  public void testReplaceWithClusteredBy()
+  {
+    // Test correctness of the query when CLUSTERED BY clause is present
+    RowSignature targetRowSignature = RowSignature.builder()
+                                                  .add("__time", 
ColumnType.LONG)
+                                                  .add("floor_m1", 
ColumnType.FLOAT)
+                                                  .add("dim1", 
ColumnType.STRING)
+                                                  .build();
+
+    testIngestionQuery()
+        .sql(
+            "REPLACE INTO druid.dst OVERWRITE ALL SELECT __time, FLOOR(m1) as 
floor_m1, dim1 FROM foo PARTITIONED BY DAY CLUSTERED BY 2, dim1")
+        .expectTarget("dst", targetRowSignature)
+        .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("__time", "dim1", "v0")
+                .virtualColumns(expressionVirtualColumn("v0", "floor(\"m1\")", 
ColumnType.FLOAT))
+                .orderBy(
+                    ImmutableList.of(
+                        new ScanQuery.OrderBy("v0", ScanQuery.Order.ASCENDING),
+                        new ScanQuery.OrderBy("dim1", 
ScanQuery.Order.ASCENDING)
+                    )
+                )
+                .context(
+                    addReplaceTimeChunkToQueryContext(
+                        queryContextWithGranularity(Granularities.DAY),
+                        DruidSqlParserUtils.ALL)
+                )
+                .build()
+        )
+        .verify();
+  }
+
   @Test
   public void testReplaceWithPartitionedByContainingInvalidGranularity() 
throws Exception
   {
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlUnparseTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlUnparseTest.java
index 96a992f0dd..20ea22c67a 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlUnparseTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlUnparseTest.java
@@ -53,12 +53,13 @@ public class DruidSqlUnparseTest
   @Test
   public void testUnparseReplaceAll() throws ParseException
   {
-    String sqlQuery = "REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo 
PARTITIONED BY ALL TIME";
+    String sqlQuery = "REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo 
PARTITIONED BY ALL TIME CLUSTERED BY dim1";
     String prettySqlQuery = "REPLACE INTO \"dst\"\n"
                             + "OVERWRITE ALL\n"
                             + "(SELECT *\n"
                             + "    FROM \"foo\")\n"
-                            + "PARTITIONED BY ALL TIME";
+                            + "PARTITIONED BY ALL TIME "
+                            + "CLUSTERED BY \"dim1\"";
 
     DruidSqlParserImpl druidSqlParser = createTestParser(sqlQuery);
     DruidSqlReplace druidSqlReplace = (DruidSqlReplace) 
druidSqlParser.DruidSqlReplaceEof();
@@ -70,12 +71,13 @@ public class DruidSqlUnparseTest
   @Test
   public void testUnparseReplaceWhere() throws ParseException
   {
-    String sqlQuery = "REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP 
'2000-01-01 00:00:00' AND __time < TIMESTAMP '2000-01-02 00:00:00' SELECT * 
FROM foo PARTITIONED BY DAY";
+    String sqlQuery = "REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP 
'2000-01-01 00:00:00' AND __time < TIMESTAMP '2000-01-02 00:00:00' SELECT * 
FROM foo PARTITIONED BY DAY CLUSTERED BY dim1";
     String prettySqlQuery = "REPLACE INTO \"dst\"\n"
                             + "OVERWRITE \"__time\" >= TIMESTAMP '2000-01-01 
00:00:00' AND \"__time\" < TIMESTAMP '2000-01-02 00:00:00'\n"
                             + "(SELECT *\n"
                             + "    FROM \"foo\")\n"
-                            + "PARTITIONED BY DAY";
+                            + "PARTITIONED BY DAY "
+                            + "CLUSTERED BY \"dim1\"";
     DruidSqlParserImpl druidSqlParser = createTestParser(sqlQuery);
     DruidSqlReplace druidSqlReplace = (DruidSqlReplace) 
druidSqlParser.DruidSqlReplaceEof();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to