This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 70acbc5 [FLINK-25171][table-planner] Validation of duplicate fields
in ddl sql
70acbc5 is described below
commit 70acbc591c8c66e82e5fb140f3d49afb0cc107bc
Author: shouzuo1 <[email protected]>
AuthorDate: Mon Dec 6 15:10:33 2021 +0800
[FLINK-25171][table-planner] Validation of duplicate fields in ddl sql
This closes #18017
(cherry picked from commit 34de3989a613cf7124f9e301cb8284080f4df4ac)
---
.../planner/operations/MergeTableLikeUtil.java | 22 ++++++-
.../planner/operations/MergeTableLikeUtilTest.java | 73 +++++++++++++++++++++-
.../table/planner/plan/stream/sql/UnionTest.scala | 3 +-
3 files changed, 93 insertions(+), 5 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
index 4fe1fb4..29c7059 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
@@ -407,6 +407,13 @@ class MergeTableLikeUtil {
name,
TypeConversions.fromLogicalToDataType(logicalType));
} else if (derivedColumn instanceof SqlComputedColumn) {
final SqlComputedColumn computedColumn =
(SqlComputedColumn) derivedColumn;
+ if (physicalFieldNamesToTypes.containsKey(name)) {
+ throw new ValidationException(
+ String.format(
+ "A column named '%s' already exists in
the table. "
+ + "Duplicate columns exist in
the compute column and regular column. ",
+ name));
+ }
if (columns.containsKey(name)) {
if (!(columns.get(name) instanceof ComputedColumn)) {
throw new ValidationException(
@@ -443,6 +450,13 @@ class MergeTableLikeUtil {
computedFieldNamesToTypes.put(name, validatedType);
} else if (derivedColumn instanceof SqlMetadataColumn) {
final SqlMetadataColumn metadataColumn =
(SqlMetadataColumn) derivedColumn;
+ if (physicalFieldNamesToTypes.containsKey(name)) {
+ throw new ValidationException(
+ String.format(
+ "A column named '%s' already exists in
the table. "
+ + "Duplicate columns exist in
the metadata column and regular column. ",
+ name));
+ }
if (columns.containsKey(name)) {
if (!(columns.get(name) instanceof MetadataColumn)) {
throw new ValidationException(
@@ -494,7 +508,13 @@ class MergeTableLikeUtil {
boolean nullable = type.getNullable() == null ? true :
type.getNullable();
RelDataType relType = type.deriveType(sqlValidator,
nullable);
// add field name and field type to physical field list
- physicalFieldNamesToTypes.put(name, relType);
+ RelDataType oldType = physicalFieldNamesToTypes.put(name,
relType);
+ if (oldType != null) {
+ throw new ValidationException(
+ String.format(
+ "A regular Column named '%s' already
exists in the table.",
+ name));
+ }
}
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
index cbf1238..a5738d1 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
@@ -127,6 +127,75 @@ public class MergeTableLikeUtilTest {
}
@Test
+ public void mergeWithIncludeFailsOnDuplicateRegularColumn() {
+ TableSchema sourceSchema =
+ TableSchema.builder().add(TableColumn.physical("one",
DataTypes.INT())).build();
+
+ List<SqlNode> derivedColumns =
+ Arrays.asList(
+ regularColumn("two", DataTypes.INT()),
+ regularColumn("two", DataTypes.INT()),
+ regularColumn("four", DataTypes.STRING()));
+
+ thrown.expect(ValidationException.class);
+ thrown.expectMessage("A regular Column named 'two' already exists in
the table.");
+ util.mergeTables(
+ getDefaultMergingStrategies(),
+ sourceSchema,
+ derivedColumns,
+ Collections.emptyList(),
+ null);
+ }
+
+ @Test
+ public void
mergeWithIncludeFailsOnDuplicateRegularColumnAndComputeColumn() {
+ TableSchema sourceSchema =
+ TableSchema.builder().add(TableColumn.physical("one",
DataTypes.INT())).build();
+
+ List<SqlNode> derivedColumns =
+ Arrays.asList(
+ regularColumn("two", DataTypes.INT()),
+ computedColumn("three", plus("two", "3")),
+ regularColumn("three", DataTypes.INT()),
+ regularColumn("four", DataTypes.STRING()));
+
+ thrown.expect(ValidationException.class);
+ thrown.expectMessage(
+ "A column named 'three' already exists in the table. "
+ + "Duplicate columns exist in the compute column and
regular column. ");
+ util.mergeTables(
+ getDefaultMergingStrategies(),
+ sourceSchema,
+ derivedColumns,
+ Collections.emptyList(),
+ null);
+ }
+
+ @Test
+ public void
mergeWithIncludeFailsOnDuplicateRegularColumnAndMetadataColumn() {
+ TableSchema sourceSchema =
+ TableSchema.builder().add(TableColumn.physical("one",
DataTypes.INT())).build();
+
+ List<SqlNode> derivedColumns =
+ Arrays.asList(
+ metadataColumn("two", DataTypes.INT(), true),
+ computedColumn("three", plus("two", "3")),
+ regularColumn("two", DataTypes.INT()),
+ regularColumn("four", DataTypes.STRING()));
+
+ thrown.expect(ValidationException.class);
+ thrown.expectMessage(
+ "A column named 'two' already exists in the table. "
+ + "Duplicate columns exist in the metadata column and
regular column. ");
+ util.mergeTables(
+ getDefaultMergingStrategies(),
+ sourceSchema,
+ derivedColumns,
+ Collections.emptyList(),
+ null);
+ }
+
+ @Test
public void mergeGeneratedColumns() {
TableSchema sourceSchema =
TableSchema.builder()
@@ -378,8 +447,8 @@ public class MergeTableLikeUtilTest {
thrown.expect(ValidationException.class);
thrown.expectMessage(
- "A column named 'two' already exists in the base table."
- + " Computed columns can only overwrite other computed
columns.");
+ "A column named 'two' already exists in the table. "
+ + "Duplicate columns exist in the compute column and
regular column. ");
util.mergeTables(
mergingStrategies, sourceSchema, derivedColumns,
Collections.emptyList(), null);
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala
index 946f1d0..6b1a6ba 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala
@@ -40,10 +40,9 @@ class UnionTest extends TableTestBase {
|CREATE TABLE t1 (
| id int,
| ts bigint,
- | name string,
+ | name varchar(32),
| timestamp_col timestamp(3),
| val bigint,
- | name varchar(32),
| timestamp_ltz_col as TO_TIMESTAMP_LTZ(ts, 3),
| watermark for timestamp_col as timestamp_col
|) WITH (