This is an automated email from the ASF dual-hosted git repository.

zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ebf237576f Move INSERT & REPLACE validation to the Calcite validator 
(#15908)
8ebf237576f is described below

commit 8ebf237576fff5d1b257d1e70d4034944910a9cf
Author: zachjsh <[email protected]>
AuthorDate: Thu Feb 22 14:01:59 2024 -0500

    Move INSERT & REPLACE validation to the Calcite validator (#15908)
    
    
    This PR contains a portion of the changes from the inactive draft PR for 
integrating the catalog with the Calcite planner 
https://github.com/apache/druid/pull/13686 from @paul-rogers, Refactoring the 
IngestHandler and subclasses to produce a validated SqlInsert instance node 
instead of the previous Insert source node. The SqlInsert node is then 
validated in the calcite validator. The validation that is implemented as part 
of this pr, is only that for the source node, and some of the v [...]
---
 .../druid/catalog/sql/CatalogIngestionTest.java    | 174 +++++++++++
 .../org/apache/druid/msq/test/MSQTestBase.java     |   6 +
 sql/src/main/codegen/includes/common.ftl           |   4 +-
 sql/src/main/codegen/includes/insert.ftl           |   2 +-
 sql/src/main/codegen/includes/replace.ftl          |   4 +-
 .../druid/sql/calcite/parser/DruidSqlIngest.java   |   8 +-
 .../druid/sql/calcite/parser/DruidSqlInsert.java   |  10 +-
 .../druid/sql/calcite/parser/DruidSqlReplace.java  |  20 +-
 .../druid/sql/calcite/planner/CalcitePlanner.java  |   6 +-
 .../calcite/planner/DruidSqlIngestOperator.java    | 112 +++++++
 .../calcite/planner/DruidSqlToRelConverter.java    |  69 +++++
 .../sql/calcite/planner/DruidSqlValidator.java     | 336 +++++++++++++++++++++
 .../druid/sql/calcite/planner/IngestHandler.java   | 147 +++++----
 .../druid/sql/calcite/planner/QueryHandler.java    |  35 ++-
 .../druid/sql/calcite/CalciteExportTest.java       |  56 ++++
 .../druid/sql/calcite/CalciteInsertDmlTest.java    |   4 -
 16 files changed, 876 insertions(+), 117 deletions(-)

diff --git 
a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogIngestionTest.java
 
b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogIngestionTest.java
new file mode 100644
index 00000000000..e4020cc86d0
--- /dev/null
+++ 
b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogIngestionTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.sql;
+
+import org.apache.druid.catalog.CatalogException;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.table.TableBuilder;
+import org.apache.druid.catalog.storage.CatalogStorage;
+import org.apache.druid.catalog.storage.CatalogTests;
+import org.apache.druid.catalog.sync.CachedMetadataCatalog;
+import org.apache.druid.catalog.sync.MetadataCatalog;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.metadata.TestDerbyConnector;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.CalciteIngestionDmlTest;
+import org.apache.druid.sql.calcite.filtration.Filtration;
+import org.apache.druid.sql.calcite.planner.CatalogResolver;
+import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.apache.druid.sql.calcite.util.SqlTestFramework;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Test the use of catalog specs to drive MSQ ingestion.
+ */
+public class CatalogIngestionTest extends CalciteIngestionDmlTest
+{
+  @ClassRule
+  public static final TestDerbyConnector.DerbyConnectorRule 
DERBY_CONNECTION_RULE =
+      new TestDerbyConnector.DerbyConnectorRule();
+
+  /**
+   * Signature for the foo datasource after applying catalog metadata.
+   */
+  private static final RowSignature FOO_SIGNATURE = RowSignature.builder()
+      .add("__time", ColumnType.LONG)
+      .add("extra1", ColumnType.STRING)
+      .add("dim2", ColumnType.STRING)
+      .add("dim1", ColumnType.STRING)
+      .add("cnt", ColumnType.LONG)
+      .add("m1", ColumnType.DOUBLE)
+      .add("extra2", ColumnType.LONG)
+      .add("extra3", ColumnType.STRING)
+      .add("m2", ColumnType.DOUBLE)
+      .build();
+
+  private static CatalogStorage storage;
+
+  @Override
+  public CatalogResolver createCatalogResolver()
+  {
+    CatalogTests.DbFixture dbFixture = new 
CatalogTests.DbFixture(DERBY_CONNECTION_RULE);
+    storage = dbFixture.storage;
+    MetadataCatalog catalog = new CachedMetadataCatalog(
+        storage,
+        storage.schemaRegistry(),
+        storage.jsonMapper()
+    );
+    return new LiveCatalogResolver(catalog);
+  }
+
+  @Override
+  public void finalizeTestFramework(SqlTestFramework sqlTestFramework)
+  {
+    super.finalizeTestFramework(sqlTestFramework);
+    buildTargetDatasources();
+    buildFooDatasource();
+  }
+
+  private void buildTargetDatasources()
+  {
+    TableMetadata spec = TableBuilder.datasource("hourDs", "PT1H")
+        .build();
+    createTableMetadata(spec);
+  }
+
+  public void buildFooDatasource()
+  {
+    TableMetadata spec = TableBuilder.datasource("foo", "ALL")
+        .timeColumn()
+        .column("extra1", null)
+        .column("dim2", null)
+        .column("dim1", null)
+        .column("cnt", null)
+        .column("m1", Columns.DOUBLE)
+        .column("extra2", Columns.LONG)
+        .column("extra3", Columns.STRING)
+        .hiddenColumns(Arrays.asList("dim3", "unique_dim1"))
+        .sealed(true)
+        .build();
+    createTableMetadata(spec);
+  }
+
+  private void createTableMetadata(TableMetadata table)
+  {
+    try {
+      storage.tables().create(table);
+    }
+    catch (CatalogException e) {
+      fail(e.getMessage());
+    }
+  }
+
+  /**
+   * If the segment grain is given in the catalog then use this value is used.
+   */
+  @Test
+  public void testInsertHourGrain()
+  {
+    testIngestionQuery()
+        .sql("INSERT INTO hourDs\n" +
+             "SELECT * FROM foo")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("hourDs", FOO_SIGNATURE)
+        .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("__time", "cnt", "dim1", "dim2", "extra1", "extra2", 
"extra3", "m1", "m2")
+                .context(queryContextWithGranularity(Granularities.HOUR))
+                .build()
+        )
+        .verify();
+  }
+
+  /**
+   * If the segment grain is given in the catalog, and also by PARTITIONED BY, 
then
+   * the query value is used.
+   */
+  @Test
+  public void testInsertHourGrainWithDay()
+  {
+    testIngestionQuery()
+        .sql("INSERT INTO hourDs\n" +
+             "SELECT * FROM foo\n" +
+             "PARTITIONED BY day")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("hourDs", FOO_SIGNATURE)
+        .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("__time", "cnt", "dim1", "dim2", "extra1", "extra2", 
"extra3", "m1", "m2")
+                .context(queryContextWithGranularity(Granularities.DAY))
+                .build()
+        )
+        .verify();
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index f8fc01b9369..8319f0c48b9 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -162,6 +162,9 @@ import 
org.apache.druid.sql.calcite.export.TestExportStorageConnector;
 import org.apache.druid.sql.calcite.export.TestExportStorageConnectorProvider;
 import org.apache.druid.sql.calcite.external.ExternalDataSource;
 import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
+import org.apache.druid.sql.calcite.external.HttpOperatorConversion;
+import org.apache.druid.sql.calcite.external.InlineOperatorConversion;
+import org.apache.druid.sql.calcite.external.LocalOperatorConversion;
 import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
 import org.apache.druid.sql.calcite.planner.CatalogResolver;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
@@ -351,6 +354,9 @@ public class MSQTestBase extends BaseCalciteQueryTest
             binder.install(new NestedDataModule());
             NestedDataModule.registerHandlersAndSerde();
             SqlBindings.addOperatorConversion(binder, 
ExternalOperatorConversion.class);
+            SqlBindings.addOperatorConversion(binder, 
HttpOperatorConversion.class);
+            SqlBindings.addOperatorConversion(binder, 
InlineOperatorConversion.class);
+            SqlBindings.addOperatorConversion(binder, 
LocalOperatorConversion.class);
           }
 
           @Override
diff --git a/sql/src/main/codegen/includes/common.ftl 
b/sql/src/main/codegen/includes/common.ftl
index 95138a7dbbf..1edc542dc99 100644
--- a/sql/src/main/codegen/includes/common.ftl
+++ b/sql/src/main/codegen/includes/common.ftl
@@ -107,14 +107,14 @@ SqlTypeNameSpec DruidType() :
 }
 
 // Parses the supported file formats for export.
