AHeise commented on code in PR #27807:
URL: https://github.com/apache/flink/pull/27807#discussion_r2973941433


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java:
##########
@@ -294,29 +295,43 @@ public static List<Column> validateAndExtractNewColumns(
                             originalColumnSize, newColumnSize));
         }
 
+        final List<TableChange> columnChanges = new ArrayList<>();
         for (int i = 0; i < oldColumns.size(); i++) {
             Column oldColumn = oldColumns.get(i);
-            Column newColumn = newColumns.get(i);
+            Column newColumn =
+                    schemaDefinedInQuery
+                            ? newColumns.get(i)
+                            : 
newColumns.get(i).copy(newColumns.get(i).getDataType().nullable());
             if (!oldColumn.equals(newColumn)) {
-                throw new ValidationException(
-                        String.format(
-                                "When modifying the query of a materialized 
table, "
-                                        + "currently only support appending 
columns at the end of original schema, dropping, renaming, and reordering 
columns are not supported.\n"
-                                        + "Column mismatch at position %d: 
Original column is [%s], but new column is [%s].",
-                                i, oldColumn, newColumn));
+                if (!oldColumn.getName().equals(newColumn.getName())
+                        || !LogicalTypeCasts.supportsImplicitCast(
+                                oldColumn.getDataType().getLogicalType(),
+                                newColumn.getDataType().getLogicalType())) {
+                    throw new ValidationException(

Review Comment:
   Can we move the exception also to the operation and just issue a 
TableChange.ModifyPhysicalColumnType?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java:
##########
@@ -294,29 +295,43 @@ public static List<Column> validateAndExtractNewColumns(
                             originalColumnSize, newColumnSize));
         }
 
+        final List<TableChange> columnChanges = new ArrayList<>();
         for (int i = 0; i < oldColumns.size(); i++) {
             Column oldColumn = oldColumns.get(i);
-            Column newColumn = newColumns.get(i);
+            Column newColumn =
+                    schemaDefinedInQuery
+                            ? newColumns.get(i)
+                            : 
newColumns.get(i).copy(newColumns.get(i).getDataType().nullable());
             if (!oldColumn.equals(newColumn)) {
-                throw new ValidationException(
-                        String.format(
-                                "When modifying the query of a materialized 
table, "
-                                        + "currently only support appending 
columns at the end of original schema, dropping, renaming, and reordering 
columns are not supported.\n"
-                                        + "Column mismatch at position %d: 
Original column is [%s], but new column is [%s].",
-                                i, oldColumn, newColumn));
+                if (!oldColumn.getName().equals(newColumn.getName())
+                        || !LogicalTypeCasts.supportsImplicitCast(
+                                oldColumn.getDataType().getLogicalType(),
+                                newColumn.getDataType().getLogicalType())) {
+                    throw new ValidationException(
+                            String.format(
+                                    "When modifying the query of a 
materialized table, "
+                                            + "currently only support 
appending columns at the end of original schema, dropping, renaming, and 
reordering columns are not supported.\n"
+                                            + "Column mismatch at position %d: 
Original column is [%s], but new column is [%s].",
+                                    i + 1, oldColumn, newColumn));
+                }
+                if (!Objects.equals(oldColumn.getComment(), 
newColumn.getComment())) {

Review Comment:
   Since we often treat comment = '' == comment = null, shall we normalize 
here? Wrap comments in some StringUtil.emptyIfNull.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java:
##########
@@ -97,7 +97,9 @@ public static String toString(TableChange tableChange) {
             return String.format(
                     "  MODIFY %s COMMENT '%s'",
                     
EncodingUtils.escapeIdentifier(modifyColumnComment.getNewColumn().getName()),
-                    modifyColumnComment.getNewComment());
+                    modifyColumnComment.getNewComment() == null

Review Comment:
   2-3 commits before, we already had some changes to the comment logic. Can 
you double-check if the commits are correctly cut?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java:
##########
@@ -413,20 +413,55 @@ void 
testCreateOrAlterMaterializedTableWithDroppedWatermark2()
     }
 
     @Test
-    void testCreateOrAlterMaterializedTableWithModifiedWatermark2()
+    void testCreateOrAlterMaterializedTableWithCommentChange()
+            throws TableAlreadyExistException, DatabaseNotExistException {
+        final String prepSql =
+                "CREATE MATERIALIZED TABLE mt1 (\n"
+                        + "   id INT COMMENT 'INT comment', t 
TIMESTAMP_LTZ(3)\n"
+                        + ")\n"
+                        + "AS SELECT 1 as id, current_timestamp as t";
+        createMaterializedTableInCatalog(prepSql, "mt1");
+
+        final String sql =
+                "CREATE OR ALTER MATERIALIZED TABLE mt1 (\n"
+                        + "   id INT, t TIMESTAMP_LTZ(3) COMMENT 'Timestamp 
Comment'\n"
+                        + ")\n"
+                        + "AS SELECT 1 as id, current_timestamp as t";
+        Operation operation = parse(sql);
+
+        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
+        assertThat(op.getTableChanges())
+                .containsExactly(
+                        TableChange.modifyColumnComment(
+                                Column.physical("id", 
DataTypes.INT()).withComment("INT comment"),
+                                null),
+                        TableChange.modifyColumnComment(
+                                Column.physical("t", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+                                "Timestamp Comment"));
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "CREATE OR ALTER MATERIALIZED TABLE 
builtin.default.mt1\n"
+                                + "  MODIFY `id` COMMENT '',\n"
+                                + "  MODIFY `t` COMMENT 'Timestamp Comment'");
+    }
+
+    @Test
+    void testCreateOrAlterMaterializedTableWithModifiedWatermark()
             throws TableAlreadyExistException, DatabaseNotExistException {
         final String prepSql =
                 "CREATE MATERIALIZED TABLE mt1 (\n"
                         + "   id INT, t TIMESTAMP_LTZ(3),\n"
-                        + "   WATERMARK FOR t as current_timestamp - INTERVAL 
'5' SECOND"
+                        + "   WATERMARK FOR t as current_timestamp - INTERVAL 
'5' SECOND\n"

Review Comment:
   This change should probably go into previous commit.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java:
##########
@@ -261,6 +261,73 @@ void 
testCreateOrAlterMaterializedTableWithDistributionForExistingTable()
                                 + "  MODIFY DISTRIBUTED BY HASH(`b`) INTO 4 
BUCKETS");
     }
 
+    @Test
+    void testCreateOrAlterMaterializedTableWithDroppedConstraint() {
+        final String sql =
+                "CREATE OR ALTER MATERIALIZED TABLE mt \n"
+                        + "COMMENT 'New materialized table comment'\n"
+                        + "PARTITIONED BY (a, d)\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'filesystem', \n"
+                        + "  'format' = 'json'\n"
+                        + ")\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation = parse(sql);
+
+        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
+        assertThat(op.getTableChanges())
+                .containsExactly(
+                        TableChange.dropConstraint("ct1"),
+                        TableChange.modifyDefinitionQuery(
+                                "SELECT *\n" + "FROM `t1`",
+                                "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`\n"
+                                        + "FROM `builtin`.`default`.`t1` AS 
`t1`"));
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "CREATE OR ALTER MATERIALIZED TABLE 
builtin.default.mt\n"
+                                + "  DROP CONSTRAINT ct1,\n"

Review Comment:
   These extra space on the first modification look unintended. Can you please 
double-check?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java:
##########
@@ -308,6 +308,37 @@ void 
testCreateOrAlterMaterializedTableWithChangedConstraint() {
                                 + "  MODIFY CONSTRAINT `new_constraint` 
PRIMARY KEY (`a`) NOT ENFORCED");
     }
 
+    @Test
+    void testCreateOrAlterMaterializedTableWithNewConstraint()

Review Comment:
   Please squash into respective commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to