This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 70acbc5  [FLINK-25171][table-planner] Validation of duplicate fields 
in ddl sql
70acbc5 is described below

commit 70acbc591c8c66e82e5fb140f3d49afb0cc107bc
Author: shouzuo1 <[email protected]>
AuthorDate: Mon Dec 6 15:10:33 2021 +0800

    [FLINK-25171][table-planner] Validation of duplicate fields in ddl sql
    
    This closes #18017
    
    (cherry picked from commit 34de3989a613cf7124f9e301cb8284080f4df4ac)
---
 .../planner/operations/MergeTableLikeUtil.java     | 22 ++++++-
 .../planner/operations/MergeTableLikeUtilTest.java | 73 +++++++++++++++++++++-
 .../table/planner/plan/stream/sql/UnionTest.scala  |  3 +-
 3 files changed, 93 insertions(+), 5 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
index 4fe1fb4..29c7059 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
@@ -407,6 +407,13 @@ class MergeTableLikeUtil {
                                     name, 
TypeConversions.fromLogicalToDataType(logicalType));
                 } else if (derivedColumn instanceof SqlComputedColumn) {
                     final SqlComputedColumn computedColumn = 
(SqlComputedColumn) derivedColumn;
+                    if (physicalFieldNamesToTypes.containsKey(name)) {
+                        throw new ValidationException(
+                                String.format(
+                                        "A column named '%s' already exists in 
the table. "
+                                                + "Duplicate columns exist in 
the compute column and regular column. ",
+                                        name));
+                    }
                     if (columns.containsKey(name)) {
                         if (!(columns.get(name) instanceof ComputedColumn)) {
                             throw new ValidationException(
@@ -443,6 +450,13 @@ class MergeTableLikeUtil {
                     computedFieldNamesToTypes.put(name, validatedType);
                 } else if (derivedColumn instanceof SqlMetadataColumn) {
                     final SqlMetadataColumn metadataColumn = 
(SqlMetadataColumn) derivedColumn;
+                    if (physicalFieldNamesToTypes.containsKey(name)) {
+                        throw new ValidationException(
+                                String.format(
+                                        "A column named '%s' already exists in 
the table. "
+                                                + "Duplicate columns exist in 
the metadata column and regular column. ",
+                                        name));
+                    }
                     if (columns.containsKey(name)) {
                         if (!(columns.get(name) instanceof MetadataColumn)) {
                             throw new ValidationException(
@@ -494,7 +508,13 @@ class MergeTableLikeUtil {
                     boolean nullable = type.getNullable() == null ? true : 
type.getNullable();
                     RelDataType relType = type.deriveType(sqlValidator, 
nullable);
                     // add field name and field type to physical field list
-                    physicalFieldNamesToTypes.put(name, relType);
+                    RelDataType oldType = physicalFieldNamesToTypes.put(name, 
relType);
+                    if (oldType != null) {
+                        throw new ValidationException(
+                                String.format(
+                                        "A regular Column named '%s' already 
exists in the table.",
+                                        name));
+                    }
                 }
             }
         }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
index cbf1238..a5738d1 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
@@ -127,6 +127,75 @@ public class MergeTableLikeUtilTest {
     }
 
     @Test
+    public void mergeWithIncludeFailsOnDuplicateRegularColumn() {
+        TableSchema sourceSchema =
+                TableSchema.builder().add(TableColumn.physical("one", 
DataTypes.INT())).build();
+
+        List<SqlNode> derivedColumns =
+                Arrays.asList(
+                        regularColumn("two", DataTypes.INT()),
+                        regularColumn("two", DataTypes.INT()),
+                        regularColumn("four", DataTypes.STRING()));
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage("A regular Column named 'two' already exists in 
the table.");
+        util.mergeTables(
+                getDefaultMergingStrategies(),
+                sourceSchema,
+                derivedColumns,
+                Collections.emptyList(),
+                null);
+    }
+
+    @Test
+    public void 
mergeWithIncludeFailsOnDuplicateRegularColumnAndComputeColumn() {
+        TableSchema sourceSchema =
+                TableSchema.builder().add(TableColumn.physical("one", 
DataTypes.INT())).build();
+
+        List<SqlNode> derivedColumns =
+                Arrays.asList(
+                        regularColumn("two", DataTypes.INT()),
+                        computedColumn("three", plus("two", "3")),
+                        regularColumn("three", DataTypes.INT()),
+                        regularColumn("four", DataTypes.STRING()));
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage(
+                "A column named 'three' already exists in the table. "
+                        + "Duplicate columns exist in the compute column and 
regular column. ");
+        util.mergeTables(
+                getDefaultMergingStrategies(),
+                sourceSchema,
+                derivedColumns,
+                Collections.emptyList(),
+                null);
+    }
+
+    @Test
+    public void 
mergeWithIncludeFailsOnDuplicateRegularColumnAndMetadataColumn() {
+        TableSchema sourceSchema =
+                TableSchema.builder().add(TableColumn.physical("one", 
DataTypes.INT())).build();
+
+        List<SqlNode> derivedColumns =
+                Arrays.asList(
+                        metadataColumn("two", DataTypes.INT(), true),
+                        computedColumn("three", plus("two", "3")),
+                        regularColumn("two", DataTypes.INT()),
+                        regularColumn("four", DataTypes.STRING()));
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage(
+                "A column named 'two' already exists in the table. "
+                        + "Duplicate columns exist in the metadata column and 
regular column. ");
+        util.mergeTables(
+                getDefaultMergingStrategies(),
+                sourceSchema,
+                derivedColumns,
+                Collections.emptyList(),
+                null);
+    }
+
+    @Test
     public void mergeGeneratedColumns() {
         TableSchema sourceSchema =
                 TableSchema.builder()
@@ -378,8 +447,8 @@ public class MergeTableLikeUtilTest {
 
         thrown.expect(ValidationException.class);
         thrown.expectMessage(
-                "A column named 'two' already exists in the base table."
-                        + " Computed columns can only overwrite other computed 
columns.");
+                "A column named 'two' already exists in the table. "
+                        + "Duplicate columns exist in the compute column and 
regular column. ");
         util.mergeTables(
                 mergingStrategies, sourceSchema, derivedColumns, 
Collections.emptyList(), null);
     }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala
index 946f1d0..6b1a6ba 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala
@@ -40,10 +40,9 @@ class UnionTest extends TableTestBase {
          |CREATE TABLE t1 (
          |  id int,
          |  ts bigint,
-         |  name string,
+         |  name varchar(32),
          |  timestamp_col timestamp(3),
          |  val bigint,
-         |  name varchar(32),
          |  timestamp_ltz_col as TO_TIMESTAMP_LTZ(ts, 3),
          |  watermark for timestamp_col as timestamp_col
          |) WITH (

Reply via email to