This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d70714cdaef [FLINK-39199][table] `CREATE OR ALTER MATERIALIZED TABLE` 
doesn't support table options changes
d70714cdaef is described below

commit d70714cdaefc5393abae0c46e2a7701943adba29
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Mar 5 15:46:10 2026 +0100

    [FLINK-39199][table] `CREATE OR ALTER MATERIALIZED TABLE` doesn't support 
table options changes
---
 .../AlterMaterializedTableChangeOperation.java     | 16 +++++-
 .../apache/flink/table/catalog/TableChange.java    |  4 +-
 ...SqlCreateOrAlterMaterializedTableConverter.java | 67 ++++++++--------------
 ...erializedTableNodeToOperationConverterTest.java | 34 +++++------
 4 files changed, 56 insertions(+), 65 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
index 0b4a877e438..b480cc00083 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
@@ -130,7 +130,7 @@ public class AlterMaterializedTableChangeOperation extends 
AlterMaterializedTabl
                         .schema(changeContext.retrieveSchema())
                         .comment(oldTable.getComment())
                         .partitionKeys(oldTable.getPartitionKeys())
-                        .options(oldTable.getOptions())
+                        .options(changeContext.options)
                         .originalQuery(changeContext.originalQuery)
                         .expandedQuery(changeContext.expandedQuery)
                         .distribution(changeContext.distribution)
@@ -201,6 +201,7 @@ public class AlterMaterializedTableChangeOperation extends 
AlterMaterializedTabl
         private int droppedPersistedCnt = 0;
         private String originalQuery;
         private String expandedQuery;
+        private final Map<String, String> options;
 
         public ChangeContext(CatalogMaterializedTable oldTable) {
             this.distribution = oldTable.getDistribution().orElse(null);
@@ -218,6 +219,7 @@ public class AlterMaterializedTableChangeOperation extends 
AlterMaterializedTabl
             originalQuery = oldTable.getOriginalQuery();
             expandedQuery = oldTable.getExpandedQuery();
             this.oldTable = oldTable;
+            this.options = new HashMap<>(oldTable.getOptions());
         }
 
         private static final class HandlerRegistry {
@@ -284,6 +286,10 @@ public class AlterMaterializedTableChangeOperation extends 
AlterMaterializedTabl
             registry.register(ModifyDistribution.class, 
ChangeContext::modifyDistribution);
             registry.register(DropDistribution.class, 
ChangeContext::dropDistribution);
 
+            // Options
+            registry.register(TableChange.SetOption.class, 
ChangeContext::setTableOption);
+            registry.register(TableChange.ResetOption.class, 
ChangeContext::resetTableOption);
+
             return registry;
         }
 
@@ -449,6 +455,14 @@ public class AlterMaterializedTableChangeOperation extends 
AlterMaterializedTabl
             distribution = null;
         }
 
