wuchong commented on code in PR #2189:
URL: https://github.com/apache/fluss/pull/2189#discussion_r2646907238
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java:
##########
@@ -83,9 +84,15 @@ public SchemaUpdate applySchemaChange(TableChange
columnChange) {
}
private SchemaUpdate addColumn(TableChange.AddColumn addColumn) {
- if (existedColumns.containsKey(addColumn.getName())) {
- throw new IllegalArgumentException(
- "Column " + addColumn.getName() + " already exists.");
+ Schema.Column existingColumn = existedColumns.get(addColumn.getName());
+ if (existingColumn != null) {
+ // Allow idempotent retries: if column name/type/comment match
existing, treat as no-op
+ if (!existingColumn.getDataType().equals(addColumn.getDataType())
+ || !Objects.equals(existingColumn.getComment(),
addColumn.getComment())) {
Review Comment:
```suggestion
||
!Objects.equals(existingColumn.getComment().orElse(null),
addColumn.getComment())) {
```
The return type of `existingColumn.getComment()` is `Optional<String>` which
is not comparable with `addColumn.getComment()`
##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java:
##########
@@ -604,6 +605,65 @@ private
CloseableIterator<org.apache.paimon.data.InternalRow> getPaimonRowClosea
return reader.toCloseableIterator();
}
+ @Test
+ void testTieringWithAddColumn() throws Exception {
+ // Test ADD COLUMN during tiering with "Lake First" strategy
+
+ // 1. Create a datalake enabled table with initial schema (c1: INT,
c2: STRING)
+ TablePath tablePath = TablePath.of(DEFAULT_DB, "addColumnTable");
+ long tableId = createLogTable(tablePath);
+ TableBucket tableBucket = new TableBucket(tableId, 0);
+
+ // 2. Write initial data before ADD COLUMN
+ List<InternalRow> initialRows = Arrays.asList(row(1, "v1"), row(2,
"v2"), row(3, "v3"));
+ writeRows(tablePath, initialRows, true);
+
+ // 3. Start tiering job
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ try {
+ // 4. Wait for initial data to be tiered
+ assertReplicaStatus(tableBucket, 3);
+
+ // 5. Execute ADD COLUMN (c3: INT, nullable)
+ List<TableChange> addColumnChanges =
+ Collections.singletonList(
+ TableChange.addColumn(
+ "c3",
+ DataTypes.INT(),
+ "new column",
+ TableChange.ColumnPosition.last()));
+ admin.alterTable(tablePath, addColumnChanges, false).get();
+
+ // 6. Write more data after ADD COLUMN (with new column value)
+ // schema now has 3 business columns (c1, c2, c3), so provide
value for the new column
+ List<InternalRow> newRows =
+ Arrays.asList(row(4, "v4", 40), row(5, "v5", 50), row(6,
"v6", 60));
+ writeRows(tablePath, newRows, true);
+
+ // 7. Wait for new data to be tiered
+ assertReplicaStatus(tableBucket, 6);
+
+ // 8. Verify Paimon table has the new column
+ Identifier tableIdentifier =
+ Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName());
+ FileStoreTable paimonTable = (FileStoreTable)
paimonCatalog.getTable(tableIdentifier);
+ List<String> fieldNames = paimonTable.rowType().getFieldNames();
+
+ // Should have: a, b, c3, __bucket, __offset, __timestamp
+ assertThat(fieldNames).contains("a", "b", "c3");
+
+ // 9. Verify all data is present in Paimon (no data loss)
+ List<InternalRow> allRows = new ArrayList<>();
+ allRows.addAll(initialRows);
+ allRows.addAll(newRows);
+ checkDataInPaimonAppendOnlyTable(tablePath, allRows, 0);
Review Comment:
I suggest constructing an **expected list of rows** that includes all six
columns (the new column, original user columns, and relevant system columns),
and then asserting that this expected list exactly matches the rows read from
Paimon. This will ensure comprehensive validation of both schema evolution and
data correctness.
##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java:
##########
@@ -604,6 +605,65 @@ private
CloseableIterator<org.apache.paimon.data.InternalRow> getPaimonRowClosea
return reader.toCloseableIterator();
}
+ @Test
+ void testTieringWithAddColumn() throws Exception {
+ // Test ADD COLUMN during tiering with "Lake First" strategy
+
+ // 1. Create a datalake enabled table with initial schema (c1: INT,
c2: STRING)
+ TablePath tablePath = TablePath.of(DEFAULT_DB, "addColumnTable");
+ long tableId = createLogTable(tablePath);
+ TableBucket tableBucket = new TableBucket(tableId, 0);
+
+ // 2. Write initial data before ADD COLUMN
+ List<InternalRow> initialRows = Arrays.asList(row(1, "v1"), row(2,
"v2"), row(3, "v3"));
+ writeRows(tablePath, initialRows, true);
+
+ // 3. Start tiering job
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ try {
+ // 4. Wait for initial data to be tiered
+ assertReplicaStatus(tableBucket, 3);
+
+ // 5. Execute ADD COLUMN (c3: INT, nullable)
+ List<TableChange> addColumnChanges =
+ Collections.singletonList(
+ TableChange.addColumn(
+ "c3",
+ DataTypes.INT(),
+ "new column",
+ TableChange.ColumnPosition.last()));
+ admin.alterTable(tablePath, addColumnChanges, false).get();
+
+ // 6. Write more data after ADD COLUMN (with new column value)
+ // schema now has 3 business columns (c1, c2, c3), so provide
value for the new column
+ List<InternalRow> newRows =
+ Arrays.asList(row(4, "v4", 40), row(5, "v5", 50), row(6,
"v6", 60));
+ writeRows(tablePath, newRows, true);
+
+ // 7. Wait for new data to be tiered
+ assertReplicaStatus(tableBucket, 6);
+
+ // 8. Verify Paimon table has the new column
+ Identifier tableIdentifier =
+ Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName());
+ FileStoreTable paimonTable = (FileStoreTable)
paimonCatalog.getTable(tableIdentifier);
+ List<String> fieldNames = paimonTable.rowType().getFieldNames();
+
+ // Should have: a, b, c3, __bucket, __offset, __timestamp
+ assertThat(fieldNames).contains("a", "b", "c3");
Review Comment:
assert exactly all the field names and order.
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java:
##########
@@ -110,8 +110,10 @@ public void alterTable(TablePath tablePath,
List<TableChange> tableChanges, Cont
try {
List<SchemaChange> paimonSchemaChanges =
toPaimonSchemaChanges(tableChanges);
alterTable(tablePath, paimonSchemaChanges);
- } catch (Catalog.ColumnAlreadyExistException |
Catalog.ColumnNotExistException e) {
- // shouldn't happen before we support schema change
+ } catch (Catalog.ColumnAlreadyExistException e) {
+ // Column already exists, treat as idempotent success for retry
scenarios.
Review Comment:
Given that we may execute multiple `TableChange` operations in a single
statement (e.g., adding several columns at once), **blindly ignoring
`ColumnAlreadyExistException` could silently skip the addition of some
columns**, leading to an incomplete schema update.
A simpler and safer approach, in my view, is to **compare the current Paimon
table schema with the expected target schema before performing any `ALTER
TABLE`** (like how you did in
`org.apache.fluss.server.coordinator.MetadataManager#alterTableSchema`):
- If the schemas **differ**, proceed with the `ALTER TABLE` and **report any
errors faithfully to the user** (who can then re-execute if needed).
- If the schemas **already match**, log a clear message (e.g., “Column(s)
already exist—skipping ALTER TABLE”) and skip the operation.
This ensures correctness, avoids silent failures, and provides transparent
feedback to the user.
##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java:
##########
@@ -163,4 +164,64 @@ void testPrimaryKeyTableRecord() {
flussRecordAsPaimonRow.setFlussRecord(logRecord);
assertThat(flussRecordAsPaimonRow.getRowKind()).isEqualTo(RowKind.DELETE);
}
+
+ @Test
+ void testPaimonSchemaWiderThanFlussRecord() {
+ int tableBucket = 0;
+ RowType tableRowType =
+ RowType.of(
+ new org.apache.paimon.types.BooleanType(),
+ new org.apache.paimon.types.VarCharType(),
+ // append three system columns: __bucket,
__offset,__timestamp
+ new org.apache.paimon.types.IntType(),
+ new org.apache.paimon.types.BigIntType(),
+ new
org.apache.paimon.types.LocalZonedTimestampType(3));
+
+ FlussRecordAsPaimonRow flussRecordAsPaimonRow =
+ new FlussRecordAsPaimonRow(tableBucket, tableRowType);
+
+ long logOffset = 7L;
+ long timeStamp = System.currentTimeMillis();
+ GenericRow genericRow = new GenericRow(1);
+ genericRow.setField(0, true);
+ LogRecord logRecord = new GenericRecord(logOffset, timeStamp,
APPEND_ONLY, genericRow);
+ flussRecordAsPaimonRow.setFlussRecord(logRecord);
+
+ assertThat(flussRecordAsPaimonRow.getFieldCount()).isEqualTo(5);
+
+ assertThat(flussRecordAsPaimonRow.getBoolean(0)).isTrue();
+ assertThat(flussRecordAsPaimonRow.isNullAt(1)).isTrue();
+ assertThat(flussRecordAsPaimonRow.getInt(2)).isEqualTo(tableBucket);
+ assertThat(flussRecordAsPaimonRow.getLong(3)).isEqualTo(logOffset);
+ assertThat(flussRecordAsPaimonRow.getLong(4)).isEqualTo(timeStamp);
Review Comment:
should assert `getTimestamp` because this is a timestamp type.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java:
##########
@@ -322,24 +322,28 @@ public long createTable(
}
public void alterTableSchema(
- TablePath tablePath, List<TableChange> schemaChanges, boolean
ignoreIfNotExists)
+ TablePath tablePath,
+ List<TableChange> schemaChanges,
+ boolean ignoreIfNotExists,
+ @Nullable LakeCatalog lakeCatalog,
+ LakeCatalog.Context lakeCatalogContext)
throws TableNotExistException, TableNotPartitionedException {
try {
TableInfo table = getTable(tablePath);
- // TODO: remote this after lake enable table support schema
evolution, track by
- // https://github.com/apache/fluss/issues/2128
- if (table.getTableConfig().isDataLakeEnabled()) {
- throw new InvalidAlterTableException(
- "Schema evolution is currently not supported for
tables with datalake enabled.");
- }
-
// validate the table column changes
if (!schemaChanges.isEmpty()) {
Schema newSchema = SchemaUpdate.applySchemaChanges(table,
schemaChanges);
- // update the schema
- zookeeperClient.registerSchema(tablePath, newSchema,
table.getSchemaId() + 1);
+
+ // Lake First: sync to Lake before updating Fluss schema
+ syncSchemaChangesToLake(
+ tablePath, table, schemaChanges, lakeCatalog,
lakeCatalogContext);
+
+ // Update Fluss schema (ZK) after Lake sync succeeds
+ if (!newSchema.equals(table.getSchema())) {
+ zookeeperClient.registerSchema(tablePath, newSchema,
table.getSchemaId() + 1);
+ }
Review Comment:
log a clear message (e.g., “Column(s) already exist—skipping ALTER TABLE”)
and skip the operation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]