loserwang1024 commented on code in PR #2331:
URL: https://github.com/apache/fluss/pull/2331#discussion_r2693500453


##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java:
##########
@@ -133,69 +138,118 @@ public void alterTable(TablePath tablePath, 
List<TableChange> tableChanges, Cont
         }
     }
 
-    private boolean shouldAlterTable(TablePath tablePath, List<TableChange> 
tableChanges)
+    private List<TableChange> checkAndFilterDuplicateTableChanges(
+            TablePath tablePath,
+            List<TableChange> tableChanges,
+            org.apache.fluss.metadata.Schema flussSchema)
             throws TableNotExistException {
+        if (tableChanges.isEmpty()) {
+            return tableChanges;
+        }
+
         try {
+            // Get current Paimon table schema
             Table table = paimonCatalog.getTable(toPaimon(tablePath));
             FileStoreTable fileStoreTable = (FileStoreTable) table;
-            Schema currentSchema = fileStoreTable.schema().toSchema();
-
-            for (TableChange change : tableChanges) {
-                if (change instanceof TableChange.AddColumn) {
-                    TableChange.AddColumn addColumn = (TableChange.AddColumn) 
change;
-                    if (!isColumnAlreadyExists(currentSchema, addColumn)) {
-                        return true;
-                    }
-                } else {
-                    return true;
+            List<DataField> paimonFields =
+                    fileStoreTable.schema().toSchema().fields().stream()
+                            .filter(field -> 
!SYSTEM_COLUMNS.containsKey(field.name()))
+                            .collect(Collectors.toList());
+            List<org.apache.fluss.metadata.Schema.Column> flussColumns = 
flussSchema.getColumns();
+
+            if (paimonFields.size() < flussColumns.size()) {
+                throw new InvalidAlterTableException(
+                        String.format(
+                                "Paimon table has less columns (%d) than Fluss 
schema (%d)",
+                                paimonFields.size(), flussColumns.size()));
+            }
+
+            // Validate  schema compatibility
+            validateExistingColumns(paimonFields, flussColumns);
+
+            if (paimonFields.size() == flussColumns.size()) {
+                return tableChanges;
+            }
+
+            // if paimon column size is greater than expected fluss column 
size, meaning last add
+            // columns are failed.
+            // Thus, this time must be retried to keep the schema same, only 
then can add new
+            // columns or other operations next time.
+            String errorMsg =
+                    String.format(
+                            "Paimon table has more columns (%d) than Fluss 
schema (%d), thus need to add the diff columns at once rather than other table 
changes %s.",
+                            paimonFields.size(), flussColumns.size(), 
tableChanges);
+            if (flussColumns.size() + tableChanges.size() != 
paimonFields.size()) {
+                throw new InvalidAlterTableException(errorMsg);
+            }
+
+            for (int i = 0; i < paimonFields.size() - flussColumns.size(); 
i++) {
+                DataField paimonDataField = paimonFields.get(i + 
flussColumns.size());
+                TableChange tableChange = tableChanges.get(i);
+                if (!(tableChange instanceof TableChange.AddColumn
+                        && ((TableChange.AddColumn) tableChange).getPosition()
+                                == TableChange.ColumnPosition.last()

Review Comment:
   @beryllw 
   
   > It’s not related to the existing Paimon schema
   
   If paimon schema is same as fluss schema, all the operations(both add column 
and set config) can be accepted. Otherwise, only add columns can be accepted to 
make the schema same.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to