+        private void setTableOption(TableChange.SetOption option) {
+            options.put(option.getKey(), option.getValue());
+        }
+
+        private void resetTableOption(TableChange.ResetOption option) {
+            options.remove(option.getKey());
+        }
+
         private UnresolvedColumn toUnresolvedColumn(Column column) {
             final String name = column.getName();
             final String comment = column.getComment().orElse(null);
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
index 98c54980e96..67b4941a48a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
@@ -1162,7 +1162,7 @@ public interface TableChange {
      * </pre>
      */
     @PublicEvolving
-    class SetOption implements CatalogTableChange {
+    class SetOption implements CatalogTableChange, MaterializedTableChange {
 
         private final String key;
         private final String value;
@@ -1215,7 +1215,7 @@ public interface TableChange {
      * </pre>
      */
     @PublicEvolving
-    class ResetOption implements CatalogTableChange {
+    class ResetOption implements CatalogTableChange, MaterializedTableChange {
 
         private final String key;
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
index 7c0beb8afed..4fe78a9a698 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
@@ -23,12 +23,7 @@ import 
org.apache.flink.sql.parser.ddl.materializedtable.SqlCreateOrAlterMateria
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
-import org.apache.flink.table.catalog.CatalogMaterializedTable;
-import 
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
-import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
-import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus;
 import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.IntervalFreshness;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
@@ -136,48 +131,34 @@ public class SqlCreateOrAlterMaterializedTableConverter
                         mergeContext.getMergedOriginalQuery(),
                         mergeContext.getMergedExpandedQuery()));
 
+        changes.addAll(
+                calculateOptionsChange(
+                        oldTable.getOptions(), 
mergeContext.getMergedTableOptions()));
+
         return changes;
     }
 
-    private CatalogMaterializedTable 
buildNewCatalogMaterializedTableFromOldTable(
-            final ResolvedCatalogMaterializedTable oldTable,
-            final SqlCreateOrAlterMaterializedTable sqlCreateOrAlterTable,
-            final MergeContext mergeContext) {
-        final Schema.Builder schemaBuilder =
-                
Schema.newBuilder().fromResolvedSchema(oldTable.getResolvedSchema());
+    public static List<TableChange> calculateOptionsChange(
+            Map<String, String> oldOptions, Map<String, String> newOptions) {
+        if (oldOptions.equals(newOptions)) {
+            return List.of();
+        }
 
-        // Add new columns if this is an alter operation
-        final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
-        final List<Column> newColumns =
-                MaterializedTableUtils.validateAndExtractNewColumns(
-                        oldSchema, mergeContext.getMergedQuerySchema());
-        newColumns.forEach(col -> schemaBuilder.column(col.getName(), 
col.getDataType()));
-
-        final String comment = sqlCreateOrAlterTable.getComment();
-        final IntervalFreshness freshness = 
getDerivedFreshness(sqlCreateOrAlterTable);
-        final LogicalRefreshMode logicalRefreshMode =
-                getDerivedLogicalRefreshMode(sqlCreateOrAlterTable);
-        final RefreshMode refreshMode = 
getDerivedRefreshMode(logicalRefreshMode);
-
-        CatalogMaterializedTable.Builder builder =
-                CatalogMaterializedTable.newBuilder()
-                        .schema(schemaBuilder.build())
-                        .comment(comment)
-                        
.distribution(mergeContext.getMergedTableDistribution().orElse(null))
-                        .partitionKeys(mergeContext.getMergedPartitionKeys())
-                        .options(mergeContext.getMergedTableOptions())
-                        .originalQuery(mergeContext.getMergedOriginalQuery())
-                        .expandedQuery(mergeContext.getMergedExpandedQuery())
-                        .freshness(freshness)
-                        .logicalRefreshMode(logicalRefreshMode)
-                        .refreshMode(refreshMode)
-                        .refreshStatus(RefreshStatus.INITIALIZING);
-
-        // Preserve refresh handler from old materialized table
-        
oldTable.getRefreshHandlerDescription().ifPresent(builder::refreshHandlerDescription);
-        
builder.serializedRefreshHandler(oldTable.getSerializedRefreshHandler());
-
-        return builder.build();
+        final List<TableChange> changes = new ArrayList<>();
+        for (Map.Entry<String, String> option : newOptions.entrySet()) {
+            final String oldValue = oldOptions.get(option.getKey());
+            if (oldValue == null || !oldValue.equals(option.getValue())) {
+                changes.add(TableChange.set(option.getKey(), 
option.getValue()));
+            }
+        }
+
+        for (Map.Entry<String, String> option : oldOptions.entrySet()) {
+            if (newOptions.get(option.getKey()) == null) {
+                changes.add(TableChange.reset(option.getKey()));
+            }
+        }
+
+        return changes;
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 878fab74434..e56180d633c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.sql.parser.error.SqlValidateException;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.FunctionDescriptor;
 import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Schema.UnresolvedColumn;
+import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
 import 
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
@@ -50,8 +52,6 @@ import 
org.apache.flink.table.operations.materializedtable.CreateMaterializedTab
 import 
org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
 import org.apache.flink.table.planner.utils.TableFunc0;
 
-import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableMap;
-
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -469,7 +469,8 @@ class SqlMaterializedTableNodeToOperationConverterTest
         AlterMaterializedTableRefreshOperation op =
                 (AlterMaterializedTableRefreshOperation) operation;
         
assertThat(op.getTableIdentifier().toString()).isEqualTo("`builtin`.`default`.`mtbl1`");
-        assertThat(op.getPartitionSpec()).isEqualTo(ImmutableMap.of("ds1", 
"1", "ds2", "2"));
+        assertThat(op.getPartitionSpec())
+                .containsExactly(Map.entry("ds1", "1"), Map.entry("ds2", "2"));
     }
 
     @Test
@@ -552,7 +553,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
         assertThat(oldTable.getSerializedRefreshHandler())
                 .isEqualTo(newTable.getSerializedRefreshHandler());
 
-        List<Schema.UnresolvedColumn> addedColumn =
+        List<UnresolvedColumn> addedColumn =
                 newTable.getUnresolvedSchema().getColumns().stream()
                         .filter(
                                 column ->
@@ -563,10 +564,8 @@ class SqlMaterializedTableNodeToOperationConverterTest
         // added column should be a nullable column.
         assertThat(addedColumn)
                 .containsExactly(
-                        new Schema.UnresolvedPhysicalColumn(
-                                "e", DataTypes.VARCHAR(Integer.MAX_VALUE)),
-                        new Schema.UnresolvedPhysicalColumn(
-                                "f", DataTypes.VARCHAR(Integer.MAX_VALUE)));
+                        new UnresolvedPhysicalColumn("e", 
DataTypes.VARCHAR(Integer.MAX_VALUE)),
+                        new UnresolvedPhysicalColumn("f", 
DataTypes.VARCHAR(Integer.MAX_VALUE)));
     }
 
     @Test
@@ -626,9 +625,6 @@ class SqlMaterializedTableNodeToOperationConverterTest
         ResolvedCatalogMaterializedTable materializedTable = 
op.getCatalogMaterializedTable();
         
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
 
-        Map<String, String> options = new HashMap<>();
-        options.put("connector", "filesystem");
-        options.put("format", "json");
         final CatalogMaterializedTable expected =
                 getDefaultMaterializedTableBuilder()
                         .freshness(IntervalFreshness.ofSecond("30"))
@@ -649,8 +645,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         + "COMMENT 'materialized table comment'\n"
                         + "PARTITIONED BY (a, d)\n"
                         + "WITH (\n"
-                        + "  'connector' = 'filesystem', \n"
-                        + "  'format' = 'json'\n"
+                        + "  'format' = 'json2'\n"
                         + ")\n"
                         + "FRESHNESS = INTERVAL '30' SECOND\n"
                         + "REFRESH_MODE = FULL\n"
@@ -668,7 +663,9 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         TableChange.modifyDefinitionQuery(
                                 "SELECT `a`, `b`, `c`, `d`, `d` AS `e`, 
CAST('123' AS STRING) AS `f`\nFROM `t3`",
                                 "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
-                                        + "FROM `builtin`.`default`.`t3` AS 
`t3`"));
+                                        + "FROM `builtin`.`default`.`t3` AS 
`t3`"),
+                        TableChange.set("format", "json2"),
+                        TableChange.reset("connector"));
         assertThat(operation.asSummaryString())
                 .isEqualTo(
                         "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS 
SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS 
STRING) AS `f`\n"
@@ -681,6 +678,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
                                 new 
ObjectPath(catalogManager.getCurrentDatabase(), "base_mtbl"));
         CatalogMaterializedTable newTable = 
op.getMaterializedTableWithAppliedChanges();
 
+        assertThat(newTable.getOptions()).containsExactly(Map.entry("format", 
"json2"));
         
assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema());
         assertThat(oldTable.getUnresolvedSchema().getPrimaryKey())
                 .isEqualTo(newTable.getUnresolvedSchema().getPrimaryKey());
@@ -694,7 +692,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
         assertThat(oldTable.getSerializedRefreshHandler())
                 .isEqualTo(newTable.getSerializedRefreshHandler());
 
-        List<Schema.UnresolvedColumn> addedColumn =
+        List<UnresolvedColumn> addedColumn =
                 newTable.getUnresolvedSchema().getColumns().stream()
                         .filter(
                                 column ->
@@ -705,10 +703,8 @@ class SqlMaterializedTableNodeToOperationConverterTest
         // added column should be a nullable column.
         assertThat(addedColumn)
                 .containsExactly(
-                        new Schema.UnresolvedPhysicalColumn(
-                                "e", DataTypes.VARCHAR(Integer.MAX_VALUE)),
-                        new Schema.UnresolvedPhysicalColumn(
-                                "f", DataTypes.VARCHAR(Integer.MAX_VALUE)));
+                        new UnresolvedPhysicalColumn("e", 
DataTypes.VARCHAR(Integer.MAX_VALUE)),
+                        new UnresolvedPhysicalColumn("f", 
DataTypes.VARCHAR(Integer.MAX_VALUE)));
     }
 
     private static Collection<TestSpec> 
testDataForCreateAlterMaterializedTableFailedCase() {

Reply via email to