-String FileFormat() :
+SqlIdentifier FileFormat() :
 {
   SqlNode format;
 }
 {
   format = SimpleIdentifier()
   {
-    return format.toString();
+    return (SqlIdentifier) format;
   }
 }
 
diff --git a/sql/src/main/codegen/includes/insert.ftl 
b/sql/src/main/codegen/includes/insert.ftl
index 81f5ed1253e..0a949aec433 100644
--- a/sql/src/main/codegen/includes/insert.ftl
+++ b/sql/src/main/codegen/includes/insert.ftl
@@ -35,7 +35,7 @@ SqlNode DruidSqlInsertEof() :
   final Pair<SqlNodeList, SqlNodeList> p;
   SqlGranularityLiteral partitionedBy = null;
   SqlNodeList clusteredBy = null;
-  String exportFileFormat = null;
+  SqlIdentifier exportFileFormat = null;
 }
 {
   (
diff --git a/sql/src/main/codegen/includes/replace.ftl 
b/sql/src/main/codegen/includes/replace.ftl
index 15edeaac12e..d067bc450bb 100644
--- a/sql/src/main/codegen/includes/replace.ftl
+++ b/sql/src/main/codegen/includes/replace.ftl
@@ -30,7 +30,7 @@ SqlNode DruidSqlReplaceEof() :
     SqlNodeList clusteredBy = null;
     final Pair<SqlNodeList, SqlNodeList> p;
     SqlNode replaceTimeQuery = null;
-    String exportFileFormat = null;
+    SqlIdentifier exportFileFormat = null;
 }
 {
     <REPLACE> { s = span(); }
@@ -90,7 +90,7 @@ SqlNode DruidSqlReplaceEof() :
     <EOF>
     {
       sqlInsert = new SqlInsert(s.end(source), SqlNodeList.EMPTY, destination, 
source, columnList);
-      return DruidSqlReplace.create(sqlInsert, partitionedBy, clusteredBy, 
replaceTimeQuery, exportFileFormat);
+      return DruidSqlReplace.create(sqlInsert, partitionedBy, clusteredBy, 
exportFileFormat, replaceTimeQuery);
     }
 }
 
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java
index a36ef9b6b96..cce253b4c1e 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.sql.calcite.parser;
 
+import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlInsert;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
@@ -43,7 +44,7 @@ public abstract class DruidSqlIngest extends SqlInsert
   @Nullable
   protected final SqlNodeList clusteredBy;
   @Nullable
-  private final String exportFileFormat;
+  private final SqlIdentifier exportFileFormat;
 
   public DruidSqlIngest(
       SqlParserPos pos,
@@ -53,7 +54,7 @@ public abstract class DruidSqlIngest extends SqlInsert
       SqlNodeList columnList,
       @Nullable SqlGranularityLiteral partitionedBy,
       @Nullable SqlNodeList clusteredBy,
-      @Nullable String exportFileFormat
+      @Nullable SqlIdentifier exportFileFormat
   )
   {
     super(pos, keywords, targetTable, source, columnList);
@@ -76,7 +77,7 @@ public abstract class DruidSqlIngest extends SqlInsert
   }
 
   @Nullable
-  public String getExportFileFormat()
+  public SqlIdentifier getExportFileFormat()
   {
     return exportFileFormat;
   }
@@ -88,6 +89,7 @@ public abstract class DruidSqlIngest extends SqlInsert
         .addAll(super.getOperandList())
         .add(partitionedBy)
         .add(clusteredBy)
+        .add(exportFileFormat)
         .build();
   }
 }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
index 7171a889ae0..e283c9df958 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
@@ -19,12 +19,14 @@
 
 package org.apache.druid.sql.calcite.parser;
 
+import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlInsert;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.druid.sql.calcite.planner.DruidSqlIngestOperator;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -39,13 +41,13 @@ public class DruidSqlInsert extends DruidSqlIngest
   public static final String SQL_INSERT_SEGMENT_GRANULARITY = 
"sqlInsertSegmentGranularity";
 
   // This allows reusing super.unparse
