AHeise commented on code in PR #27729:
URL: https://github.com/apache/flink/pull/27729#discussion_r2887963644


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -136,48 +131,34 @@ private List<TableChange> buildTableChanges(
                         mergeContext.getMergedOriginalQuery(),
                         mergeContext.getMergedExpandedQuery()));
 
+        changes.addAll(
+                calculateOptionsChange(
+                        oldTable.getOptions(), 
mergeContext.getMergedTableOptions()));
+
         return changes;
     }
 
-    private CatalogMaterializedTable 
buildNewCatalogMaterializedTableFromOldTable(
-            final ResolvedCatalogMaterializedTable oldTable,
-            final SqlCreateOrAlterMaterializedTable sqlCreateOrAlterTable,
-            final MergeContext mergeContext) {
-        final Schema.Builder schemaBuilder =
-                
Schema.newBuilder().fromResolvedSchema(oldTable.getResolvedSchema());
+    public static List<TableChange> calculateOptionsChange(
+            Map<String, String> oldOptions, Map<String, String> newOptions) {
+        if (oldOptions.equals(newOptions)) {
+            return List.of();
+        }
 
-        // Add new columns if this is an alter operation
-        final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
-        final List<Column> newColumns =
-                MaterializedTableUtils.validateAndExtractNewColumns(
-                        oldSchema, mergeContext.getMergedQuerySchema());
-        newColumns.forEach(col -> schemaBuilder.column(col.getName(), 
col.getDataType()));
-
-        final String comment = sqlCreateOrAlterTable.getComment();
-        final IntervalFreshness freshness = 
getDerivedFreshness(sqlCreateOrAlterTable);
-        final LogicalRefreshMode logicalRefreshMode =
-                getDerivedLogicalRefreshMode(sqlCreateOrAlterTable);
-        final RefreshMode refreshMode = 
getDerivedRefreshMode(logicalRefreshMode);
-
-        CatalogMaterializedTable.Builder builder =
-                CatalogMaterializedTable.newBuilder()
-                        .schema(schemaBuilder.build())
-                        .comment(comment)
-                        
.distribution(mergeContext.getMergedTableDistribution().orElse(null))
-                        .partitionKeys(mergeContext.getMergedPartitionKeys())
-                        .options(mergeContext.getMergedTableOptions())
-                        .originalQuery(mergeContext.getMergedOriginalQuery())
-                        .expandedQuery(mergeContext.getMergedExpandedQuery())
-                        .freshness(freshness)
-                        .logicalRefreshMode(logicalRefreshMode)
-                        .refreshMode(refreshMode)
-                        .refreshStatus(RefreshStatus.INITIALIZING);
-
-        // Preserve refresh handler from old materialized table
-        
oldTable.getRefreshHandlerDescription().ifPresent(builder::refreshHandlerDescription);
-        
builder.serializedRefreshHandler(oldTable.getSerializedRefreshHandler());
-
-        return builder.build();
+        final List<TableChange> changes = new ArrayList<>();
+        for (Map.Entry<String, String> option : newOptions.entrySet()) {
+            if (oldOptions.get(option.getKey()) == null

Review Comment:
   Extract oldValue.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -136,48 +131,34 @@ private List<TableChange> buildTableChanges(
                         mergeContext.getMergedOriginalQuery(),
                         mergeContext.getMergedExpandedQuery()));
 
+        changes.addAll(
+                calculateOptionsChange(
+                        oldTable.getOptions(), 
mergeContext.getMergedTableOptions()));
+
         return changes;
     }
 
-    private CatalogMaterializedTable 
buildNewCatalogMaterializedTableFromOldTable(
-            final ResolvedCatalogMaterializedTable oldTable,
-            final SqlCreateOrAlterMaterializedTable sqlCreateOrAlterTable,
-            final MergeContext mergeContext) {
-        final Schema.Builder schemaBuilder =
-                
Schema.newBuilder().fromResolvedSchema(oldTable.getResolvedSchema());
+    public static List<TableChange> calculateOptionsChange(
+            Map<String, String> oldOptions, Map<String, String> newOptions) {
+        if (oldOptions.equals(newOptions)) {
+            return List.of();
+        }
 
-        // Add new columns if this is an alter operation
-        final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
-        final List<Column> newColumns =
-                MaterializedTableUtils.validateAndExtractNewColumns(
-                        oldSchema, mergeContext.getMergedQuerySchema());
-        newColumns.forEach(col -> schemaBuilder.column(col.getName(), 
col.getDataType()));
-
-        final String comment = sqlCreateOrAlterTable.getComment();
-        final IntervalFreshness freshness = 
getDerivedFreshness(sqlCreateOrAlterTable);
-        final LogicalRefreshMode logicalRefreshMode =
-                getDerivedLogicalRefreshMode(sqlCreateOrAlterTable);
-        final RefreshMode refreshMode = 
getDerivedRefreshMode(logicalRefreshMode);
-
-        CatalogMaterializedTable.Builder builder =
-                CatalogMaterializedTable.newBuilder()
-                        .schema(schemaBuilder.build())
-                        .comment(comment)
-                        
.distribution(mergeContext.getMergedTableDistribution().orElse(null))
-                        .partitionKeys(mergeContext.getMergedPartitionKeys())
-                        .options(mergeContext.getMergedTableOptions())
-                        .originalQuery(mergeContext.getMergedOriginalQuery())
-                        .expandedQuery(mergeContext.getMergedExpandedQuery())
-                        .freshness(freshness)
-                        .logicalRefreshMode(logicalRefreshMode)
-                        .refreshMode(refreshMode)
-                        .refreshStatus(RefreshStatus.INITIALIZING);
-
-        // Preserve refresh handler from old materialized table
-        
oldTable.getRefreshHandlerDescription().ifPresent(builder::refreshHandlerDescription);
-        
builder.serializedRefreshHandler(oldTable.getSerializedRefreshHandler());
-
-        return builder.build();
+        final List<TableChange> changes = new ArrayList<>();
+        for (Map.Entry<String, String> option : newOptions.entrySet()) {
+            if (oldOptions.get(option.getKey()) == null
+                    || 
!oldOptions.get(option.getKey()).equals(option.getValue())) {
+                changes.add(TableChange.set(option.getKey(), 
option.getValue()));
+            }
+        }
+
+        for (Map.Entry<String, String> option : oldOptions.entrySet()) {
+            if (newOptions.get(option.getKey()) == null) {
+                changes.add(TableChange.reset(option.getKey()));

Review Comment:
   This will generate a reset for all inferred options. Let's say you have
   
   ```
   CREATE OR ALTER MATERIALIZED TABLE mt
   WITH (
     'connector' = 'filesystem'
   )
   ```
   
   and the catalog enriches into SHOW CREATE OR ALTER MATERIALIZED TABLE
   
   ```
   CREATE OR ALTER MATERIALIZED TABLE mt
   WITH (
     'connector' = 'filesystem'
     'fs' = 'hadoop'
   )
   ```
   
   and now the user adjusts the original settings
   
   ```
   CREATE OR ALTER MATERIALIZED TABLE mt
   WITH (
     'connector' = 'filesystem'
     'delimiter' = ';'
   )
   ```
   
   We would receive `TableChange.set('delimiter', ';')` and would get 
`TableChange.reset('fs')`. I can see that we get many resets this way.
   
   My main question is, is this the best way. And I think it is but I still 
wanted to call it out explicitly. My main concern is things like 
`changelog.mode`.
   
   Here are my thoughts:
   * The catalog is the one that enriches the options, so it can deal with the 
resets.
   * Catalog can infer the same options again.
   * If I would change the connector, catalog may infer completely different 
options. Old inferred options may actually be not applicable anymore.
   * If the catalog later infers more options than a script that does CREATE OR 
ALTER may need to be adjusted if we don't remove those inferred options.
   
   So those options that are implicitly added are also implicitly removed which 
is kinda symmetric. 
   
   An alternative would be that we never generate resets but wait for the user 
to explicitly reset them through other DDLs. But it would then break on the 
occasions mentioned above. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to