Copilot commented on code in PR #2331:
URL: https://github.com/apache/fluss/pull/2331#discussion_r2672377900
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java:
##########
@@ -133,69 +138,118 @@ public void alterTable(TablePath tablePath,
List<TableChange> tableChanges, Cont
}
}
- private boolean shouldAlterTable(TablePath tablePath, List<TableChange>
tableChanges)
+ private List<TableChange> checkAndFilterDuplicateTableChanges(
+ TablePath tablePath,
+ List<TableChange> tableChanges,
+ org.apache.fluss.metadata.Schema flussSchema)
throws TableNotExistException {
+ if (tableChanges.isEmpty()) {
+ return tableChanges;
+ }
+
try {
+ // Get current Paimon table schema
Table table = paimonCatalog.getTable(toPaimon(tablePath));
FileStoreTable fileStoreTable = (FileStoreTable) table;
- Schema currentSchema = fileStoreTable.schema().toSchema();
-
- for (TableChange change : tableChanges) {
- if (change instanceof TableChange.AddColumn) {
- TableChange.AddColumn addColumn = (TableChange.AddColumn)
change;
- if (!isColumnAlreadyExists(currentSchema, addColumn)) {
- return true;
- }
- } else {
- return true;
+ List<DataField> paimonFields =
+ fileStoreTable.schema().toSchema().fields().stream()
+ .filter(field ->
!SYSTEM_COLUMNS.containsKey(field.name()))
+ .collect(Collectors.toList());
+ List<org.apache.fluss.metadata.Schema.Column> flussColumns =
flussSchema.getColumns();
+
+ if (paimonFields.size() < flussColumns.size()) {
+ throw new InvalidAlterTableException(
+ String.format(
+ "Paimon table has less columns (%d) than Fluss
schema (%d)",
+ paimonFields.size(), flussColumns.size()));
+ }
+
+ // Validate schema compatibility
+ validateExistingColumns(paimonFields, flussColumns);
+
+ if (paimonFields.size() == flussColumns.size()) {
+ return tableChanges;
+ }
+
+ // if paimon column size is greater than expected fluss column
size, meaning last add
+ // columns are failed.
+ // Thus, this time must be retried to keep the schema same, only
then can add new
+ // columns or other operations next time.
+ String errorMsg =
+ String.format(
+ "Paimon table has more columns (%d) than Fluss
schema (%d), thus need to add the diff columns at once rather than other table
changes %s.",
Review Comment:
The error message has a grammatical issue. "thus need to add" should be
"thus needs to add" or better yet, rephrase to "therefore you need to add the
diff columns all at once, rather than applying other table changes" for better
clarity.
```suggestion
"Paimon table has more columns (%d) than Fluss
schema (%d); therefore you need to add the diff columns all at once, rather
than applying other table changes: %s.",
```
##########
fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java:
##########
@@ -84,5 +85,12 @@ interface Context {
/** Get the fluss principal currently accessing the catalog. */
FlussPrincipal getFlussPrincipal();
+
+ /**
+ * Get the current schema of fluss.
+ *
+ * @return the current schema of fluss,
Review Comment:
The new method `getFlussSchema()` should include a `@since` annotation to
document when it was added to the API, consistent with other methods in the
interface that have `@since` annotations.
```suggestion
* @return the current schema of fluss,
* @since 0.10
```
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java:
##########
@@ -133,69 +138,118 @@ public void alterTable(TablePath tablePath,
List<TableChange> tableChanges, Cont
}
}
- private boolean shouldAlterTable(TablePath tablePath, List<TableChange>
tableChanges)
+ private List<TableChange> checkAndFilterDuplicateTableChanges(
+ TablePath tablePath,
+ List<TableChange> tableChanges,
+ org.apache.fluss.metadata.Schema flussSchema)
throws TableNotExistException {
+ if (tableChanges.isEmpty()) {
+ return tableChanges;
+ }
+
try {
+ // Get current Paimon table schema
Table table = paimonCatalog.getTable(toPaimon(tablePath));
FileStoreTable fileStoreTable = (FileStoreTable) table;
- Schema currentSchema = fileStoreTable.schema().toSchema();
-
- for (TableChange change : tableChanges) {
- if (change instanceof TableChange.AddColumn) {
- TableChange.AddColumn addColumn = (TableChange.AddColumn)
change;
- if (!isColumnAlreadyExists(currentSchema, addColumn)) {
- return true;
- }
- } else {
- return true;
+ List<DataField> paimonFields =
+ fileStoreTable.schema().toSchema().fields().stream()
+ .filter(field ->
!SYSTEM_COLUMNS.containsKey(field.name()))
+ .collect(Collectors.toList());
+ List<org.apache.fluss.metadata.Schema.Column> flussColumns =
flussSchema.getColumns();
+
+ if (paimonFields.size() < flussColumns.size()) {
+ throw new InvalidAlterTableException(
+ String.format(
+ "Paimon table has less columns (%d) than Fluss
schema (%d)",
+ paimonFields.size(), flussColumns.size()));
+ }
+
+ // Validate schema compatibility
+ validateExistingColumns(paimonFields, flussColumns);
+
+ if (paimonFields.size() == flussColumns.size()) {
+ return tableChanges;
+ }
+
+ // if paimon column size is greater than expected fluss column
size, meaning last add
+ // columns are failed.
+ // Thus, this time must be retried to keep the schema same, only
then can add new
+ // columns or other operations next time.
+ String errorMsg =
+ String.format(
+ "Paimon table has more columns (%d) than Fluss
schema (%d), thus need to add the diff columns at once rather than other table
changes %s.",
+ paimonFields.size(), flussColumns.size(),
tableChanges);
+ if (flussColumns.size() + tableChanges.size() !=
paimonFields.size()) {
+ throw new InvalidAlterTableException(errorMsg);
+ }
+
+ for (int i = 0; i < paimonFields.size() - flussColumns.size();
i++) {
+ DataField paimonDataField = paimonFields.get(i +
flussColumns.size());
+ TableChange tableChange = tableChanges.get(i);
+ if (!(tableChange instanceof TableChange.AddColumn
+ && ((TableChange.AddColumn) tableChange).getPosition()
+ == TableChange.ColumnPosition.last()
+ && isColumnAlreadyExists(
+ paimonDataField, (TableChange.AddColumn)
tableChange))) {
+ throw new InvalidAlterTableException(errorMsg);
}
}
Review Comment:
The method `checkAndFilterDuplicateTableChanges` only handles
`TableChange.AddColumn` operations in its retry logic (lines 186-196). If the
method is called with property changes (like `TableChange.SetOption` or
`TableChange.ResetOption`) when Paimon has more columns than Fluss, it will
incorrectly throw an error at line 194. Consider checking if all tableChanges
are AddColumn operations before entering the retry logic, or handle
non-AddColumn changes appropriately.
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java:
##########
@@ -133,69 +138,118 @@ public void alterTable(TablePath tablePath,
List<TableChange> tableChanges, Cont
}
}
- private boolean shouldAlterTable(TablePath tablePath, List<TableChange>
tableChanges)
+ private List<TableChange> checkAndFilterDuplicateTableChanges(
+ TablePath tablePath,
+ List<TableChange> tableChanges,
+ org.apache.fluss.metadata.Schema flussSchema)
throws TableNotExistException {
+ if (tableChanges.isEmpty()) {
+ return tableChanges;
+ }
+
try {
+ // Get current Paimon table schema
Table table = paimonCatalog.getTable(toPaimon(tablePath));
FileStoreTable fileStoreTable = (FileStoreTable) table;
- Schema currentSchema = fileStoreTable.schema().toSchema();
-
- for (TableChange change : tableChanges) {
- if (change instanceof TableChange.AddColumn) {
- TableChange.AddColumn addColumn = (TableChange.AddColumn)
change;
- if (!isColumnAlreadyExists(currentSchema, addColumn)) {
- return true;
- }
- } else {
- return true;
+ List<DataField> paimonFields =
+ fileStoreTable.schema().toSchema().fields().stream()
+ .filter(field ->
!SYSTEM_COLUMNS.containsKey(field.name()))
+ .collect(Collectors.toList());
+ List<org.apache.fluss.metadata.Schema.Column> flussColumns =
flussSchema.getColumns();
+
+ if (paimonFields.size() < flussColumns.size()) {
+ throw new InvalidAlterTableException(
+ String.format(
+ "Paimon table has less columns (%d) than Fluss
schema (%d)",
+ paimonFields.size(), flussColumns.size()));
+ }
+
+ // Validate schema compatibility
Review Comment:
There's an extra space in the comment. Should be "Validate schema
compatibility" instead of "Validate schema compatibility".
```suggestion
// Validate schema compatibility
```
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java:
##########
@@ -133,69 +138,118 @@ public void alterTable(TablePath tablePath,
List<TableChange> tableChanges, Cont
}
}
- private boolean shouldAlterTable(TablePath tablePath, List<TableChange>
tableChanges)
+ private List<TableChange> checkAndFilterDuplicateTableChanges(
+ TablePath tablePath,
+ List<TableChange> tableChanges,
+ org.apache.fluss.metadata.Schema flussSchema)
throws TableNotExistException {
+ if (tableChanges.isEmpty()) {
+ return tableChanges;
+ }
+
try {
+ // Get current Paimon table schema
Table table = paimonCatalog.getTable(toPaimon(tablePath));
FileStoreTable fileStoreTable = (FileStoreTable) table;
- Schema currentSchema = fileStoreTable.schema().toSchema();
-
- for (TableChange change : tableChanges) {
- if (change instanceof TableChange.AddColumn) {
- TableChange.AddColumn addColumn = (TableChange.AddColumn)
change;
- if (!isColumnAlreadyExists(currentSchema, addColumn)) {
- return true;
- }
- } else {
- return true;
+ List<DataField> paimonFields =
+ fileStoreTable.schema().toSchema().fields().stream()
+ .filter(field ->
!SYSTEM_COLUMNS.containsKey(field.name()))
+ .collect(Collectors.toList());
+ List<org.apache.fluss.metadata.Schema.Column> flussColumns =
flussSchema.getColumns();
+
+ if (paimonFields.size() < flussColumns.size()) {
+ throw new InvalidAlterTableException(
+ String.format(
+ "Paimon table has less columns (%d) than Fluss
schema (%d)",
+ paimonFields.size(), flussColumns.size()));
+ }
+
+ // Validate schema compatibility
+ validateExistingColumns(paimonFields, flussColumns);
+
+ if (paimonFields.size() == flussColumns.size()) {
+ return tableChanges;
+ }
+
+ // if paimon column size is greater than expected fluss column
size, meaning last add
+ // columns are failed.
+ // Thus, this time must be retried to keep the schema same, only
then can add new
+ // columns or other operations next time.
Review Comment:
The comment states "meaning last add columns are failed" which should be
"meaning the last add columns operation failed" for better grammar. Also, "This
time must be retried" could be clearer as "This operation must retry adding the
same columns".
```suggestion
// If Paimon column size is greater than the expected Fluss
column size, it means the
// last add columns operation failed.
// Thus, this operation must retry adding the same columns to
keep the schemas the
// same before adding new columns or performing other operations.
```
##########
fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java:
##########
@@ -84,5 +85,12 @@ interface Context {
/** Get the fluss principal currently accessing the catalog. */
FlussPrincipal getFlussPrincipal();
+
+ /**
+ * Get the current schema of fluss.
+ *
+ * @return the current schema of fluss,
Review Comment:
The JavaDoc comment is incomplete. It has "@return the current schema of
fluss," but the sentence is not finished and has a trailing comma. Consider
completing the description, for example: "Get the current schema of Fluss.
@return the current schema of Fluss".
```suggestion
* Get the current schema of Fluss.
*
* @return the current schema of Fluss.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]