This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d70714cdaef [FLINK-39199][table] `CREATE OR ALTER MATERIALIZED TABLE`
doesn't support table options changes
d70714cdaef is described below
commit d70714cdaefc5393abae0c46e2a7701943adba29
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Mar 5 15:46:10 2026 +0100
[FLINK-39199][table] `CREATE OR ALTER MATERIALIZED TABLE` doesn't support
table options changes
---
.../AlterMaterializedTableChangeOperation.java | 16 +++++-
.../apache/flink/table/catalog/TableChange.java | 4 +-
...SqlCreateOrAlterMaterializedTableConverter.java | 67 ++++++++--------------
...erializedTableNodeToOperationConverterTest.java | 34 +++++------
4 files changed, 56 insertions(+), 65 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
index 0b4a877e438..b480cc00083 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
@@ -130,7 +130,7 @@ public class AlterMaterializedTableChangeOperation extends
AlterMaterializedTabl
.schema(changeContext.retrieveSchema())
.comment(oldTable.getComment())
.partitionKeys(oldTable.getPartitionKeys())
- .options(oldTable.getOptions())
+ .options(changeContext.options)
.originalQuery(changeContext.originalQuery)
.expandedQuery(changeContext.expandedQuery)
.distribution(changeContext.distribution)
@@ -201,6 +201,7 @@ public class AlterMaterializedTableChangeOperation extends
AlterMaterializedTabl
private int droppedPersistedCnt = 0;
private String originalQuery;
private String expandedQuery;
+ private final Map<String, String> options;
public ChangeContext(CatalogMaterializedTable oldTable) {
this.distribution = oldTable.getDistribution().orElse(null);
@@ -218,6 +219,7 @@ public class AlterMaterializedTableChangeOperation extends
AlterMaterializedTabl
originalQuery = oldTable.getOriginalQuery();
expandedQuery = oldTable.getExpandedQuery();
this.oldTable = oldTable;
+ this.options = new HashMap<>(oldTable.getOptions());
}
private static final class HandlerRegistry {
@@ -284,6 +286,10 @@ public class AlterMaterializedTableChangeOperation extends
AlterMaterializedTabl
registry.register(ModifyDistribution.class,
ChangeContext::modifyDistribution);
registry.register(DropDistribution.class,
ChangeContext::dropDistribution);
+ // Options
+ registry.register(TableChange.SetOption.class,
ChangeContext::setTableOption);
+ registry.register(TableChange.ResetOption.class,
ChangeContext::resetTableOption);
+
return registry;
}
@@ -449,6 +455,14 @@ public class AlterMaterializedTableChangeOperation extends
AlterMaterializedTabl
distribution = null;
}
+ private void setTableOption(TableChange.SetOption option) {
+ options.put(option.getKey(), option.getValue());
+ }
+
+ private void resetTableOption(TableChange.ResetOption option) {
+ options.remove(option.getKey());
+ }
+
private UnresolvedColumn toUnresolvedColumn(Column column) {
final String name = column.getName();
final String comment = column.getComment().orElse(null);
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
index 98c54980e96..67b4941a48a 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
@@ -1162,7 +1162,7 @@ public interface TableChange {
* </pre>
*/
@PublicEvolving
- class SetOption implements CatalogTableChange {
+ class SetOption implements CatalogTableChange, MaterializedTableChange {
private final String key;
private final String value;
@@ -1215,7 +1215,7 @@ public interface TableChange {
* </pre>
*/
@PublicEvolving
- class ResetOption implements CatalogTableChange {
+ class ResetOption implements CatalogTableChange, MaterializedTableChange {
private final String key;
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
index 7c0beb8afed..4fe78a9a698 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
@@ -23,12 +23,7 @@ import
org.apache.flink.sql.parser.ddl.materializedtable.SqlCreateOrAlterMateria
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
-import org.apache.flink.table.catalog.CatalogMaterializedTable;
-import
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
-import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
-import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus;
import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.IntervalFreshness;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
@@ -136,48 +131,34 @@ public class SqlCreateOrAlterMaterializedTableConverter
mergeContext.getMergedOriginalQuery(),
mergeContext.getMergedExpandedQuery()));
+ changes.addAll(
+ calculateOptionsChange(
+ oldTable.getOptions(),
mergeContext.getMergedTableOptions()));
+
return changes;
}
- private CatalogMaterializedTable
buildNewCatalogMaterializedTableFromOldTable(
- final ResolvedCatalogMaterializedTable oldTable,
- final SqlCreateOrAlterMaterializedTable sqlCreateOrAlterTable,
- final MergeContext mergeContext) {
- final Schema.Builder schemaBuilder =
-
Schema.newBuilder().fromResolvedSchema(oldTable.getResolvedSchema());
+ public static List<TableChange> calculateOptionsChange(
+ Map<String, String> oldOptions, Map<String, String> newOptions) {
+ if (oldOptions.equals(newOptions)) {
+ return List.of();
+ }
- // Add new columns if this is an alter operation
- final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
- final List<Column> newColumns =
- MaterializedTableUtils.validateAndExtractNewColumns(
- oldSchema, mergeContext.getMergedQuerySchema());
- newColumns.forEach(col -> schemaBuilder.column(col.getName(),
col.getDataType()));
-
- final String comment = sqlCreateOrAlterTable.getComment();
- final IntervalFreshness freshness =
getDerivedFreshness(sqlCreateOrAlterTable);
- final LogicalRefreshMode logicalRefreshMode =
- getDerivedLogicalRefreshMode(sqlCreateOrAlterTable);
- final RefreshMode refreshMode =
getDerivedRefreshMode(logicalRefreshMode);
-
- CatalogMaterializedTable.Builder builder =
- CatalogMaterializedTable.newBuilder()
- .schema(schemaBuilder.build())
- .comment(comment)
-
.distribution(mergeContext.getMergedTableDistribution().orElse(null))
- .partitionKeys(mergeContext.getMergedPartitionKeys())
- .options(mergeContext.getMergedTableOptions())
- .originalQuery(mergeContext.getMergedOriginalQuery())
- .expandedQuery(mergeContext.getMergedExpandedQuery())
- .freshness(freshness)
- .logicalRefreshMode(logicalRefreshMode)
- .refreshMode(refreshMode)
- .refreshStatus(RefreshStatus.INITIALIZING);
-
- // Preserve refresh handler from old materialized table
-
oldTable.getRefreshHandlerDescription().ifPresent(builder::refreshHandlerDescription);
-
builder.serializedRefreshHandler(oldTable.getSerializedRefreshHandler());
-
- return builder.build();
+ final List<TableChange> changes = new ArrayList<>();
+ for (Map.Entry<String, String> option : newOptions.entrySet()) {
+ final String oldValue = oldOptions.get(option.getKey());
+ if (oldValue == null || !oldValue.equals(option.getValue())) {
+ changes.add(TableChange.set(option.getKey(),
option.getValue()));
+ }
+ }
+
+ for (Map.Entry<String, String> option : oldOptions.entrySet()) {
+ if (newOptions.get(option.getKey()) == null) {
+ changes.add(TableChange.reset(option.getKey()));
+ }
+ }
+
+ return changes;
}
@Override
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 878fab74434..e56180d633c 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.sql.parser.error.SqlValidateException;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.FunctionDescriptor;
import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Schema.UnresolvedColumn;
+import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
@@ -50,8 +52,6 @@ import
org.apache.flink.table.operations.materializedtable.CreateMaterializedTab
import
org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
import org.apache.flink.table.planner.utils.TableFunc0;
-import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableMap;
-
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -469,7 +469,8 @@ class SqlMaterializedTableNodeToOperationConverterTest
AlterMaterializedTableRefreshOperation op =
(AlterMaterializedTableRefreshOperation) operation;
assertThat(op.getTableIdentifier().toString()).isEqualTo("`builtin`.`default`.`mtbl1`");
- assertThat(op.getPartitionSpec()).isEqualTo(ImmutableMap.of("ds1",
"1", "ds2", "2"));
+ assertThat(op.getPartitionSpec())
+ .containsExactly(Map.entry("ds1", "1"), Map.entry("ds2", "2"));
}
@Test
@@ -552,7 +553,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
assertThat(oldTable.getSerializedRefreshHandler())
.isEqualTo(newTable.getSerializedRefreshHandler());
- List<Schema.UnresolvedColumn> addedColumn =
+ List<UnresolvedColumn> addedColumn =
newTable.getUnresolvedSchema().getColumns().stream()
.filter(
column ->
@@ -563,10 +564,8 @@ class SqlMaterializedTableNodeToOperationConverterTest
// added column should be a nullable column.
assertThat(addedColumn)
.containsExactly(
- new Schema.UnresolvedPhysicalColumn(
- "e", DataTypes.VARCHAR(Integer.MAX_VALUE)),
- new Schema.UnresolvedPhysicalColumn(
- "f", DataTypes.VARCHAR(Integer.MAX_VALUE)));
+ new UnresolvedPhysicalColumn("e",
DataTypes.VARCHAR(Integer.MAX_VALUE)),
+ new UnresolvedPhysicalColumn("f",
DataTypes.VARCHAR(Integer.MAX_VALUE)));
}
@Test
@@ -626,9 +625,6 @@ class SqlMaterializedTableNodeToOperationConverterTest
ResolvedCatalogMaterializedTable materializedTable =
op.getCatalogMaterializedTable();
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
- Map<String, String> options = new HashMap<>();
- options.put("connector", "filesystem");
- options.put("format", "json");
final CatalogMaterializedTable expected =
getDefaultMaterializedTableBuilder()
.freshness(IntervalFreshness.ofSecond("30"))
@@ -649,8 +645,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
+ "COMMENT 'materialized table comment'\n"
+ "PARTITIONED BY (a, d)\n"
+ "WITH (\n"
- + " 'connector' = 'filesystem', \n"
- + " 'format' = 'json'\n"
+ + " 'format' = 'json2'\n"
+ ")\n"
+ "FRESHNESS = INTERVAL '30' SECOND\n"
+ "REFRESH_MODE = FULL\n"
@@ -668,7 +663,9 @@ class SqlMaterializedTableNodeToOperationConverterTest
TableChange.modifyDefinitionQuery(
"SELECT `a`, `b`, `c`, `d`, `d` AS `e`,
CAST('123' AS STRING) AS `f`\nFROM `t3`",
"SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`,
`t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
- + "FROM `builtin`.`default`.`t3` AS
`t3`"));
+ + "FROM `builtin`.`default`.`t3` AS
`t3`"),
+ TableChange.set("format", "json2"),
+ TableChange.reset("connector"));
assertThat(operation.asSummaryString())
.isEqualTo(
"ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS
SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS
STRING) AS `f`\n"
@@ -681,6 +678,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
new
ObjectPath(catalogManager.getCurrentDatabase(), "base_mtbl"));
CatalogMaterializedTable newTable =
op.getMaterializedTableWithAppliedChanges();
+ assertThat(newTable.getOptions()).containsExactly(Map.entry("format",
"json2"));
assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema());
assertThat(oldTable.getUnresolvedSchema().getPrimaryKey())
.isEqualTo(newTable.getUnresolvedSchema().getPrimaryKey());
@@ -694,7 +692,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
assertThat(oldTable.getSerializedRefreshHandler())
.isEqualTo(newTable.getSerializedRefreshHandler());
- List<Schema.UnresolvedColumn> addedColumn =
+ List<UnresolvedColumn> addedColumn =
newTable.getUnresolvedSchema().getColumns().stream()
.filter(
column ->
@@ -705,10 +703,8 @@ class SqlMaterializedTableNodeToOperationConverterTest
// added column should be a nullable column.
assertThat(addedColumn)
.containsExactly(
- new Schema.UnresolvedPhysicalColumn(
- "e", DataTypes.VARCHAR(Integer.MAX_VALUE)),
- new Schema.UnresolvedPhysicalColumn(
- "f", DataTypes.VARCHAR(Integer.MAX_VALUE)));
+ new UnresolvedPhysicalColumn("e",
DataTypes.VARCHAR(Integer.MAX_VALUE)),
+ new UnresolvedPhysicalColumn("f",
DataTypes.VARCHAR(Integer.MAX_VALUE)));
}
private static Collection<TestSpec>
testDataForCreateAlterMaterializedTableFailedCase() {