snuyanzin commented on code in PR #27488:
URL: https://github.com/apache/flink/pull/27488#discussion_r2741547425


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java:
##########
@@ -77,24 +158,343 @@ public String asSummaryString() {
     }
 
     private static String toString(TableChange tableChange) {
-        if (tableChange instanceof TableChange.ModifyRefreshStatus) {
-            TableChange.ModifyRefreshStatus refreshStatus =
-                    (TableChange.ModifyRefreshStatus) tableChange;
+        if (tableChange instanceof ModifyRefreshStatus) {
+            ModifyRefreshStatus refreshStatus = (ModifyRefreshStatus) 
tableChange;
             return String.format(
                     "  MODIFY REFRESH STATUS TO '%s'", 
refreshStatus.getRefreshStatus());
-        } else if (tableChange instanceof TableChange.ModifyRefreshHandler) {
-            TableChange.ModifyRefreshHandler refreshHandler =
-                    (TableChange.ModifyRefreshHandler) tableChange;
+        } else if (tableChange instanceof ModifyRefreshHandler) {
+            ModifyRefreshHandler refreshHandler = (ModifyRefreshHandler) 
tableChange;
             return String.format(
                     "  MODIFY REFRESH HANDLER DESCRIPTION TO '%s'",
                     refreshHandler.getRefreshHandlerDesc());
-        } else if (tableChange instanceof TableChange.ModifyDefinitionQuery) {
-            TableChange.ModifyDefinitionQuery definitionQuery =
-                    (TableChange.ModifyDefinitionQuery) tableChange;
+        } else if (tableChange instanceof ModifyDefinitionQuery) {
+            ModifyDefinitionQuery definitionQuery = (ModifyDefinitionQuery) 
tableChange;
             return String.format(
                     " MODIFY DEFINITION QUERY TO '%s'", 
definitionQuery.getDefinitionQuery());
         } else {
             return AlterTableChangeOperation.toString(tableChange);
         }
     }
+
+    private static class MTContext {
+        private boolean isQueryChange;
+        private @Nullable TableDistribution distribution;
+        private RefreshStatus refreshStatus;
+        private @Nullable String refreshHandlerDesc;
+        private byte[] refreshHandlerBytes;
+        private List<UnresolvedColumn> columns;
+        private List<UnresolvedWatermarkSpec> watermarkSpecs;
+        private String primaryKeyName = null;
+        private List<String> primaryKeyColumns = null;
+        private int droppedPersistedCnt = 0;
+        private String originalQuery;
+        private String expandedQuery;
+        private CatalogMaterializedTable oldTable;
+
+        public MTContext(CatalogMaterializedTable oldTable) {
+            this.distribution = oldTable.getDistribution().orElse(null);
+            this.refreshStatus = oldTable.getRefreshStatus();
+            this.refreshHandlerDesc = 
oldTable.getRefreshHandlerDescription().orElse(null);
+            this.refreshHandlerBytes = oldTable.getSerializedRefreshHandler();
+            this.watermarkSpecs = 
oldTable.getUnresolvedSchema().getWatermarkSpecs();
+            this.columns = new 
LinkedList<>(oldTable.getUnresolvedSchema().getColumns());
+            Schema.UnresolvedPrimaryKey primaryKey =
+                    
oldTable.getUnresolvedSchema().getPrimaryKey().orElse(null);
+            if (primaryKey != null) {
+                this.primaryKeyName = primaryKey.getConstraintName();
+                this.primaryKeyColumns = primaryKey.getColumnNames();
+            }
+            originalQuery = oldTable.getOriginalQuery();
+            expandedQuery = oldTable.getExpandedQuery();
+            this.oldTable = oldTable;
+        }
+
+        private void setColumnAtPosition(UnresolvedColumn column, 
ColumnPosition position) {
+            if (position == null) {
+                columns.add(column);
+            } else if (position == ColumnPosition.first()) {
+                columns.add(0, column);
+            } else {
+                String after = ((After) position).column();
+                int index = getColumnIndex(after);
+
+                columns.add(index + 1, column);
+            }
+        }
+
+        private int getColumnIndex(String name) {
+            for (int i = 0; i < columns.size(); i++) {
+                if (Objects.equals(name, columns.get(i).getName())) {
+                    return i;
+                }
+            }
+            return -1;
+        }
+
+        private void applyTableChanges(List<TableChange> tableChanges) {
+            isQueryChange = tableChanges.stream().anyMatch(t -> t instanceof 
ModifyDefinitionQuery);
+            Schema oldSchema = oldTable.getUnresolvedSchema();
+            if (isQueryChange) {
+                checkForChangedPositionByQuery(tableChanges, oldSchema);
+            }
+
+            for (TableChange tableChange : tableChanges) {
+                handleColumns(tableChange);
+                handleDistribution(tableChange);
+                handleWatermark(tableChange);
+                handleUniqueConstraint(tableChange);
+                handleRefreshStatus(tableChange);
+                handleRefreshHandler(tableChange);
+                handleModifyDefinitionQuery(tableChange);
+            }
+
+            if (droppedPersistedCnt > 0 && isQueryChange) {
+                final int schemaSize = oldSchema.getColumns().size();
+                throw new ValidationException(
+                        String.format(
+                                "Failed to modify query because drop column is 
unsupported. "
+                                        + "When modifying a query, you can 
only append new columns at the end of original schema. "
+                                        + "The original schema has %d columns, 
but the newly derived schema from the query has %d columns.",
+                                schemaSize, schemaSize - droppedPersistedCnt));
+            }
+        }
+
+        private Schema retrieveSchema() {
+            Schema.Builder schemaToApply = 
Schema.newBuilder().fromColumns(columns);
+            if (primaryKeyColumns != null) {
+                if (primaryKeyName == null) {
+                    schemaToApply.primaryKey(primaryKeyColumns);
+                } else {
+                    schemaToApply.primaryKeyNamed(primaryKeyName, 
primaryKeyColumns);
+                }
+            }
+
+            for (UnresolvedWatermarkSpec spec : watermarkSpecs) {
+                schemaToApply.watermark(spec.getColumnName(), 
spec.getWatermarkExpression());
+            }
+            return schemaToApply.build();
+        }
+
+        private void handleColumns(TableChange tableChange) {
+            if (tableChange instanceof AddColumn) {
+                AddColumn addColumn = (AddColumn) tableChange;
+                Column column = addColumn.getColumn();
+                ColumnPosition position = addColumn.getPosition();
+                UnresolvedColumn columnToAdd = toUnresolvedColumn(column);
+                setColumnAtPosition(columnToAdd, position);
+            } else if (tableChange instanceof ModifyColumnPosition) {
+                // ModifyColumnPosition extends ModifyColumn so it should be 
before ModifyColumn
+                ModifyColumnPosition columnWithChangedPosition = 
(ModifyColumnPosition) tableChange;
+                Column column = columnWithChangedPosition.getOldColumn();
+                int oldPosition = getColumnIndex(column.getName());
+                if (isQueryChange) {
+                    throw new ValidationException(
+                            String.format(
+                                    "When modifying the query of a 
materialized table, "
+                                            + "currently only support 
appending columns at the end of original schema, dropping, renaming, and 
reordering columns are not supported.\n"
+                                            + "Column mismatch at position %d: 
Original column is [%s], but new column is [%s].",
+                                    oldPosition, column, column));
+                }
+
+                ColumnPosition position = 
columnWithChangedPosition.getNewPosition();
+                UnresolvedColumn changedPositionColumn = 
columns.get(oldPosition);
+                columns.remove(oldPosition);
+                setColumnAtPosition(changedPositionColumn, position);
+            } else if (tableChange instanceof DropColumn) {
+                DropColumn dropColumn = (DropColumn) tableChange;
+                String droppedColumnName = dropColumn.getColumnName();
+                int index = getColumnIndex(droppedColumnName);
+                UnresolvedColumn column = columns.get(index);
+                if (isQueryChange && isNonPersistedColumn(column)) {
+                    // noop
+                } else {
+                    columns.remove(index);
+                    droppedPersistedCnt++;
+                }
+            } else if (tableChange instanceof ModifyColumn) {
+                ModifyColumn type = (ModifyColumn) tableChange;
+                Column column = type.getOldColumn();
+                Column newColumn = type.getNewColumn();
+                int index = getColumnIndex(column.getName());
+                UnresolvedColumn newColumn1 = toUnresolvedColumn(newColumn);
+                columns.set(index, newColumn1);
+            }
+        }
+
+        private void handleModifyDefinitionQuery(TableChange tableChange) {
+            if (tableChange instanceof ModifyDefinitionQuery) {
+                ModifyDefinitionQuery queryChange = (ModifyDefinitionQuery) 
tableChange;
+                expandedQuery = queryChange.getDefinitionQuery();
+                originalQuery = queryChange.getOriginalQuery();
+            }
+        }
+
+        private boolean isNonPersistedColumn(UnresolvedColumn column) {
+            return column instanceof UnresolvedComputedColumn
+                    || column instanceof UnresolvedMetadataColumn
+                            && ((UnresolvedMetadataColumn) column).isVirtual();
+        }
+
+        private void handleUniqueConstraint(TableChange tableChange) {
+            if (tableChange instanceof DropConstraint) {
+                primaryKeyName = null;
+                primaryKeyColumns = null;

Review Comment:
   >Do we need to sequence the changes somehow? If I change the PK from pk1 to 
pk2, do I get a DropConstraint and an AddUniqueConstraint. If so, we need to 
drop first before readding, right?
   Can this also happen for other changes?
   Since changes is a list, do we already have the sequence? If so, please 
document that.
   
   may be in future we will have such cases
   right now I don't know how could tell to Flink just with changing query 
operation (what I should put in SQL...)
   
   Anyway I will put the comment about ordering just in case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to