This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a627623c [lake] Support ADD COLUMN AT LAST for dataLake enabled 
tables (#2189)
4a627623c is described below

commit 4a627623c9f9d056aa5e4a3991ca1b2b503453ff
Author: Knock.Code <[email protected]>
AuthorDate: Sun Dec 28 15:05:06 2025 +0800

    [lake] Support ADD COLUMN AT LAST for dataLake enabled tables (#2189)
---
 .../org/apache/fluss/metadata/TableChange.java     |  49 ++++++++
 .../fluss/lake/paimon/PaimonLakeCatalog.java       |  93 ++++++++++++++-
 .../paimon/tiering/FlussRecordAsPaimonRow.java     |  60 ++++++++--
 .../fluss/lake/paimon/utils/PaimonConversions.java |  25 ++++
 .../lake/paimon/LakeEnabledTableCreateITCase.java  |  26 ++++-
 .../fluss/lake/paimon/PaimonLakeCatalogTest.java   | 130 +++++++++++++++++++++
 .../paimon/tiering/FlussRecordAsPaimonRowTest.java |  61 ++++++++++
 .../lake/paimon/tiering/PaimonTieringITCase.java   |  74 +++++++++++-
 .../server/coordinator/CoordinatorService.java     |  15 ++-
 .../fluss/server/coordinator/MetadataManager.java  |  57 +++++++--
 .../fluss/server/coordinator/SchemaUpdate.java     |  14 ++-
 .../coordinator/CoordinatorEventProcessorTest.java |   2 +-
 .../event/watcher/TableChangeWatcherTest.java      |   4 +-
 13 files changed, 566 insertions(+), 44 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java
index fcc8462d9..bb027d690 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java
@@ -257,6 +257,22 @@ public interface TableChange {
         public ColumnPosition getPosition() {
             return position;
         }
+
+        @Override
+        public String toString() {
+            return "AddColumn{"
+                    + "name='"
+                    + name
+                    + '\''
+                    + ", dataType="
+                    + dataType
+                    + ", comment='"
+                    + comment
+                    + '\''
+                    + ", position="
+                    + position
+                    + '}';
+        }
     }
 
     /** A table change to drop a column. */
@@ -270,6 +286,11 @@ public interface TableChange {
         public String getName() {
             return name;
         }
+
+        @Override
+        public String toString() {
+            return "DropColumn{" + "name='" + name + '\'' + '}';
+        }
     }
 
     /** A table change to modify a column. */
@@ -308,6 +329,22 @@ public interface TableChange {
         public ColumnPosition getNewPosition() {
             return newPosition;
         }
+
+        @Override
+        public String toString() {
+            return "ModifyColumn{"
+                    + "name='"
+                    + name
+                    + '\''
+                    + ", dataType="
+                    + dataType
+                    + ", comment='"
+                    + comment
+                    + '\''
+                    + ", newPosition="
+                    + newPosition
+                    + '}';
+        }
     }
 
     /** A table change to modify a column's name. */
@@ -327,6 +364,18 @@ public interface TableChange {
         public String getNewColumnName() {
             return newColumnName;
         }
+
+        @Override
+        public String toString() {
+            return "RenameColumn{"
+                    + "oldColumnName='"
+                    + oldColumnName
+                    + '\''
+                    + ", newColumnName='"
+                    + newColumnName
+                    + '\''
+                    + '}';
+        }
     }
 
     /** The position of the modified or added column. */
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
index 42bc01942..f8cb46ce4 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
@@ -19,6 +19,7 @@ package org.apache.fluss.lake.paimon;
 
 import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.InvalidAlterTableException;
 import org.apache.fluss.exception.TableAlreadyExistException;
 import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.lake.lakestorage.LakeCatalog;
@@ -38,6 +39,8 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -54,6 +57,7 @@ import static 
org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
 /** A Paimon implementation of {@link LakeCatalog}. */
 public class PaimonLakeCatalog implements LakeCatalog {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(PaimonLakeCatalog.class);
     public static final LinkedHashMap<String, DataType> SYSTEM_COLUMNS = new 
LinkedHashMap<>();
 
     static {
@@ -109,13 +113,92 @@ public class PaimonLakeCatalog implements LakeCatalog {
             throws TableNotExistException {
         try {
             List<SchemaChange> paimonSchemaChanges = 
toPaimonSchemaChanges(tableChanges);
-            alterTable(tablePath, paimonSchemaChanges);
-        } catch (Catalog.ColumnAlreadyExistException | 
Catalog.ColumnNotExistException e) {
-            // shouldn't happen before we support schema change
-            throw new RuntimeException(e);
+
+            // Compare current Paimon table schema with expected target schema 
before altering
+            if (shouldAlterTable(tablePath, tableChanges)) {
+                alterTable(tablePath, paimonSchemaChanges);
+            } else {
+                // If schemas already match, treat as idempotent success
+                LOG.info(
+                        "Skipping schema evolution for Paimon table {} because 
the column(s) to add {} already exist.",
+                        tablePath,
+                        tableChanges);
+            }
+        } catch (Catalog.ColumnAlreadyExistException e) {
+            // This shouldn't happen if shouldAlterTable works correctly, but 
keep as safeguard
+            throw new InvalidAlterTableException(e.getMessage());
+        } catch (Catalog.ColumnNotExistException e) {
+            // This shouldn't happen for AddColumn operations
+            throw new InvalidAlterTableException(e.getMessage());
         }
     }
 
+    private boolean shouldAlterTable(TablePath tablePath, List<TableChange> 
tableChanges)
+            throws TableNotExistException {
+        try {
+            Table table = paimonCatalog.getTable(toPaimon(tablePath));
+            FileStoreTable fileStoreTable = (FileStoreTable) table;
+            Schema currentSchema = fileStoreTable.schema().toSchema();
+
+            for (TableChange change : tableChanges) {
+                if (change instanceof TableChange.AddColumn) {
+                    TableChange.AddColumn addColumn = (TableChange.AddColumn) 
change;
+                    if (!isColumnAlreadyExists(currentSchema, addColumn)) {
+                        return true;
+                    }
+                } else {
+                    return true;
+                }
+            }
+
+            return false;
+        } catch (Catalog.TableNotExistException e) {
+            throw new TableNotExistException("Table " + tablePath + " does not 
exist.");
+        }
+    }
+
+    private boolean isColumnAlreadyExists(Schema currentSchema, 
TableChange.AddColumn addColumn) {
+        String columnName = addColumn.getName();
+
+        for (org.apache.paimon.types.DataField field : currentSchema.fields()) 
{
+            if (field.name().equals(columnName)) {
+                org.apache.paimon.types.DataType expectedType =
+                        addColumn
+                                .getDataType()
+                                .accept(
+                                        org.apache.fluss.lake.paimon.utils
+                                                
.FlussDataTypeToPaimonDataType.INSTANCE);
+
+                if (!field.type().equals(expectedType)) {
+                    throw new InvalidAlterTableException(
+                            String.format(
+                                    "Column '%s' already exists but with 
different type. "
+                                            + "Existing: %s, Expected: %s",
+                                    columnName, field.type(), expectedType));
+                }
+                String existingComment = field.description();
+                String expectedComment = addColumn.getComment();
+
+                boolean commentsMatch =
+                        (existingComment == null && expectedComment == null)
+                                || (existingComment != null
+                                        && 
existingComment.equals(expectedComment));
+
+                if (!commentsMatch) {
+                    throw new InvalidAlterTableException(
+                            String.format(
+                                    "Column %s already exists but with 
different comment. "
+                                            + "Existing: %s, Expected: %s",
+                                    columnName, existingComment, 
expectedComment));
+                }
+
+                return true;
+            }
+        }
+
+        return false;
+    }
+
     private void createTable(TablePath tablePath, Schema schema, boolean 
isCreatingFlussTable)
             throws Catalog.DatabaseNotExistException {
         Identifier paimonPath = toPaimon(tablePath);
@@ -134,7 +217,7 @@ public class PaimonLakeCatalog implements LakeCatalog {
                 }
             } catch (Catalog.TableNotExistException tableNotExistException) {
                 // shouldn't happen in normal cases
-                throw new RuntimeException(
+                throw new InvalidAlterTableException(
                         String.format(
                                 "Failed to create table %s in Paimon. The 
table already existed "
                                         + "during the initial creation 
attempt, but subsequently "
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
index 24304d0e4..c092fbdab 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
@@ -27,7 +27,6 @@ import org.apache.paimon.types.RowType;
 
 import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
 import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
-import static org.apache.fluss.utils.Preconditions.checkState;
 
 /** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
 public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow {
@@ -35,27 +34,42 @@ public class FlussRecordAsPaimonRow extends 
FlussRowAsPaimonRow {
     private final int bucket;
     private LogRecord logRecord;
     private int originRowFieldCount;
+    private final int businessFieldCount;
+    private final int bucketFieldIndex;
+    private final int offsetFieldIndex;
+    private final int timestampFieldIndex;
 
     public FlussRecordAsPaimonRow(int bucket, RowType tableTowType) {
         super(tableTowType);
         this.bucket = bucket;
+        this.businessFieldCount = tableRowType.getFieldCount() - 
SYSTEM_COLUMNS.size();
+        this.bucketFieldIndex = businessFieldCount;
+        this.offsetFieldIndex = businessFieldCount + 1;
+        this.timestampFieldIndex = businessFieldCount + 2;
     }
 
     public void setFlussRecord(LogRecord logRecord) {
         this.logRecord = logRecord;
         this.internalRow = logRecord.getRow();
-        this.originRowFieldCount = internalRow.getFieldCount();
-        checkState(
-                originRowFieldCount == tableRowType.getFieldCount() - 
SYSTEM_COLUMNS.size(),
-                "The paimon table fields count must equals to LogRecord's 
fields count.");
+        int flussFieldCount = internalRow.getFieldCount();
+        if (flussFieldCount > businessFieldCount) {
+            // Fluss record is wider than Paimon schema, which means Lake 
schema is not yet
+            // synchronized. With "Lake First" strategy, this should not 
happen in normal cases.
+            throw new IllegalStateException(
+                    String.format(
+                            "Fluss record has %d fields but Paimon schema only 
has %d business fields. "
+                                    + "This indicates the lake schema is not 
yet synchronized. "
+                                    + "Please retry the schema change 
operation.",
+                            flussFieldCount, businessFieldCount));
+        }
+        this.originRowFieldCount = flussFieldCount;
     }
 
     @Override
     public int getFieldCount() {
-        return
-        //  business (including partitions) + system (three system fields: 
bucket, offset,
+        // business (including partitions) + system (three system fields: 
bucket, offset,
         // timestamp)
-        originRowFieldCount + SYSTEM_COLUMNS.size();
+        return tableRowType.getFieldCount();
     }
 
     @Override
@@ -68,28 +82,44 @@ public class FlussRecordAsPaimonRow extends 
FlussRowAsPaimonRow {
         if (pos < originRowFieldCount) {
             return super.isNullAt(pos);
         }
+        if (pos < businessFieldCount) {
+            // Padding NULL for missing business fields when Paimon schema is 
wider than Fluss
+            return true;
+        }
         // is the last three system fields: bucket, offset, timestamp which 
are never null
         return false;
     }
 
     @Override
     public int getInt(int pos) {
-        if (pos == originRowFieldCount) {
+        if (pos == bucketFieldIndex) {
             // bucket system column
             return bucket;
         }
+        if (pos >= originRowFieldCount) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Field %s is NULL because Paimon schema is wider 
than Fluss record.",
+                            pos));
+        }
         return super.getInt(pos);
     }
 
     @Override
     public long getLong(int pos) {
-        if (pos == originRowFieldCount + 1) {
+        if (pos == offsetFieldIndex) {
             //  offset system column
             return logRecord.logOffset();
-        } else if (pos == originRowFieldCount + 2) {
+        } else if (pos == timestampFieldIndex) {
             //  timestamp system column
             return logRecord.timestamp();
         }
+        if (pos >= originRowFieldCount) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Field %s is NULL because Paimon schema is wider 
than Fluss record.",
+                            pos));
+        }
         //  the origin RowData
         return super.getLong(pos);
     }
@@ -97,9 +127,15 @@ public class FlussRecordAsPaimonRow extends 
FlussRowAsPaimonRow {
     @Override
     public Timestamp getTimestamp(int pos, int precision) {
         // it's timestamp system column
-        if (pos == originRowFieldCount + 2) {
+        if (pos == timestampFieldIndex) {
             return Timestamp.fromEpochMillis(logRecord.timestamp());
         }
+        if (pos >= originRowFieldCount) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Field %s is NULL because Paimon schema is wider 
than Fluss record.",
+                            pos));
+        }
         return super.getTimestamp(pos, precision);
     }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
index 8d561957a..ded40ac59 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
@@ -128,6 +128,31 @@ public class PaimonConversions {
                 schemaChanges.add(
                         SchemaChange.removeOption(
                                 
convertFlussPropertyKeyToPaimon(resetOption.getKey())));
+            } else if (tableChange instanceof TableChange.AddColumn) {
+                TableChange.AddColumn addColumn = (TableChange.AddColumn) 
tableChange;
+
+                if (!(addColumn.getPosition() instanceof TableChange.Last)) {
+                    throw new UnsupportedOperationException(
+                            "Only support to add column at last for paimon 
table.");
+                }
+
+                org.apache.fluss.types.DataType flussDataType = 
addColumn.getDataType();
+                if (!flussDataType.isNullable()) {
+                    throw new UnsupportedOperationException(
+                            "Only support to add nullable column for paimon 
table.");
+                }
+
+                org.apache.paimon.types.DataType paimonDataType =
+                        
flussDataType.accept(FlussDataTypeToPaimonDataType.INSTANCE);
+
+                String firstSystemColumnName = 
SYSTEM_COLUMNS.keySet().iterator().next();
+                schemaChanges.add(
+                        SchemaChange.addColumn(
+                                addColumn.getName(),
+                                paimonDataType,
+                                addColumn.getComment(),
+                                SchemaChange.Move.before(
+                                        addColumn.getName(), 
firstSystemColumnName)));
             } else {
                 throw new UnsupportedOperationException(
                         "Unsupported table change: " + tableChange.getClass());
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
index 699553db6..9491918b4 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
@@ -23,7 +23,6 @@ import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.FlussRuntimeException;
-import org.apache.fluss.exception.InvalidAlterTableException;
 import org.apache.fluss.exception.InvalidConfigException;
 import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.exception.LakeTableAlreadyExistException;
@@ -906,11 +905,26 @@ class LakeEnabledTableCreateITCase {
                                 DataTypes.INT(),
                                 "c3 comment",
                                 TableChange.ColumnPosition.last()));
-        assertThatThrownBy(() -> admin.alterTable(tablePath, tableChanges, 
false).get())
-                .cause()
-                .isInstanceOf(InvalidAlterTableException.class)
-                .hasMessage(
-                        "Schema evolution is currently not supported for 
tables with datalake enabled.");
+
+        admin.alterTable(tablePath, tableChanges, false).get();
+
+        Table alteredPaimonTable =
+                paimonCatalog.getTable(Identifier.create(DATABASE, 
tablePath.getTableName()));
+        // Verify the new column c3 with comment was added to Paimon table
+        RowType alteredRowType = alteredPaimonTable.rowType();
+        assertThat(alteredRowType.getFieldCount()).isEqualTo(6);
+        assertThat(alteredRowType.getFieldNames())
+                .containsExactly(
+                        "c1",
+                        "c2",
+                        "c3",
+                        BUCKET_COLUMN_NAME,
+                        OFFSET_COLUMN_NAME,
+                        TIMESTAMP_COLUMN_NAME);
+        // Verify c3 column has the correct type and comment
+        assertThat(alteredRowType.getField("c3").type())
+                .isEqualTo(org.apache.paimon.types.DataTypes.INT());
+        assertThat(alteredRowType.getField("c3").description()).isEqualTo("c3 
comment");
     }
 
     private void verifyPaimonTable(
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
index 4c99afdf7..ea18b85f6 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
@@ -18,6 +18,7 @@
 package org.apache.fluss.lake.paimon;
 
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.InvalidAlterTableException;
 import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext;
 import org.apache.fluss.metadata.Schema;
@@ -34,6 +35,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.util.Collections;
+import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -113,6 +115,134 @@ class PaimonLakeCatalogTest {
                 .hasMessage("Table alter_props_db.non_existing_table does not 
exist.");
     }
 
+    @Test
+    void testAlterTableAddColumnLastNullable() throws Exception {
+        String database = "test_alter_table_add_column_db";
+        String tableName = "test_alter_table_add_column_table";
+        TablePath tablePath = TablePath.of(database, tableName);
+        Identifier identifier = Identifier.create(database, tableName);
+        createTable(database, tableName);
+
+        List<TableChange> changes =
+                Collections.singletonList(
+                        TableChange.addColumn(
+                                "new_col",
+                                DataTypes.INT(),
+                                "new_col comment",
+                                TableChange.ColumnPosition.last()));
+
+        flussPaimonCatalog.alterTable(tablePath, changes, new 
TestingLakeCatalogContext());
+
+        Table table = 
flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
+        assertThat(table.rowType().getFieldNames())
+                .containsSequence(
+                        "id",
+                        "name",
+                        "amount",
+                        "address",
+                        "new_col",
+                        "__bucket",
+                        "__offset",
+                        "__timestamp");
+    }
+
+    @Test
+    void testAlterTableAddColumnNotLast() {
+        String database = "test_alter_table_add_column_not_last_db";
+        String tableName = "test_alter_table_add_column_not_last_table";
+        TablePath tablePath = TablePath.of(database, tableName);
+        createTable(database, tableName);
+
+        List<TableChange> changes =
+                Collections.singletonList(
+                        TableChange.addColumn(
+                                "new_col",
+                                DataTypes.INT(),
+                                null,
+                                TableChange.ColumnPosition.first()));
+
+        assertThatThrownBy(
+                        () ->
+                                flussPaimonCatalog.alterTable(
+                                        tablePath, changes, new 
TestingLakeCatalogContext()))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage("Only support to add column at last for paimon 
table.");
+    }
+
+    @Test
+    void testAlterTableAddColumnNotNullable() {
+        String database = "test_alter_table_add_column_not_nullable_db";
+        String tableName = "test_alter_table_add_column_not_nullable_table";
+        TablePath tablePath = TablePath.of(database, tableName);
+        createTable(database, tableName);
+
+        List<TableChange> changes =
+                Collections.singletonList(
+                        TableChange.addColumn(
+                                "new_col",
+                                DataTypes.INT().copy(false),
+                                null,
+                                TableChange.ColumnPosition.last()));
+
+        assertThatThrownBy(
+                        () ->
+                                flussPaimonCatalog.alterTable(
+                                        tablePath, changes, new 
TestingLakeCatalogContext()))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage("Only support to add nullable column for paimon 
table.");
+    }
+
+    @Test
+    void testAlterTableAddExistingColumn() {
+        String database = "test_alter_table_add_existing_column_db";
+        String tableName = "test_alter_table_add_existing_column_table";
+        TablePath tablePath = TablePath.of(database, tableName);
+        createTable(database, tableName);
+
+        List<TableChange> changes =
+                Collections.singletonList(
+                        TableChange.addColumn(
+                                "address",
+                                DataTypes.STRING(),
+                                null,
+                                TableChange.ColumnPosition.last()));
+
+        // no exception thrown when adding existing column
+        flussPaimonCatalog.alterTable(tablePath, changes, new 
TestingLakeCatalogContext());
+
+        List<TableChange> changes2 =
+                Collections.singletonList(
+                        TableChange.addColumn(
+                                "address",
+                                DataTypes.INT(),
+                                null,
+                                TableChange.ColumnPosition.last()));
+
+        assertThatThrownBy(
+                        () ->
+                                flussPaimonCatalog.alterTable(
+                                        tablePath, changes2, new 
TestingLakeCatalogContext()))
+                .isInstanceOf(InvalidAlterTableException.class)
+                .hasMessage(
+                        "Column 'address' already exists but with different 
type. Existing: STRING, Expected: INT");
+
+        List<TableChange> changes3 =
+                Collections.singletonList(
+                        TableChange.addColumn(
+                                "address",
+                                DataTypes.STRING(),
+                                "the address comment",
+                                TableChange.ColumnPosition.last()));
+
+        assertThatThrownBy(
+                        () ->
+                                flussPaimonCatalog.alterTable(
+                                        tablePath, changes3, new 
TestingLakeCatalogContext()))
+                .isInstanceOf(InvalidAlterTableException.class)
+                .hasMessage(
+                        "Column address already exists but with different 
comment. Existing: null, Expected: the address comment");
+    }
+
     private void createTable(String database, String tableName) {
         Schema flussSchema =
                 Schema.newBuilder()
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java
index 79b548966..369fc7498 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java
@@ -40,6 +40,7 @@ import static org.apache.fluss.record.ChangeType.INSERT;
 import static org.apache.fluss.record.ChangeType.UPDATE_AFTER;
 import static org.apache.fluss.record.ChangeType.UPDATE_BEFORE;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link FlussRecordAsPaimonRow}. */
 class FlussRecordAsPaimonRowTest {
@@ -545,4 +546,64 @@ class FlussRecordAsPaimonRowTest {
         assertThat(array).isNotNull();
         assertThat(array.size()).isEqualTo(0);
     }
+
+    @Test
+    void testPaimonSchemaWiderThanFlussRecord() {
+        int tableBucket = 0;
+        RowType tableRowType =
+                RowType.of(
+                        new org.apache.paimon.types.BooleanType(),
+                        new org.apache.paimon.types.VarCharType(),
+                        // append three system columns: __bucket, 
__offset,__timestamp
+                        new org.apache.paimon.types.IntType(),
+                        new org.apache.paimon.types.BigIntType(),
+                        new 
org.apache.paimon.types.LocalZonedTimestampType(3));
+
+        FlussRecordAsPaimonRow flussRecordAsPaimonRow =
+                new FlussRecordAsPaimonRow(tableBucket, tableRowType);
+
+        long logOffset = 7L;
+        long timeStamp = System.currentTimeMillis();
+        GenericRow genericRow = new GenericRow(1);
+        genericRow.setField(0, true);
+        LogRecord logRecord = new GenericRecord(logOffset, timeStamp, 
APPEND_ONLY, genericRow);
+        flussRecordAsPaimonRow.setFlussRecord(logRecord);
+
+        assertThat(flussRecordAsPaimonRow.getFieldCount()).isEqualTo(5);
+
+        assertThat(flussRecordAsPaimonRow.getBoolean(0)).isTrue();
+        assertThat(flussRecordAsPaimonRow.isNullAt(1)).isTrue();
+        assertThat(flussRecordAsPaimonRow.getInt(2)).isEqualTo(tableBucket);
+        assertThat(flussRecordAsPaimonRow.getLong(3)).isEqualTo(logOffset);
+        assertThat(flussRecordAsPaimonRow.getTimestamp(4, 3))
+                .isEqualTo(Timestamp.fromEpochMillis(timeStamp));
+    }
+
+    @Test
+    void testFlussRecordWiderThanPaimonSchema() {
+        int tableBucket = 0;
+        RowType tableRowType =
+                RowType.of(
+                        new org.apache.paimon.types.BooleanType(),
+                        // append three system columns: __bucket, 
__offset,__timestamp
+                        new org.apache.paimon.types.IntType(),
+                        new org.apache.paimon.types.BigIntType(),
+                        new 
org.apache.paimon.types.LocalZonedTimestampType(3));
+
+        FlussRecordAsPaimonRow flussRecordAsPaimonRow =
+                new FlussRecordAsPaimonRow(tableBucket, tableRowType);
+
+        long logOffset = 7L;
+        long timeStamp = System.currentTimeMillis();
+        GenericRow genericRow = new GenericRow(2);
+        genericRow.setField(0, true);
+        genericRow.setField(1, BinaryString.fromString("extra"));
+        LogRecord logRecord = new GenericRecord(logOffset, timeStamp, 
APPEND_ONLY, genericRow);
+
+        // Should throw exception instead of silently truncating data
+        assertThatThrownBy(() -> 
flussRecordAsPaimonRow.setFlussRecord(logRecord))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining(
+                        "Fluss record has 2 fields but Paimon schema only has 
1 business fields");
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index d725038d6..73aba60cb 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -559,8 +559,9 @@ class PaimonTieringITCase extends 
FlinkPaimonTieringTestBase {
             InternalRow flussRow = flussRowIterator.next();
             assertThat(row.getInt(0)).isEqualTo(flussRow.getInt(0));
             
assertThat(row.getString(1).toString()).isEqualTo(flussRow.getString(1).toString());
-            // the idx 2 is __bucket, so use 3
-            assertThat(row.getLong(3)).isEqualTo(startingOffset++);
+            // system columns are always the last three: __bucket, __offset, 
__timestamp
+            int offsetIndex = row.getFieldCount() - 2;
+            assertThat(row.getLong(offsetIndex)).isEqualTo(startingOffset++);
         }
         assertThat(flussRowIterator.hasNext()).isFalse();
     }
@@ -603,6 +604,75 @@ class PaimonTieringITCase extends 
FlinkPaimonTieringTestBase {
         return reader.toCloseableIterator();
     }
 
+    @Test
+    void testTieringWithAddColumn() throws Exception {
+        // Test ADD COLUMN during tiering with "Lake First" strategy
+
+        // 1. Create a datalake enabled table with initial schema (c1: INT, 
c2: STRING)
+        TablePath tablePath = TablePath.of(DEFAULT_DB, "addColumnTable");
+        long tableId = createLogTable(tablePath);
+        TableBucket tableBucket = new TableBucket(tableId, 0);
+
+        // 2. Write initial data before ADD COLUMN
+        List<InternalRow> initialRows = Arrays.asList(row(1, "v1"), row(2, 
"v2"), row(3, "v3"));
+        writeRows(tablePath, initialRows, true);
+
+        // 3. Start tiering job
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        try {
+            // 4. Wait for initial data to be tiered
+            assertReplicaStatus(tableBucket, 3);
+
+            // 5. Execute ADD COLUMN (c3: INT, nullable)
+            List<TableChange> addColumnChanges =
+                    Collections.singletonList(
+                            TableChange.addColumn(
+                                    "c3",
+                                    DataTypes.INT(),
+                                    "new column",
+                                    TableChange.ColumnPosition.last()));
+            admin.alterTable(tablePath, addColumnChanges, false).get();
+
+            // 6. Write more data after ADD COLUMN (with new column value)
+            // schema now has 3 business columns (c1, c2, c3), so provide 
value for the new column
+            List<InternalRow> newRows =
+                    Arrays.asList(row(4, "v4", 40), row(5, "v5", 50), row(6, 
"v6", 60));
+            writeRows(tablePath, newRows, true);
+
+            // 7. Wait for new data to be tiered
+            assertReplicaStatus(tableBucket, 6);
+
+            // 8. Verify Paimon table has the new column with exact field 
names and order
+            Identifier tableIdentifier =
+                    Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName());
+            FileStoreTable paimonTable = (FileStoreTable) 
paimonCatalog.getTable(tableIdentifier);
+            List<String> fieldNames = paimonTable.rowType().getFieldNames();
+
+            // Should have exact fields in order: a, b, c3, __bucket, 
__offset, __timestamp
+            assertThat(fieldNames)
+                    .containsExactly("a", "b", "c3", "__bucket", "__offset", 
"__timestamp");
+
+            // 9. Verify both schema evolution and data correctness
+            // For initial rows (before ADD COLUMN), c3 should be NULL
+            // For new rows (after ADD COLUMN), c3 should have the provided 
values
+            List<InternalRow> expectedRows = new ArrayList<>();
+            // Initial rows with NULL for c3
+            expectedRows.add(row(1, "v1", null));
+            expectedRows.add(row(2, "v2", null));
+            expectedRows.add(row(3, "v3", null));
+            // New rows with c3 values
+            expectedRows.add(row(4, "v4", 40));
+            expectedRows.add(row(5, "v5", 50));
+            expectedRows.add(row(6, "v6", 60));
+
+            checkDataInPaimonAppendOnlyTable(tablePath, expectedRows, 0);
+
+        } finally {
+            jobClient.cancel().get();
+        }
+    }
+
     @Override
     protected FlussClusterExtension getFlussClusterExtension() {
         return FLUSS_CLUSTER_EXTENSION;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 3d5811207..4239b6167 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -328,14 +328,21 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                             + "table properties or table schema.");
         }
 
+        LakeCatalogDynamicLoader.LakeCatalogContainer lakeCatalogContainer =
+                lakeCatalogDynamicLoader.getLakeCatalogContainer();
+        LakeCatalog.Context lakeCatalogContext =
+                new DefaultLakeCatalogContext(false, 
currentSession().getPrincipal());
+
         if (!alterSchemaChanges.isEmpty()) {
             metadataManager.alterTableSchema(
-                    tablePath, alterSchemaChanges, 
request.isIgnoreIfNotExists());
+                    tablePath,
+                    alterSchemaChanges,
+                    request.isIgnoreIfNotExists(),
+                    lakeCatalogContainer.getLakeCatalog(),
+                    lakeCatalogContext);
         }
 
         if (!alterTableConfigChanges.isEmpty()) {
-            LakeCatalogDynamicLoader.LakeCatalogContainer lakeCatalogContainer 
=
-                    lakeCatalogDynamicLoader.getLakeCatalogContainer();
             metadataManager.alterTableProperties(
                     tablePath,
                     alterTableConfigChanges,
@@ -343,7 +350,7 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                     request.isIgnoreIfNotExists(),
                     lakeCatalogContainer.getLakeCatalog(),
                     lakeTableTieringManager,
-                    new DefaultLakeCatalogContext(false, 
currentSession().getPrincipal()));
+                    lakeCatalogContext);
         }
 
         return CompletableFuture.completedFuture(new AlterTableResponse());
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index a2fc6cf3d..9d5dc3479 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -322,24 +322,33 @@ public class MetadataManager {
     }
 
     public void alterTableSchema(
-            TablePath tablePath, List<TableChange> schemaChanges, boolean 
ignoreIfNotExists)
+            TablePath tablePath,
+            List<TableChange> schemaChanges,
+            boolean ignoreIfNotExists,
+            @Nullable LakeCatalog lakeCatalog,
+            LakeCatalog.Context lakeCatalogContext)
             throws TableNotExistException, TableNotPartitionedException {
         try {
 
             TableInfo table = getTable(tablePath);
 
-            // TODO: remote this after lake enable table support schema 
evolution, track by
-            // https://github.com/apache/fluss/issues/2128
-            if (table.getTableConfig().isDataLakeEnabled()) {
-                throw new InvalidAlterTableException(
-                        "Schema evolution is currently not supported for 
tables with datalake enabled.");
-            }
-
             // validate the table column changes
             if (!schemaChanges.isEmpty()) {
                 Schema newSchema = SchemaUpdate.applySchemaChanges(table, 
schemaChanges);
-                // update the schema
-                zookeeperClient.registerSchema(tablePath, newSchema, 
table.getSchemaId() + 1);
+
+                // Lake First: sync to Lake before updating Fluss schema
+                syncSchemaChangesToLake(
+                        tablePath, table, schemaChanges, lakeCatalog, 
lakeCatalogContext);
+
+                // Update Fluss schema (ZK) after Lake sync succeeds
+                if (!newSchema.equals(table.getSchema())) {
+                    zookeeperClient.registerSchema(tablePath, newSchema, 
table.getSchemaId() + 1);
+                } else {
+                    LOG.info(
+                            "Skipping schema evolution for table {} because 
the column(s) to add {} already exist.",
+                            tablePath,
+                            schemaChanges);
+                }
             }
         } catch (Exception e) {
             if (e instanceof TableNotExistException) {
@@ -355,6 +364,34 @@ public class MetadataManager {
         }
     }
 
+    private void syncSchemaChangesToLake(
+            TablePath tablePath,
+            TableInfo tableInfo,
+            List<TableChange> schemaChanges,
+            @Nullable LakeCatalog lakeCatalog,
+            LakeCatalog.Context lakeCatalogContext) {
+        if (!isDataLakeEnabled(tableInfo.toTableDescriptor())) {
+            return;
+        }
+
+        if (lakeCatalog == null) {
+            throw new InvalidAlterTableException(
+                    "Cannot alter schema for datalake enabled table "
+                            + tablePath
+                            + ", because the Fluss cluster doesn't enable 
datalake tables.");
+        }
+
+        try {
+            lakeCatalog.alterTable(tablePath, schemaChanges, 
lakeCatalogContext);
+        } catch (TableNotExistException e) {
+            throw new FlussRuntimeException(
+                    "Lake table doesn't exist for lake-enabled table "
+                            + tablePath
+                            + ", which shouldn't happen. Please check if the 
lake table was deleted manually.",
+                    e);
+        }
+    }
+
     public void alterTableProperties(
             TablePath tablePath,
             List<TableChange> tableChanges,
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
index 9efab2a51..3162e4bed 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /** Schema update. */
@@ -83,9 +84,16 @@ public class SchemaUpdate {
     }
 
     private SchemaUpdate addColumn(TableChange.AddColumn addColumn) {
-        if (existedColumns.containsKey(addColumn.getName())) {
-            throw new IllegalArgumentException(
-                    "Column " + addColumn.getName() + " already exists.");
+        Schema.Column existingColumn = existedColumns.get(addColumn.getName());
+        if (existingColumn != null) {
+            // Allow idempotent retries: if column name/type/comment match 
existing, treat as no-op
+            if (!existingColumn.getDataType().equals(addColumn.getDataType())
+                    || !Objects.equals(
+                            existingColumn.getComment().orElse(null), 
addColumn.getComment())) {
+                throw new IllegalArgumentException(
+                        "Column " + addColumn.getName() + " already exists.");
+            }
+            return this;
         }
 
         TableChange.ColumnPosition position = addColumn.getPosition();
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
index 5e99b0023..dcfa6b5b3 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
@@ -1230,7 +1230,7 @@ class CoordinatorEventProcessorTest {
     }
 
     private void alterTable(TablePath tablePath, List<TableChange> 
schemaChanges) {
-        metadataManager.alterTableSchema(tablePath, schemaChanges, true);
+        metadataManager.alterTableSchema(tablePath, schemaChanges, true, null, 
null);
     }
 
     private TableDescriptor getPartitionedTable() {
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java
index 0ed0c9d95..bb9a25d1d 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java
@@ -301,7 +301,9 @@ class TableChangeWatcherTest {
                                     DataTypes.INT(),
                                     null,
                                     TableChange.ColumnPosition.last())),
-                    false);
+                    false,
+                    null,
+                    null);
             Schema newSchema =
                     Schema.newBuilder()
                             .fromSchema(tableInfo.getSchema())


Reply via email to