snuyanzin commented on code in PR #27488:
URL: https://github.com/apache/flink/pull/27488#discussion_r2746031827


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java:
##########
@@ -77,24 +168,410 @@ public String asSummaryString() {
     }
 
     private static String toString(TableChange tableChange) {
-        if (tableChange instanceof TableChange.ModifyRefreshStatus) {
-            TableChange.ModifyRefreshStatus refreshStatus =
-                    (TableChange.ModifyRefreshStatus) tableChange;
+        if (tableChange instanceof ModifyRefreshStatus) {
+            ModifyRefreshStatus refreshStatus = (ModifyRefreshStatus) 
tableChange;
             return String.format(
                     "  MODIFY REFRESH STATUS TO '%s'", 
refreshStatus.getRefreshStatus());
-        } else if (tableChange instanceof TableChange.ModifyRefreshHandler) {
-            TableChange.ModifyRefreshHandler refreshHandler =
-                    (TableChange.ModifyRefreshHandler) tableChange;
+        } else if (tableChange instanceof ModifyRefreshHandler) {
+            ModifyRefreshHandler refreshHandler = (ModifyRefreshHandler) 
tableChange;
             return String.format(
                     "  MODIFY REFRESH HANDLER DESCRIPTION TO '%s'",
                     refreshHandler.getRefreshHandlerDesc());
-        } else if (tableChange instanceof TableChange.ModifyDefinitionQuery) {
-            TableChange.ModifyDefinitionQuery definitionQuery =
-                    (TableChange.ModifyDefinitionQuery) tableChange;
+        } else if (tableChange instanceof ModifyDefinitionQuery) {
+            ModifyDefinitionQuery definitionQuery = (ModifyDefinitionQuery) 
tableChange;
             return String.format(
                     " MODIFY DEFINITION QUERY TO '%s'", 
definitionQuery.getDefinitionQuery());
         } else {
             return AlterTableChangeOperation.toString(tableChange);
         }
     }
+
+    private static class ChangeContext {
+        private static final Map<
+                        Class<? extends TableChange>, 
BiConsumer<ChangeContext, TableChange>>
+                HANDLERS_MAP = getHandlersMap();
+
+        private final List<UnresolvedColumn> columns;
+        private final CatalogMaterializedTable oldTable;
+        private boolean isQueryChange;
+        private @Nullable TableDistribution distribution;
+        private RefreshStatus refreshStatus;
+        private @Nullable String refreshHandlerDesc;
+        private byte[] refreshHandlerBytes;
+        private List<UnresolvedWatermarkSpec> watermarkSpecs;
+        private String primaryKeyName = null;
+        private List<String> primaryKeyColumns = null;
+        private int droppedPersistedCnt = 0;
+        private String originalQuery;
+        private String expandedQuery;
+
+        public ChangeContext(CatalogMaterializedTable oldTable) {
+            this.distribution = oldTable.getDistribution().orElse(null);
+            this.refreshStatus = oldTable.getRefreshStatus();
+            this.refreshHandlerDesc = 
oldTable.getRefreshHandlerDescription().orElse(null);
+            this.refreshHandlerBytes = oldTable.getSerializedRefreshHandler();
+            this.watermarkSpecs = 
oldTable.getUnresolvedSchema().getWatermarkSpecs();
+            this.columns = new 
LinkedList<>(oldTable.getUnresolvedSchema().getColumns());
+            Schema.UnresolvedPrimaryKey primaryKey =
+                    
oldTable.getUnresolvedSchema().getPrimaryKey().orElse(null);
+            if (primaryKey != null) {
+                this.primaryKeyName = primaryKey.getConstraintName();
+                this.primaryKeyColumns = primaryKey.getColumnNames();
+            }
+            originalQuery = oldTable.getOriginalQuery();
+            expandedQuery = oldTable.getExpandedQuery();
+            this.oldTable = oldTable;
+        }
+
+        private static Map<Class<? extends TableChange>, 
BiConsumer<ChangeContext, TableChange>>
+                getHandlersMap() {
+            final Map<Class<? extends TableChange>, BiConsumer<ChangeContext, 
TableChange>> map =
+                    new IdentityHashMap<>();
+
+            // Column
+            map.put(AddColumn.class, (c, t) -> c.addColumn((AddColumn) t));

Review Comment:
   I introduced HandlerRegistry since failed to have casting only in one place 
and compilable all these templates...



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java:
##########
@@ -77,24 +168,410 @@ public String asSummaryString() {
     }
 
     private static String toString(TableChange tableChange) {
-        if (tableChange instanceof TableChange.ModifyRefreshStatus) {
-            TableChange.ModifyRefreshStatus refreshStatus =
-                    (TableChange.ModifyRefreshStatus) tableChange;
+        if (tableChange instanceof ModifyRefreshStatus) {
+            ModifyRefreshStatus refreshStatus = (ModifyRefreshStatus) 
tableChange;
             return String.format(
                     "  MODIFY REFRESH STATUS TO '%s'", 
refreshStatus.getRefreshStatus());
-        } else if (tableChange instanceof TableChange.ModifyRefreshHandler) {
-            TableChange.ModifyRefreshHandler refreshHandler =
-                    (TableChange.ModifyRefreshHandler) tableChange;
+        } else if (tableChange instanceof ModifyRefreshHandler) {
+            ModifyRefreshHandler refreshHandler = (ModifyRefreshHandler) 
tableChange;
             return String.format(
                     "  MODIFY REFRESH HANDLER DESCRIPTION TO '%s'",
                     refreshHandler.getRefreshHandlerDesc());
-        } else if (tableChange instanceof TableChange.ModifyDefinitionQuery) {
-            TableChange.ModifyDefinitionQuery definitionQuery =
-                    (TableChange.ModifyDefinitionQuery) tableChange;
+        } else if (tableChange instanceof ModifyDefinitionQuery) {
+            ModifyDefinitionQuery definitionQuery = (ModifyDefinitionQuery) 
tableChange;
             return String.format(
                     " MODIFY DEFINITION QUERY TO '%s'", 
definitionQuery.getDefinitionQuery());
         } else {
             return AlterTableChangeOperation.toString(tableChange);
         }
     }
+
+    private static class ChangeContext {
+        private static final Map<
+                        Class<? extends TableChange>, 
BiConsumer<ChangeContext, TableChange>>
+                HANDLERS_MAP = getHandlersMap();
+
+        private final List<UnresolvedColumn> columns;
+        private final CatalogMaterializedTable oldTable;
+        private boolean isQueryChange;
+        private @Nullable TableDistribution distribution;
+        private RefreshStatus refreshStatus;
+        private @Nullable String refreshHandlerDesc;
+        private byte[] refreshHandlerBytes;
+        private List<UnresolvedWatermarkSpec> watermarkSpecs;
+        private String primaryKeyName = null;
+        private List<String> primaryKeyColumns = null;
+        private int droppedPersistedCnt = 0;
+        private String originalQuery;
+        private String expandedQuery;
+
+        public ChangeContext(CatalogMaterializedTable oldTable) {
+            this.distribution = oldTable.getDistribution().orElse(null);
+            this.refreshStatus = oldTable.getRefreshStatus();
+            this.refreshHandlerDesc = 
oldTable.getRefreshHandlerDescription().orElse(null);
+            this.refreshHandlerBytes = oldTable.getSerializedRefreshHandler();
+            this.watermarkSpecs = 
oldTable.getUnresolvedSchema().getWatermarkSpecs();
+            this.columns = new 
LinkedList<>(oldTable.getUnresolvedSchema().getColumns());
+            Schema.UnresolvedPrimaryKey primaryKey =
+                    
oldTable.getUnresolvedSchema().getPrimaryKey().orElse(null);
+            if (primaryKey != null) {
+                this.primaryKeyName = primaryKey.getConstraintName();
+                this.primaryKeyColumns = primaryKey.getColumnNames();
+            }
+            originalQuery = oldTable.getOriginalQuery();
+            expandedQuery = oldTable.getExpandedQuery();
+            this.oldTable = oldTable;
+        }
+
+        private static Map<Class<? extends TableChange>, 
BiConsumer<ChangeContext, TableChange>>
+                getHandlersMap() {
+            final Map<Class<? extends TableChange>, BiConsumer<ChangeContext, 
TableChange>> map =
+                    new IdentityHashMap<>();
+
+            // Column
+            map.put(AddColumn.class, (c, t) -> c.addColumn((AddColumn) t));

Review Comment:
   I introduced HandlerRegistry since failed to have casting only in one place 
and compilable all these templates...
   hope this is ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to