beryllw commented on code in PR #2331:
URL: https://github.com/apache/fluss/pull/2331#discussion_r2675507496
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java:
##########
@@ -133,69 +138,118 @@ public void alterTable(TablePath tablePath,
List<TableChange> tableChanges, Cont
}
}
- private boolean shouldAlterTable(TablePath tablePath, List<TableChange>
tableChanges)
+ private List<TableChange> checkAndFilterDuplicateTableChanges(
+ TablePath tablePath,
+ List<TableChange> tableChanges,
+ org.apache.fluss.metadata.Schema flussSchema)
throws TableNotExistException {
+ if (tableChanges.isEmpty()) {
+ return tableChanges;
+ }
+
try {
+ // Get current Paimon table schema
Table table = paimonCatalog.getTable(toPaimon(tablePath));
FileStoreTable fileStoreTable = (FileStoreTable) table;
- Schema currentSchema = fileStoreTable.schema().toSchema();
-
- for (TableChange change : tableChanges) {
- if (change instanceof TableChange.AddColumn) {
- TableChange.AddColumn addColumn = (TableChange.AddColumn)
change;
- if (!isColumnAlreadyExists(currentSchema, addColumn)) {
- return true;
- }
- } else {
- return true;
+ List<DataField> paimonFields =
+ fileStoreTable.schema().toSchema().fields().stream()
+ .filter(field ->
!SYSTEM_COLUMNS.containsKey(field.name()))
+ .collect(Collectors.toList());
+ List<org.apache.fluss.metadata.Schema.Column> flussColumns =
flussSchema.getColumns();
+
+ if (paimonFields.size() < flussColumns.size()) {
+ throw new InvalidAlterTableException(
+ String.format(
+ "Paimon table has less columns (%d) than Fluss
schema (%d)",
+ paimonFields.size(), flussColumns.size()));
+ }
+
+ // Validate schema compatibility
+ validateExistingColumns(paimonFields, flussColumns);
+
+ if (paimonFields.size() == flussColumns.size()) {
+ return tableChanges;
+ }
+
+ // if paimon column size is greater than expected fluss column
size, meaning last add
+ // columns are failed.
+ // Thus, this time must be retried to keep the schema same, only
then can add new
+ // columns or other operations next time.
+ String errorMsg =
+ String.format(
+ "Paimon table has more columns (%d) than Fluss
schema (%d), thus need to add the diff columns at once rather than other table
changes %s.",
+ paimonFields.size(), flussColumns.size(),
tableChanges);
+ if (flussColumns.size() + tableChanges.size() !=
paimonFields.size()) {
+ throw new InvalidAlterTableException(errorMsg);
+ }
+
+ for (int i = 0; i < paimonFields.size() - flussColumns.size();
i++) {
+ DataField paimonDataField = paimonFields.get(i +
flussColumns.size());
+ TableChange tableChange = tableChanges.get(i);
+ if (!(tableChange instanceof TableChange.AddColumn
+ && ((TableChange.AddColumn) tableChange).getPosition()
+ == TableChange.ColumnPosition.last()
+ && isColumnAlreadyExists(
+ paimonDataField, (TableChange.AddColumn)
tableChange))) {
+ throw new InvalidAlterTableException(errorMsg);
Review Comment:
Maybe we could split the checks and provide distinct error messages to help
users more easily identify issues related to the Paimon schema.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]