This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 39b3487aa9 Add replace statement to sql parser (#12386)
39b3487aa9 is described below

commit 39b3487aa9829d1d5b19733ef76319d8133dcb38
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Fri May 13 10:56:40 2022 +0530

    Add replace statement to sql parser (#12386)
    
    Relevant Issue: #11929
    
    - Add custom replace statement to Druid SQL parser.
    - Edit DruidPlanner to convert relevant fields to Query Context.
    - Refactor common code with INSERT statements to reuse them for REPLACE 
where possible.
---
 sql/src/main/codegen/config.fmpp                   |   6 +
 .../codegen/includes/{insert.ftl => common.ftl}    |  59 +-
 sql/src/main/codegen/includes/explain.ftl          |   2 +
 sql/src/main/codegen/includes/insert.ftl           |  55 --
 sql/src/main/codegen/includes/replace.ftl          |  78 +++
 .../druid/sql/calcite/parser/DruidSqlInsert.java   |   2 +
 .../sql/calcite/parser/DruidSqlParserUtils.java    | 212 +++++++
 .../{DruidSqlInsert.java => DruidSqlReplace.java}  |  64 +-
 .../druid/sql/calcite/planner/DruidPlanner.java    | 195 +++---
 .../druid/sql/calcite/CalciteIngestionDmlTest.java | 339 +++++++++++
 .../druid/sql/calcite/CalciteInsertDmlTest.java    | 372 ++----------
 .../druid/sql/calcite/CalciteReplaceDmlTest.java   | 656 +++++++++++++++++++++
 .../sql/calcite/parser/DruidSqlUnparseTest.java    |  97 +++
 13 files changed, 1598 insertions(+), 539 deletions(-)

diff --git a/sql/src/main/codegen/config.fmpp b/sql/src/main/codegen/config.fmpp
index 817bd4e79e..31fe812bbe 100644
--- a/sql/src/main/codegen/config.fmpp
+++ b/sql/src/main/codegen/config.fmpp
@@ -51,6 +51,7 @@ data: {
     # List of additional classes and packages to import.
     # Example. "org.apache.calcite.sql.*", "java.util.List".
     imports: [
+      "java.util.List"
       "org.apache.calcite.sql.SqlNode"
       "org.apache.calcite.sql.SqlInsert"
       "org.apache.druid.java.util.common.granularity.Granularity"
@@ -63,6 +64,7 @@ data: {
     # keyword add it to 'nonReservedKeywords' section.
     keywords: [
       "CLUSTERED"
+      "OVERWRITE"
       "PARTITIONED"
     ]
 
@@ -218,6 +220,7 @@ data: {
         "OTHERS"
         "OUTPUT"
         "OVERRIDING"
+        "OVERWRITE"
         "PAD"
         "PARAMETER_MODE"
         "PARAMETER_NAME"
@@ -384,6 +387,7 @@ data: {
     statementParserMethods: [
       "DruidSqlInsertEof()"
       "DruidSqlExplain()"
+      "DruidSqlReplaceEof()"
     ]
 
     # List of methods for parsing custom literals.
@@ -433,8 +437,10 @@ data: {
     # given as part of "statementParserMethods", "literalParserMethods" or
     # "dataTypeParserMethods".
     implementationFiles: [
+      "common.ftl"
       "insert.ftl"
       "explain.ftl"
+      "replace.ftl"
     ]
 
     includePosixOperators: false
diff --git a/sql/src/main/codegen/includes/insert.ftl 
b/sql/src/main/codegen/includes/common.ftl
similarity index 52%
copy from sql/src/main/codegen/includes/insert.ftl
copy to sql/src/main/codegen/includes/common.ftl
index ced692759f..136492482c 100644
--- a/sql/src/main/codegen/includes/insert.ftl
+++ b/sql/src/main/codegen/includes/common.ftl
@@ -18,64 +18,11 @@
  */
 
 // Using fully qualified name for Pair class, since Calcite also has a same 
class name being used in the Parser.jj
-SqlNode DruidSqlInsertEof() :
-{
-  SqlNode insertNode;
-  org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = 
new org.apache.druid.java.util.common.Pair(null, null);
-  SqlNodeList clusteredBy = null;
-}
-{
-  insertNode = SqlInsert()
-  // PARTITIONED BY is necessary, but is kept optional in the grammar. It is 
asserted that it is not missing in the
-  // DruidSqlInsert constructor so that we can return a custom error message.
-  [
-    <PARTITIONED> <BY>
-    partitionedBy = PartitionGranularity()
-  ]
-  [
-    <CLUSTERED> <BY>
-    clusteredBy = ClusterItems()
-  ]
-  // EOF is also present in SqlStmtEof but EOF is a special case and a single 
EOF can be consumed multiple times.
-  // The reason for adding EOF here is to ensure that we create a 
DruidSqlInsert node after the syntax has been
-  // validated and throw SQL syntax errors before performing validations in 
the DruidSqlInsert which can overshadow the
-  // actual error message.
-  <EOF>
-  {
-    if (!(insertNode instanceof SqlInsert)) {
-      // This shouldn't be encountered, but done as a defensive practice. 
SqlInsert() always returns a node of type
-      // SqlInsert
-      return insertNode;
-    }
-    SqlInsert sqlInsert = (SqlInsert) insertNode;
-    return new DruidSqlInsert(sqlInsert, partitionedBy.lhs, partitionedBy.rhs, 
clusteredBy);
-  }
-}
-
-SqlNodeList ClusterItems() :
-{
-  List<SqlNode> list;
-  final Span s;
-  SqlNode e;
-}
-{
-  e = OrderItem() {
-    s = span();
-    list = startList(e);
-  }
-  (
-    LOOKAHEAD(2) <COMMA> e = OrderItem() { list.add(e); }
-  )*
-  {
-    return new SqlNodeList(list, s.addAll(list).pos());
-  }
-}
-
 org.apache.druid.java.util.common.Pair<Granularity, String> 
PartitionGranularity() :
 {
-  SqlNode e = null;
-  Granularity granularity = null;
-  String unparseString = null;
+  SqlNode e;
+  Granularity granularity;
+  String unparseString;
 }
 {
   (
diff --git a/sql/src/main/codegen/includes/explain.ftl 
b/sql/src/main/codegen/includes/explain.ftl
index b24927e500..b6f3d926a1 100644
--- a/sql/src/main/codegen/includes/explain.ftl
+++ b/sql/src/main/codegen/includes/explain.ftl
@@ -62,6 +62,8 @@ SqlNode DruidQueryOrSqlQueryOrDml() :
 {
   (
     stmt = DruidSqlInsertEof()
+  |
+    stmt = DruidSqlReplaceEof()
   |
     stmt = SqlQueryOrDml()
   )
diff --git a/sql/src/main/codegen/includes/insert.ftl 
b/sql/src/main/codegen/includes/insert.ftl
index ced692759f..6f7c1148f5 100644
--- a/sql/src/main/codegen/includes/insert.ftl
+++ b/sql/src/main/codegen/includes/insert.ftl
@@ -70,58 +70,3 @@ SqlNodeList ClusterItems() :
     return new SqlNodeList(list, s.addAll(list).pos());
   }
 }
-
-org.apache.druid.java.util.common.Pair<Granularity, String> 
PartitionGranularity() :
-{
-  SqlNode e = null;
-  Granularity granularity = null;
-  String unparseString = null;
-}
-{
-  (
-    <HOUR>
-    {
-      granularity = Granularities.HOUR;
-      unparseString = "HOUR";
-    }
-  |
-    <DAY>
-    {
-      granularity = Granularities.DAY;
-      unparseString = "DAY";
-    }
-  |
-    <MONTH>
-    {
-      granularity = Granularities.MONTH;
-      unparseString = "MONTH";
-    }
-  |
-    <YEAR>
-    {
-      granularity = Granularities.YEAR;
-      unparseString = "YEAR";
-    }
-  |
-    <ALL>
-    {
-      granularity = Granularities.ALL;
-      unparseString = "ALL";
-    }
-    [
-      <TIME>
-      {
-        unparseString += " TIME";
-      }
-    ]
-  |
-    e = Expression(ExprContext.ACCEPT_SUB_QUERY)
-    {
-      granularity = 
DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(e);
-      unparseString = e.toString();
-    }
-  )
-  {
-    return new org.apache.druid.java.util.common.Pair(granularity, 
unparseString);
-  }
-}
diff --git a/sql/src/main/codegen/includes/replace.ftl 
b/sql/src/main/codegen/includes/replace.ftl
new file mode 100644
index 0000000000..741c59c0ad
--- /dev/null
+++ b/sql/src/main/codegen/includes/replace.ftl
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// Taken from syntax of SqlInsert statement from calcite parser, edited for 
replace syntax
+SqlNode DruidSqlReplaceEof() :
+{
+    SqlNode table;
+    SqlNode source;
+    SqlNodeList columnList = null;
+    final Span s;
+    SqlInsert sqlInsert;
+    // Using fully qualified name for Pair class, since Calcite also has a 
same class name being used in the Parser.jj
+    org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy 
= new org.apache.druid.java.util.common.Pair(null, null);
+    final Pair<SqlNodeList, SqlNodeList> p;
+    final SqlNode replaceTimeQuery;
+}
+{
+    <REPLACE> { s = span(); }
+    <INTO>
+    table = CompoundIdentifier()
+    [
+        p = ParenthesizedCompoundIdentifierList() {
+            if (p.left.size() > 0) {
+                columnList = p.left;
+            }
+        }
+    ]
+    <OVERWRITE>
+    replaceTimeQuery = ReplaceTimeQuery()
+    source = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
+    // PARTITIONED BY is necessary, but is kept optional in the grammar. It is 
asserted that it is not missing in the
+    // DruidSqlInsert constructor so that we can return a custom error message.
+    [
+      <PARTITIONED> <BY>
+      partitionedBy = PartitionGranularity()
+    ]
+    // EOF is also present in SqlStmtEof but EOF is a special case and a 
single EOF can be consumed multiple times.
+    // The reason for adding EOF here is to ensure that we create a 
DruidSqlReplace node after the syntax has been
+    // validated and throw SQL syntax errors before performing validations in 
the DruidSqlReplace which can overshadow the
+    // actual error message.
+    <EOF>
+    {
+        sqlInsert = new SqlInsert(s.end(source), SqlNodeList.EMPTY, table, 
source, columnList);
+        return new DruidSqlReplace(sqlInsert, partitionedBy.lhs, 
partitionedBy.rhs, replaceTimeQuery);
+    }
+}
+
+SqlNode ReplaceTimeQuery() :
+{
+    SqlNode replaceQuery;
+}
+{
+    (
+      <ALL> { replaceQuery = SqlLiteral.createCharString("ALL", getPos()); }
+    |
+      // We parse all types of conditions and throw an exception if it is not 
supported to keep the parsing simple
+      replaceQuery = WhereOpt()
+    )
+    {
+      return replaceQuery;
+    }
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
index c941da3e59..dbaace1d93 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
@@ -43,6 +43,8 @@ public class DruidSqlInsert extends SqlInsert
   public static final SqlOperator OPERATOR = SqlInsert.OPERATOR;
 
   private final Granularity partitionedBy;
+
+  // Used in the unparse function to generate the original query since we 
convert the string to an enum
   private final String partitionedByStringForUnparse;
 
   @Nullable
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
index d81ce2ca5b..2ef587e3b0 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java
@@ -20,27 +20,48 @@
 package org.apache.druid.sql.calcite.parser;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlIntervalQualifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlTimestampLiteral;
+import org.apache.calcite.tools.ValidationException;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.granularity.PeriodGranularity;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.filter.AndDimFilter;
+import org.apache.druid.query.filter.BoundDimFilter;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.filter.NotDimFilter;
+import org.apache.druid.query.filter.OrDimFilter;
+import org.apache.druid.query.ordering.StringComparators;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.sql.calcite.expression.TimeUnits;
 import 
org.apache.druid.sql.calcite.expression.builtin.TimeFloorOperatorConversion;
+import org.apache.druid.sql.calcite.filtration.Filtration;
+import org.apache.druid.sql.calcite.filtration.MoveTimeFiltersToIntervals;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
 import org.joda.time.Period;
+import org.joda.time.base.AbstractInterval;
 
+import java.sql.Timestamp;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class DruidSqlParserUtils
 {
 
   private static final Logger log = new Logger(DruidSqlParserUtils.class);
+  public static final String ALL = "all";
 
   /**
    * Delegates to {@code convertSqlNodeToGranularity} and converts the 
exceptions to {@link ParseException}
@@ -167,4 +188,195 @@ public class DruidSqlParserUtils
         TimeFloorOperatorConversion.SQL_FUNCTION_NAME
     ));
   }
+
+  /**
+   * This method validates and converts a {@link SqlNode} representing a query 
into an optmizied list of intervals to
+   * be used in creating an ingestion spec. If the sqlNode is an SqlLiteral of 
{@link #ALL}, returns a singleton list of
+   * "ALL". Otherwise, it converts and optimizes the query using {@link 
MoveTimeFiltersToIntervals} into a list of
+   * intervals which contain all valid values of time as per the query.
+   *
+   * The following validations are performed
+   * 1. Only __time column and timestamp literals are present in the query
+   * 2. The interval after optimization is not empty
+   * 3. The operands in the expression are supported
+   * 4. The intervals after adjusting for timezone are aligned with the 
granularity parameter
+   *
+   * @param replaceTimeQuery Sql node representing the query
+   * @param granularity granularity of the query for validation
+   * @param dateTimeZone timezone
+   * @return List of string representation of intervals
+   * @throws ValidationException if the SqlNode cannot be converted to a list 
of intervals
+   */
+  public static List<String> validateQueryAndConvertToIntervals(
+      SqlNode replaceTimeQuery,
+      Granularity granularity,
+      DateTimeZone dateTimeZone
+  ) throws ValidationException
+  {
+    if (replaceTimeQuery instanceof SqlLiteral && 
ALL.equalsIgnoreCase(((SqlLiteral) replaceTimeQuery).toValue())) {
+      return ImmutableList.of(ALL);
+    }
+
+    DimFilter dimFilter = convertQueryToDimFilter(replaceTimeQuery, 
dateTimeZone);
+
+    Filtration filtration = Filtration.create(dimFilter);
+    filtration = MoveTimeFiltersToIntervals.instance().apply(filtration);
+    List<Interval> intervals = filtration.getIntervals();
+
+    if (filtration.getDimFilter() != null) {
+      throw new ValidationException("Only " + ColumnHolder.TIME_COLUMN_NAME + 
" column is supported in OVERWRITE WHERE clause");
+    }
+
+    if (intervals.isEmpty()) {
+      throw new ValidationException("Intervals for replace are empty");
+    }
+
+    for (Interval interval : intervals) {
+      DateTime intervalStart = interval.getStart();
+      DateTime intervalEnd = interval.getEnd();
+      if (!granularity.bucketStart(intervalStart).equals(intervalStart) || 
!granularity.bucketStart(intervalEnd).equals(intervalEnd)) {
+        throw new ValidationException("OVERWRITE WHERE clause contains an 
interval " + intervals +
+                                      " which is not aligned with PARTITIONED 
BY granularity " + granularity);
+      }
+    }
+    return intervals
+        .stream()
+        .map(AbstractInterval::toString)
+        .collect(Collectors.toList());
+  }
+
+
+  /**
+   * This method is used to convert an {@link SqlNode} representing a query 
into a {@link DimFilter} for the same query.
+   * It takes the timezone as a separate parameter, as Sql timestamps don't 
contain that information. Supported functions
+   * are AND, OR, NOT, >, <, >=, <= and BETWEEN operators in the sql query.
+   *
+   * @param replaceTimeQuery Sql node representing the query
+   * @param dateTimeZone timezone
+   * @return Dimfilter for the query
+   * @throws ValidationException if the SqlNode cannot be converted a Dimfilter
+   */
+  public static DimFilter convertQueryToDimFilter(SqlNode replaceTimeQuery, 
DateTimeZone dateTimeZone)
+      throws ValidationException
+  {
+    if (!(replaceTimeQuery instanceof SqlBasicCall)) {
+      log.error("Expected SqlBasicCall during parsing, but found " + 
replaceTimeQuery.getClass().getName());
+      throw new ValidationException("Invalid OVERWRITE WHERE clause");
+    }
+    String columnName;
+    SqlBasicCall sqlBasicCall = (SqlBasicCall) replaceTimeQuery;
+    List<SqlNode> operandList = sqlBasicCall.getOperandList();
+    switch (sqlBasicCall.getOperator().getKind()) {
+      case AND:
+        List<DimFilter> dimFilters = new ArrayList<>();
+        for (SqlNode sqlNode : sqlBasicCall.getOperandList()) {
+          dimFilters.add(convertQueryToDimFilter(sqlNode, dateTimeZone));
+        }
+        return new AndDimFilter(dimFilters);
+      case OR:
+        dimFilters = new ArrayList<>();
+        for (SqlNode sqlNode : sqlBasicCall.getOperandList()) {
+          dimFilters.add(convertQueryToDimFilter(sqlNode, dateTimeZone));
+        }
+        return new OrDimFilter(dimFilters);
+      case NOT:
+        return new 
NotDimFilter(convertQueryToDimFilter(sqlBasicCall.getOperandList().get(0), 
dateTimeZone));
+      case GREATER_THAN_OR_EQUAL:
+        columnName = parseColumnName(operandList.get(0));
+        return new BoundDimFilter(
+            columnName,
+            parseTimeStampWithTimeZone(operandList.get(1), dateTimeZone),
+            null,
+            false,
+            null,
+            null,
+            null,
+            StringComparators.NUMERIC
+        );
+      case LESS_THAN_OR_EQUAL:
+        columnName = parseColumnName(operandList.get(0));
+        return new BoundDimFilter(
+            columnName,
+            null,
+            parseTimeStampWithTimeZone(operandList.get(1), dateTimeZone),
+            null,
+            false,
+            null,
+            null,
+            StringComparators.NUMERIC
+        );
+      case GREATER_THAN:
+        columnName = parseColumnName(operandList.get(0));
+        return new BoundDimFilter(
+            columnName,
+            parseTimeStampWithTimeZone(operandList.get(1), dateTimeZone),
+            null,
+            true,
+            null,
+            null,
+            null,
+            StringComparators.NUMERIC
+        );
+      case LESS_THAN:
+        columnName = parseColumnName(operandList.get(0));
+        return new BoundDimFilter(
+            columnName,
+            null,
+            parseTimeStampWithTimeZone(operandList.get(1), dateTimeZone),
+            null,
+            true,
+            null,
+            null,
+            StringComparators.NUMERIC
+        );
+      case BETWEEN:
+        columnName = parseColumnName(operandList.get(0));
+        return new BoundDimFilter(
+            columnName,
+            parseTimeStampWithTimeZone(operandList.get(1), dateTimeZone),
+            parseTimeStampWithTimeZone(operandList.get(2), dateTimeZone),
+            false,
+            false,
+            null,
+            null,
+            StringComparators.NUMERIC
+        );
+      default:
+        throw new ValidationException("Unsupported operation in OVERWRITE 
WHERE clause: " + sqlBasicCall.getOperator().getName());
+    }
+  }
+
+  /**
+   * Converts a {@link SqlNode} identifier into a string representation
+   *
+   * @param sqlNode the sql node
+   * @return string representing the column name
+   * @throws ValidationException if the sql node is not an SqlIdentifier
+   */
+  public static String parseColumnName(SqlNode sqlNode) throws 
ValidationException
+  {
+    if (!(sqlNode instanceof SqlIdentifier)) {
+      throw new ValidationException("Expressions must be of the form __time 
<operator> TIMESTAMP");
+    }
+    return ((SqlIdentifier) sqlNode).getSimple();
+  }
+
+  /**
+   * Converts a {@link SqlNode} into a timestamp, taking into account the 
timezone
+   *
+   * @param sqlNode the sql node
+   * @param timeZone timezone
+   * @return the timestamp string as milliseconds from epoch
+   * @throws ValidationException if the sql node is not a SqlTimestampLiteral
+   */
+  public static String parseTimeStampWithTimeZone(SqlNode sqlNode, 
DateTimeZone timeZone) throws ValidationException
+  {
+    if (!(sqlNode instanceof SqlTimestampLiteral)) {
+      throw new ValidationException("Expressions must be of the form __time 
<operator> TIMESTAMP");
+    }
+
+    Timestamp sqlTimestamp = Timestamp.valueOf(((SqlTimestampLiteral) 
sqlNode).toFormattedString());
+    ZonedDateTime zonedTimestamp = 
sqlTimestamp.toLocalDateTime().atZone(timeZone.toTimeZone().toZoneId());
+    return String.valueOf(zonedTimestamp.toInstant().toEpochMilli());
+  }
 }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
similarity index 63%
copy from 
sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
copy to 
sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
index c941da3e59..a56fd69251 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
@@ -21,9 +21,12 @@ package org.apache.druid.sql.calcite.parser;
 
 import com.google.common.base.Preconditions;
 import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.druid.java.util.common.granularity.Granularity;
 
@@ -31,22 +34,22 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 /**
- * Extends the 'insert' call to hold custom parameters specific to Druid i.e. 
PARTITIONED BY and CLUSTERED BY
+ * Extends the 'replace' call to hold custom parameters specific to Druid i.e. 
PARTITIONED BY and the PARTITION SPECS
  * This class extends the {@link SqlInsert} so that this SqlNode can be used in
  * {@link org.apache.calcite.sql2rel.SqlToRelConverter} for getting converted 
into RelNode, and further processing
  */
-public class DruidSqlInsert extends SqlInsert
+public class DruidSqlReplace extends SqlInsert
 {
-  public static final String SQL_INSERT_SEGMENT_GRANULARITY = 
"sqlInsertSegmentGranularity";
+  public static final String SQL_REPLACE_TIME_CHUNKS = "sqlReplaceTimeChunks";
 
-  // This allows reusing super.unparse
-  public static final SqlOperator OPERATOR = SqlInsert.OPERATOR;
+  public static final SqlOperator OPERATOR = new SqlSpecialOperator("REPLACE", 
SqlKind.OTHER);
 
   private final Granularity partitionedBy;
+
+  // Used in the unparse function to generate the original query since we 
convert the string to an enum
   private final String partitionedByStringForUnparse;
 
-  @Nullable
-  private final SqlNodeList clusteredBy;
+  private final SqlNode replaceTimeQuery;
 
   /**
    * While partitionedBy and partitionedByStringForUnparse can be null as 
arguments to the constructor, this is
@@ -54,11 +57,11 @@ public class DruidSqlInsert extends SqlInsert
    * errors when the PARTITIONED BY custom clause is not present, and keeps 
its error separate from JavaCC/Calcite's
    * custom errors which can be cryptic when someone accidentally forgets to 
explicitly specify the PARTITIONED BY clause
    */
-  public DruidSqlInsert(
+  public DruidSqlReplace(
       @Nonnull SqlInsert insertNode,
       @Nullable Granularity partitionedBy,
       @Nullable String partitionedByStringForUnparse,
-      @Nullable SqlNodeList clusteredBy
+      @Nonnull SqlNode replaceTimeQuery
   ) throws ParseException
   {
     super(
@@ -69,20 +72,18 @@ public class DruidSqlInsert extends SqlInsert
         insertNode.getTargetColumnList()
     );
     if (partitionedBy == null) {
-      throw new ParseException("INSERT statements must specify PARTITIONED BY 
clause explicitly");
+      throw new ParseException("REPLACE statements must specify PARTITIONED BY 
clause explictly");
     }
     this.partitionedBy = partitionedBy;
 
-    Preconditions.checkNotNull(partitionedByStringForUnparse);
-    this.partitionedByStringForUnparse = partitionedByStringForUnparse;
+    this.partitionedByStringForUnparse = 
Preconditions.checkNotNull(partitionedByStringForUnparse);
 
-    this.clusteredBy = clusteredBy;
+    this.replaceTimeQuery = replaceTimeQuery;
   }
 
-  @Nullable
-  public SqlNodeList getClusteredBy()
+  public SqlNode getReplaceTimeQuery()
   {
-    return clusteredBy;
+    return replaceTimeQuery;
   }
 
   public Granularity getPartitionedBy()
@@ -100,16 +101,29 @@ public class DruidSqlInsert extends SqlInsert
   @Override
   public void unparse(SqlWriter writer, int leftPrec, int rightPrec)
   {
-    super.unparse(writer, leftPrec, rightPrec);
+    writer.startList(SqlWriter.FrameTypeEnum.SELECT);
+    writer.sep("REPLACE INTO");
+    final int opLeft = getOperator().getLeftPrec();
+    final int opRight = getOperator().getRightPrec();
+    getTargetTable().unparse(writer, opLeft, opRight);
+
+    if (getTargetColumnList() != null) {
+      getTargetColumnList().unparse(writer, opLeft, opRight);
+    }
+    writer.newlineAndIndent();
+
+    writer.keyword("OVERWRITE");
+    if (replaceTimeQuery instanceof SqlLiteral) {
+      writer.keyword("ALL");
+    } else {
+      replaceTimeQuery.unparse(writer, leftPrec, rightPrec);
+    }
+    writer.newlineAndIndent();
+
+    getSource().unparse(writer, 0, 0);
+    writer.newlineAndIndent();
+
     writer.keyword("PARTITIONED BY");
     writer.keyword(partitionedByStringForUnparse);
-    if (getClusteredBy() != null) {
-      writer.keyword("CLUSTERED BY");
-      SqlWriter.Frame frame = writer.startList("", "");
-      for (SqlNode clusterByOpts : getClusteredBy().getList()) {
-        clusterByOpts.unparse(writer, leftPrec, rightPrec);
-      }
-      writer.endList(frame);
-    }
   }
 }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
index b9ae4a353b..c552356efd 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
@@ -55,6 +55,7 @@ import org.apache.calcite.sql.SqlExplain;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlInsert;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlOrderBy;
@@ -83,6 +84,8 @@ import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
 import org.apache.druid.sql.calcite.rel.DruidConvention;
 import org.apache.druid.sql.calcite.rel.DruidQuery;
 import org.apache.druid.sql.calcite.rel.DruidRel;
@@ -90,6 +93,7 @@ import org.apache.druid.sql.calcite.rel.DruidUnionRel;
 import org.apache.druid.sql.calcite.run.QueryMaker;
 import org.apache.druid.sql.calcite.run.QueryMakerFactory;
 import org.apache.druid.utils.Throwables;
+import org.joda.time.DateTimeZone;
 
 import javax.annotation.Nullable;
 import java.io.Closeable;
@@ -134,7 +138,7 @@ public class DruidPlanner implements Closeable
   public ValidationResult validate(boolean authorizeContextParams) throws 
SqlParseException, ValidationException
   {
     resetPlanner();
-    final ParsedNodes parsed = 
ParsedNodes.create(planner.parse(plannerContext.getSql()));
+    final ParsedNodes parsed = 
ParsedNodes.create(planner.parse(plannerContext.getSql()), 
plannerContext.getTimeZone());
     final SqlValidator validator = getValidator();
     final SqlNode validatedQueryNode;
 
@@ -150,8 +154,8 @@ public class DruidPlanner implements Closeable
 
     final Set<ResourceAction> resourceActions = new 
HashSet<>(resourceCollectorShuttle.getResourceActions());
 
-    if (parsed.getInsertNode() != null) {
-      final String targetDataSource = 
validateAndGetDataSourceForInsert(parsed.getInsertNode());
+    if (parsed.getInsertOrReplace() != null) {
+      final String targetDataSource = 
validateAndGetDataSourceForIngest(parsed.getInsertOrReplace());
       resourceActions.add(new ResourceAction(new Resource(targetDataSource, 
ResourceType.DATASOURCE), Action.WRITE));
     }
     if (authorizeContextParams) {
@@ -175,7 +179,7 @@ public class DruidPlanner implements Closeable
   {
     resetPlanner();
 
-    final ParsedNodes parsed = 
ParsedNodes.create(planner.parse(plannerContext.getSql()));
+    final ParsedNodes parsed = 
ParsedNodes.create(planner.parse(plannerContext.getSql()), 
plannerContext.getTimeZone());
     final SqlNode validatedQueryNode = planner.validate(parsed.getQueryNode());
     final RelRoot rootQueryRel = planner.rel(validatedQueryNode);
 
@@ -187,7 +191,7 @@ public class DruidPlanner implements Closeable
     if (parsed.getExplainNode() != null) {
       returnedRowType = getExplainStructType(typeFactory);
     } else {
-      returnedRowType = buildQueryMaker(rootQueryRel, 
parsed.getInsertNode()).getResultType();
+      returnedRowType = buildQueryMaker(rootQueryRel, 
parsed.getInsertOrReplace()).getResultType();
     }
 
     return new PrepareResult(returnedRowType, parameterTypes);
@@ -206,7 +210,7 @@ public class DruidPlanner implements Closeable
   {
     resetPlanner();
 
-    final ParsedNodes parsed = 
ParsedNodes.create(planner.parse(plannerContext.getSql()));
+    final ParsedNodes parsed = 
ParsedNodes.create(planner.parse(plannerContext.getSql()), 
plannerContext.getTimeZone());
 
     try {
       if (parsed.getIngestionGranularity() != null) {
@@ -220,6 +224,13 @@ public class DruidPlanner implements Closeable
       throw new ValidationException("Unable to serialize partition 
granularity.");
     }
 
+    if (parsed.getReplaceIntervals() != null) {
+      plannerContext.getQueryContext().addSystemParam(
+          DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS,
+          String.join(",", parsed.getReplaceIntervals())
+      );
+    }
+
     // the planner's type factory is not available until after parsing
     this.rexBuilder = new RexBuilder(planner.getTypeFactory());
     final SqlNode parameterizedQueryNode = 
rewriteDynamicParameters(parsed.getQueryNode());
@@ -227,7 +238,7 @@ public class DruidPlanner implements Closeable
     final RelRoot rootQueryRel = planner.rel(validatedQueryNode);
 
     try {
-      return planWithDruidConvention(rootQueryRel, parsed.getExplainNode(), 
parsed.getInsertNode());
+      return planWithDruidConvention(rootQueryRel, parsed.getExplainNode(), 
parsed.getInsertOrReplace());
     }
     catch (Exception e) {
       Throwable cannotPlanException = Throwables.getCauseOfType(e, 
RelOptPlanner.CannotPlanException.class);
@@ -236,9 +247,9 @@ public class DruidPlanner implements Closeable
         throw e;
       }
 
-      // If there isn't any INSERT clause, then we should try again with 
BINDABLE convention. And return without
+      // If there isn't any ingestion clause, then we should try again with 
BINDABLE convention. And return without
       // any error, if it is plannable by the bindable convention
-      if (parsed.getInsertNode() == null) {
+      if (parsed.getInsertOrReplace() == null) {
         // Try again with BINDABLE convention. Used for querying Values and 
metadata tables.
         try {
           return planWithBindableConvention(rootQueryRel, 
parsed.getExplainNode());
@@ -295,12 +306,11 @@ public class DruidPlanner implements Closeable
   private PlannerResult planWithDruidConvention(
       final RelRoot root,
       @Nullable final SqlExplain explain,
-      @Nullable final SqlInsert insert
+      @Nullable final SqlInsert insertOrReplace
   ) throws ValidationException, RelConversionException
   {
     final RelRoot possiblyLimitedRoot = 
possiblyWrapRootWithOuterLimitFromContext(root);
-
-    final QueryMaker queryMaker = buildQueryMaker(root, insert);
+    final QueryMaker queryMaker = buildQueryMaker(root, insertOrReplace);
     plannerContext.setQueryMaker(queryMaker);
 
     RelNode parameterized = 
rewriteRelDynamicParameters(possiblyLimitedRoot.rel);
@@ -625,7 +635,7 @@ public class DruidPlanner implements Closeable
 
   /**
    * Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap 
out any
-   * {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link 
org.apache.calcite.sql.SqlLiteral}
+   * {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link 
SqlLiteral}
    * replacement
    */
   private SqlNode rewriteDynamicParameters(SqlNode parsed)
@@ -650,11 +660,11 @@ public class DruidPlanner implements Closeable
 
   private QueryMaker buildQueryMaker(
       final RelRoot rootQueryRel,
-      @Nullable final SqlInsert insert
+      @Nullable final SqlInsert insertOrReplace
   ) throws ValidationException
   {
-    if (insert != null) {
-      final String targetDataSource = 
validateAndGetDataSourceForInsert(insert);
+    if (insertOrReplace != null) {
+      final String targetDataSource = 
validateAndGetDataSourceForIngest(insertOrReplace);
       validateColumnsForIngestion(rootQueryRel);
       return queryMakerFactory.buildForInsert(targetDataSource, rootQueryRel, 
plannerContext);
     } else {
@@ -674,17 +684,17 @@ public class DruidPlanner implements Closeable
   }
 
   /**
-   * Extract target datasource from a {@link SqlInsert}, and also validate 
that the INSERT is of a form we support.
-   * Expects the INSERT target to be either an unqualified name, or a name 
qualified by the default schema.
+   * Extract target datasource from a {@link SqlInsert}, and also validate 
that the ingestion is of a form we support.
+   * Expects the target datasource to be either an unqualified name, or a name 
qualified by the default schema.
    */
-  private String validateAndGetDataSourceForInsert(final SqlInsert insert) 
throws ValidationException
+  private String validateAndGetDataSourceForIngest(final SqlInsert insert) 
throws ValidationException
   {
     if (insert.isUpsert()) {
       throw new ValidationException("UPSERT is not supported.");
     }
 
     if (insert.getTargetColumnList() != null) {
-      throw new ValidationException("INSERT with target column list is not 
supported.");
+      throw new ValidationException("Ingestion with target column list is not 
supported.");
     }
 
     final SqlIdentifier tableIdentifier = (SqlIdentifier) 
insert.getTargetTable();
@@ -692,7 +702,7 @@ public class DruidPlanner implements Closeable
 
     if (tableIdentifier.names.isEmpty()) {
       // I don't think this can happen, but include a branch for it just in 
case.
-      throw new ValidationException("INSERT requires target table.");
+      throw new ValidationException("Ingestion requires target table.");
     } else if (tableIdentifier.names.size() == 1) {
       // Unqualified name.
       dataSource = Iterables.getOnlyElement(tableIdentifier.names);
@@ -705,13 +715,13 @@ public class DruidPlanner implements Closeable
         dataSource = tableIdentifier.names.get(1);
       } else {
         throw new ValidationException(
-            StringUtils.format("Cannot INSERT into [%s] because it is not a 
Druid datasource.", tableIdentifier)
+            StringUtils.format("Cannot ingest into [%s] because it is not a 
Druid datasource.", tableIdentifier)
         );
       }
     }
 
     try {
-      IdUtils.validateId("INSERT dataSource", dataSource);
+      IdUtils.validateId("Ingestion dataSource", dataSource);
     }
     catch (IllegalArgumentException e) {
       throw new ValidationException(e.getMessage());
@@ -777,84 +787,129 @@ public class DruidPlanner implements Closeable
     private final SqlExplain explain;
 
     @Nullable
-    private final DruidSqlInsert insert;
+    private final SqlInsert insertOrReplace;
 
     private final SqlNode query;
 
     @Nullable
     private final Granularity ingestionGranularity;
 
+    @Nullable
+    private final List<String> replaceIntervals;
+
     private ParsedNodes(
         @Nullable SqlExplain explain,
-        @Nullable DruidSqlInsert insert,
+        @Nullable SqlInsert insertOrReplace,
         SqlNode query,
-        @Nullable Granularity ingestionGranularity
+        @Nullable Granularity ingestionGranularity,
+        @Nullable List<String> replaceIntervals
     )
     {
       this.explain = explain;
-      this.insert = insert;
+      this.insertOrReplace = insertOrReplace;
       this.query = query;
       this.ingestionGranularity = ingestionGranularity;
+      this.replaceIntervals = replaceIntervals;
     }
 
-    static ParsedNodes create(final SqlNode node) throws ValidationException
+    static ParsedNodes create(final SqlNode node, DateTimeZone dateTimeZone) 
throws ValidationException
     {
-      SqlExplain explain = null;
-      DruidSqlInsert druidSqlInsert = null;
       SqlNode query = node;
-      Granularity ingestionGranularity = null;
-
+      SqlExplain explain = null;
       if (query.getKind() == SqlKind.EXPLAIN) {
         explain = (SqlExplain) query;
         query = explain.getExplicandum();
       }
 
       if (query.getKind() == SqlKind.INSERT) {
-        druidSqlInsert = (DruidSqlInsert) query;
-        query = druidSqlInsert.getSource();
+        if (query instanceof DruidSqlInsert) {
+          return handleInsert(explain, (DruidSqlInsert) query);
+        } else if (query instanceof DruidSqlReplace) {
+          return handleReplace(explain, (DruidSqlReplace) query, dateTimeZone);
+        }
+      }
+
+      if (!query.isA(SqlKind.QUERY)) {
+        throw new ValidationException(StringUtils.format("Cannot execute 
[%s].", query.getKind()));
+      }
+
+      return new ParsedNodes(explain, null, query, null, null);
+    }
+
+    static ParsedNodes handleInsert(SqlExplain explain, DruidSqlInsert 
druidSqlInsert) throws ValidationException
+    {
+      SqlNode query = druidSqlInsert.getSource();
+
+      // Check if ORDER BY clause is not provided to the underlying query
+      if (query instanceof SqlOrderBy) {
+        SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
+        SqlNodeList orderByList = sqlOrderBy.orderList;
+        if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) {
+          throw new ValidationException("Cannot have ORDER BY on an INSERT 
query, use CLUSTERED BY instead.");
+        }
+      }
+
+      Granularity ingestionGranularity = druidSqlInsert.getPartitionedBy();
+
+      if (druidSqlInsert.getClusteredBy() != null) {
+        // If we have a CLUSTERED BY clause, extract the information in that 
CLUSTERED BY and create a new
+        // SqlOrderBy node
+        SqlNode offset = null;
+        SqlNode fetch = null;
 
-        // Check if ORDER BY clause is not provided to the underlying query
         if (query instanceof SqlOrderBy) {
           SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
-          SqlNodeList orderByList = sqlOrderBy.orderList;
-          if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) 
{
-            throw new ValidationException("Cannot have ORDER BY on an INSERT 
query, use CLUSTERED BY instead.");
-          }
+          // This represents the underlying query free of OFFSET, FETCH and 
ORDER BY clauses
+          // For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo 
OFFSET 10 FETCH 30 ORDER BY dim1 GROUP
+          // BY dim1 this would represent the "SELECT dim1, sum(dim2) from foo 
GROUP BY dim1
+          query = sqlOrderBy.query;
+          offset = sqlOrderBy.offset;
+          fetch = sqlOrderBy.fetch;
         }
+        // Creates a new SqlOrderBy query, which may have our CLUSTERED BY 
overwritten
+        query = new SqlOrderBy(
+            query.getParserPosition(),
+            query,
+            druidSqlInsert.getClusteredBy(),
+            offset,
+            fetch
+        );
+      }
 
-        ingestionGranularity = druidSqlInsert.getPartitionedBy();
-
-        if (druidSqlInsert.getClusteredBy() != null) {
-          // If we have a CLUSTERED BY clause, extract the information in that 
CLUSTERED BY and create a new SqlOrderBy
-          // node
-          SqlNode offset = null;
-          SqlNode fetch = null;
-
-          if (query instanceof SqlOrderBy) {
-            SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
-            // This represents the underlying query free of OFFSET, FETCH and 
ORDER BY clauses
-            // For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo 
OFFSET 10 FETCH 30 ORDER BY dim1 GROUP BY dim1
-            // this would represent the "SELECT dim1, sum(dim2) from foo GROUP 
BY dim1
-            query = sqlOrderBy.query;
-            offset = sqlOrderBy.offset;
-            fetch = sqlOrderBy.fetch;
-          }
-          // Creates a new SqlOrderBy query, which may have our CLUSTERED BY 
overwritten
-          query = new SqlOrderBy(
-              query.getParserPosition(),
-              query,
-              druidSqlInsert.getClusteredBy(),
-              offset,
-              fetch
-          );
+      if (!query.isA(SqlKind.QUERY)) {
+        throw new ValidationException(StringUtils.format("Cannot execute 
[%s].", query.getKind()));
+      }
+
+      return new ParsedNodes(explain, druidSqlInsert, query, 
ingestionGranularity, null);
+    }
+
+    static ParsedNodes handleReplace(SqlExplain explain, DruidSqlReplace 
druidSqlReplace, DateTimeZone dateTimeZone)
+        throws ValidationException
+    {
+      SqlNode query = druidSqlReplace.getSource();
+
+      // Check if ORDER BY clause is not provided to the underlying query
+      if (query instanceof SqlOrderBy) {
+        SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
+        SqlNodeList orderByList = sqlOrderBy.orderList;
+        if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) {
+          throw new ValidationException("Cannot have ORDER BY on a REPLACE 
query.");
         }
       }
 
+      SqlNode replaceTimeQuery = druidSqlReplace.getReplaceTimeQuery();
+      if (replaceTimeQuery == null) {
+        throw new ValidationException("Missing time chunk information in 
DELETE WHERE clause for replace.");
+      }
+
+      Granularity ingestionGranularity = druidSqlReplace.getPartitionedBy();
+      List<String> replaceIntervals = 
DruidSqlParserUtils.validateQueryAndConvertToIntervals(replaceTimeQuery, 
ingestionGranularity, dateTimeZone);
+
       if (!query.isA(SqlKind.QUERY)) {
         throw new ValidationException(StringUtils.format("Cannot execute 
[%s].", query.getKind()));
       }
 
-      return new ParsedNodes(explain, druidSqlInsert, query, 
ingestionGranularity);
+      return new ParsedNodes(explain, druidSqlReplace, query, 
ingestionGranularity, replaceIntervals);
     }
 
     @Nullable
@@ -864,9 +919,15 @@ public class DruidPlanner implements Closeable
     }
 
     @Nullable
-    public DruidSqlInsert getInsertNode()
+    public SqlInsert getInsertOrReplace()
+    {
+      return insertOrReplace;
+    }
+
+    @Nullable
+    public List<String> getReplaceIntervals()
     {
-      return insert;
+      return replaceIntervals;
     }
 
     public SqlNode getQueryNode()
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteIngestionDmlTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteIngestionDmlTest.java
new file mode 100644
index 0000000000..7040e7fc53
--- /dev/null
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteIngestionDmlTest.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContext;
+import 
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.sql.SqlLifecycle;
+import org.apache.druid.sql.SqlLifecycleFactory;
+import org.apache.druid.sql.calcite.external.ExternalDataSource;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.Matcher;
+import org.hamcrest.MatcherAssert;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class CalciteIngestionDmlTest extends BaseCalciteQueryTest
+{
+  protected static final Map<String, Object> DEFAULT_CONTEXT =
+      ImmutableMap.<String, Object>builder()
+                  .put(PlannerContext.CTX_SQL_QUERY_ID, DUMMY_SQL_ID)
+                  .build();
+
+  protected static final RowSignature FOO_TABLE_SIGNATURE =
+      RowSignature.builder()
+                  .addTimeColumn()
+                  .add("cnt", ColumnType.LONG)
+                  .add("dim1", ColumnType.STRING)
+                  .add("dim2", ColumnType.STRING)
+                  .add("dim3", ColumnType.STRING)
+                  .add("m1", ColumnType.FLOAT)
+                  .add("m2", ColumnType.DOUBLE)
+                  .add("unique_dim1", HyperUniquesAggregatorFactory.TYPE)
+                  .build();
+
+  protected final ExternalDataSource externalDataSource = new 
ExternalDataSource(
+      new InlineInputSource("a,b,1\nc,d,2\n"),
+      new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 
0),
+      RowSignature.builder()
+                  .add("x", ColumnType.STRING)
+                  .add("y", ColumnType.STRING)
+                  .add("z", ColumnType.LONG)
+                  .build()
+  );
+
+  protected boolean didTest = false;
+
+  @After
+  @Override
+  public void tearDown() throws Exception
+  {
+    super.tearDown();
+
+    // Catch situations where tests forgot to call "verify" on their tester.
+    if (!didTest) {
+      throw new ISE("Test was not run; did you call verify() on a tester?");
+    }
+  }
+
+  protected String externSql(final ExternalDataSource externalDataSource)
+  {
+    try {
+      return StringUtils.format(
+          "TABLE(extern(%s, %s, %s))",
+          
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getInputSource())),
+          
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getInputFormat())),
+          
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getSignature()))
+      );
+    }
+    catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected Map<String, Object> queryContextWithGranularity(Granularity 
granularity)
+  {
+    String granularityString = null;
+    try {
+      granularityString = queryJsonMapper.writeValueAsString(granularity);
+    }
+    catch (JsonProcessingException e) {
+      Assert.fail(e.getMessage());
+    }
+    return ImmutableMap.of(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, 
granularityString);
+  }
+
+  protected IngestionDmlTester testIngestionQuery()
+  {
+    return new IngestionDmlTester();
+  }
+
+  public class IngestionDmlTester
+  {
+    private String sql;
+    private PlannerConfig plannerConfig = new PlannerConfig();
+    private Map<String, Object> queryContext = DEFAULT_CONTEXT;
+    private AuthenticationResult authenticationResult = 
CalciteTests.REGULAR_USER_AUTH_RESULT;
+    private String expectedTargetDataSource;
+    private RowSignature expectedTargetSignature;
+    private List<ResourceAction> expectedResources;
+    private Query expectedQuery;
+    private Matcher<Throwable> validationErrorMatcher;
+
+    private IngestionDmlTester()
+    {
+      // Nothing to do.
+    }
+
+    public IngestionDmlTester sql(final String sql)
+    {
+      this.sql = sql;
+      return this;
+    }
+
+    protected IngestionDmlTester sql(final String sqlPattern, final Object 
arg, final Object... otherArgs)
+    {
+      final Object[] args = new Object[otherArgs.length + 1];
+      args[0] = arg;
+      System.arraycopy(otherArgs, 0, args, 1, otherArgs.length);
+      this.sql = StringUtils.format(sqlPattern, args);
+      return this;
+    }
+
+    public IngestionDmlTester context(final Map<String, Object> context)
+    {
+      this.queryContext = context;
+      return this;
+    }
+
+    public IngestionDmlTester authentication(final AuthenticationResult 
authenticationResult)
+    {
+      this.authenticationResult = authenticationResult;
+      return this;
+    }
+
+    public IngestionDmlTester expectTarget(
+        final String expectedTargetDataSource,
+        final RowSignature expectedTargetSignature
+    )
+    {
+      this.expectedTargetDataSource = 
Preconditions.checkNotNull(expectedTargetDataSource, 
"expectedTargetDataSource");
+      this.expectedTargetSignature = 
Preconditions.checkNotNull(expectedTargetSignature, "expectedTargetSignature");
+      return this;
+    }
+
+    public IngestionDmlTester expectResources(final ResourceAction... 
expectedResources)
+    {
+      this.expectedResources = Arrays.asList(expectedResources);
+      return this;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public IngestionDmlTester expectQuery(final Query expectedQuery)
+    {
+      this.expectedQuery = expectedQuery;
+      return this;
+    }
+
+    public IngestionDmlTester expectValidationError(Matcher<Throwable> 
validationErrorMatcher)
+    {
+      this.validationErrorMatcher = validationErrorMatcher;
+      return this;
+    }
+
+    public IngestionDmlTester expectValidationError(Class<? extends Throwable> 
clazz)
+    {
+      return expectValidationError(CoreMatchers.instanceOf(clazz));
+    }
+
+    public IngestionDmlTester expectValidationError(Class<? extends Throwable> 
clazz, String message)
+    {
+      return expectValidationError(
+          CoreMatchers.allOf(
+              CoreMatchers.instanceOf(clazz),
+              ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(message))
+          )
+      );
+    }
+
+    public void verify()
+    {
+      if (didTest) {
+        // It's good form to only do one test per method.
+        // This also helps us ensure that "verify" actually does get called.
+        throw new ISE("Use one @Test method per tester");
+      }
+
+      didTest = true;
+
+      if (sql == null) {
+        throw new ISE("Test must have SQL statement");
+      }
+
+      try {
+        log.info("SQL: %s", sql);
+        queryLogHook.clearRecordedQueries();
+
+        if (validationErrorMatcher != null) {
+          verifyValidationError();
+        } else {
+          verifySuccess();
+        }
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    private void verifyValidationError()
+    {
+      if (expectedTargetDataSource != null) {
+        throw new ISE("Test must not have expectedTargetDataSource");
+      }
+
+      if (expectedResources != null) {
+        throw new ISE("Test must not have expectedResources");
+      }
+
+      if (expectedQuery != null) {
+        throw new ISE("Test must not have expectedQuery");
+      }
+
+      final SqlLifecycleFactory sqlLifecycleFactory = getSqlLifecycleFactory(
+          plannerConfig,
+          new AuthConfig(),
+          createOperatorTable(),
+          createMacroTable(),
+          CalciteTests.TEST_AUTHORIZER_MAPPER,
+          queryJsonMapper
+      );
+
+      final SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
+      sqlLifecycle.initialize(sql, new QueryContext(queryContext));
+
+      final Throwable e = Assert.assertThrows(
+          Throwable.class,
+          () -> {
+            sqlLifecycle.validateAndAuthorize(authenticationResult);
+            sqlLifecycle.plan();
+          }
+      );
+
+      MatcherAssert.assertThat(e, validationErrorMatcher);
+      Assert.assertTrue(queryLogHook.getRecordedQueries().isEmpty());
+    }
+
+    private void verifySuccess() throws Exception
+    {
+      if (expectedTargetDataSource == null) {
+        throw new ISE("Test must have expectedTargetDataSource");
+      }
+
+      if (expectedResources == null) {
+        throw new ISE("Test must have expectedResources");
+      }
+
+      final List<Query> expectedQueries =
+          expectedQuery == null
+          ? Collections.emptyList()
+          : 
Collections.singletonList(recursivelyOverrideContext(expectedQuery, 
queryContext));
+
+      Assert.assertEquals(
+          ImmutableSet.copyOf(expectedResources),
+          analyzeResources(plannerConfig, new AuthConfig(), sql, queryContext, 
authenticationResult)
+      );
+
+      final List<Object[]> results =
+          getResults(plannerConfig, queryContext, Collections.emptyList(), 
sql, authenticationResult);
+
+      verifyResults(
+          sql,
+          expectedQueries,
+          Collections.singletonList(new Object[]{expectedTargetDataSource, 
expectedTargetSignature}),
+          results
+      );
+    }
+  }
+
+  protected static ResourceAction viewRead(final String viewName)
+  {
+    return new ResourceAction(new Resource(viewName, ResourceType.VIEW), 
Action.READ);
+  }
+
+  protected static ResourceAction dataSourceRead(final String dataSource)
+  {
+    return new ResourceAction(new Resource(dataSource, 
ResourceType.DATASOURCE), Action.READ);
+  }
+
+  protected static ResourceAction dataSourceWrite(final String dataSource)
+  {
+    return new ResourceAction(new Resource(dataSource, 
ResourceType.DATASOURCE), Action.WRITE);
+  }
+}
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
index caa1ef8486..87c389a722 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
@@ -20,110 +20,44 @@
 package org.apache.druid.sql.calcite;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import org.apache.druid.data.input.impl.CsvInputFormat;
-import org.apache.druid.data.input.impl.InlineInputSource;
-import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
-import org.apache.druid.query.Query;
-import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
-import 
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.scan.ScanQuery;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.AuthConfig;
-import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.ForbiddenException;
-import org.apache.druid.server.security.Resource;
-import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.server.security.ResourceType;
-import org.apache.druid.sql.SqlLifecycle;
-import org.apache.druid.sql.SqlLifecycleFactory;
 import org.apache.druid.sql.SqlPlanningException;
-import org.apache.druid.sql.calcite.external.ExternalDataSource;
 import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
 import org.apache.druid.sql.calcite.filtration.Filtration;
 import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
-import org.apache.druid.sql.calcite.planner.Calcites;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
-import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.hamcrest.CoreMatchers;
-import org.hamcrest.Matcher;
-import org.hamcrest.MatcherAssert;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.internal.matchers.ThrowableMessageMatcher;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 
-public class CalciteInsertDmlTest extends BaseCalciteQueryTest
+public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
 {
-  private static final Map<String, Object> DEFAULT_CONTEXT =
-      ImmutableMap.<String, Object>builder()
-                  .put(PlannerContext.CTX_SQL_QUERY_ID, DUMMY_SQL_ID)
-                  .build();
-
-  private static final RowSignature FOO_TABLE_SIGNATURE =
-      RowSignature.builder()
-                  .addTimeColumn()
-                  .add("cnt", ColumnType.LONG)
-                  .add("dim1", ColumnType.STRING)
-                  .add("dim2", ColumnType.STRING)
-                  .add("dim3", ColumnType.STRING)
-                  .add("m1", ColumnType.FLOAT)
-                  .add("m2", ColumnType.DOUBLE)
-                  .add("unique_dim1", HyperUniquesAggregatorFactory.TYPE)
-                  .build();
-
-  private final ExternalDataSource externalDataSource = new ExternalDataSource(
-      new InlineInputSource("a,b,1\nc,d,2\n"),
-      new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 
0),
-      RowSignature.builder()
-                  .add("x", ColumnType.STRING)
-                  .add("y", ColumnType.STRING)
-                  .add("z", ColumnType.LONG)
-                  .build()
-  );
-
   private static final Map<String, Object> 
PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT = ImmutableMap.of(
       DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
       "{\"type\":\"all\"}"
   );
 
-  private boolean didTest = false;
-
-  @After
-  @Override
-  public void tearDown() throws Exception
-  {
-    super.tearDown();
-
-    // Catch situations where tests forgot to call "verify" on their tester.
-    if (!didTest) {
-      throw new ISE("Test was not run; did you call verify() on a tester?");
-    }
-  }
-
   @Test
   public void testInsertFromTable()
   {
-    testInsertQuery()
+    testIngestionQuery()
         .sql("INSERT INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME")
         .expectTarget("dst", FOO_TABLE_SIGNATURE)
         .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
@@ -141,7 +75,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
   @Test
   public void testInsertFromView()
   {
-    testInsertQuery()
+    testIngestionQuery()
         .sql("INSERT INTO dst SELECT * FROM view.aview PARTITIONED BY ALL 
TIME")
         .expectTarget("dst", RowSignature.builder().add("dim1_firstchar", 
ColumnType.STRING).build())
         .expectResources(viewRead("aview"), dataSourceWrite("dst"))
@@ -161,7 +95,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
   @Test
   public void testInsertIntoExistingTable()
   {
-    testInsertQuery()
+    testIngestionQuery()
         .sql("INSERT INTO foo SELECT * FROM foo PARTITIONED BY ALL TIME")
         .expectTarget("foo", FOO_TABLE_SIGNATURE)
         .expectResources(dataSourceRead("foo"), dataSourceWrite("foo"))
@@ -179,7 +113,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
   @Test
   public void testInsertIntoQualifiedTable()
   {
-    testInsertQuery()
+    testIngestionQuery()
         .sql("INSERT INTO druid.dst SELECT * FROM foo PARTITIONED BY ALL TIME")
         .expectTarget("dst", FOO_TABLE_SIGNATURE)
         .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
@@ -197,25 +131,25 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
   @Test
   public void testInsertIntoInvalidDataSourceName()
   {
-    testInsertQuery()
+    testIngestionQuery()
         .sql("INSERT INTO \"in/valid\" SELECT dim1, dim2 FROM foo PARTITIONED 
BY ALL TIME")
-        .expectValidationError(SqlPlanningException.class, "INSERT dataSource 
cannot contain the '/' character.")
+        .expectValidationError(SqlPlanningException.class, "Ingestion 
dataSource cannot contain the '/' character.")
         .verify();
   }
 
   @Test
   public void testInsertUsingColumnList()
   {
-    testInsertQuery()
+    testIngestionQuery()
         .sql("INSERT INTO dst (foo, bar) SELECT dim1, dim2 FROM foo 
PARTITIONED BY ALL TIME")
-        .expectValidationError(SqlPlanningException.class, "INSERT with target 
column list is not supported.")
+        .expectValidationError(SqlPlanningException.class, "Ingestion with 
target column list is not supported.")
         .verify();
   }
 
   @Test
   public void testUpsert()
   {
-    testInsertQuery()
+    testIngestionQuery()
         .sql("UPSERT INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME")
         .expectValidationError(SqlPlanningException.class, "UPSERT is not 
supported.")
         .verify();
@@ -224,11 +158,11 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
   @Test
   public void testInsertIntoSystemTable()
   {
-    testInsertQuery()
+    testIngestionQuery()
         .sql("INSERT INTO INFORMATION_SCHEMA.COLUMNS SELECT * FROM foo 
PARTITIONED BY ALL TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            "Cannot INSERT into [INFORMATION_SCHEMA.COLUMNS] because it is not 
a Druid datasource."
+            "Cannot ingest into [INFORMATION_SCHEMA.COLUMNS] because it is not 
a Druid datasource."
         )
         .verify();
   }
@@ -236,11 +170,11 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
   @Test
   public void testInsertIntoView()
   {
-    testInsertQuery()
+    testIngestionQuery()
         .sql("INSERT INTO view.aview SELECT * FROM foo PARTITIONED BY ALL 
TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            "Cannot INSERT into [view.aview] because it is not a Druid 
datasource."
+            "Cannot ingest into [view.aview] because it is not a Druid 
datasource."
         )
         .verify();
   }
@@ -248,7 +182,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
   @Test
   public void testInsertFromUnauthorizedDataSource()
   {
-    testInsertQuery()
+    testIngestionQuery()
         .sql("INSERT INTO dst SELECT * FROM \"%s\" PARTITIONED BY ALL TIME", 
CalciteTests.FORBIDDEN_DATASOURCE)
         .expectValidationError(ForbiddenException.class)
         .verify();
@@ -257,7 +191,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
   @Test
   public void testInsertIntoUnauthorizedDataSource()
   {
-    testInsertQuery()
+    testIngestionQuery()
         .sql("INSERT INTO \"%s\" SELECT * FROM foo PARTITIONED BY ALL TIME", 
CalciteTests.FORBIDDEN_DATASOURCE)
         .expectValidationError(ForbiddenException.class)
         .verify();
@@ -266,11 +200,11 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
   @Test
   public void testInsertIntoNonexistentSchema()
   {
-    testInsertQuery()
+    testIngestionQuery()
         .sql("INSERT INTO nonexistent.dst SELECT * FROM foo PARTITIONED BY ALL 
TIME")
         .expectValidationError(
             SqlPlanningException.class,
-            "Cannot INSERT into [nonexistent.dst] because it is not a Druid 
datasource."
+            "Cannot ingest into [nonexistent.dst] because it is not a Druid 
datasource."
         )
         .verify();
   }
@@ -278,7 +212,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
   @Test
   public void testInsertFromExternal()
   {
-    testInsertQuery()
+    testIngestionQuery()
         .sql("INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME", 
externSql(externalDataSource))
         .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
         .expectTarget("dst", externalDataSource.getSignature())
@@ -304,7 +238,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
                                                   .add("dim1", 
ColumnType.STRING)
                                                   .build();
 
-    testInsertQuery()
+    testIngestionQuery()
         .sql(
             "INSERT INTO druid.dst SELECT __time, FLOOR(m1) as floor_m1, dim1 
FROM foo PARTITIONED BY TIME_FLOOR(__time, 'PT1H')")
         .expectTarget("dst", targetRowSignature)
@@ -353,7 +287,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
         Assert.fail(e.getMessage());
       }
 
-      testInsertQuery()
+      testIngestionQuery()
           .sql(StringUtils.format(
               "INSERT INTO druid.dst SELECT __time, dim1 FROM foo PARTITIONED 
BY %s",
               partitionedByArgument
@@ -384,7 +318,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
                                                   .add("dim1", 
ColumnType.STRING)
                                                   .add("ceil_m2", 
ColumnType.DOUBLE)
                                                   .build();
-    testInsertQuery()
+    testIngestionQuery()
         .sql(
             "INSERT INTO druid.dst "
             + "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2 
FROM foo "
@@ -424,7 +358,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
                                                   .add("dim1", 
ColumnType.STRING)
                                                   .build();
 
-    testInsertQuery()
+    testIngestionQuery()
         .sql(
             "INSERT INTO druid.dst SELECT __time, FLOOR(m1) as floor_m1, dim1 
FROM foo PARTITIONED BY DAY CLUSTERED BY 2, dim1")
         .expectTarget("dst", targetRowSignature)
@@ -456,7 +390,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
                                                   .add("dim1", 
ColumnType.STRING)
                                                   .build();
 
-    testInsertQuery()
+    testIngestionQuery()
         .sql(
             "INSERT INTO druid.dst SELECT __time, FLOOR(m1) as floor_m1, dim1 
FROM foo LIMIT 10 OFFSET 20 PARTITIONED BY DAY")
         .expectTarget("dst", targetRowSignature)
@@ -583,7 +517,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
         + queryJsonMapper.writeValueAsString(expectedQuery)
         + "], signature=[{x:STRING, y:STRING, z:LONG}])\n";
 
-    // Use testQuery for EXPLAIN (not testInsertQuery).
+    // Use testQuery for EXPLAIN (not testIngestionQuery).
     testQuery(
         new PlannerConfig(),
         StringUtils.format(
@@ -600,14 +534,14 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
         )
     );
 
-    // Not using testInsertQuery, so must set didTest manually to satisfy the 
check in tearDown.
+    // Not using testIngestionQuery, so must set didTest manually to satisfy 
the check in tearDown.
     didTest = true;
   }
 
   @Test
   public void testExplainInsertFromExternalUnauthorized()
   {
-    // Use testQuery for EXPLAIN (not testInsertQuery).
+    // Use testQuery for EXPLAIN (not testIngestionQuery).
     Assert.assertThrows(
         ForbiddenException.class,
         () ->
@@ -621,14 +555,14 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
             )
     );
 
-    // Not using testInsertQuery, so must set didTest manually to satisfy the 
check in tearDown.
+    // Not using testIngestionQuery, so must set didTest manually to satisfy 
the check in tearDown.
     didTest = true;
   }
 
   @Test
   public void testInsertFromExternalUnauthorized()
   {
-    testInsertQuery()
+    testIngestionQuery()
         .sql("INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME", 
externSql(externalDataSource))
         .expectValidationError(ForbiddenException.class)
         .verify();
@@ -639,7 +573,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
   {
     // INSERT with a particular column ordering.
 
-    testInsertQuery()
+    testIngestionQuery()
         .sql(
             "INSERT INTO dst SELECT x || y AS xy, z FROM %s PARTITIONED BY ALL 
TIME CLUSTERED BY 1, 2",
             externSql(externalDataSource)
@@ -670,7 +604,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
   {
     // INSERT with rollup.
 
-    testInsertQuery()
+    testIngestionQuery()
         .sql(
             "INSERT INTO dst SELECT x, SUM(z) AS sum_z, COUNT(*) AS cnt FROM 
%s GROUP BY 1 PARTITIONED BY ALL TIME",
             externSql(externalDataSource)
@@ -706,7 +640,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
   {
     // INSERT with rollup into a single row (no GROUP BY exprs).
 
-    testInsertQuery()
+    testIngestionQuery()
         .sql(
             "INSERT INTO dst SELECT COUNT(*) AS cnt FROM %s PARTITIONED BY ALL 
TIME",
             externSql(externalDataSource)
@@ -734,7 +668,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
   @Test
   public void testInsertWithInvalidSelectStatement()
   {
-    testInsertQuery()
+    testIngestionQuery()
         .sql("INSERT INTO t SELECT channel, added as count FROM foo 
PARTITIONED BY ALL") // count is a keyword
         .expectValidationError(
             CoreMatchers.allOf(
@@ -748,7 +682,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
   @Test
   public void testInsertWithUnnamedColumnInSelectStatement()
   {
-    testInsertQuery()
+    testIngestionQuery()
         .sql("INSERT INTO t SELECT dim1, dim2 || '-lol' FROM foo PARTITIONED 
BY ALL")
         .expectValidationError(
             SqlPlanningException.class,
@@ -763,7 +697,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
   @Test
   public void testInsertWithInvalidColumnNameInIngest()
   {
-    testInsertQuery()
+    testIngestionQuery()
         .sql("INSERT INTO t SELECT __time, dim1 AS EXPR$0 FROM foo PARTITIONED 
BY ALL")
         .expectValidationError(
             SqlPlanningException.class,
@@ -778,7 +712,7 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
   @Test
   public void testInsertWithUnnamedColumnInNestedSelectStatement()
   {
-    testInsertQuery()
+    testIngestionQuery()
         .sql("INSERT INTO test "
              + "SELECT __time, * FROM "
              + "(SELECT __time, LOWER(dim1) FROM foo) PARTITIONED BY ALL TIME")
@@ -791,238 +725,4 @@ public class CalciteInsertDmlTest extends 
BaseCalciteQueryTest
         )
         .verify();
   }
-
-  private String externSql(final ExternalDataSource externalDataSource)
-  {
-    try {
-      return StringUtils.format(
-          "TABLE(extern(%s, %s, %s))",
-          
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getInputSource())),
-          
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getInputFormat())),
-          
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getSignature()))
-      );
-    }
-    catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private Map<String, Object> queryContextWithGranularity(Granularity 
granularity)
-  {
-    String granularityString = null;
-    try {
-      granularityString = queryJsonMapper.writeValueAsString(granularity);
-    }
-    catch (JsonProcessingException e) {
-      Assert.fail(e.getMessage());
-    }
-    return ImmutableMap.of(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, 
granularityString);
-  }
-
-  private InsertDmlTester testInsertQuery()
-  {
-    return new InsertDmlTester();
-  }
-
-  public class InsertDmlTester
-  {
-    private String sql;
-    private PlannerConfig plannerConfig = new PlannerConfig();
-    private Map<String, Object> queryContext = DEFAULT_CONTEXT;
-    private AuthenticationResult authenticationResult = 
CalciteTests.REGULAR_USER_AUTH_RESULT;
-    private String expectedTargetDataSource;
-    private RowSignature expectedTargetSignature;
-    private List<ResourceAction> expectedResources;
-    private Query expectedQuery;
-    private Matcher<Throwable> validationErrorMatcher;
-
-    private InsertDmlTester()
-    {
-      // Nothing to do.
-    }
-
-    public InsertDmlTester sql(final String sql)
-    {
-      this.sql = sql;
-      return this;
-    }
-
-    private InsertDmlTester sql(final String sqlPattern, final Object arg, 
final Object... otherArgs)
-    {
-      final Object[] args = new Object[otherArgs.length + 1];
-      args[0] = arg;
-      System.arraycopy(otherArgs, 0, args, 1, otherArgs.length);
-      this.sql = StringUtils.format(sqlPattern, args);
-      return this;
-    }
-
-    public InsertDmlTester context(final Map<String, Object> context)
-    {
-      this.queryContext = context;
-      return this;
-    }
-
-    public InsertDmlTester authentication(final AuthenticationResult 
authenticationResult)
-    {
-      this.authenticationResult = authenticationResult;
-      return this;
-    }
-
-    public InsertDmlTester expectTarget(
-        final String expectedTargetDataSource,
-        final RowSignature expectedTargetSignature
-    )
-    {
-      this.expectedTargetDataSource = 
Preconditions.checkNotNull(expectedTargetDataSource, 
"expectedTargetDataSource");
-      this.expectedTargetSignature = 
Preconditions.checkNotNull(expectedTargetSignature, "expectedTargetSignature");
-      return this;
-    }
-
-    public InsertDmlTester expectResources(final ResourceAction... 
expectedResources)
-    {
-      this.expectedResources = Arrays.asList(expectedResources);
-      return this;
-    }
-
-    @SuppressWarnings("rawtypes")
-    public InsertDmlTester expectQuery(final Query expectedQuery)
-    {
-      this.expectedQuery = expectedQuery;
-      return this;
-    }
-
-    public InsertDmlTester expectValidationError(Matcher<Throwable> 
validationErrorMatcher)
-    {
-      this.validationErrorMatcher = validationErrorMatcher;
-      return this;
-    }
-
-    public InsertDmlTester expectValidationError(Class<? extends Throwable> 
clazz)
-    {
-      return expectValidationError(CoreMatchers.instanceOf(clazz));
-    }
-
-    public InsertDmlTester expectValidationError(Class<? extends Throwable> 
clazz, String message)
-    {
-      return expectValidationError(
-          CoreMatchers.allOf(
-              CoreMatchers.instanceOf(clazz),
-              ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(message))
-          )
-      );
-    }
-
-    public void verify()
-    {
-      if (didTest) {
-        // It's good form to only do one test per method.
-        // This also helps us ensure that "verify" actually does get called.
-        throw new ISE("Use one @Test method per tester");
-      }
-
-      didTest = true;
-
-      if (sql == null) {
-        throw new ISE("Test must have SQL statement");
-      }
-
-      try {
-        log.info("SQL: %s", sql);
-        queryLogHook.clearRecordedQueries();
-
-        if (validationErrorMatcher != null) {
-          verifyValidationError();
-        } else {
-          verifySuccess();
-        }
-      }
-      catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    private void verifyValidationError()
-    {
-      if (expectedTargetDataSource != null) {
-        throw new ISE("Test must not have expectedTargetDataSource");
-      }
-
-      if (expectedResources != null) {
-        throw new ISE("Test must not have expectedResources");
-      }
-
-      if (expectedQuery != null) {
-        throw new ISE("Test must not have expectedQuery");
-      }
-
-      final SqlLifecycleFactory sqlLifecycleFactory = getSqlLifecycleFactory(
-          plannerConfig,
-          new AuthConfig(),
-          createOperatorTable(),
-          createMacroTable(),
-          CalciteTests.TEST_AUTHORIZER_MAPPER,
-          queryJsonMapper
-      );
-
-      final SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
-      sqlLifecycle.initialize(sql, new QueryContext(queryContext));
-
-      final Throwable e = Assert.assertThrows(
-          Throwable.class,
-          () -> {
-            sqlLifecycle.validateAndAuthorize(authenticationResult);
-            sqlLifecycle.plan();
-          }
-      );
-
-      MatcherAssert.assertThat(e, validationErrorMatcher);
-      Assert.assertTrue(queryLogHook.getRecordedQueries().isEmpty());
-    }
-
-    private void verifySuccess() throws Exception
-    {
-      if (expectedTargetDataSource == null) {
-        throw new ISE("Test must have expectedTargetDataSource");
-      }
-
-      if (expectedResources == null) {
-        throw new ISE("Test must have expectedResources");
-      }
-
-      final List<Query> expectedQueries =
-          expectedQuery == null
-          ? Collections.emptyList()
-          : 
Collections.singletonList(recursivelyOverrideContext(expectedQuery, 
queryContext));
-
-      Assert.assertEquals(
-          ImmutableSet.copyOf(expectedResources),
-          analyzeResources(plannerConfig, sql, authenticationResult)
-      );
-
-      final List<Object[]> results =
-          getResults(plannerConfig, queryContext, Collections.emptyList(), 
sql, authenticationResult);
-
-      verifyResults(
-          sql,
-          expectedQueries,
-          Collections.singletonList(new Object[]{expectedTargetDataSource, 
expectedTargetSignature}),
-          results
-      );
-    }
-  }
-
-  private static ResourceAction viewRead(final String viewName)
-  {
-    return new ResourceAction(new Resource(viewName, ResourceType.VIEW), 
Action.READ);
-  }
-
-  private static ResourceAction dataSourceRead(final String dataSource)
-  {
-    return new ResourceAction(new Resource(dataSource, 
ResourceType.DATASOURCE), Action.READ);
-  }
-
-  private static ResourceAction dataSourceWrite(final String dataSource)
-  {
-    return new ResourceAction(new Resource(dataSource, 
ResourceType.DATASOURCE), Action.WRITE);
-  }
 }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
new file mode 100644
index 0000000000..04354d126f
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
@@ -0,0 +1,656 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.sql.SqlPlanningException;
+import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
+import org.apache.druid.sql.calcite.filtration.Filtration;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
+{
+  private static final Map<String, Object> REPLACE_ALL_TIME_CHUNKS = 
ImmutableMap.of(
+      DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
+      "{\"type\":\"all\"}",
+      DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS,
+      "all"
+  );
+
+  protected Map<String, Object> addReplaceTimeChunkToQueryContext(Map<String, 
Object> context, String replaceTimeChunks)
+  {
+    return ImmutableMap.<String, Object>builder()
+                       .putAll(context)
+                       .put(DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS, 
replaceTimeChunks)
+                       .build();
+  }
+
+  @Test
+  public void testReplaceFromTableWithReplaceAll()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY 
ALL TIME")
+        .expectTarget("dst", FOO_TABLE_SIGNATURE)
+        .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", 
"unique_dim1")
+                .context(REPLACE_ALL_TIME_CHUNKS)
+                .build()
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceFromTableWithDeleteWhereClause()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01 
00:00:00' AND __time < TIMESTAMP '2000-01-02 00:00:00' "
+             + "SELECT * FROM foo PARTITIONED BY DAY")
+        .expectTarget("dst", FOO_TABLE_SIGNATURE)
+        .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", 
"unique_dim1")
+                .context(
+                    addReplaceTimeChunkToQueryContext(
+                        queryContextWithGranularity(Granularities.DAY),
+                        "2000-01-01T00:00:00.000Z/2000-01-02T00:00:00.000Z"
+                    )
+                )
+                .build()
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceFromTableWithTimeZoneInQueryContext()
+  {
+    HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
+    context.put(PlannerContext.CTX_SQL_TIME_ZONE, "+05:30");
+    testIngestionQuery()
+        .context(context)
+        .sql("REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01 
05:30:00' AND __time < TIMESTAMP '2000-01-02 05:30:00' "
+             + "SELECT * FROM foo PARTITIONED BY DAY")
+        .expectTarget("dst", FOO_TABLE_SIGNATURE)
+        .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", 
"unique_dim1")
+                .context(
+                    addReplaceTimeChunkToQueryContext(
+                        queryContextWithGranularity(Granularities.DAY),
+                        "2000-01-01T00:00:00.000Z/2000-01-02T00:00:00.000Z"
+                    )
+                )
+                .build()
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceFromTableWithIntervalLargerThanOneGranularity()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst OVERWRITE WHERE "
+             + "__time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP 
'2000-05-01' "
+             + "SELECT * FROM foo PARTITIONED BY MONTH")
+        .expectTarget("dst", FOO_TABLE_SIGNATURE)
+        .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", 
"unique_dim1")
+                .context(
+                    addReplaceTimeChunkToQueryContext(
+                        queryContextWithGranularity(Granularities.MONTH),
+                        "2000-01-01T00:00:00.000Z/2000-05-01T00:00:00.000Z"
+                    )
+                )
+                .build()
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceFromTableWithComplexDeleteWhereClause()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst OVERWRITE WHERE "
+             + "__time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP 
'2000-02-01' "
+             + "OR __time >= TIMESTAMP '2000-03-01' AND __time < TIMESTAMP 
'2000-04-01' "
+             + "SELECT * FROM foo PARTITIONED BY MONTH")
+        .expectTarget("dst", FOO_TABLE_SIGNATURE)
+        .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", 
"unique_dim1")
+                .context(
+                    addReplaceTimeChunkToQueryContext(
+                        queryContextWithGranularity(Granularities.MONTH),
+                        
"2000-01-01T00:00:00.000Z/2000-02-01T00:00:00.000Z,2000-03-01T00:00:00.000Z/2000-04-01T00:00:00.000Z"
+                    )
+                )
+                .build()
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceFromTableWithBetweenClause()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst OVERWRITE WHERE "
+             + "__time BETWEEN TIMESTAMP '2000-01-01' AND TIMESTAMP 
'2000-01-31 23:59:59.999' "
+             + "SELECT * FROM foo PARTITIONED BY MONTH")
+        .expectTarget("dst", FOO_TABLE_SIGNATURE)
+        .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", 
"unique_dim1")
+                .context(
+                    addReplaceTimeChunkToQueryContext(
+                        queryContextWithGranularity(Granularities.MONTH),
+                        "2000-01-01T00:00:00.000Z/2000-02-01T00:00:00.000Z"
+                    )
+                )
+                .build()
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceForUnsupportedDeleteWhereClause()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst OVERWRITE WHERE __time LIKE '20__-02-01' SELECT 
* FROM foo PARTITIONED BY MONTH")
+        .expectValidationError(
+            SqlPlanningException.class,
+            "Unsupported operation in OVERWRITE WHERE clause: LIKE"
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceForInvalidDeleteWhereClause()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst OVERWRITE WHERE TRUE SELECT * FROM foo 
PARTITIONED BY MONTH")
+        .expectValidationError(
+            SqlPlanningException.class,
+            "Invalid OVERWRITE WHERE clause"
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceForDeleteWhereClauseOnUnsupportedColumns()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst OVERWRITE WHERE dim1 > TIMESTAMP '2000-01-05 
00:00:00' SELECT * FROM foo PARTITIONED BY ALL TIME")
+        .expectValidationError(
+            SqlPlanningException.class,
+            "Only __time column is supported in OVERWRITE WHERE clause"
+        )
+        .verify();
+  }
+
+
+  @Test
+  public void testReplaceWithOrderBy()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo ORDER BY dim1 
PARTITIONED BY ALL TIME")
+        .expectValidationError(SqlPlanningException.class, "Cannot have ORDER 
BY on a REPLACE query.")
+        .verify();
+  }
+
+  @Test
+  public void testReplaceForMisalignedPartitionInterval()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP '2000-01-05 
00:00:00' AND __time <= TIMESTAMP '2000-01-06 00:00:00' SELECT * FROM foo 
PARTITIONED BY MONTH")
+        .expectValidationError(
+            SqlPlanningException.class,
+            "OVERWRITE WHERE clause contains an interval 
[2000-01-05T00:00:00.000Z/2000-01-06T00:00:00.001Z] which is not aligned with 
PARTITIONED BY granularity {type=period, period=P1M, timeZone=UTC, origin=null}"
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceForInvalidPartition()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP '2000-01-05 
00:00:00' AND __time <= TIMESTAMP '2000-02-05 00:00:00' SELECT * FROM foo 
PARTITIONED BY ALL TIME")
+        .expectValidationError(
+            SqlPlanningException.class,
+            "OVERWRITE WHERE clause contains an interval 
[2000-01-05T00:00:00.000Z/2000-02-05T00:00:00.001Z] which is not aligned with 
PARTITIONED BY granularity AllGranularity"
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceFromTableWithEmptyInterval()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst OVERWRITE WHERE "
+             + "__time < TIMESTAMP '2000-01-01' AND __time > TIMESTAMP 
'2000-01-01' "
+             + "SELECT * FROM foo PARTITIONED BY MONTH")
+        .expectValidationError(
+            SqlPlanningException.class,
+            "Intervals for replace are empty"
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceForWithInvalidInterval()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP 
'2000-01-INVALID0:00' AND __time <= TIMESTAMP '2000-02-05 00:00:00' SELECT * 
FROM foo PARTITIONED BY ALL TIME")
+        .expectValidationError(SqlPlanningException.class)
+        .verify();
+  }
+
+  @Test
+  public void testReplaceForWithoutPartitionSpec()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME")
+        .expectValidationError(SqlPlanningException.class)
+        .verify();
+  }
+
+  @Test
+  public void testReplaceFromView()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM view.aview 
PARTITIONED BY ALL TIME")
+        .expectTarget("dst", RowSignature.builder().add("dim1_firstchar", 
ColumnType.STRING).build())
+        .expectResources(viewRead("aview"), dataSourceWrite("dst"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .virtualColumns(expressionVirtualColumn("v0", 
"substring(\"dim1\", 0, 1)", ColumnType.STRING))
+                .filters(selector("dim2", "a", null))
+                .columns("v0")
+                .context(REPLACE_ALL_TIME_CHUNKS)
+                .build()
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceIntoQualifiedTable()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO druid.dst OVERWRITE ALL SELECT * FROM foo 
PARTITIONED BY ALL TIME")
+        .expectTarget("dst", FOO_TABLE_SIGNATURE)
+        .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", 
"unique_dim1")
+                .context(REPLACE_ALL_TIME_CHUNKS)
+                .build()
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceIntoInvalidDataSourceName()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO \"in/valid\" OVERWRITE ALL SELECT dim1, dim2 FROM 
foo PARTITIONED BY ALL TIME")
+        .expectValidationError(SqlPlanningException.class, "Ingestion 
dataSource cannot contain the '/' character.")
+        .verify();
+  }
+
+  @Test
+  public void testReplaceUsingColumnList()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst (foo, bar) OVERWRITE ALL SELECT dim1, dim2 FROM 
foo PARTITIONED BY ALL TIME")
+        .expectValidationError(SqlPlanningException.class, "Ingestion with 
target column list is not supported.")
+        .verify();
+  }
+
+  @Test
+  public void testReplaceIntoSystemTable()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO INFORMATION_SCHEMA.COLUMNS OVERWRITE ALL SELECT * 
FROM foo PARTITIONED BY ALL TIME")
+        .expectValidationError(
+            SqlPlanningException.class,
+            "Cannot ingest into [INFORMATION_SCHEMA.COLUMNS] because it is not 
a Druid datasource."
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceIntoView()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO view.aview OVERWRITE ALL SELECT * FROM foo 
PARTITIONED BY ALL TIME")
+        .expectValidationError(
+            SqlPlanningException.class,
+            "Cannot ingest into [view.aview] because it is not a Druid 
datasource."
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceFromUnauthorizedDataSource()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM \"%s\" PARTITIONED 
BY ALL TIME", CalciteTests.FORBIDDEN_DATASOURCE)
+        .expectValidationError(ForbiddenException.class)
+        .verify();
+  }
+
+  @Test
+  public void testReplaceIntoUnauthorizedDataSource()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO \"%s\" OVERWRITE ALL SELECT * FROM foo PARTITIONED 
BY ALL TIME", CalciteTests.FORBIDDEN_DATASOURCE)
+        .expectValidationError(ForbiddenException.class)
+        .verify();
+  }
+
+  @Test
+  public void testReplaceIntoNonexistentSchema()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO nonexistent.dst OVERWRITE ALL SELECT * FROM foo 
PARTITIONED BY ALL TIME")
+        .expectValidationError(
+            SqlPlanningException.class,
+            "Cannot ingest into [nonexistent.dst] because it is not a Druid 
datasource."
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceFromExternal()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM %s PARTITIONED BY 
ALL TIME", externSql(externalDataSource))
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("dst", externalDataSource.getSignature())
+        .expectResources(dataSourceWrite("dst"), 
ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource(externalDataSource)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("x", "y", "z")
+                .context(REPLACE_ALL_TIME_CHUNKS)
+                .build()
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceWithPartitionedByAndLimitOffset()
+  {
+    RowSignature targetRowSignature = RowSignature.builder()
+                                                  .add("__time", 
ColumnType.LONG)
+                                                  .add("floor_m1", 
ColumnType.FLOAT)
+                                                  .add("dim1", 
ColumnType.STRING)
+                                                  .build();
+
+    testIngestionQuery()
+        .sql(
+            "REPLACE INTO druid.dst OVERWRITE ALL SELECT __time, FLOOR(m1) as 
floor_m1, dim1 FROM foo LIMIT 10 OFFSET 20 PARTITIONED BY DAY")
+        .expectTarget("dst", targetRowSignature)
+        .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("__time", "dim1", "v0")
+                .virtualColumns(expressionVirtualColumn("v0", "floor(\"m1\")", 
ColumnType.FLOAT))
+                .limit(10)
+                .offset(20)
+                .context(
+                    addReplaceTimeChunkToQueryContext(
+                        queryContextWithGranularity(Granularities.DAY),
+                        "all"
+                    )
+                )
+                .build()
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceWithPartitionedByContainingInvalidGranularity() 
throws Exception
+  {
+    // Throws a ValidationException, which gets converted to a 
SqlPlanningException before throwing to end user
+    try {
+      testQuery(
+          "REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY 
'invalid_granularity'",
+          ImmutableList.of(),
+          ImmutableList.of()
+      );
+      Assert.fail("Exception should be thrown");
+    }
+    catch (SqlPlanningException e) {
+      assertEquals(
+          "Encountered 'invalid_granularity' after PARTITIONED BY. Expected 
HOUR, DAY, MONTH, YEAR, ALL TIME, FLOOR function or TIME_FLOOR function",
+          e.getMessage()
+      );
+    }
+    didTest = true;
+  }
+
+  @Test
+  public void testExplainReplaceFromExternal() throws Exception
+  {
+    // Skip vectorization since otherwise the "context" will change for each 
subtest.
+    skipVectorize();
+
+    final ScanQuery expectedQuery = newScanQueryBuilder()
+        .dataSource(externalDataSource)
+        .intervals(querySegmentSpec(Filtration.eternity()))
+        .columns("x", "y", "z")
+        .context(
+            queryJsonMapper.readValue(
+                
"{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"sqlReplaceTimeChunks\":\"all\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"}",
+                JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+            )
+        )
+        .build();
+
+    final String expectedExplanation =
+        "DruidQueryRel(query=["
+        + queryJsonMapper.writeValueAsString(expectedQuery)
+        + "], signature=[{x:STRING, y:STRING, z:LONG}])\n";
+
+    // Use testQuery for EXPLAIN (not testIngestionQuery).
+    testQuery(
+        new PlannerConfig(),
+        StringUtils.format(
+            "EXPLAIN PLAN FOR REPLACE INTO dst OVERWRITE ALL SELECT * FROM %s 
PARTITIONED BY ALL TIME",
+            externSql(externalDataSource)
+        ),
+        CalciteTests.SUPER_USER_AUTH_RESULT,
+        ImmutableList.of(),
+        ImmutableList.of(
+            new Object[]{
+                expectedExplanation,
+                
"[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]"
+            }
+        )
+    );
+
+    // Not using testIngestionQuery, so must set didTest manually to satisfy 
the check in tearDown.
+    didTest = true;
+  }
+
+  @Test
+  public void testExplainReplaceFromExternalUnauthorized()
+  {
+    // Use testQuery for EXPLAIN (not testIngestionQuery).
+    Assert.assertThrows(
+        ForbiddenException.class,
+        () ->
+            testQuery(
+                StringUtils.format(
+                    "EXPLAIN PLAN FOR REPLACE INTO dst OVERWRITE ALL SELECT * 
FROM %s PARTITIONED BY ALL TIME",
+                    externSql(externalDataSource)
+                ),
+                ImmutableList.of(),
+                ImmutableList.of()
+            )
+    );
+
+    // Not using testIngestionQuery, so must set didTest manually to satisfy 
the check in tearDown.
+    didTest = true;
+  }
+
+  @Test
+  public void testReplaceFromExternalUnauthorized()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM %s PARTITIONED BY 
ALL TIME", externSql(externalDataSource))
+        .expectValidationError(ForbiddenException.class)
+        .verify();
+  }
+
+  @Test
+  public void testReplaceFromExternalProjectSort()
+  {
+    testIngestionQuery()
+        .sql(
+            "REPLACE INTO dst OVERWRITE ALL SELECT x || y AS xy, z FROM %s 
PARTITIONED BY ALL TIME",
+            externSql(externalDataSource)
+        )
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("dst", RowSignature.builder().add("xy", 
ColumnType.STRING).add("z", ColumnType.LONG).build())
+        .expectResources(dataSourceWrite("dst"), 
ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource(externalDataSource)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .virtualColumns(expressionVirtualColumn("v0", 
"concat(\"x\",\"y\")", ColumnType.STRING))
+                .columns("v0", "z")
+                .context(REPLACE_ALL_TIME_CHUNKS)
+                .build()
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceFromExternalAggregate()
+  {
+    testIngestionQuery()
+        .sql(
+            "REPLACE INTO dst OVERWRITE ALL SELECT x, SUM(z) AS sum_z, 
COUNT(*) AS cnt FROM %s GROUP BY 1 PARTITIONED BY ALL TIME",
+            externSql(externalDataSource)
+        )
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget(
+            "dst",
+            RowSignature.builder()
+                        .add("x", ColumnType.STRING)
+                        .add("sum_z", ColumnType.LONG)
+                        .add("cnt", ColumnType.LONG)
+                        .build()
+        )
+        .expectResources(dataSourceWrite("dst"), 
ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
+        .expectQuery(
+            GroupByQuery.builder()
+                        .setDataSource(externalDataSource)
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setDimensions(dimensions(new 
DefaultDimensionSpec("x", "d0")))
+                        .setAggregatorSpecs(
+                            new LongSumAggregatorFactory("a0", "z"),
+                            new CountAggregatorFactory("a1")
+                        )
+                        .setContext(REPLACE_ALL_TIME_CHUNKS)
+                        .build()
+        )
+        .verify();
+  }
+
+  @Test
+  public void testReplaceFromExternalAggregateAll()
+  {
+    testIngestionQuery()
+        .sql(
+            "REPLACE INTO dst OVERWRITE ALL SELECT COUNT(*) AS cnt FROM %s 
PARTITIONED BY ALL TIME",
+            externSql(externalDataSource)
+        )
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget(
+            "dst",
+            RowSignature.builder()
+                        .add("cnt", ColumnType.LONG)
+                        .build()
+        )
+        .expectResources(dataSourceWrite("dst"), 
ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
+        .expectQuery(
+            GroupByQuery.builder()
+                        .setDataSource(externalDataSource)
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setAggregatorSpecs(new CountAggregatorFactory("a0"))
+                        .setContext(REPLACE_ALL_TIME_CHUNKS)
+                        .build()
+        )
+        .verify();
+  }
+}
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlUnparseTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlUnparseTest.java
new file mode 100644
index 0000000000..96a992f0dd
--- /dev/null
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlUnparseTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.parser;
+
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.dialect.CalciteSqlDialect;
+import org.apache.calcite.sql.pretty.SqlPrettyWriter;
+import org.junit.Test;
+
+import java.io.StringReader;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A class containing unit tests for testing implmentations of {@link 
org.apache.calcite.sql.SqlNode#unparse(SqlWriter, int, int)}
+ * in custom Druid SqlNode classes, like {@link DruidSqlInsert} and {@link 
DruidSqlReplace}.
+ */
+public class DruidSqlUnparseTest
+{
+  @Test
+  public void testUnparseInsert() throws ParseException
+  {
+    String sqlQuery = "INSERT INTO dst SELECT * FROM foo PARTITIONED BY ALL 
TIME";
+    String prettySqlQuery = "INSERT INTO \"dst\"\n"
+                     + "(SELECT *\n"
+                     + "    FROM \"foo\") PARTITIONED BY ALL TIME";
+
+    DruidSqlParserImpl druidSqlParser = createTestParser(sqlQuery);
+    DruidSqlInsert druidSqlReplace = (DruidSqlInsert) 
druidSqlParser.DruidSqlInsertEof();
+
+    druidSqlReplace.unparse(sqlWriter, 0, 0);
+    assertEquals(prettySqlQuery, sqlWriter.toSqlString().getSql());
+  }
+
+  @Test
+  public void testUnparseReplaceAll() throws ParseException
+  {
+    String sqlQuery = "REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo 
PARTITIONED BY ALL TIME";
+    String prettySqlQuery = "REPLACE INTO \"dst\"\n"
+                            + "OVERWRITE ALL\n"
+                            + "(SELECT *\n"
+                            + "    FROM \"foo\")\n"
+                            + "PARTITIONED BY ALL TIME";
+
+    DruidSqlParserImpl druidSqlParser = createTestParser(sqlQuery);
+    DruidSqlReplace druidSqlReplace = (DruidSqlReplace) 
druidSqlParser.DruidSqlReplaceEof();
+
+    druidSqlReplace.unparse(sqlWriter, 0, 0);
+    assertEquals(prettySqlQuery, sqlWriter.toSqlString().getSql());
+  }
+
+  @Test
+  public void testUnparseReplaceWhere() throws ParseException
+  {
+    String sqlQuery = "REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP 
'2000-01-01 00:00:00' AND __time < TIMESTAMP '2000-01-02 00:00:00' SELECT * 
FROM foo PARTITIONED BY DAY";
+    String prettySqlQuery = "REPLACE INTO \"dst\"\n"
+                            + "OVERWRITE \"__time\" >= TIMESTAMP '2000-01-01 
00:00:00' AND \"__time\" < TIMESTAMP '2000-01-02 00:00:00'\n"
+                            + "(SELECT *\n"
+                            + "    FROM \"foo\")\n"
+                            + "PARTITIONED BY DAY";
+    DruidSqlParserImpl druidSqlParser = createTestParser(sqlQuery);
+    DruidSqlReplace druidSqlReplace = (DruidSqlReplace) 
druidSqlParser.DruidSqlReplaceEof();
+
+    druidSqlReplace.unparse(sqlWriter, 0, 0);
+    assertEquals(prettySqlQuery, sqlWriter.toSqlString().getSql());
+  }
+
+  private final SqlWriter sqlWriter = new 
SqlPrettyWriter(CalciteSqlDialect.DEFAULT);
+
+  private static DruidSqlParserImpl createTestParser(String parseString)
+  {
+    DruidSqlParserImplFactory druidSqlParserImplFactory = new 
DruidSqlParserImplFactory();
+    DruidSqlParserImpl druidSqlParser = (DruidSqlParserImpl) 
druidSqlParserImplFactory.getParser(new StringReader(parseString));
+    druidSqlParser.setUnquotedCasing(Casing.TO_LOWER);
+    druidSqlParser.setQuotedCasing(Casing.TO_LOWER);
+    druidSqlParser.setIdentifierMaxLength(20);
+    return druidSqlParser;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to