adarshsanjeev commented on code in PR #12386:
URL: https://github.com/apache/druid/pull/12386#discussion_r858275848


##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java:
##########
@@ -761,84 +787,204 @@ public T next()
     private final SqlExplain explain;
 
     @Nullable
-    private final DruidSqlInsert insert;
+    private final SqlInsert insertOrReplace;
 
     private final SqlNode query;
 
     @Nullable
     private final Granularity ingestionGranularity;
 
+    @Nullable
+    private final List<String> replaceIntervals;
+
     private ParsedNodes(
         @Nullable SqlExplain explain,
-        @Nullable DruidSqlInsert insert,
+        @Nullable SqlInsert insertOrReplace,
         SqlNode query,
-        @Nullable Granularity ingestionGranularity
+        @Nullable Granularity ingestionGranularity,
+        @Nullable List<String> replaceIntervals
     )
     {
       this.explain = explain;
-      this.insert = insert;
+      this.insertOrReplace = insertOrReplace;
       this.query = query;
       this.ingestionGranularity = ingestionGranularity;
+      this.replaceIntervals = replaceIntervals;
     }
 
-    static ParsedNodes create(final SqlNode node) throws ValidationException
+    static ParsedNodes create(final SqlNode node, DateTimeZone dateTimeZone) 
throws ValidationException
     {
       SqlExplain explain = null;
       DruidSqlInsert druidSqlInsert = null;
+      DruidSqlReplace druidSqlReplace = null;
       SqlNode query = node;
       Granularity ingestionGranularity = null;
+      List<String> replaceIntervals = null;
 
       if (query.getKind() == SqlKind.EXPLAIN) {
         explain = (SqlExplain) query;
         query = explain.getExplicandum();
       }
 
       if (query.getKind() == SqlKind.INSERT) {
-        druidSqlInsert = (DruidSqlInsert) query;
-        query = druidSqlInsert.getSource();
-
-        // Check if ORDER BY clause is not provided to the underlying query
-        if (query instanceof SqlOrderBy) {
-          SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
-          SqlNodeList orderByList = sqlOrderBy.orderList;
-          if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) 
{
-            throw new ValidationException("Cannot have ORDER BY on an INSERT 
query, use CLUSTERED BY instead.");
-          }
-        }
+        if (query instanceof DruidSqlInsert) {
+          druidSqlInsert = (DruidSqlInsert) query;
+          query = druidSqlInsert.getSource();
 
-        ingestionGranularity = druidSqlInsert.getPartitionedBy();
+          // Check if ORDER BY clause is not provided to the underlying query
+          if (query instanceof SqlOrderBy) {
+            SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
+            SqlNodeList orderByList = sqlOrderBy.orderList;
+            if (!(orderByList == null || 
orderByList.equals(SqlNodeList.EMPTY))) {
+              throw new ValidationException("Cannot have ORDER BY on an INSERT 
query, use CLUSTERED BY instead.");
+            }
+          }
 
-        if (druidSqlInsert.getClusteredBy() != null) {
-          // If we have a CLUSTERED BY clause, extract the information in that 
CLUSTERED BY and create a new SqlOrderBy
-          // node
-          SqlNode offset = null;
-          SqlNode fetch = null;
+          ingestionGranularity = druidSqlInsert.getPartitionedBy();
+
+          if (druidSqlInsert.getClusteredBy() != null) {
+            // If we have a CLUSTERED BY clause, extract the information in 
that CLUSTERED BY and create a new
+            // SqlOrderBy node
+            SqlNode offset = null;
+            SqlNode fetch = null;
+
+            if (query instanceof SqlOrderBy) {
+              SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
+              // This represents the underlying query free of OFFSET, FETCH 
and ORDER BY clauses
+              // For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo 
OFFSET 10 FETCH 30 ORDER BY dim1 GROUP
+              // BY dim1 this would represent the "SELECT dim1, sum(dim2) from 
foo GROUP BY dim1
+              query = sqlOrderBy.query;
+              offset = sqlOrderBy.offset;
+              fetch = sqlOrderBy.fetch;
+            }
+            // Creates a new SqlOrderBy query, which may have our CLUSTERED BY 
overwritten
+            query = new SqlOrderBy(
+                query.getParserPosition(),
+                query,
+                druidSqlInsert.getClusteredBy(),
+                offset,
+                fetch
+            );
+          }
+        } else if (query instanceof DruidSqlReplace) {
+          druidSqlReplace = (DruidSqlReplace) query;
+          query = druidSqlReplace.getSource();
 
+          // Check if ORDER BY clause is not provided to the underlying query
           if (query instanceof SqlOrderBy) {
             SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
-            // This represents the underlying query free of OFFSET, FETCH and 
ORDER BY clauses
-            // For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo 
OFFSET 10 FETCH 30 ORDER BY dim1 GROUP BY dim1
-            // this would represent the "SELECT dim1, sum(dim2) from foo GROUP 
BY dim1
-            query = sqlOrderBy.query;
-            offset = sqlOrderBy.offset;
-            fetch = sqlOrderBy.fetch;
+            SqlNodeList orderByList = sqlOrderBy.orderList;
+            if (!(orderByList == null || 
orderByList.equals(SqlNodeList.EMPTY))) {
+              throw new ValidationException("Cannot have ORDER BY on a REPLACE 
query.");
+            }
           }
-          // Creates a new SqlOrderBy query, which may have our CLUSTERED BY 
overwritten
-          query = new SqlOrderBy(
-              query.getParserPosition(),
-              query,
-              druidSqlInsert.getClusteredBy(),
-              offset,
-              fetch
-          );
+
+          SqlNode replaceTimeQuery = druidSqlReplace.getReplaceTimeQuery();
+          if (replaceTimeQuery == null) {
+            throw new ValidationException("Missing time chunk information in 
DELETE WHERE clause for replace.");
+          }
+
+          ingestionGranularity = druidSqlReplace.getPartitionedBy();
+          replaceIntervals = 
validateQueryAndConvertToIntervals(replaceTimeQuery, ingestionGranularity, 
dateTimeZone);
         }
       }
 
       if (!query.isA(SqlKind.QUERY)) {
         throw new ValidationException(StringUtils.format("Cannot execute 
[%s].", query.getKind()));
       }
 
-      return new ParsedNodes(explain, druidSqlInsert, query, 
ingestionGranularity);
+      if (druidSqlInsert != null) {
+        return new ParsedNodes(explain, druidSqlInsert, query, 
ingestionGranularity, null);
+      } else {
+        return new ParsedNodes(explain, druidSqlReplace, query, 
ingestionGranularity, replaceIntervals);
+      }
+    }
+
+    private static List<String> validateQueryAndConvertToIntervals(
+        SqlNode replaceTimeQuery,
+        Granularity granularity,
+        DateTimeZone dateTimeZone
+    ) throws ValidationException
+    {
+      if (replaceTimeQuery instanceof SqlLiteral && 
"all".equalsIgnoreCase(((SqlLiteral) replaceTimeQuery).toValue())) {
+        return ImmutableList.of("all");
+      }
+
+      DimFilter dimFilter = convertQueryToDimFilter(replaceTimeQuery, 
dateTimeZone);
+      if 
(!ImmutableSet.of(ColumnHolder.TIME_COLUMN_NAME).equals(dimFilter.getRequiredColumns()))
 {
+        throw new ValidationException("Only " + ColumnHolder.TIME_COLUMN_NAME 
+ " column is supported in DELETE WHERE clause");
+      }
+
+      Filtration filtration = Filtration.create(dimFilter);
+      filtration = MoveTimeFiltersToIntervals.instance().apply(filtration);
+      List<Interval> intervals = filtration.getIntervals();
+
+      for (Interval interval : intervals) {
+        DateTime intervalStart = interval.getStart();
+        DateTime intervalEnd = interval.getEnd();
+        // The start of each interval should be aligned with the start of the 
bucket, and the end interval should be
+        // aligned with the start of the next interval.
+        if (!granularity.bucketStart(intervalStart).equals(intervalStart) || 
!granularity.bucketStart(intervalEnd).equals(intervalEnd)) {
+          throw new ValidationException("DELETE WHERE clause contains an 
interval which is not aligned with PARTITIONED BY granularity");
+        }
+      }
+      return intervals
+          .stream()
+          .map(AbstractInterval::toString)
+          .collect(Collectors.toList());
+    }
+
+    private static DimFilter convertQueryToDimFilter(SqlNode replaceTimeQuery, 
DateTimeZone dateTimeZone)

Review Comment:
   Moved



##########
sql/src/main/codegen/includes/replace.ftl:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// Taken from syntax of SqlInsert statement from calcite parser, edited for 
replace syntax
+SqlNode DruidSqlReplaceEof() :
+{
+    SqlNode table;
+    SqlNode source;
+    SqlNodeList columnList = null;
+    final Span s;
+    SqlInsert sqlInsert;
+    // Using fully qualified name for Pair class, since Calcite also has a 
same class name being used in the Parser.jj
+    org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy 
= new org.apache.druid.java.util.common.Pair(null, null);
+    final Pair<SqlNodeList, SqlNodeList> p;
+    final SqlNode replaceTimeQuery;
+}
+{
+    <REPLACE> { s = span(); }
+    table = CompoundIdentifier()
+    [
+        p = ParenthesizedCompoundIdentifierList() {
+            if (p.left.size() > 0) {
+                columnList = p.left;
+            }
+        }
+    ]
+    <DELETE>
+    (
+      <ALL> { replaceTimeQuery = SqlLiteral.createCharString("ALL", getPos()); 
}

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to