LadyForest commented on code in PR #20652:
URL: https://github.com/apache/flink/pull/20652#discussion_r990580442


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java:
##########
@@ -1466,6 +1477,404 @@ public void 
testAlterTableCompactOnManagedPartitionedTable() throws Exception {
                 parse("alter table tb1 compact", SqlDialect.DEFAULT), 
staticPartitions);
     }
 
+    @Test
+    public void testAlterTableAddColumn() throws Exception {
+        prepareNonManagedTable(false);
+        ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", 
"tb1");
+        Schema originalSchema =
+                
catalogManager.getTable(tableIdentifier).get().getTable().getUnresolvedSchema();
+
+        // test duplicated column name
+        assertThatThrownBy(() -> parse("alter table tb1 add a bigint", 
SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("Try to add a column 'a' which already 
exists in the table.");
+
+        // test reference nonexistent column name
+        assertThatThrownBy(() -> parse("alter table tb1 add x bigint after y", 
SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Referenced column 'y' by 'AFTER' does not exist in 
the table.");
+
+        // test add a single column
+        Operation operation =
+                parse(
+                        "alter table tb1 add d double not null comment 'd is 
double not null'",
+                        SqlDialect.DEFAULT);
+        assertAlterTableSchema(
+                operation,
+                tableIdentifier,
+                Schema.newBuilder()
+                        .fromSchema(originalSchema)
+                        .column("d", DataTypes.DOUBLE().notNull())
+                        .withComment("d is double not null")
+                        .build());
+
+        // test add multiple columns with pk
+        operation =
+                parse(
+                        "alter table tb1 add (\n"
+                                + " e as upper(a) first,\n"
+                                + " f as b*2 after e,\n"
+                                + " g int metadata from 'mk1' virtual comment 
'comment_metadata' first,\n"
+                                + " h string primary key not enforced after 
a)",
+                        SqlDialect.DEFAULT);
+
+        List<Schema.UnresolvedColumn> unresolvedColumns =
+                new ArrayList<>(originalSchema.getColumns());
+        unresolvedColumns.add(
+                0,
+                new Schema.UnresolvedMetadataColumn(
+                        "g", DataTypes.INT(), "mk1", true, 
"comment_metadata"));
+        unresolvedColumns.add(
+                1, new Schema.UnresolvedComputedColumn("e", new 
SqlCallExpression("UPPER(`a`)")));
+        unresolvedColumns.add(
+                2, new Schema.UnresolvedComputedColumn("f", new 
SqlCallExpression("`b` * 2")));
+        unresolvedColumns.add(
+                4, new Schema.UnresolvedPhysicalColumn("h", 
DataTypes.STRING().notNull()));
+        assertAlterTableSchema(
+                operation,
+                tableIdentifier,
+                
Schema.newBuilder().fromColumns(unresolvedColumns).primaryKey("h").build());
+
+        // test add nested type
+        operation =
+                parse(
+                        "alter table tb1 add (\n"
+                                + " r row<r1 bigint, r2 string, r3 
array<double> not null> not null comment 'add composite type',\n"
+                                + " m map<string not null, int not null>,\n"
+                                + " g as r.r1 * 2 after r,\n"
+                                + " ts as to_timestamp(r.r2) comment 'rowtime' 
after g,\n"
+                                + " na as r.r3 after ts)",
+                        SqlDialect.DEFAULT);
+        assertAlterTableSchema(
+                operation,
+                tableIdentifier,
+                Schema.newBuilder()
+                        .fromSchema(originalSchema)
+                        .column(
+                                "r",
+                                DataTypes.ROW(
+                                                DataTypes.FIELD("r1", 
DataTypes.BIGINT()),
+                                                DataTypes.FIELD("r2", 
DataTypes.STRING()),
+                                                DataTypes.FIELD(
+                                                        "r3",
+                                                        
DataTypes.ARRAY(DataTypes.DOUBLE())
+                                                                .notNull()))
+                                        .notNull())
+                        .withComment("add composite type")
+                        .columnByExpression("g", "`r`.`r1` * 2")
+                        .columnByExpression("ts", "TO_TIMESTAMP(`r`.`r2`)")
+                        .withComment("rowtime")
+                        .columnByExpression("na", "`r`.`r3`")
+                        .column(
+                                "m",
+                                DataTypes.MAP(
+                                        DataTypes.STRING().notNull(), 
DataTypes.INT().notNull()))
+                        .build());
+    }
+
+    @Test
+    public void testAlterTableAddConstraintOnPrimaryKeyedTable() throws 
Exception {
+        prepareNonManagedTable(true);
+
+        assertThatThrownBy(() -> parse("alter table tb1 add unique(a)", 
SqlDialect.DEFAULT))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("UNIQUE constraint is not supported 
yet");
+
+        assertThatThrownBy(() -> parse("alter table tb1 add primary key(c)", 
SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Flink doesn't support ENFORCED mode for PRIMARY KEY 
constraint");
+
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 add primary key(c) 
not enforced",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The base table already has a primary key [a, b]. You 
might want to drop it before adding a new one");
+
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 add d string not null 
primary key not enforced",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The base table already has a primary key [a, b]. You 
might want to drop it before adding a new one");
+    }
+
+    @Test
+    public void testAlterTableSchemaAddPrimaryKey() throws Exception {
+        prepareNonManagedTable(false);
+
+        // test add a composite pk which contains computed column
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 add (\n"
+                                                + "  d as upper(a),\n"
+                                                + "  primary key (b, d) not 
enforced)",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Could not create a PRIMARY KEY with column 'd' at 
line 3, column 19.\n"
+                                + "A PRIMARY KEY constraint must be declared 
on physical columns.");
+
+        // test add a pk which is metadata column
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 add (e int metadata, 
primary key (e) not enforced)",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Could not create a PRIMARY KEY with column 'e' at 
line 1, column 51.\n"
+                                + "A PRIMARY KEY constraint must be declared 
on physical columns.");
+
+        ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", 
"tb1");
+        Schema originalSchema =
+                
catalogManager.getTable(tableIdentifier).get().getTable().getUnresolvedSchema();
+        Operation operation =
+                parse(
+                        "alter table tb1 add constraint my_pk primary key (a, 
b) not enforced",
+                        SqlDialect.DEFAULT);
+
+        assertAlterTableSchema(
+                operation,
+                tableIdentifier,
+                Schema.newBuilder()
+                        .fromSchema(originalSchema)
+                        .primaryKeyNamed("my_pk", "a", "b")
+                        .build());
+
+        operation =
+                parse(
+                        "alter table tb1 add d bigint not null primary key not 
enforced",
+                        SqlDialect.DEFAULT);
+        assertAlterTableSchema(
+                operation,
+                tableIdentifier,
+                Schema.newBuilder()
+                        .fromSchema(originalSchema)
+                        .column("d", DataTypes.BIGINT().notNull())
+                        .primaryKey("d")
+                        .build());
+    }
+
+    @Test
+    public void testAlterTableSchemaAddWatermark() throws Exception {
+        prepareNonManagedTable(false);
+
+        // test add watermark with an undefined column as rowtime
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 add watermark for ts 
as ts",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The rowtime attribute field 'ts' is not defined in 
the table schema, at line 1, column 35\n"
+                                + "Available fields: ['a', 'b', 'c']");
+
+        // test add watermark with an undefined nested column as rowtime
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "alter table tb1 add (d row<d1 string, 
ts timestamp(3)>, watermark for d.ts as d.ts)",
+                                        SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The nested rowtime attribute field 'd.ts' cannot 
define a watermark.");
+
+        ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", 
"tb1");
+        Schema originalSchema =
+                
catalogManager.getTable(tableIdentifier).get().getTable().getUnresolvedSchema();
+
+        // test add watermark with new added physical column as rowtime

Review Comment:
   > Test add watermark on existed column?
   
   This is covered by `TableEnvironmentTest#testAlterTableAddWatermark`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to