beryllw commented on code in PR #2331:
URL: https://github.com/apache/fluss/pull/2331#discussion_r2675489067


##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java:
##########
@@ -133,69 +138,118 @@ public void alterTable(TablePath tablePath, 
List<TableChange> tableChanges, Cont
         }
     }
 
-    private boolean shouldAlterTable(TablePath tablePath, List<TableChange> 
tableChanges)
+    private List<TableChange> checkAndFilterDuplicateTableChanges(
+            TablePath tablePath,
+            List<TableChange> tableChanges,
+            org.apache.fluss.metadata.Schema flussSchema)
             throws TableNotExistException {
+        if (tableChanges.isEmpty()) {
+            return tableChanges;
+        }
+
         try {
+            // Get current Paimon table schema
             Table table = paimonCatalog.getTable(toPaimon(tablePath));
             FileStoreTable fileStoreTable = (FileStoreTable) table;
-            Schema currentSchema = fileStoreTable.schema().toSchema();
-
-            for (TableChange change : tableChanges) {
-                if (change instanceof TableChange.AddColumn) {
-                    TableChange.AddColumn addColumn = (TableChange.AddColumn) 
change;
-                    if (!isColumnAlreadyExists(currentSchema, addColumn)) {
-                        return true;
-                    }
-                } else {
-                    return true;
+            List<DataField> paimonFields =
+                    fileStoreTable.schema().toSchema().fields().stream()
+                            .filter(field -> 
!SYSTEM_COLUMNS.containsKey(field.name()))
+                            .collect(Collectors.toList());
+            List<org.apache.fluss.metadata.Schema.Column> flussColumns = 
flussSchema.getColumns();
+
+            if (paimonFields.size() < flussColumns.size()) {
+                throw new InvalidAlterTableException(
+                        String.format(
+                                "Paimon table has less columns (%d) than Fluss 
schema (%d)",
+                                paimonFields.size(), flussColumns.size()));
+            }
+
+            // Validate  schema compatibility
+            validateExistingColumns(paimonFields, flussColumns);
+
+            if (paimonFields.size() == flussColumns.size()) {
+                return tableChanges;
+            }
+
+            // if paimon column size is greater than expected fluss column 
size, meaning last add
+            // columns are failed.
+            // Thus, this time must be retried to keep the schema same, only 
then can add new
+            // columns or other operations next time.
+            String errorMsg =
+                    String.format(
+                            "Paimon table has more columns (%d) than Fluss 
schema (%d), thus need to add the diff columns at once rather than other table 
changes %s.",
+                            paimonFields.size(), flussColumns.size(), 
tableChanges);
+            if (flussColumns.size() + tableChanges.size() != 
paimonFields.size()) {
+                throw new InvalidAlterTableException(errorMsg);
+            }
+
+            for (int i = 0; i < paimonFields.size() - flussColumns.size(); 
i++) {
+                DataField paimonDataField = paimonFields.get(i + 
flussColumns.size());
+                TableChange tableChange = tableChanges.get(i);
+                if (!(tableChange instanceof TableChange.AddColumn
+                        && ((TableChange.AddColumn) tableChange).getPosition()
+                                == TableChange.ColumnPosition.last()

Review Comment:
   Maybe we could perform this check earlier—it’s not related to the existing 
Paimon schema. Also, it’s already validated here:
   
https://github.com/apache/fluss/blob/256bd7f3838a72965720eb2a8c6c7c0575f46b07/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java#L101-L104



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to