Copilot commented on code in PR #2331:
URL: https://github.com/apache/fluss/pull/2331#discussion_r2672377900


##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java:
##########
@@ -133,69 +138,118 @@ public void alterTable(TablePath tablePath, 
List<TableChange> tableChanges, Cont
         }
     }
 
-    private boolean shouldAlterTable(TablePath tablePath, List<TableChange> 
tableChanges)
+    private List<TableChange> checkAndFilterDuplicateTableChanges(
+            TablePath tablePath,
+            List<TableChange> tableChanges,
+            org.apache.fluss.metadata.Schema flussSchema)
             throws TableNotExistException {
+        if (tableChanges.isEmpty()) {
+            return tableChanges;
+        }
+
         try {
+            // Get current Paimon table schema
             Table table = paimonCatalog.getTable(toPaimon(tablePath));
             FileStoreTable fileStoreTable = (FileStoreTable) table;
-            Schema currentSchema = fileStoreTable.schema().toSchema();
-
-            for (TableChange change : tableChanges) {
-                if (change instanceof TableChange.AddColumn) {
-                    TableChange.AddColumn addColumn = (TableChange.AddColumn) 
change;
-                    if (!isColumnAlreadyExists(currentSchema, addColumn)) {
-                        return true;
-                    }
-                } else {
-                    return true;
+            List<DataField> paimonFields =
+                    fileStoreTable.schema().toSchema().fields().stream()
+                            .filter(field -> 
!SYSTEM_COLUMNS.containsKey(field.name()))
+                            .collect(Collectors.toList());
+            List<org.apache.fluss.metadata.Schema.Column> flussColumns = 
flussSchema.getColumns();
+
+            if (paimonFields.size() < flussColumns.size()) {
+                throw new InvalidAlterTableException(
+                        String.format(
+                                "Paimon table has less columns (%d) than Fluss 
schema (%d)",
+                                paimonFields.size(), flussColumns.size()));
+            }
+
+            // Validate  schema compatibility
+            validateExistingColumns(paimonFields, flussColumns);
+
+            if (paimonFields.size() == flussColumns.size()) {
+                return tableChanges;
+            }
+
+            // if paimon column size is greater than expected fluss column 
size, meaning last add
+            // columns are failed.
+            // Thus, this time must be retried to keep the schema same, only 
then can add new
+            // columns or other operations next time.
+            String errorMsg =
+                    String.format(
+                            "Paimon table has more columns (%d) than Fluss 
schema (%d), thus need to add the diff columns at once rather than other table 
changes %s.",

Review Comment:
   The error message has a grammatical issue. "thus need to add" should be 
"thus needs to add" or better yet, rephrase to "therefore you need to add the 
diff columns all at once, rather than applying other table changes" for better 
clarity.
   ```suggestion
                               "Paimon table has more columns (%d) than Fluss 
schema (%d); therefore you need to add the diff columns all at once, rather 
than applying other table changes: %s.",
   ```



##########
fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java:
##########
@@ -84,5 +85,12 @@ interface Context {
 
         /** Get the fluss principal currently accessing the catalog. */
         FlussPrincipal getFlussPrincipal();
+
+        /**
+         * Get the current schema of fluss.
+         *
+         * @return the current schema of fluss,

Review Comment:
   The new method `getFlussSchema()` should include a `@since` annotation to 
document when it was added to the API, consistent with other methods in the 
interface that have `@since` annotations.
   ```suggestion
            * @return the current schema of fluss,
            * @since 0.10
   ```



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java:
##########
@@ -133,69 +138,118 @@ public void alterTable(TablePath tablePath, 
List<TableChange> tableChanges, Cont
         }
     }
 
-    private boolean shouldAlterTable(TablePath tablePath, List<TableChange> 
tableChanges)
+    private List<TableChange> checkAndFilterDuplicateTableChanges(
+            TablePath tablePath,
+            List<TableChange> tableChanges,
+            org.apache.fluss.metadata.Schema flussSchema)
             throws TableNotExistException {
+        if (tableChanges.isEmpty()) {
+            return tableChanges;
+        }
+
         try {
+            // Get current Paimon table schema
             Table table = paimonCatalog.getTable(toPaimon(tablePath));
             FileStoreTable fileStoreTable = (FileStoreTable) table;
-            Schema currentSchema = fileStoreTable.schema().toSchema();
-
-            for (TableChange change : tableChanges) {
-                if (change instanceof TableChange.AddColumn) {
-                    TableChange.AddColumn addColumn = (TableChange.AddColumn) 
change;
-                    if (!isColumnAlreadyExists(currentSchema, addColumn)) {
-                        return true;
-                    }
-                } else {
-                    return true;
+            List<DataField> paimonFields =
+                    fileStoreTable.schema().toSchema().fields().stream()
+                            .filter(field -> 
!SYSTEM_COLUMNS.containsKey(field.name()))
+                            .collect(Collectors.toList());
+            List<org.apache.fluss.metadata.Schema.Column> flussColumns = 
flussSchema.getColumns();
+
+            if (paimonFields.size() < flussColumns.size()) {
+                throw new InvalidAlterTableException(
+                        String.format(
+                                "Paimon table has less columns (%d) than Fluss 
schema (%d)",
+                                paimonFields.size(), flussColumns.size()));
+            }
+
+            // Validate  schema compatibility
+            validateExistingColumns(paimonFields, flussColumns);
+
+            if (paimonFields.size() == flussColumns.size()) {
+                return tableChanges;
+            }
+
+            // if paimon column size is greater than expected fluss column 
size, meaning last add
+            // columns are failed.
+            // Thus, this time must be retried to keep the schema same, only 
then can add new
+            // columns or other operations next time.
+            String errorMsg =
+                    String.format(
+                            "Paimon table has more columns (%d) than Fluss 
schema (%d), thus need to add the diff columns at once rather than other table 
changes %s.",
+                            paimonFields.size(), flussColumns.size(), 
tableChanges);
+            if (flussColumns.size() + tableChanges.size() != 
paimonFields.size()) {
+                throw new InvalidAlterTableException(errorMsg);
+            }
+
+            for (int i = 0; i < paimonFields.size() - flussColumns.size(); 
i++) {
+                DataField paimonDataField = paimonFields.get(i + 
flussColumns.size());
+                TableChange tableChange = tableChanges.get(i);
+                if (!(tableChange instanceof TableChange.AddColumn
+                        && ((TableChange.AddColumn) tableChange).getPosition()
+                                == TableChange.ColumnPosition.last()
+                        && isColumnAlreadyExists(
+                                paimonDataField, (TableChange.AddColumn) 
tableChange))) {
+                    throw new InvalidAlterTableException(errorMsg);
                 }
             }

Review Comment:
   The method `checkAndFilterDuplicateTableChanges` only handles 
`TableChange.AddColumn` operations in its retry logic (lines 186-196). If the 
method is called with property changes (like `TableChange.SetOption` or 
`TableChange.ResetOption`) when Paimon has more columns than Fluss, it will 
incorrectly throw an error at line 194. Consider checking if all tableChanges 
are AddColumn operations before entering the retry logic, or handle 
non-AddColumn changes appropriately.



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java:
##########
@@ -133,69 +138,118 @@ public void alterTable(TablePath tablePath, 
List<TableChange> tableChanges, Cont
         }
     }
 
-    private boolean shouldAlterTable(TablePath tablePath, List<TableChange> 
tableChanges)
+    private List<TableChange> checkAndFilterDuplicateTableChanges(
+            TablePath tablePath,
+            List<TableChange> tableChanges,
+            org.apache.fluss.metadata.Schema flussSchema)
             throws TableNotExistException {
+        if (tableChanges.isEmpty()) {
+            return tableChanges;
+        }
+
         try {
+            // Get current Paimon table schema
             Table table = paimonCatalog.getTable(toPaimon(tablePath));
             FileStoreTable fileStoreTable = (FileStoreTable) table;
-            Schema currentSchema = fileStoreTable.schema().toSchema();
-
-            for (TableChange change : tableChanges) {
-                if (change instanceof TableChange.AddColumn) {
-                    TableChange.AddColumn addColumn = (TableChange.AddColumn) 
change;
-                    if (!isColumnAlreadyExists(currentSchema, addColumn)) {
-                        return true;
-                    }
-                } else {
-                    return true;
+            List<DataField> paimonFields =
+                    fileStoreTable.schema().toSchema().fields().stream()
+                            .filter(field -> 
!SYSTEM_COLUMNS.containsKey(field.name()))
+                            .collect(Collectors.toList());
+            List<org.apache.fluss.metadata.Schema.Column> flussColumns = 
flussSchema.getColumns();
+
+            if (paimonFields.size() < flussColumns.size()) {
+                throw new InvalidAlterTableException(
+                        String.format(
+                                "Paimon table has less columns (%d) than Fluss 
schema (%d)",
+                                paimonFields.size(), flussColumns.size()));
+            }
+
+            // Validate  schema compatibility

Review Comment:
   There's an extra space in the comment. Should be "Validate schema 
compatibility" instead of "Validate  schema compatibility".
   ```suggestion
               // Validate schema compatibility
   ```



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java:
##########
@@ -133,69 +138,118 @@ public void alterTable(TablePath tablePath, 
List<TableChange> tableChanges, Cont
         }
     }
 
-    private boolean shouldAlterTable(TablePath tablePath, List<TableChange> 
tableChanges)
+    private List<TableChange> checkAndFilterDuplicateTableChanges(
+            TablePath tablePath,
+            List<TableChange> tableChanges,
+            org.apache.fluss.metadata.Schema flussSchema)
             throws TableNotExistException {
+        if (tableChanges.isEmpty()) {
+            return tableChanges;
+        }
+
         try {
+            // Get current Paimon table schema
             Table table = paimonCatalog.getTable(toPaimon(tablePath));
             FileStoreTable fileStoreTable = (FileStoreTable) table;
-            Schema currentSchema = fileStoreTable.schema().toSchema();
-
-            for (TableChange change : tableChanges) {
-                if (change instanceof TableChange.AddColumn) {
-                    TableChange.AddColumn addColumn = (TableChange.AddColumn) 
change;
-                    if (!isColumnAlreadyExists(currentSchema, addColumn)) {
-                        return true;
-                    }
-                } else {
-                    return true;
+            List<DataField> paimonFields =
+                    fileStoreTable.schema().toSchema().fields().stream()
+                            .filter(field -> 
!SYSTEM_COLUMNS.containsKey(field.name()))
+                            .collect(Collectors.toList());
+            List<org.apache.fluss.metadata.Schema.Column> flussColumns = 
flussSchema.getColumns();
+
+            if (paimonFields.size() < flussColumns.size()) {
+                throw new InvalidAlterTableException(
+                        String.format(
+                                "Paimon table has less columns (%d) than Fluss 
schema (%d)",
+                                paimonFields.size(), flussColumns.size()));
+            }
+
+            // Validate  schema compatibility
+            validateExistingColumns(paimonFields, flussColumns);
+
+            if (paimonFields.size() == flussColumns.size()) {
+                return tableChanges;
+            }
+
+            // if paimon column size is greater than expected fluss column 
size, meaning last add
+            // columns are failed.
+            // Thus, this time must be retried to keep the schema same, only 
then can add new
+            // columns or other operations next time.

Review Comment:
   The comment states "meaning last add columns are failed" which should be 
"meaning the last add columns operation failed" for better grammar. Also, "This 
time must be retried" could be clearer as "This operation must retry adding the 
same columns".
   ```suggestion
               // If Paimon column size is greater than the expected Fluss 
column size, it means the
               // last add columns operation failed.
               // Thus, this operation must retry adding the same columns to 
keep the schemas the
               // same before adding new columns or performing other operations.
   ```



##########
fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java:
##########
@@ -84,5 +85,12 @@ interface Context {
 
         /** Get the fluss principal currently accessing the catalog. */
         FlussPrincipal getFlussPrincipal();
+
+        /**
+         * Get the current schema of fluss.
+         *
+         * @return the current schema of fluss,

Review Comment:
   The JavaDoc comment is incomplete. It has "@return the current schema of 
fluss," but the sentence is not finished and has a trailing comma. Consider 
completing the description, for example: "Get the current schema of Fluss. 
@return the current schema of Fluss".
   ```suggestion
            * Get the current schema of Fluss.
            *
            * @return the current schema of Fluss.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to