clintropolis commented on code in PR #15962:
URL: https://github.com/apache/druid/pull/15962#discussion_r1536264246


##########
sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.ResolvedTable;
+import org.apache.druid.catalog.model.TableDefn;
+import org.apache.druid.catalog.model.TableSpec;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.table.DatasourceDefn;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.planner.CatalogResolver;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.sql.calcite.table.DruidTable;
+
+public class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest

Review Comment:
   this seems like a base class, maybe javadoc to explain that would be useful



##########
sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java:
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.external.ExternalDataSource;
+import org.apache.druid.sql.calcite.external.Externals;
+import org.apache.druid.sql.calcite.filtration.Filtration;
+import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test for INSERT DML statements for tables defined in catalog.
+ */
+public class CalciteCatalogInsertTest extends CalciteCatalogIngestionDmlTest
+{
+  /**
+   * If the segment grain is given in the catalog and absent in the 
PARTITIONED BY clause in the query, then use the
+   * value from the catalog.
+   */
+  @Test
+  public void testInsertHourGrainPartitonedByFromCatalog()
+  {
+    testIngestionQuery()
+        .sql("INSERT INTO hourDs\n" +
+             "SELECT * FROM foo")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("hourDs", FOO_TABLE_SIGNATURE)
+        .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                // Scan query lists columns in alphabetical order independent 
of the
+                // SQL project list or the defined schema.
+                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", 
"unique_dim1")
+                .context(queryContextWithGranularity(Granularities.HOUR))
+                .build()
+        )
+        .verify();
+  }
+
+  /**
+   * If the segment grain is given in the catalog, and also by PARTITIONED BY, 
then
+   * the query value is used.
+   */
+  @Test
+  public void testInsertHourGrainWithDayPartitonedByFromQuery()
+  {
+    testIngestionQuery()
+        .sql("INSERT INTO hourDs\n" +
+             "SELECT * FROM foo\n" +
+             "PARTITIONED BY day")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("hourDs", FOO_TABLE_SIGNATURE)
+        .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                // Scan query lists columns in alphabetical order independent 
of the
+                // SQL project list or the defined schema.
+                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", 
"unique_dim1")
+                .context(queryContextWithGranularity(Granularities.DAY))
+                .build()
+        )
+        .verify();
+  }
+
+  /**
+   * If the segment grain is absent in the catalog and absent in the 
PARTITIONED BY clause in the query, then
+   * validation error.
+   */
+  @Test
+  public void testInsertNoPartitonedByFromCatalog()
+  {
+    testIngestionQuery()
+        .sql("INSERT INTO noPartitonedBy\n" +
+             "SELECT * FROM foo")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectValidationError(
+            DruidException.class,
+            "Operation [INSERT] requires a PARTITIONED BY to be explicitly 
defined, but none was found."
+        )
+        .verify();
+  }
+
+  /**
+   * If the segment grain is absent in the catalog, but given by PARTITIONED 
BY, then
+   * the query value is used.
+   */
+  @Test
+  public void testInsertNoPartitonedByWithDayPartitonedByFromQuery()
+  {
+    testIngestionQuery()
+        .sql("INSERT INTO noPartitonedBy\n" +
+             "SELECT * FROM foo\n" +
+             "PARTITIONED BY day")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE)
+        .expectResources(dataSourceWrite("noPartitonedBy"), 
dataSourceRead("foo"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource("foo")
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                // Scan query lists columns in alphabetical order independent 
of the
+                // SQL project list or the defined schema.
+                .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", 
"unique_dim1")
+                .context(queryContextWithGranularity(Granularities.DAY))
+                .build()
+        )
+        .verify();
+  }
+
+  /**
+   * Adding a new column during ingestion that is not defined in a non-sealed 
table should succeed.
+   */
+  @Test
+  public void testInsertAddNonDefinedColumnIntoNonSealedCatalogTable()
+  {
+    ExternalDataSource externalDataSource = new ExternalDataSource(
+        new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+        new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, 
false, false, 0),
+        RowSignature.builder()
+            .add("a", ColumnType.STRING)
+            .add("b", ColumnType.STRING)
+            .add("c", ColumnType.LONG)
+            .add("d", ColumnType.STRING)
+            .add("e", ColumnType.STRING)
+            .build()
+    );
+    final RowSignature signature = RowSignature.builder()
+        .add("__time", ColumnType.LONG)
+        .add("dim1", ColumnType.STRING)
+        .add("cnt", ColumnType.LONG)
+        .add("m2", ColumnType.DOUBLE)
+        .add("extra2", ColumnType.LONG)
+        .add("extra3", ColumnType.STRING)
+        .build();
+    testIngestionQuery()
+        .sql("INSERT INTO foo\n" +
+             "SELECT\n" +
+             "  TIME_PARSE(a) AS __time,\n" +
+             "  b AS dim1,\n" +
+             "  1 AS cnt,\n" +
+             "  c AS m2,\n" +
+             "  CAST(d AS BIGINT) AS extra2,\n" +
+             "  e AS extra3\n" +
+             "FROM TABLE(inline(\n" +
+             "  data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+             "  format => 'csv'))\n" +
+             "  (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+             "PARTITIONED BY ALL TIME")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("foo", signature)
+        .expectResources(dataSourceWrite("foo"), 
Externals.externalRead("EXTERNAL"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource(externalDataSource)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .virtualColumns(
+                    expressionVirtualColumn("v0", 
"timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+                    expressionVirtualColumn("v1", "1", ColumnType.LONG),
+                    expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", 
ColumnType.DOUBLE),
+                    expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", 
ColumnType.LONG)
+                )
+                // Scan query lists columns in alphabetical order independent 
of the
+                // SQL project list or the defined schema.
+                .columns("b", "e", "v0", "v1", "v2", "v3")
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                .build()
+        )
+        .verify();
+  }
+
+  /**
+   * Adding a new column during ingestion that is not defined in a sealed 
table should fail with
+   * proper validation error.
+   */
+  @Test
+  public void testInsertAddNonDefinedColumnIntoSealedCatalogTable()
+  {
+    testIngestionQuery()
+        .sql("INSERT INTO fooSealed\n" +
+             "SELECT\n" +
+             "  TIME_PARSE(a) AS __time,\n" +
+             "  b AS dim1,\n" +
+             "  1 AS cnt,\n" +
+             "  c AS m2,\n" +
+             "  CAST(d AS BIGINT) AS extra2\n" +
+             "FROM TABLE(inline(\n" +
+             "  data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+             "  format => 'csv'))\n" +
+             "  (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+             "PARTITIONED BY ALL TIME")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectValidationError(
+            DruidException.class,
+            "Column [extra2] is not defined in the target table 
[druid.fooSealed] strict schema"
+        )
+        .verify();
+  }
+
+
+  /**
+   * Inserting into a catalog table with a WITH source succeeds
+   */
+  @Test
+  public void testInsertWithSourceIntoCatalogTable()
+  {
+    ExternalDataSource externalDataSource = new ExternalDataSource(
+        new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
+        new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, 
false, false, 0),
+        RowSignature.builder()
+            .add("a", ColumnType.STRING)
+            .add("b", ColumnType.STRING)
+            .add("c", ColumnType.LONG)
+            .add("d", ColumnType.STRING)
+            .add("e", ColumnType.STRING)
+            .build()
+    );
+    final RowSignature signature = RowSignature.builder()
+        .add("__time", ColumnType.LONG)
+        .add("dim1", ColumnType.STRING)
+        .add("cnt", ColumnType.LONG)
+        .add("m2", ColumnType.DOUBLE)
+        .add("extra2", ColumnType.LONG)
+        .add("extra3", ColumnType.STRING)
+        .build();
+    testIngestionQuery()
+        .sql("INSERT INTO \"foo\"\n" +
+             "WITH \"ext\" AS (\n" +
+             "  SELECT *\n" +
+             "FROM TABLE(inline(\n" +
+             "  data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
+             "  format => 'csv'))\n" +
+             "  (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
+             ")\n" +
+             "SELECT\n" +
+             "  TIME_PARSE(a) AS __time,\n" +
+             "  b AS dim1,\n" +
+             "  1 AS cnt,\n" +
+             "  c AS m2,\n" +
+             "  CAST(d AS BIGINT) AS extra2,\n" +
+             "  e AS extra3\n" +
+             "FROM \"ext\"\n" +
+             "PARTITIONED BY ALL TIME")
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("foo", signature)
+        .expectResources(dataSourceWrite("foo"), 
Externals.externalRead("EXTERNAL"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource(externalDataSource)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .virtualColumns(
+                    expressionVirtualColumn("v0", 
"timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
+                    expressionVirtualColumn("v1", "1", ColumnType.LONG),
+                    expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", 
ColumnType.DOUBLE),
+                    expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", 
ColumnType.LONG)
+                )
+                // Scan query lists columns in alphabetical order independent 
of the
+                // SQL project list or the defined schema.
+                .columns("b", "e", "v0", "v1", "v2", "v3")
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                .build()
+        )
+        .verify();
+  }
+
+  @Test
+  public void testInsertIntoExistingStrictNoDefinedSchema()
+  {
+    testIngestionQuery()
+        .sql("INSERT INTO strictTableWithNoDefinedSchema SELECT __time AS 
__time FROM foo PARTITIONED BY ALL TIME")
+        .expectValidationError(
+            DruidException.class,
+            "Column [__time] is not defined in the target table 
[druid.strictTableWithNoDefinedSchema] strict schema")
+        .verify();
+  }
+
+  @Test
+  public void testInsertIntoExistingWithIncompatibleTypeAssignment()

Review Comment:
   nit: it might be nice to add a few more tests like this to encode some of 
the behaviors in tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to