-  public static final SqlOperator OPERATOR = SqlInsert.OPERATOR;
+  public static final SqlOperator OPERATOR = 
DruidSqlIngestOperator.INSERT_OPERATOR;
 
   public static DruidSqlInsert create(
       @Nonnull SqlInsert insertNode,
       @Nullable SqlGranularityLiteral partitionedBy,
       @Nullable SqlNodeList clusteredBy,
-      @Nullable String exportFileFormat
+      @Nullable SqlIdentifier exportFileFormat
   )
   {
     return new DruidSqlInsert(
@@ -74,7 +76,7 @@ public class DruidSqlInsert extends DruidSqlIngest
       SqlNodeList columnList,
       @Nullable SqlGranularityLiteral partitionedBy,
       @Nullable SqlNodeList clusteredBy,
-      @Nullable String exportFileFormat
+      @Nullable SqlIdentifier exportFileFormat
   )
   {
     super(
@@ -110,7 +112,7 @@ public class DruidSqlInsert extends DruidSqlIngest
     writer.newlineAndIndent();
     if (getExportFileFormat() != null) {
       writer.keyword("AS");
-      writer.print(getExportFileFormat());
+      writer.print(getExportFileFormat().toString());
       writer.newlineAndIndent();
     }
     getSource().unparse(writer, 0, 0);
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
index 86f78b4d6d7..45b677631d2 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
@@ -19,16 +19,16 @@
 
 package org.apache.druid.sql.calcite.parser;
 
+import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlInsert;
-import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.druid.sql.calcite.planner.DruidSqlIngestOperator;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -43,7 +43,7 @@ public class DruidSqlReplace extends DruidSqlIngest
 {
   public static final String SQL_REPLACE_TIME_CHUNKS = "sqlReplaceTimeChunks";
 
-  public static final SqlOperator OPERATOR = new SqlSpecialOperator("REPLACE", 
SqlKind.OTHER);
+  public static final SqlOperator OPERATOR = 
DruidSqlIngestOperator.REPLACE_OPERATOR;
 
   private final SqlNode replaceTimeQuery;
 
@@ -51,8 +51,8 @@ public class DruidSqlReplace extends DruidSqlIngest
       @Nonnull SqlInsert insertNode,
       @Nullable SqlGranularityLiteral partitionedBy,
       @Nullable SqlNodeList clusteredBy,
-      @Nullable SqlNode replaceTimeQuery,
-      @Nullable String exportFileFormat
+      @Nullable SqlIdentifier exportFileFormat,
+      @Nullable SqlNode replaceTimeQuery
   )
   {
     return new DruidSqlReplace(
@@ -63,8 +63,8 @@ public class DruidSqlReplace extends DruidSqlIngest
         insertNode.getTargetColumnList(),
         partitionedBy,
         clusteredBy,
-        replaceTimeQuery,
-        exportFileFormat
+        exportFileFormat,
+        replaceTimeQuery
     );
   }
 
@@ -82,8 +82,8 @@ public class DruidSqlReplace extends DruidSqlIngest
       SqlNodeList columnList,
       @Nullable SqlGranularityLiteral partitionedBy,
       @Nullable SqlNodeList clusteredBy,
-      @Nullable SqlNode replaceTimeQuery,
-      @Nullable String exportFileFormat
+      @Nullable SqlIdentifier exportFileFormat,
+      @Nullable SqlNode replaceTimeQuery
   )
   {
     super(
@@ -137,7 +137,7 @@ public class DruidSqlReplace extends DruidSqlIngest
 
     if (getExportFileFormat() != null) {
       writer.keyword("AS");
-      writer.print(getExportFileFormat());
+      writer.print(getExportFileFormat().toString());
       writer.newlineAndIndent();
     }
 
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java
index 2cda011848b..f2d0408b491 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java
@@ -283,7 +283,7 @@ public class CalcitePlanner implements Planner, ViewExpander
   public RelRoot rel(SqlNode sql)
   {
     ensure(CalcitePlanner.State.STATE_4_VALIDATED);
-    SqlNode validatedSqlNode = Objects.requireNonNull(
+    Objects.requireNonNull(
         this.validatedSqlNode,
         "validatedSqlNode is null. Need to call #validate() first"
     );
@@ -295,11 +295,11 @@ public class CalcitePlanner implements Planner, 
ViewExpander
     final SqlToRelConverter.Config config =
         sqlToRelConverterConfig.withTrimUnusedFields(false);
     final SqlToRelConverter sqlToRelConverter =
-        new SqlToRelConverter(this, validator,
+        new DruidSqlToRelConverter(this, validator,
                               createCatalogReader(), cluster, convertletTable, 
config
         );
     RelRoot root =
-        sqlToRelConverter.convertQuery(validatedSqlNode, false, true);
+        sqlToRelConverter.convertQuery(sql, false, true);
     root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
     final RelBuilder relBuilder =
         config.getRelBuilderFactory().create(cluster, null);
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlIngestOperator.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlIngestOperator.java
new file mode 100644
index 00000000000..628e4768663
--- /dev/null
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlIngestOperator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.planner;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
+import org.apache.druid.sql.calcite.parser.SqlGranularityLiteral;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class DruidSqlIngestOperator extends SqlSpecialOperator implements 
AuthorizableOperator
+{
+  public static final SqlSpecialOperator INSERT_OPERATOR =
+      new DruidSqlInsertOperator();
+  public static final SqlSpecialOperator REPLACE_OPERATOR =
+      new DruidSqlReplaceOperator();
+
+  public static class DruidSqlInsertOperator extends DruidSqlIngestOperator
+  {
+    public DruidSqlInsertOperator()
+    {
+      super("INSERT");
+    }
+
+    @Override
+    public SqlCall createCall(
+        SqlLiteral functionQualifier,
+        SqlParserPos pos,
+        SqlNode... operands
+    )
+    {
+      return new DruidSqlInsert(
+          pos,
+          (SqlNodeList) operands[0],
+          operands[1],
+          operands[2],
+          (SqlNodeList) operands[3],
+          (SqlGranularityLiteral) operands[4],
+          (SqlNodeList) operands[5],
+          (SqlIdentifier) operands[6]
+      );
+    }
+  }
+
+  public static class DruidSqlReplaceOperator extends DruidSqlIngestOperator
+  {
+    public DruidSqlReplaceOperator()
+    {
+      super("REPLACE");
+    }
+
+    @Override
+    public SqlCall createCall(
+        SqlLiteral functionQualifier,
+        SqlParserPos pos,
+        SqlNode... operands
+    )
+    {
+      return new DruidSqlReplace(
+          pos,
+          (SqlNodeList) operands[0],
+          operands[1],
+          operands[2],
+          (SqlNodeList) operands[3],
+          (SqlGranularityLiteral) operands[4],
+          (SqlNodeList) operands[5],
+          (SqlIdentifier) operands[6],
+          operands[7]
+      );
+    }
+  }
+
+  public DruidSqlIngestOperator(String name)
+  {
+    super(name, SqlKind.INSERT);
+  }
+
+  @Override
+  public Set<ResourceAction> computeResources(SqlCall call, boolean 
inputSourceTypeSecurityEnabled)
+  {
+    // resource actions are computed in the respective ingest handlers.
+    return new HashSet<>();
+  }
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlToRelConverter.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlToRelConverter.java
new file mode 100644
index 00000000000..d5f68748337
--- /dev/null
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlToRelConverter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.planner;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable.ViewExpander;
+import org.apache.calcite.prepare.Prepare.CatalogReader;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql2rel.SqlRexConvertletTable;
+import org.apache.calcite.sql2rel.SqlToRelConverter;
+
+public class DruidSqlToRelConverter extends SqlToRelConverter
+{
+  public DruidSqlToRelConverter(
+      final ViewExpander viewExpander,
+      final SqlValidator validator,
+      final CatalogReader catalogReader,
+      RelOptCluster cluster,
+      final SqlRexConvertletTable convertletTable,
+      final Config config
+  )
+  {
+    super(viewExpander, validator, catalogReader, cluster, convertletTable, 
config);
+  }
+
+  /**
+   * Convert a Druid {@code INSERT} or {@code REPLACE} statement. The code is 
the same
+   * as the normal conversion, except we don't actually create the final 
modify node.
+   * Druid has its own special way to handle inserts. (This should probably 
change in
+   * some future, but doing so requires changes in the SQL engine and MSQ, 
which is a bit
+   * invasive.)
+   */
+  @Override
+  protected RelNode convertInsert(SqlInsert call)
+  {
+    // Get the target type: the column types we want to write into the target 
datasource.
+    final RelDataType targetRowType = validator.getValidatedNodeType(call);
+    assert targetRowType != null;
+
+    // Convert the underlying SELECT. We pushed the CLUSTERED BY clause into 
the SELECT
+    // as its ORDER BY. We claim this is the top query because MSQ doesn't 
actually
+    // use the Calcite insert node.
+    RelNode sourceRel = convertQueryRecursive(call.getSource(), true, 
targetRowType).project();
+
+    // We omit the column mapping and insert node that Calcite normally 
provides.
+    // Presumably MSQ does these its own way.
+    return sourceRel;
+  }
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
index 390ddf96baf..90aa21907ac 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java
@@ -19,32 +19,72 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.BaseDruidSqlValidator;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
 import org.apache.calcite.runtime.CalciteContextException;
 import org.apache.calcite.runtime.CalciteException;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.SqlWindow;
 import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
 import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.error.InvalidSqlInput;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.query.QueryContexts;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.parser.ExternalDestinationSqlIdentifier;
 import org.apache.druid.sql.calcite.run.EngineFeature;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
 /**
  * Druid extended SQL validator. (At present, it doesn't actually
  * have any extensions yet, but it will soon.)
  */
 class DruidSqlValidator extends BaseDruidSqlValidator
 {
+  private static final Pattern UNNAMED_COLUMN_PATTERN = 
Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+  // Copied here from MSQE since that extension is not visible here.
+  public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+  public interface ValidatorContext
+  {
+    Map<String, Object> queryContextMap();
+    CatalogResolver catalog();
+    String druidSchemaName();
+    ObjectMapper jsonMapper();
+  }
+
   private final PlannerContext plannerContext;
 
   protected DruidSqlValidator(
@@ -113,6 +153,302 @@ class DruidSqlValidator extends BaseDruidSqlValidator
     super.validateWindow(windowOrId, scope, call);
   }
 
+  @Override
+  public void validateInsert(final SqlInsert insert)
+  {
+    final DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+    if (insert.isUpsert()) {
+      throw InvalidSqlInput.exception("UPSERT is not supported.");
+    }
+
+
+    // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+    final String operationName = insert.getOperator().getName();
+    if (insert.getTargetColumnList() != null) {
+      throw InvalidSqlInput.exception(
+          "Operation [%s] cannot be run with a target column list, given [%s 
(%s)]",
+          operationName,
+          ingestNode.getTargetTable(), ingestNode.getTargetColumnList()
+      );
+    }
+
+    // The target namespace is both the target table ID and the row type for 
that table.
+    final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+    final IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+    // The target is a new or existing datasource.
+    final DatasourceTable table = validateInsertTarget(targetNamespace, 
insertNs, operationName);
+
+    // An existing datasource may have metadata.
+    final DatasourceFacade tableMetadata = table == null ? null : 
table.effectiveMetadata().catalogMetadata();
+
+    // Validate segment granularity, which depends on nothing else.
+    if (!(ingestNode.getTargetTable() instanceof 
ExternalDestinationSqlIdentifier)) {
+      Granularity effectiveGranularity = 
getEffectiveGranularity(operationName, ingestNode, tableMetadata);
+      // Note: though this is the validator, we cheat a bit and write the 
target
+      // granularity into the query context. Perhaps this step should be done
+      // during conversion, however, we've just worked out the granularity, so 
we
+      // do it here instead.
+      try {
+        plannerContext.queryContextMap().put(
+            DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
+            
plannerContext.getPlannerToolbox().jsonMapper().writeValueAsString(effectiveGranularity)
+        );
+      }
+      catch (JsonProcessingException e) {
+        throw InvalidSqlInput.exception(e, "Invalid partition granularity 
[%s]", effectiveGranularity);
+      }
+    }
+
+    // The source must be a SELECT
+    final SqlNode source = insert.getSource();
+
+    // Validate the source statement.
+    // Because of the non-standard Druid semantics, we can't define the target 
type: we don't know
+    // the target columns yet, and we can't infer types when they must come 
from the SELECT.
+    // Normally, the target type is known, and is pushed into the SELECT. In 
Druid, the SELECT
+    // usually defines the target types, unless the catalog says otherwise. 
Since catalog entries
+    // are optional, we don't know the target type until we validate the 
SELECT. (Also, we won't
+    // know names and we match by name.) Thus, we'd have to validate (to know 
names and types)
+    // to get the target types, but we need the target types to validate. 
Catch-22. So, we punt.
+    final SqlValidatorScope scope;
+    if (source instanceof SqlSelect) {
+      final SqlSelect sqlSelect = (SqlSelect) source;
+      validateSelect(sqlSelect, unknownType);
+      scope = null;
+    } else {
+      scope = scopes.get(source);
+      validateQuery(source, scope, unknownType);
+    }
+
+    final SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+    final RelRecordType sourceType = (RelRecordType) 
sourceNamespace.getRowType();
+
+    // Determine the output (target) schema.
+    final RelDataType targetType = validateTargetType(scope, insertNs, insert, 
sourceType, tableMetadata);
+
+    // Set the type for the INSERT/REPLACE node
+    setValidatedNodeType(insert, targetType);
+
+    // Segment size
+    if (tableMetadata != null && 
!plannerContext.queryContextMap().containsKey(CTX_ROWS_PER_SEGMENT)) {
+      final Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+      if (targetSegmentRows != null) {
+        plannerContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, 
targetSegmentRows);
+      }
+    }
+  }
+
+  /**
+   * Validate the target table. Druid {@code INSERT/REPLACE} can create a new 
datasource,
+   * or insert into an existing one. If the target exists, it must be a 
datasource. If it
+   * does not exist, the target must be in the datasource schema, normally 
"druid".
+   */
+  private DatasourceTable validateInsertTarget(
+      final SqlValidatorNamespace targetNamespace,
+      final IdentifierNamespace insertNs,
+      final String operationName
+  )
+  {
+    // Get the target table ID
+    final SqlIdentifier destId = insertNs.getId();
+    if (destId.names.isEmpty()) {
+      // I don't think this can happen, but include a branch for it just in 
case.
+      throw InvalidSqlInput.exception("Operation [%s] requires a target 
table", operationName);
+    }
+
+    // Druid does not support 3+ part names.
+    final int n = destId.names.size();
+    if (n > 2) {
+      throw InvalidSqlInput.exception("Druid does not support 3+ part names: 
[%s]", destId, operationName);
+    }
+    String tableName = destId.names.get(n - 1);
+
+    // If this is a 2-part name, the first part must be the datasource schema.
+    if (n == 2 && 
!plannerContext.getPlannerToolbox().druidSchemaName().equals(destId.names.get(0)))
 {
+      throw InvalidSqlInput.exception(
+          "Table [%s] does not support operation [%s] because it is not a 
Druid datasource",
+          destId,
+          operationName
+      );
+    }
+    try {
+      // Try to resolve the table. Will fail if this is an INSERT into a new 
table.
+      validateNamespace(targetNamespace, unknownType);
+      SqlValidatorTable target = insertNs.resolve().getTable();
+      try {
+        return target.unwrap(DatasourceTable.class);
+      }
+      catch (Exception e) {
+        throw InvalidSqlInput.exception(
+            "Table [%s] does not support operation [%s] because it is not a 
Druid datasource",
+            destId,
+            operationName
+        );
+      }
+    }
+    catch (CalciteContextException e) {
+      // Something failed. Let's make sure it was the table lookup.
+      // The check is kind of a hack, but its the best we can do given that 
Calcite
+      // didn't expect this non-SQL use case.
+      if (e.getCause() instanceof SqlValidatorException && e.getMessage()
+          .contains(StringUtils.format("Object '%s' not found", tableName))) {
+        // The catalog implementation may be "strict": and require that the 
target
+        // table already exists, rather than the default "lenient" mode that 
can
+        // create a new table.
+        if 
(plannerContext.getPlannerToolbox().catalogResolver().ingestRequiresExistingTable())
 {
+          throw InvalidSqlInput.exception("Cannot %s into [%s] because it does 
not exist", operationName, destId);
+        }
+        // New table. Validate the shape of the name.
+        IdUtils.validateId("table", tableName);
+        return null;
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Gets the effective PARTITIONED BY granularity. Resolves the granularity 
from the granularity specified on the
+   * ingest node, and on the table metadata as stored in catalog, if any. 
Mismatches between the 2 granularities are
+   * allowed if both are specified. The granularity specified on the ingest 
node is taken to be the effective
+   * granulartiy if specified. If no granulartiy is specified on either the 
ingestNode or in the table catalog entry
+   * for the table, an error is thrown.
+   *
+   * @param operationName The operation name
+   * @param ingestNode    The ingest node.
+   * @param tableMetadata The table metadata as stored in the catalog, if any.
+   *
+   * @return The effective granularity
+   * @throws org.apache.druid.error.DruidException indicating invalud Sql if 
both the ingest node and table metadata
+   * for the respective target table have no PARTITIONED BY granularity 
defined.
+   */
+  private Granularity getEffectiveGranularity(
+      final String operationName,
+      final DruidSqlIngest ingestNode,
+      @Nullable final DatasourceFacade tableMetadata
+  )
+  {
+    Granularity effectiveGranularity = null;
+    final Granularity ingestionGranularity = ingestNode.getPartitionedBy() != 
null
+        ? ingestNode.getPartitionedBy().getGranularity()
+        : null;
+    if (ingestionGranularity != null) {
+      
DruidSqlParserUtils.validateSupportedGranularityForPartitionedBy(ingestNode, 
ingestionGranularity);
+      effectiveGranularity = ingestionGranularity;
+    } else {
+      final Granularity definedGranularity = tableMetadata == null
+          ? null
+          : tableMetadata.segmentGranularity();
+      if (definedGranularity != null) {
+        // Should already have been checked when creating the catalog entry
+        DruidSqlParserUtils.validateSupportedGranularityForPartitionedBy(null, 
definedGranularity);
+        effectiveGranularity = definedGranularity;
+      }
+    }
+
+    if (effectiveGranularity == null) {
+      throw InvalidSqlInput.exception(
+          "Operation [%s] requires a PARTITIONED BY to be explicitly defined, 
but none was found.",
+          operationName);
+    }
+
+    return effectiveGranularity;
+  }
+
+  /**
+   * Compute and validate the target type. In normal SQL, the engine would 
insert
+   * a project operator after the SELECT before the write to cast columns from 
the
+   * input type to the (compatible) defined output type. Druid doesn't work 
that way.
+   * In MSQ, the output the just is the input type. If the user wants to 
control the
+   * output type, then the user must manually insert any required CAST: Druid 
is not
+   * in the business of changing the type to suit the catalog.
+   * <p>
+   * As a result, we first propagate column names and types using Druid rules: 
the
+   * output is exactly what SELECT says it is. We then apply restrictions from 
the
+   * catalog. If the table is strict, only column names from the catalog can be
+   * used.
+   */
+  private RelDataType validateTargetType(
+      SqlValidatorScope scope,
+      final IdentifierNamespace insertNs,
+      SqlInsert insert,
+      RelRecordType sourceType,
+      DatasourceFacade tableMetadata
+  )
+  {
+    final List<RelDataTypeField> sourceFields = sourceType.getFieldList();
+    for (final RelDataTypeField sourceField : sourceFields) {
+      // Check that there are no unnamed columns in the insert.
+      if (UNNAMED_COLUMN_PATTERN.matcher(sourceField.getName()).matches()) {
+        throw InvalidSqlInput.exception(
+            "Insertion requires columns to be named, but at least one of the 
columns was unnamed.  This is usually "
+            + "the result of applying a function without having an AS clause, 
please ensure that all function calls"
+            + "are named with an AS clause as in \"func(X) as myColumn\"."
+        );
+      }
+    }
+    if (tableMetadata == null) {
+      return sourceType;
+    }
+    final boolean isStrict = tableMetadata.isSealed();
+    final List<Map.Entry<String, RelDataType>> fields = new ArrayList<>();
+    for (RelDataTypeField sourceField : sourceFields) {
+      final String colName = sourceField.getName();
+      final DatasourceFacade.ColumnFacade definedCol = 
tableMetadata.column(colName);
+      if (definedCol == null) {
+        if (isStrict) {
+          throw InvalidSqlInput.exception(
+              "Column [%s] is not defined in the target table [%s] strict 
schema",
+              colName,
+              insert.getTargetTable()
+          );
+        }
+
+        // Table is not strict: add a new column based on the SELECT column.
+        fields.add(Pair.of(colName, sourceField.getType()));
+        continue;
+      }
+
+      // If the column name is defined, but no type is given then, use the
+      // column type from SELECT.
+      if (!definedCol.hasType()) {
+        fields.add(Pair.of(colName, sourceField.getType()));
+        continue;
+      }
+
+      // Both the column name and type are provided. Use the name and type
+      // from the catalog.
+      // Note to future readers: this check is preliminary. It works for the
+      // simple column types and has not yet been extended to complex types, 
aggregates,
+      // types defined in extensions, etc. It may be that SQL
+      // has types that Druid cannot store. This may crop up with types 
defined in
+      // extensions which are not loaded. Those details are not known at the 
time
+      // of this code so we are not yet in a position to make the right 
decision.
+      // This is a task to be revisited when we have more information.
+      final String sqlTypeName = definedCol.sqlStorageType();
+      if (sqlTypeName == null) {
+        // Don't know the storage type. Just skip this one: Druid types are
+        // fluid so let Druid sort out what to store. This is probably not the 
right
+        // answer, but should avoid problems until full type system support is 
completed.
+        fields.add(Pair.of(colName, sourceField.getType()));
+        continue;
+      }
+      RelDataType relType = 
typeFactory.createSqlType(SqlTypeName.get(sqlTypeName));
+      fields.add(Pair.of(
+          colName,
+          typeFactory.createTypeWithNullability(relType, true)
+      ));
+    }
+
+    // Perform the SQL-standard check: that the SELECT column can be
+    // converted to the target type. This check is retained to mimic SQL
+    // behavior, but doesn't do anything because we enforced exact type
+    // matches above.
+    final RelDataType targetType = typeFactory.createStructType(fields);
+    final SqlValidatorTable target = insertNs.resolve().getTable();
+    checkTypeAssignment(scope, target, sourceType, targetType, insert);
+    return targetType;
+  }
+
   private boolean isPrecedingOrFollowing(@Nullable SqlNode bound)
   {
     if (bound == null) {
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java
index 0448a7245f8..4862f282182 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java
@@ -33,7 +33,6 @@ import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlOrderBy;
 import org.apache.calcite.tools.ValidationException;
-import org.apache.calcite.util.Pair;
 import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.error.InvalidSqlInput;
@@ -55,28 +54,23 @@ import org.apache.druid.sql.destination.TableDestination;
 import org.apache.druid.storage.ExportStorageProvider;
 
 import java.util.List;
-import java.util.regex.Pattern;
 
 public abstract class IngestHandler extends QueryHandler
 {
-  private static final Pattern UNNAMED_COLUMN_PATTERN = 
Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
-
-  protected final Granularity ingestionGranularity;
+  protected Granularity ingestionGranularity;
   protected IngestDestination targetDatasource;
 
+  private SqlNode validatedQueryNode;
+
   IngestHandler(
       HandlerContext handlerContext,
-      DruidSqlIngest ingestNode,
-      SqlNode queryNode,
       SqlExplain explain
   )
   {
-    super(handlerContext, queryNode, explain);
-    ingestionGranularity = ingestNode.getPartitionedBy() != null ? 
ingestNode.getPartitionedBy().getGranularity() : null;
-    handlerContext.hook().captureInsert(ingestNode);
+    super(handlerContext, explain);
   }
 
-  protected static SqlNode convertQuery(DruidSqlIngest sqlNode)
+  protected static SqlNode convertSourceQuery(DruidSqlIngest sqlNode)
   {
     SqlNode query = sqlNode.getSource();
 
@@ -98,6 +92,7 @@ public abstract class IngestHandler extends QueryHandler
     if (!query.isA(SqlKind.QUERY)) {
       throw InvalidSqlInput.exception("Unexpected SQL statement type [%s], 
expected it to be a QUERY", query.getKind());
     }
+
     return query;
   }
 
@@ -123,7 +118,7 @@ public abstract class IngestHandler extends QueryHandler
                           .build("Export statements do not support a 
PARTITIONED BY or CLUSTERED BY clause.");
     }
 
-    final String exportFileFormat = ingestNode().getExportFileFormat();
+    final SqlIdentifier exportFileFormat = ingestNode().getExportFileFormat();
     if (exportFileFormat == null) {
       throw InvalidSqlInput.exception(
           "Exporting rows into an EXTERN destination requires an AS clause to 
specify the format, but none was found.",
@@ -132,7 +127,7 @@ public abstract class IngestHandler extends QueryHandler
     } else {
       handlerContext.plannerContext().queryContextMap().put(
           DruidSqlIngest.SQL_EXPORT_FILE_FORMAT,
-          exportFileFormat
+          exportFileFormat.toString()
       );
     }
   }
@@ -143,13 +138,6 @@ public abstract class IngestHandler extends QueryHandler
     if (ingestNode().getTargetTable() instanceof 
ExternalDestinationSqlIdentifier) {
       validateExport();
     } else {
-      if (ingestNode().getPartitionedBy() == null) {
-        throw InvalidSqlInput.exception(
-            "Operation [%s] requires a PARTITIONED BY to be explicitly 
defined, but none was found.",
-            operationName()
-        );
-      }
-
       if (ingestNode().getExportFileFormat() != null) {
         throw InvalidSqlInput.exception(
             "The AS <format> clause should only be specified while exporting 
rows into an EXTERN destination.",
@@ -158,19 +146,6 @@ public abstract class IngestHandler extends QueryHandler
       }
     }
 
-    try {
-      PlannerContext plannerContext = handlerContext.plannerContext();
-      if (ingestionGranularity != null) {
-        plannerContext.queryContextMap().put(
-            DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
-            
plannerContext.getJsonMapper().writeValueAsString(ingestionGranularity)
-        );
-      }
-    }
-    catch (JsonProcessingException e) {
-      throw InvalidSqlInput.exception(e, "Invalid partition granularity [%s]", 
ingestionGranularity);
-    }
-    super.validate();
     // Check if CTX_SQL_OUTER_LIMIT is specified and fail the query if it is. 
CTX_SQL_OUTER_LIMIT being provided causes
     // the number of rows inserted to be limited which is likely to be 
confusing and unintended.
     if 
(handlerContext.queryContextMap().get(PlannerContext.CTX_SQL_OUTER_LIMIT) != 
null) {
@@ -180,9 +155,31 @@ public abstract class IngestHandler extends QueryHandler
           operationName()
       );
     }
+    DruidSqlIngest ingestNode = ingestNode();
+    DruidSqlIngest validatedNode = (DruidSqlIngest) validate(ingestNode);
+    validatedQueryNode = validatedNode.getSource();
+    // This context key is set during validation in
+    // org.apache.druid.sql.calcite.planner.DruidSqlValidator.validateInsert.
+    String effectiveGranularity = (String) handlerContext.queryContextMap()
+        .get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY);
+    try {
+      ingestionGranularity = effectiveGranularity != null
+          ? handlerContext.jsonMapper().readValue(effectiveGranularity, 
Granularity.class)
+          : null;
+    }
+    catch (JsonProcessingException e) {
+      // this should never happen, since the granularity value is validated 
before being written to contextMap.
+      throw InvalidSqlInput.exception(e, "Invalid partition granularity [%s]", 
effectiveGranularity);
+    }
     targetDatasource = validateAndGetDataSourceForIngest();
   }
 
+  @Override
+  protected SqlNode validatedQueryNode()
+  {
+    return validatedQueryNode;
+  }
+
   @Override
   protected RelDataType returnedRowType()
   {
@@ -202,27 +199,11 @@ public abstract class IngestHandler extends QueryHandler
   private IngestDestination validateAndGetDataSourceForIngest()
   {
     final SqlInsert insert = ingestNode();
-    if (insert.isUpsert()) {
-      throw InvalidSqlInput.exception("UPSERT is not supported.");
-    }
-
-    if (insert.getTargetColumnList() != null) {
-      throw InvalidSqlInput.exception(
-          "Operation [%s] cannot be run with a target column list, given [%s 
(%s)]",
-          operationName(),
-          insert.getTargetTable(), insert.getTargetColumnList()
-      );
-    }
 
     final SqlIdentifier tableIdentifier = (SqlIdentifier) 
insert.getTargetTable();
     final IngestDestination dataSource;
 
-    if (tableIdentifier.names.isEmpty()) {
-      // I don't think this can happen, but include a branch for it just in 
case.
-      throw DruidException.forPersona(DruidException.Persona.USER)
-          .ofCategory(DruidException.Category.DEFENSIVE)
-          .build("Operation [%s] requires a target table", operationName());
-    } else if (tableIdentifier instanceof ExternalDestinationSqlIdentifier) {
+    if (tableIdentifier instanceof ExternalDestinationSqlIdentifier) {
       ExternalDestinationSqlIdentifier externalDestination = 
((ExternalDestinationSqlIdentifier) tableIdentifier);
       ExportStorageProvider storageProvider = 
externalDestination.toExportStorageProvider(handlerContext.jsonMapper());
       dataSource = new ExportDestination(storageProvider);
@@ -264,7 +245,6 @@ public abstract class IngestHandler extends QueryHandler
   @Override
   protected QueryMaker buildQueryMaker(final RelRoot rootQueryRel) throws 
ValidationException
   {
-    validateColumnsForIngestion(rootQueryRel);
     return handlerContext.engine().buildQueryMakerForInsert(
         targetDatasource,
         rootQueryRel,
@@ -272,20 +252,6 @@ public abstract class IngestHandler extends QueryHandler
     );
   }
 
-  private void validateColumnsForIngestion(RelRoot rootQueryRel)
-  {
-    // Check that there are no unnamed columns in the insert.
-    for (Pair<Integer, String> field : rootQueryRel.fields) {
-      if (UNNAMED_COLUMN_PATTERN.matcher(field.right).matches()) {
-        throw InvalidSqlInput.exception(
-            "Insertion requires columns to be named, but at least one of the 
columns was unnamed.  This is usually "
-            + "the result of applying a function without having an AS clause, 
please ensure that all function calls"
-            + "are named with an AS clause as in \"func(X) as myColumn\"."
-        );
-      }
-    }
-  }
-
   /**
    * Handler for the INSERT statement.
    */
@@ -299,13 +265,24 @@ public abstract class IngestHandler extends QueryHandler
         SqlExplain explain
     )
     {
-      super(
-          handlerContext,
-          sqlNode,
-          convertQuery(sqlNode),
-          explain
-      );
-      this.sqlNode = sqlNode;
+      super(handlerContext, explain);
+      this.sqlNode = convertQuery(sqlNode);
+      handlerContext.hook().captureInsert(sqlNode);
+    }
+
+    protected static DruidSqlInsert convertQuery(DruidSqlIngest sqlNode)
+    {
+      SqlNode query = convertSourceQuery(sqlNode);
+
+      return DruidSqlInsert.create(new SqlInsert(
+              sqlNode.getParserPosition(),
+              (SqlNodeList) sqlNode.getOperandList().get(0),
+              sqlNode.getOperandList().get(1),
+              query,
+              (SqlNodeList) sqlNode.getOperandList().get(3)),
+          sqlNode.getPartitionedBy(),
+          sqlNode.getClusteredBy(),
+          sqlNode.getExportFileFormat());
     }
 
     @Override
@@ -355,11 +332,29 @@ public abstract class IngestHandler extends QueryHandler
     {
       super(
           handlerContext,
-          sqlNode,
-          convertQuery(sqlNode),
           explain
       );
-      this.sqlNode = sqlNode;
+      this.sqlNode = convertQuery(sqlNode);
+      handlerContext.hook().captureInsert(sqlNode);
+    }
+
+    protected static DruidSqlReplace convertQuery(DruidSqlReplace sqlNode)
+    {
+      SqlNode query = convertSourceQuery(sqlNode);
+
+      return DruidSqlReplace.create(
+          new SqlInsert(
+              sqlNode.getParserPosition(),
+              (SqlNodeList) sqlNode.getOperandList().get(0),
+              sqlNode.getOperandList().get(1),
+              query,
+              (SqlNodeList) sqlNode.getOperandList().get(3)
+          ),
+          sqlNode.getPartitionedBy(),
+          sqlNode.getClusteredBy(),
+          sqlNode.getExportFileFormat(),
+          sqlNode.getReplaceTimeQuery()
+      );
     }
 
     @Override
@@ -390,12 +385,12 @@ public abstract class IngestHandler extends QueryHandler
         );
       }
 
+      super.validate();
       List<String> replaceIntervalsList = 
DruidSqlParserUtils.validateQueryAndConvertToIntervals(
           replaceTimeQuery,
           ingestionGranularity,
           handlerContext.timeZone()
       );
-      super.validate();
       if (replaceIntervalsList != null) {
         replaceIntervals = String.join(",", replaceIntervalsList);
         handlerContext.queryContextMap().put(
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java
index 57d4e08b071..3e7f58ec067 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java
@@ -93,27 +93,24 @@ public abstract class QueryHandler extends 
SqlStatementHandler.BaseStatementHand
 {
   static final EmittingLogger log = new EmittingLogger(QueryHandler.class);
 
-  protected SqlNode queryNode;
   protected SqlExplain explain;
-  protected SqlNode validatedQueryNode;
   private boolean isPrepared;
   protected RelRoot rootQueryRel;
   private PrepareResult prepareResult;
   protected RexBuilder rexBuilder;
 
-  public QueryHandler(SqlStatementHandler.HandlerContext handlerContext, 
SqlNode sqlNode, SqlExplain explain)
+  public QueryHandler(HandlerContext handlerContext, SqlExplain explain)
   {
     super(handlerContext);
-    this.queryNode = sqlNode;
     this.explain = explain;
   }
 
-  @Override
-  public void validate()
+  protected SqlNode validate(SqlNode root)
   {
     CalcitePlanner planner = handlerContext.planner();
+    SqlNode validatedQueryNode;
     try {
-      validatedQueryNode = planner.validate(rewriteParameters());
+      validatedQueryNode = planner.validate(rewriteParameters(root));
     }
     catch (ValidationException e) {
       throw DruidPlanner.translateException(e);
@@ -126,9 +123,10 @@ public abstract class QueryHandler extends 
SqlStatementHandler.BaseStatementHand
     );
     validatedQueryNode.accept(resourceCollectorShuttle);
     resourceActions = resourceCollectorShuttle.getResourceActions();
+    return validatedQueryNode;
   }
 
-  private SqlNode rewriteParameters()
+  private SqlNode rewriteParameters(SqlNode original)
   {
     // Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap 
out any
     // {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link 
SqlLiteral}
@@ -140,9 +138,9 @@ public abstract class QueryHandler extends 
SqlStatementHandler.BaseStatementHand
     // contains parameters, but no values were provided.
     PlannerContext plannerContext = handlerContext.plannerContext();
     if (plannerContext.getParameters().isEmpty()) {
-      return queryNode;
+      return original;
     } else {
-      return queryNode.accept(new SqlParameterizerShuttle(plannerContext));
+      return original.accept(new SqlParameterizerShuttle(plannerContext));
     }
   }
 
@@ -153,6 +151,7 @@ public abstract class QueryHandler extends 
SqlStatementHandler.BaseStatementHand
       return;
     }
     isPrepared = true;
+    SqlNode validatedQueryNode = validatedQueryNode();
     rootQueryRel = handlerContext.planner().rel(validatedQueryNode);
     handlerContext.hook().captureQueryRel(rootQueryRel);
     final RelDataTypeFactory typeFactory = 
rootQueryRel.rel.getCluster().getTypeFactory();
@@ -177,6 +176,8 @@ public abstract class QueryHandler extends 
SqlStatementHandler.BaseStatementHand
     return prepareResult;
   }
 
+  protected abstract SqlNode validatedQueryNode();
+
   protected abstract RelDataType returnedRowType();
 
   private static RelDataType getExplainStructType(RelDataTypeFactory 
typeFactory)
@@ -690,13 +691,17 @@ public abstract class QueryHandler extends 
SqlStatementHandler.BaseStatementHand
 
   public static class SelectHandler extends QueryHandler
   {
+    private final SqlNode queryNode;
+    private SqlNode validatedQueryNode;
+
     public SelectHandler(
         HandlerContext handlerContext,
         SqlNode sqlNode,
         SqlExplain explain
     )
     {
-      super(handlerContext, sqlNode, explain);
+      super(handlerContext, explain);
+      this.queryNode = sqlNode;
     }
 
     @Override
@@ -705,7 +710,13 @@ public abstract class QueryHandler extends 
SqlStatementHandler.BaseStatementHand
       if 
(!handlerContext.plannerContext().featureAvailable(EngineFeature.CAN_SELECT)) {
         throw InvalidSqlInput.exception("Cannot execute SELECT with SQL engine 
[%s]", handlerContext.engine().name());
       }
-      super.validate();
+      validatedQueryNode = validate(queryNode);
+    }
+
+    @Override
+    protected SqlNode validatedQueryNode()
+    {
+      return validatedQueryNode;
     }
 
     @Override
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteExportTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteExportTest.java
index cc4b2a0fec4..4a97367fcd1 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteExportTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteExportTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.common.collect.ImmutableList;
 import com.google.inject.Binder;
+import org.apache.calcite.avatica.SqlType;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.guice.DruidInjectorBuilder;
 import org.apache.druid.initialization.DruidModule;
@@ -37,6 +38,7 @@ import 
org.apache.druid.sql.calcite.export.TestExportStorageConnector;
 import org.apache.druid.sql.calcite.filtration.Filtration;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.sql.destination.ExportDestination;
+import org.apache.druid.sql.http.SqlParameter;
 import org.apache.druid.storage.StorageConfig;
 import org.apache.druid.storage.StorageConnector;
 import org.apache.druid.storage.local.LocalFileExportStorageProvider;
@@ -46,6 +48,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.internal.matchers.ThrowableMessageMatcher;
 
+import java.util.Collections;
 import java.util.List;
 
 public class CalciteExportTest extends CalciteIngestionDmlTest
@@ -176,6 +179,59 @@ public class CalciteExportTest extends 
CalciteIngestionDmlTest
         .verify();
   }
 
+
+  @Test
+  public void testInsertIntoExternParameterized()
+  {
+    testIngestionQuery()
+        .sql(StringUtils.format("INSERT INTO EXTERN(%s()) "
+                                + "AS CSV "
+                                + "SELECT dim2 FROM foo WHERE dim2=?", 
TestExportStorageConnector.TYPE_NAME))
+        .parameters(Collections.singletonList(new 
SqlParameter(SqlType.VARCHAR, "val")))
+        .expectQuery(
+            Druids.newScanQueryBuilder()
+                .dataSource(
+                    "foo"
+                )
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .filters(equality("dim2", "val", ColumnType.STRING))
+                .columns("dim2")
+                
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                .legacy(false)
+                .build()
+        )
+        .expectResources(dataSourceRead("foo"), 
externalWrite(TestExportStorageConnector.TYPE_NAME))
+        .expectTarget(ExportDestination.TYPE_KEY, 
RowSignature.builder().add("dim2", ColumnType.STRING).build())
+        .verify();
+  }
+
+  // Disabled until replace supports external destinations. To be enabled 
after that point.
+  @Test
+  @Ignore
+  public void testReplaceIntoExternParameterized()
+  {
+    testIngestionQuery()
+        .sql(StringUtils.format("REPLACE INTO EXTERN(%s()) "
+                                + "AS CSV "
+                                + "SELECT dim2 FROM foo WHERE dim2=?", 
TestExportStorageConnector.TYPE_NAME))
+        .parameters(Collections.singletonList(new 
SqlParameter(SqlType.VARCHAR, "val")))
+        .expectQuery(
+            Druids.newScanQueryBuilder()
+                .dataSource(
+                    "foo"
+                )
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .filters(equality("dim2", "val", ColumnType.STRING))
+                .columns("dim2")
+                
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                .legacy(false)
+                .build()
+        )
+        .expectResources(dataSourceRead("foo"), 
externalWrite(TestExportStorageConnector.TYPE_NAME))
+        .expectTarget(ExportDestination.TYPE_KEY, 
RowSignature.builder().add("dim2", ColumnType.STRING).build())
+        .verify();
+  }
+
   @Test
   public void testExportWithoutFormat()
   {
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
index 515796948fa..eded1b7fb77 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
@@ -1679,7 +1679,6 @@ public class CalciteInsertDmlTest extends 
CalciteIngestionDmlTest
                              + "partitioned by DAY\n"
                              + "clustered by channel";
     HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
-    context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100);
     testIngestionQuery().context(context).sql(sqlString)
                         .expectValidationError(
                             new DruidExceptionMatcher(
@@ -1708,7 +1707,6 @@ public class CalciteInsertDmlTest extends 
CalciteIngestionDmlTest
                              + "partitioned by DAY\n"
                              + "clustered by channel";
     HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
-    context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100);
     testIngestionQuery().context(context).sql(sqlString)
                         .expectValidationError(
                             new DruidExceptionMatcher(
@@ -1736,7 +1734,6 @@ public class CalciteInsertDmlTest extends 
CalciteIngestionDmlTest
                              + "partitioned by DAY\n"
                              + "clustered by channel";
     HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
-    context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100);
     testIngestionQuery().context(context).sql(sqlString)
                         .expectValidationError(
                             new DruidExceptionMatcher(
@@ -1765,7 +1762,6 @@ public class CalciteInsertDmlTest extends 
CalciteIngestionDmlTest
                              + "partitioned by DAY\n"
                              + "clustered by channel";
     HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
-    context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100);
     testIngestionQuery().context(context).sql(sqlString)
                         .expectValidationError(
                             new DruidExceptionMatcher(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to