This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 96faf08cb [lake/paimon] Compare paimon schema and Fluss schema before 
alter table. (#2331)
96faf08cb is described below

commit 96faf08cbd84d1d43a2ef610c189829fa9c34e76
Author: Hongshun Wang <[email protected]>
AuthorDate: Thu Jan 29 14:30:22 2026 +0800

    [lake/paimon] Compare paimon schema and Fluss schema before alter table. 
(#2331)
---
 .../fluss/client/admin/FlussAdminITCase.java       |  16 ++
 .../apache/fluss/lake/lakestorage/LakeCatalog.java |  14 ++
 .../lakestorage/TestingLakeCatalogContext.java     |  27 +++
 .../fluss/lake/paimon/PaimonLakeCatalog.java       | 136 +++++---------
 .../fluss/lake/paimon/utils/PaimonConversions.java |   3 +-
 .../lake/paimon/utils/PaimonTableValidation.java   |  43 +++--
 .../fluss/lake/paimon/PaimonLakeCatalogTest.java   | 202 +++++++++++++++++----
 .../server/coordinator/CoordinatorService.java     |  36 +++-
 .../fluss/server/coordinator/MetadataManager.java  |  27 ++-
 .../fluss/server/coordinator/SchemaUpdate.java     |  22 +--
 10 files changed, 355 insertions(+), 171 deletions(-)

diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index 5560e14ee..5dff27b58 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -436,6 +436,22 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
                         DataTypeChecks.equalsWithFieldId(
                                 schemaInfo.getSchema().getRowType(), 
expectedSchema.getRowType()))
                 .isTrue();
+
+        assertThatThrownBy(
+                        () ->
+                                admin.alterTable(
+                                                tablePath,
+                                                Collections.singletonList(
+                                                        TableChange.addColumn(
+                                                                "nested_row",
+                                                                DataTypes.ROW(
+                                                                        
DataTypes.STRING(),
+                                                                        
DataTypes.INT()),
+                                                                "new nested 
column",
+                                                                
TableChange.ColumnPosition.last())),
+                                                false)
+                                        .get())
+                .hasMessageContaining("Column nested_row already exists");
     }
 
     @Test
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java 
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
index 4cbccb6c1..7fa2f3ea1 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
@@ -25,6 +25,8 @@ import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.security.acl.FlussPrincipal;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 
 /**
@@ -84,5 +86,17 @@ public interface LakeCatalog extends AutoCloseable {
 
         /** Get the fluss principal currently accessing the catalog. */
         FlussPrincipal getFlussPrincipal();
+
+        /**
+         * Get the current table info of fluss.
+         *
+         * @return the current table info of fluss. Null if the table does not 
exist.
+         * @since 0.10
+         */
+        @Nullable
+        TableDescriptor getCurrentTable();
+
+        /** Get the expected table info of fluss. */
+        TableDescriptor getExpectedTable();
     }
 }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
 
b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
index 7406b13b6..d3efd712e 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
@@ -17,11 +17,28 @@
 
 package org.apache.fluss.lake.lakestorage;
 
+import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.security.acl.FlussPrincipal;
 
 /** A testing implementation of {@link LakeCatalog.Context}. */
 public class TestingLakeCatalogContext implements LakeCatalog.Context {
 
+    private final TableDescriptor currentTable;
+    private final TableDescriptor expectedTable;
+
+    public TestingLakeCatalogContext(TableDescriptor tableDescriptor) {
+        this(tableDescriptor, tableDescriptor);
+    }
+
+    public TestingLakeCatalogContext(TableDescriptor currentTable, 
TableDescriptor expectedTable) {
+        this.currentTable = currentTable;
+        this.expectedTable = expectedTable;
+    }
+
+    public TestingLakeCatalogContext() {
+        this(null);
+    }
+
     @Override
     public boolean isCreatingFlussTable() {
         return false;
@@ -31,4 +48,14 @@ public class TestingLakeCatalogContext implements 
LakeCatalog.Context {
     public FlussPrincipal getFlussPrincipal() {
         return null;
     }
+
+    @Override
+    public TableDescriptor getCurrentTable() {
+        return currentTable;
+    }
+
+    @Override
+    public TableDescriptor getExpectedTable() {
+        return expectedTable;
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
index 500546e64..6548b7e26 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
@@ -44,12 +44,13 @@ import org.slf4j.LoggerFactory;
 
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
 import static 
org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema;
 import static 
org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges;
 import static 
org.apache.fluss.lake.paimon.utils.PaimonTableValidation.checkTableIsEmpty;
-import static 
org.apache.fluss.lake.paimon.utils.PaimonTableValidation.validatePaimonSchemaCompatible;
+import static 
org.apache.fluss.lake.paimon.utils.PaimonTableValidation.isPaimonSchemaCompatible;
 import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
 import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
 import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
@@ -111,94 +112,51 @@ public class PaimonLakeCatalog implements LakeCatalog {
     @Override
     public void alterTable(TablePath tablePath, List<TableChange> 
tableChanges, Context context)
             throws TableNotExistException {
-        try {
-            List<SchemaChange> paimonSchemaChanges = 
toPaimonSchemaChanges(tableChanges);
-
-            // Compare current Paimon table schema with expected target schema 
before altering
-            if (shouldAlterTable(tablePath, tableChanges)) {
-                alterTable(tablePath, paimonSchemaChanges);
-            } else {
-                // If schemas already match, treat as idempotent success
-                LOG.info(
-                        "Skipping schema evolution for Paimon table {} because 
the column(s) to add {} already exist.",
-                        tablePath,
-                        tableChanges);
-            }
-        } catch (Catalog.ColumnAlreadyExistException e) {
-            // This shouldn't happen if shouldAlterTable works correctly, but 
keep as safeguard
-            throw new InvalidAlterTableException(e.getMessage());
-        } catch (Catalog.ColumnNotExistException e) {
-            // This shouldn't happen for AddColumn operations
-            throw new InvalidAlterTableException(e.getMessage());
-        }
-    }
-
-    private boolean shouldAlterTable(TablePath tablePath, List<TableChange> 
tableChanges)
-            throws TableNotExistException {
         try {
             Table table = paimonCatalog.getTable(toPaimon(tablePath));
             FileStoreTable fileStoreTable = (FileStoreTable) table;
-            Schema currentSchema = fileStoreTable.schema().toSchema();
-
-            for (TableChange change : tableChanges) {
-                if (change instanceof TableChange.AddColumn) {
-                    TableChange.AddColumn addColumn = (TableChange.AddColumn) 
change;
-                    if (!isColumnAlreadyExists(currentSchema, addColumn)) {
-                        return true;
-                    }
-                } else {
-                    return true;
-                }
+            Schema currentPaimonSchema = fileStoreTable.schema().toSchema();
+
+            List<SchemaChange> paimonSchemaChanges;
+            if (isPaimonSchemaCompatible(
+                    currentPaimonSchema, 
toPaimonSchema(context.getCurrentTable()))) {
+                // if the paimon schema is same as current fluss schema, 
directly apply all the
+                // changes.
+                paimonSchemaChanges = toPaimonSchemaChanges(tableChanges);
+            } else if (isPaimonSchemaCompatible(
+                    currentPaimonSchema, 
toPaimonSchema(context.getExpectedTable()))) {
+                // if the schema is same as applied fluss schema , skip adding 
columns.
+                paimonSchemaChanges =
+                        toPaimonSchemaChanges(
+                                tableChanges.stream()
+                                        .filter(
+                                                tableChange ->
+                                                        !(tableChange
+                                                                instanceof 
TableChange.AddColumn))
+                                        .collect(Collectors.toList()));
+            } else {
+                throw new InvalidAlterTableException(
+                        String.format(
+                                "Paimon schema is not compatible with Fluss 
schema: "
+                                        + "Paimon schema: %s, Fluss schema: 
%s. "
+                                        + "therefore you need to add the diff 
columns all at once, "
+                                        + "rather than applying other table 
changes: %s.",
+                                currentPaimonSchema,
+                                context.getCurrentTable().getSchema(),
+                                tableChanges));
             }
 
-            return false;
+            if (!paimonSchemaChanges.isEmpty()) {
+                paimonCatalog.alterTable(toPaimon(tablePath), 
paimonSchemaChanges, false);
+            }
+        } catch (Catalog.ColumnAlreadyExistException | 
Catalog.ColumnNotExistException e) {
+            // This shouldn't happen for AddColumn operations
+            throw new InvalidAlterTableException(e.getMessage());
         } catch (Catalog.TableNotExistException e) {
             throw new TableNotExistException("Table " + tablePath + " does not 
exist.");
         }
     }
 
-    private boolean isColumnAlreadyExists(Schema currentSchema, 
TableChange.AddColumn addColumn) {
-        String columnName = addColumn.getName();
-
-        for (org.apache.paimon.types.DataField field : currentSchema.fields()) 
{
-            if (field.name().equals(columnName)) {
-                org.apache.paimon.types.DataType expectedType =
-                        addColumn
-                                .getDataType()
-                                .accept(
-                                        org.apache.fluss.lake.paimon.utils
-                                                
.FlussDataTypeToPaimonDataType.INSTANCE);
-
-                if (!field.type().equals(expectedType)) {
-                    throw new InvalidAlterTableException(
-                            String.format(
-                                    "Column '%s' already exists but with 
different type. "
-                                            + "Existing: %s, Expected: %s",
-                                    columnName, field.type(), expectedType));
-                }
-                String existingComment = field.description();
-                String expectedComment = addColumn.getComment();
-
-                boolean commentsMatch =
-                        (existingComment == null && expectedComment == null)
-                                || (existingComment != null
-                                        && 
existingComment.equals(expectedComment));
-
-                if (!commentsMatch) {
-                    throw new InvalidAlterTableException(
-                            String.format(
-                                    "Column %s already exists but with 
different comment. "
-                                            + "Existing: %s, Expected: %s",
-                                    columnName, existingComment, 
expectedComment));
-                }
-
-                return true;
-            }
-        }
-
-        return false;
-    }
-
     private void createTable(TablePath tablePath, Schema schema, boolean 
isCreatingFlussTable)
             throws Catalog.DatabaseNotExistException {
         Identifier paimonPath = toPaimon(tablePath);
@@ -209,8 +167,15 @@ public class PaimonLakeCatalog implements LakeCatalog {
             try {
                 Table table = paimonCatalog.getTable(paimonPath);
                 FileStoreTable fileStoreTable = (FileStoreTable) table;
-                validatePaimonSchemaCompatible(
-                        paimonPath, fileStoreTable.schema().toSchema(), 
schema);
+                Schema existingSchema = fileStoreTable.schema().toSchema();
+                if (!isPaimonSchemaCompatible(existingSchema, schema)) {
+                    throw new TableAlreadyExistException(
+                            String.format(
+                                    "The table %s already exists in Paimon 
catalog, but the table schema is not compatible. "
+                                            + "Existing schema: %s, new 
schema: %s. "
+                                            + "Please first drop the table in 
Paimon catalog or use a new table name.",
+                                    paimonPath.getEscapedFullName(), 
existingSchema, schema));
+                }
                 // if creating a new fluss table, we should ensure the lake 
table is empty
                 if (isCreatingFlussTable) {
                     checkTableIsEmpty(tablePath, fileStoreTable);
@@ -237,15 +202,6 @@ public class PaimonLakeCatalog implements LakeCatalog {
         }
     }
 
-    private void alterTable(TablePath tablePath, List<SchemaChange> 
tableChanges)
-            throws Catalog.ColumnAlreadyExistException, 
Catalog.ColumnNotExistException {
-        try {
-            paimonCatalog.alterTable(toPaimon(tablePath), tableChanges, false);
-        } catch (Catalog.TableNotExistException e) {
-            throw new TableNotExistException("Table " + tablePath + " does not 
exist.");
-        }
-    }
-
     @Override
     public void close() {
         IOUtils.closeQuietly(paimonCatalog, "paimon catalog");
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
index ded40ac59..ee6d0f0d7 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
@@ -57,6 +57,7 @@ public class PaimonConversions {
 
     // for fluss config
     public static final String FLUSS_CONF_PREFIX = "fluss.";
+    public static final String TABLE_DATALAKE_PAIMON_PREFIX = 
"table.datalake.paimon.";
     // for paimon config
     private static final String PAIMON_CONF_PREFIX = "paimon.";
 
@@ -261,7 +262,7 @@ public class PaimonConversions {
     private static void setFlussPropertyToPaimon(String key, String value, 
Options options) {
         if (key.startsWith(PAIMON_CONF_PREFIX)) {
             options.set(key.substring(PAIMON_CONF_PREFIX.length()), value);
-        } else {
+        } else if (!key.startsWith(TABLE_DATALAKE_PAIMON_PREFIX)) {
             options.set(FLUSS_CONF_PREFIX + key, value);
         }
     }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
index 94580df17..a15feefeb 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
@@ -21,7 +21,6 @@ import org.apache.fluss.exception.TableAlreadyExistException;
 import org.apache.fluss.metadata.TablePath;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
@@ -34,6 +33,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 import static 
org.apache.fluss.lake.paimon.utils.PaimonConversions.FLUSS_CONF_PREFIX;
 import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
@@ -43,8 +43,7 @@ public class PaimonTableValidation {
 
     private static final Map<String, ConfigOption<?>> PAIMON_CONFIGS = 
extractPaimonConfigs();
 
-    public static void validatePaimonSchemaCompatible(
-            Identifier tablePath, Schema existingSchema, Schema newSchema) {
+    public static boolean isPaimonSchemaCompatible(Schema existingSchema, 
Schema newSchema) {
         // Adjust options for comparison
         Map<String, String> existingOptions = existingSchema.options();
         Map<String, String> newOptions = newSchema.options();
@@ -66,21 +65,16 @@ public class PaimonTableValidation {
         // ignore the existing options that are not in new options
         existingOptions.entrySet().removeIf(entry -> 
!newOptions.containsKey(entry.getKey()));
 
-        if (!existingSchema.equals(newSchema)) {
+        // ignore the fields because newSchema is referred by fluss schema, 
whose field id maybe not
+        // same as existingSchema.
+        if (!equalIgnoreFieldId(existingSchema, newSchema)) {
             // Allow different precisions for __timestamp column for backward 
compatibility,
             // old cluster will use precision 6, but new cluster will use 
precision 3,
             // we allow such precision difference
-            if (equalIgnoreSystemColumnTimestampPrecision(existingSchema, 
newSchema)) {
-                return;
-            }
-
-            throw new TableAlreadyExistException(
-                    String.format(
-                            "The table %s already exists in Paimon catalog, 
but the table schema is not compatible. "
-                                    + "Existing schema: %s, new schema: %s. "
-                                    + "Please first drop the table in Paimon 
catalog or use a new table name.",
-                            tablePath.getEscapedFullName(), existingSchema, 
newSchema));
+            return equalIgnoreSystemColumnTimestampPrecision(existingSchema, 
newSchema);
         }
+
+        return true;
     }
 
     /**
@@ -96,7 +90,7 @@ public class PaimonTableValidation {
      * @param newSchema the new schema descriptor generated by the current 
Fluss cluster
      * @return true if the schemas are identical, disregarding the precision 
of the system timestamp
      */
-    private static boolean equalIgnoreSystemColumnTimestampPrecision(
+    public static boolean equalIgnoreSystemColumnTimestampPrecision(
             Schema existingSchema, Schema newSchema) {
         List<DataField> existingFields = new 
ArrayList<>(existingSchema.fields());
         DataField systemTimestampField = 
existingFields.get(existingFields.size() - 1);
@@ -113,7 +107,24 @@ public class PaimonTableValidation {
                             systemTimestampField.description()));
         }
         existingSchema = 
existingSchema.copy(RowType.of(existingFields.toArray(new DataField[0])));
-        return existingSchema.equals(newSchema);
+        return equalIgnoreFieldId(existingSchema, newSchema);
+    }
+
+    private static boolean equalIgnoreFieldId(Schema existingSchema, Schema 
newSchema) {
+        List<DataField> existingFields = existingSchema.fields();
+        List<DataField> newFields = newSchema.fields();
+        for (int i = 0; i < existingFields.size(); i++) {
+            DataField existingField = existingFields.get(i);
+            DataField newField = newFields.get(i);
+            if (!existingField.equalsIgnoreFieldId(newField)) {
+                return false;
+            }
+        }
+
+        return Objects.equals(existingSchema.partitionKeys(), 
newSchema.partitionKeys())
+                && Objects.equals(existingSchema.primaryKeys(), 
newSchema.primaryKeys())
+                && Objects.equals(existingSchema.options(), 
newSchema.options())
+                && Objects.equals(existingSchema.comment(), 
newSchema.comment());
     }
 
     private static void removeChangeableOptions(Map<String, String> options) {
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
index ea18b85f6..b56ba17cb 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
@@ -25,35 +25,57 @@ import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.coordinator.SchemaUpdate;
 import org.apache.fluss.types.DataTypes;
 
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED;
+import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
+import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Unit test for {@link PaimonLakeCatalog}. */
 class PaimonLakeCatalogTest {
+    private static final Schema FLUSS_SCHEMA =
+            Schema.newBuilder()
+                    .column("id", DataTypes.BIGINT())
+                    .column("name", DataTypes.STRING())
+                    .column("amount", DataTypes.INT())
+                    .column("address", DataTypes.STRING())
+                    .build();
+    private static final TestingLakeCatalogContext LAKE_CATALOG_CONTEXT =
+            new 
TestingLakeCatalogContext(TableDescriptor.builder().schema(FLUSS_SCHEMA).build());
 
     @TempDir private File tempWarehouseDir;
 
     private PaimonLakeCatalog flussPaimonCatalog;
 
     @BeforeEach
-    public void setUp() {
+    void setUp() {
         Configuration configuration = new Configuration();
         configuration.setString("warehouse", 
tempWarehouseDir.toURI().toString());
         flussPaimonCatalog = new PaimonLakeCatalog(configuration);
     }
 
+    @AfterEach
+    void cleanup() {
+        flussPaimonCatalog.close();
+        setUp();
+    }
+
     @Test
     void testAlterTableProperties() throws Exception {
         String database = "test_alter_table_properties_db";
@@ -70,7 +92,7 @@ class PaimonLakeCatalogTest {
         flussPaimonCatalog.alterTable(
                 tablePath,
                 Collections.singletonList(TableChange.set("key", "value")),
-                new TestingLakeCatalogContext());
+                LAKE_CATALOG_CONTEXT);
 
         table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
         // we have set the value for key
@@ -80,7 +102,7 @@ class PaimonLakeCatalogTest {
         flussPaimonCatalog.alterTable(
                 tablePath,
                 Collections.singletonList(TableChange.reset("key")),
-                new TestingLakeCatalogContext());
+                LAKE_CATALOG_CONTEXT);
 
         table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
         // we have reset the value for key
@@ -89,14 +111,13 @@ class PaimonLakeCatalogTest {
 
     @Test
     void alterTablePropertiesWithNonExistentTable() {
-        TestingLakeCatalogContext context = new TestingLakeCatalogContext();
         // db & table don't exist
         assertThatThrownBy(
                         () ->
                                 flussPaimonCatalog.alterTable(
                                         TablePath.of("non_existing_db", 
"non_existing_table"),
                                         
Collections.singletonList(TableChange.set("key", "value")),
-                                        context))
+                                        LAKE_CATALOG_CONTEXT))
                 .isInstanceOf(TableNotExistException.class)
                 .hasMessage("Table non_existing_db.non_existing_table does not 
exist.");
 
@@ -110,7 +131,7 @@ class PaimonLakeCatalogTest {
                                 flussPaimonCatalog.alterTable(
                                         TablePath.of(database, 
"non_existing_table"),
                                         
Collections.singletonList(TableChange.set("key", "value")),
-                                        context))
+                                        LAKE_CATALOG_CONTEXT))
                 .isInstanceOf(TableNotExistException.class)
                 .hasMessage("Table alter_props_db.non_existing_table does not 
exist.");
     }
@@ -131,7 +152,7 @@ class PaimonLakeCatalogTest {
                                 "new_col comment",
                                 TableChange.ColumnPosition.last()));
 
-        flussPaimonCatalog.alterTable(tablePath, changes, new 
TestingLakeCatalogContext());
+        flussPaimonCatalog.alterTable(tablePath, changes, 
LAKE_CATALOG_CONTEXT);
 
         Table table = 
flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
         assertThat(table.rowType().getFieldNames())
@@ -164,7 +185,7 @@ class PaimonLakeCatalogTest {
         assertThatThrownBy(
                         () ->
                                 flussPaimonCatalog.alterTable(
-                                        tablePath, changes, new 
TestingLakeCatalogContext()))
+                                        tablePath, changes, 
LAKE_CATALOG_CONTEXT))
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessage("Only support to add column at last for paimon 
table.");
     }
@@ -187,18 +208,17 @@ class PaimonLakeCatalogTest {
         assertThatThrownBy(
                         () ->
                                 flussPaimonCatalog.alterTable(
-                                        tablePath, changes, new 
TestingLakeCatalogContext()))
+                                        tablePath, changes, 
LAKE_CATALOG_CONTEXT))
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessage("Only support to add nullable column for paimon 
table.");
     }
 
     @Test
-    void testAlterTableAddExistingColumn() {
+    void testAlterTableAddExistingColumns() throws Exception {
         String database = "test_alter_table_add_existing_column_db";
         String tableName = "test_alter_table_add_existing_column_table";
         TablePath tablePath = TablePath.of(database, tableName);
         createTable(database, tableName);
-
         List<TableChange> changes =
                 Collections.singletonList(
                         TableChange.addColumn(
@@ -207,13 +227,40 @@ class PaimonLakeCatalogTest {
                                 null,
                                 TableChange.ColumnPosition.last()));
 
-        // no exception thrown when adding existing column
-        flussPaimonCatalog.alterTable(tablePath, changes, new 
TestingLakeCatalogContext());
+        assertThatThrownBy(
+                        () ->
+                                flussPaimonCatalog.alterTable(
+                                        tablePath,
+                                        changes,
+                                        getLakeCatalogContext(FLUSS_SCHEMA, 
changes)))
+                .isInstanceOf(InvalidAlterTableException.class)
+                .hasMessageContaining("Column address already exists");
 
         List<TableChange> changes2 =
-                Collections.singletonList(
+                Arrays.asList(
                         TableChange.addColumn(
-                                "address",
+                                "new_column",
+                                DataTypes.INT(),
+                                null,
+                                TableChange.ColumnPosition.last()),
+                        TableChange.addColumn(
+                                "new_column2",
+                                DataTypes.STRING(),
+                                null,
+                                TableChange.ColumnPosition.last()));
+
+        // mock add columns to paimon successfully but fail to add columns to 
fluss.
+        flussPaimonCatalog.alterTable(
+                tablePath, changes2, getLakeCatalogContext(FLUSS_SCHEMA, 
changes2));
+        List<TableChange> changes3 =
+                Arrays.asList(
+                        TableChange.addColumn(
+                                "new_column",
+                                DataTypes.INT(),
+                                null,
+                                TableChange.ColumnPosition.last()),
+                        TableChange.addColumn(
+                                "new_column2",
                                 DataTypes.INT(),
                                 null,
                                 TableChange.ColumnPosition.last()));
@@ -221,15 +268,25 @@ class PaimonLakeCatalogTest {
         assertThatThrownBy(
                         () ->
                                 flussPaimonCatalog.alterTable(
-                                        tablePath, changes2, new 
TestingLakeCatalogContext()))
+                                        tablePath,
+                                        changes3,
+                                        getLakeCatalogContext(FLUSS_SCHEMA, 
changes3)))
                 .isInstanceOf(InvalidAlterTableException.class)
-                .hasMessage(
-                        "Column 'address' already exists but with different 
type. Existing: STRING, Expected: INT");
-
-        List<TableChange> changes3 =
-                Collections.singletonList(
+                .hasMessageContaining("Paimon schema is not compatible with 
Fluss schema")
+                .hasMessageContaining(
+                        String.format(
+                                "therefore you need to add the diff columns 
all at once, rather than applying other table changes: %s.",
+                                changes3));
+
+        List<TableChange> changes4 =
+                Arrays.asList(
                         TableChange.addColumn(
-                                "address",
+                                "new_column",
+                                DataTypes.INT(),
+                                null,
+                                TableChange.ColumnPosition.last()),
+                        TableChange.addColumn(
+                                "new_column2",
                                 DataTypes.STRING(),
                                 "the address comment",
                                 TableChange.ColumnPosition.last()));
@@ -237,29 +294,108 @@ class PaimonLakeCatalogTest {
         assertThatThrownBy(
                         () ->
                                 flussPaimonCatalog.alterTable(
-                                        tablePath, changes3, new 
TestingLakeCatalogContext()))
+                                        tablePath,
+                                        changes4,
+                                        getLakeCatalogContext(FLUSS_SCHEMA, 
changes4)))
                 .isInstanceOf(InvalidAlterTableException.class)
-                .hasMessage(
-                        "Column address already exists but with different 
comment. Existing: null, Expected: the address comment");
+                .hasMessageContaining("Paimon schema is not compatible with 
Fluss schema")
+                .hasMessageContaining(
+                        String.format(
+                                "therefore you need to add the diff columns 
all at once, rather than applying other table changes: %s.",
+                                changes4));
+
+        // no exception thrown only when adding existing column to match fluss 
and paimon.
+        flussPaimonCatalog.alterTable(
+                tablePath, changes2, getLakeCatalogContext(FLUSS_SCHEMA, 
changes2));
     }
 
-    private void createTable(String database, String tableName) {
-        Schema flussSchema =
+    @Test
+    void testAlterTableAddColumnWhenPaimonSchemaNotMatch() throws Exception {
+        // this rarely happens only when new fluss lake table with an existed 
paimon table or use
+        // alter table in paimon side directly.
+        String database = "test_alter_table_add_column_fluss_wider";
+        String tableName = "test_alter_table_add_column_fluss_wider";
+        createTable(database, tableName);
+        TablePath tablePath = TablePath.of(database, tableName);
+        org.apache.paimon.schema.Schema paimonSchema =
+                ((FileStoreTable)
+                                
flussPaimonCatalog.getPaimonCatalog().getTable(toPaimon(tablePath)))
+                        .schema()
+                        .toSchema();
+
+        List<TableChange> changes =
+                Collections.singletonList(
+                        TableChange.addColumn(
+                                "new_col",
+                                DataTypes.INT(),
+                                "new_col comment",
+                                TableChange.ColumnPosition.last()));
+
+        // test column number mismatch.
+        Schema widerFlussSchema =
                 Schema.newBuilder()
                         .column("id", DataTypes.BIGINT())
                         .column("name", DataTypes.STRING())
                         .column("amount", DataTypes.INT())
                         .column("address", DataTypes.STRING())
+                        .column("phone", DataTypes.INT())
                         .build();
-
-        TableDescriptor td =
-                TableDescriptor.builder()
-                        .schema(flussSchema)
-                        .distributedBy(3) // no bucket key
+        assertThatThrownBy(
+                        () ->
+                                flussPaimonCatalog.alterTable(
+                                        tablePath,
+                                        changes,
+                                        
getLakeCatalogContext(widerFlussSchema, changes)))
+                .isInstanceOf(InvalidAlterTableException.class)
+                .hasMessageContaining("Paimon schema is not compatible with 
Fluss schema")
+                .hasMessageContaining(
+                        String.format(
+                                "therefore you need to add the diff columns 
all at once, rather than applying other table changes: %s.",
+                                changes));
+
+        // test column order mismatch.
+        Schema disorderflussSchema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.BIGINT())
+                        .column("amount", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .column("address", DataTypes.STRING())
                         .build();
+        assertThatThrownBy(
+                        () ->
+                                flussPaimonCatalog.alterTable(
+                                        tablePath,
+                                        changes,
+                                        
getLakeCatalogContext(disorderflussSchema, changes)))
+                .isInstanceOf(InvalidAlterTableException.class)
+                .hasMessageContaining("Paimon schema is not compatible with 
Fluss schema")
+                .hasMessageContaining(
+                        String.format(
+                                "therefore you need to add the diff columns 
all at once, rather than applying other table changes: %s.",
+                                changes));
+    }
 
+    private void createTable(String database, String tableName) {
+        TableDescriptor td = getTableDescriptor(FLUSS_SCHEMA);
         TablePath tablePath = TablePath.of(database, tableName);
 
-        flussPaimonCatalog.createTable(tablePath, td, new 
TestingLakeCatalogContext());
+        flussPaimonCatalog.createTable(tablePath, td, LAKE_CATALOG_CONTEXT);
+    }
+
+    private TestingLakeCatalogContext getLakeCatalogContext(
+            Schema schema, List<TableChange> schemaChanges) {
+        Schema expectedSchema = SchemaUpdate.applySchemaChanges(schema, 
schemaChanges);
+        return new TestingLakeCatalogContext(
+                getTableDescriptor(schema), 
getTableDescriptor(expectedSchema));
+    }
+
+    private TableDescriptor getTableDescriptor(Schema schema) {
+        return TableDescriptor.builder()
+                .schema(schema)
+                .property(TABLE_DATALAKE_ENABLED.key(), "true")
+                .property(TABLE_DATALAKE_FORMAT.key(), "paimon")
+                .property("table.datalake.paimon.warehouse", 
tempWarehouseDir.toURI().toString())
+                .distributedBy(3) // no bucket key
+                .build();
     }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 6a846076a..88b1d5b08 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -389,7 +389,10 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                                 tablePath,
                                 tableDescriptor,
                                 new DefaultLakeCatalogContext(
-                                        true, 
currentSession().getPrincipal()));
+                                        true,
+                                        currentSession().getPrincipal(),
+                                        null,
+                                        tableDescriptor));
             } catch (TableAlreadyExistException e) {
                 throw new LakeTableAlreadyExistException(e.getMessage(), e);
             }
@@ -420,15 +423,12 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                             + "table properties or table schema.");
         }
 
-        LakeCatalog.Context lakeCatalogContext =
-                new DefaultLakeCatalogContext(false, 
currentSession().getPrincipal());
-
         if (!alterSchemaChanges.isEmpty()) {
             metadataManager.alterTableSchema(
                     tablePath,
                     alterSchemaChanges,
                     request.isIgnoreIfNotExists(),
-                    lakeCatalogContext);
+                    currentSession().getPrincipal());
         }
 
         if (!alterTableConfigChanges.isEmpty()) {
@@ -437,7 +437,7 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                     alterTableConfigChanges,
                     tablePropertyChanges,
                     request.isIgnoreIfNotExists(),
-                    lakeCatalogContext);
+                    currentSession().getPrincipal());
         }
 
         return CompletableFuture.completedFuture(new AlterTableResponse());
@@ -1011,11 +1011,22 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
 
         private final boolean isCreatingFlussTable;
         private final FlussPrincipal flussPrincipal;
+        @Nullable private final TableDescriptor currentTable;
+        private final TableDescriptor expectedTable;
 
         public DefaultLakeCatalogContext(
-                boolean isCreatingFlussTable, FlussPrincipal flussPrincipal) {
+                boolean isCreatingFlussTable,
+                FlussPrincipal flussPrincipal,
+                @Nullable TableDescriptor currentTable,
+                TableDescriptor expectedTable) {
             this.isCreatingFlussTable = isCreatingFlussTable;
             this.flussPrincipal = flussPrincipal;
+            if (!isCreatingFlussTable) {
+                checkNotNull(
+                        currentTable, "currentTable must be provided when 
altering a Fluss table.");
+            }
+            this.currentTable = currentTable;
+            this.expectedTable = expectedTable;
         }
 
         @Override
@@ -1027,6 +1038,17 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         public FlussPrincipal getFlussPrincipal() {
             return flussPrincipal;
         }
+
+        @Nullable
+        @Override
+        public TableDescriptor getCurrentTable() {
+            return currentTable;
+        }
+
+        @Override
+        public TableDescriptor getExpectedTable() {
+            return expectedTable;
+        }
     }
 
     // 
==================================================================================
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index 8d7b61a08..02a02cc44 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -45,6 +45,7 @@ import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePartition;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.security.acl.FlussPrincipal;
 import org.apache.fluss.server.entity.TablePropertyChanges;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.data.DatabaseRegistration;
@@ -325,16 +326,23 @@ public class MetadataManager {
             TablePath tablePath,
             List<TableChange> schemaChanges,
             boolean ignoreIfNotExists,
-            LakeCatalog.Context lakeCatalogContext)
+            FlussPrincipal flussPrincipal)
             throws TableNotExistException, TableNotPartitionedException {
         try {
 
             TableInfo table = getTable(tablePath);
+            TableDescriptor tableDescriptor = table.toTableDescriptor();
 
             // validate the table column changes
             if (!schemaChanges.isEmpty()) {
-                Schema newSchema = SchemaUpdate.applySchemaChanges(table, 
schemaChanges);
-
+                Schema newSchema =
+                        SchemaUpdate.applySchemaChanges(table.getSchema(), 
schemaChanges);
+                LakeCatalog.Context lakeCatalogContext =
+                        new CoordinatorService.DefaultLakeCatalogContext(
+                                false,
+                                flussPrincipal,
+                                tableDescriptor,
+                                
TableDescriptor.builder(tableDescriptor).schema(newSchema).build());
                 // Lake First: sync to Lake before updating Fluss schema
                 syncSchemaChangesToLake(tablePath, table, schemaChanges, 
lakeCatalogContext);
 
@@ -396,7 +404,7 @@ public class MetadataManager {
             List<TableChange> tableChanges,
             TablePropertyChanges tablePropertyChanges,
             boolean ignoreIfNotExists,
-            LakeCatalog.Context lakeCatalogContext) {
+            FlussPrincipal flussPrincipal) {
         try {
             // it throws TableNotExistException if the table or database not 
exists
             TableRegistration tableReg = getTableRegistration(tablePath);
@@ -422,11 +430,7 @@ public class MetadataManager {
                 // pre alter table properties, e.g. create lake table in lake 
storage if it's to
                 // enable datalake for the table
                 preAlterTableProperties(
-                        tablePath,
-                        tableDescriptor,
-                        newDescriptor,
-                        tableChanges,
-                        lakeCatalogContext);
+                        tablePath, tableDescriptor, newDescriptor, 
tableChanges, flussPrincipal);
                 // update the table to zk
                 TableRegistration updatedTableRegistration =
                         tableReg.newProperties(
@@ -456,7 +460,10 @@ public class MetadataManager {
             TableDescriptor tableDescriptor,
             TableDescriptor newDescriptor,
             List<TableChange> tableChanges,
-            LakeCatalog.Context lakeCatalogContext) {
+            FlussPrincipal flussPrincipal) {
+        LakeCatalog.Context lakeCatalogContext =
+                new CoordinatorService.DefaultLakeCatalogContext(
+                        false, flussPrincipal, tableDescriptor, newDescriptor);
         LakeCatalog lakeCatalog =
                 
lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog();
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
index 011871a0b..d1ad0c16c 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
@@ -17,20 +17,19 @@
 
 package org.apache.fluss.server.coordinator;
 
+import org.apache.fluss.exception.InvalidAlterTableException;
 import org.apache.fluss.exception.SchemaChangeException;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableChange;
-import org.apache.fluss.metadata.TableInfo;
 
 import java.util.List;
-import java.util.Objects;
 
 /** Schema update. */
 public class SchemaUpdate {
 
     /** Apply schema changes to the given table info and return the updated 
schema. */
-    public static Schema applySchemaChanges(TableInfo tableInfo, 
List<TableChange> changes) {
-        SchemaUpdate schemaUpdate = new SchemaUpdate(tableInfo);
+    public static Schema applySchemaChanges(Schema initialSchema, 
List<TableChange> changes) {
+        SchemaUpdate schemaUpdate = new SchemaUpdate(initialSchema);
         for (TableChange change : changes) {
             schemaUpdate = schemaUpdate.applySchemaChange(change);
         }
@@ -40,9 +39,9 @@ public class SchemaUpdate {
     // Now we only maintain the Builder
     private final Schema.Builder builder;
 
-    public SchemaUpdate(TableInfo tableInfo) {
+    public SchemaUpdate(Schema initialSchema) {
         // Initialize builder from the current table schema
-        this.builder = Schema.newBuilder().fromSchema(tableInfo.getSchema());
+        this.builder = Schema.newBuilder().fromSchema(initialSchema);
     }
 
     public Schema getSchema() {
@@ -50,7 +49,7 @@ public class SchemaUpdate {
         return builder.build();
     }
 
-    public SchemaUpdate applySchemaChange(TableChange columnChange) {
+    private SchemaUpdate applySchemaChange(TableChange columnChange) {
         if (columnChange instanceof TableChange.AddColumn) {
             return addColumn((TableChange.AddColumn) columnChange);
         } else if (columnChange instanceof TableChange.ModifyColumn) {
@@ -69,13 +68,8 @@ public class SchemaUpdate {
         Schema.Column existingColumn = 
builder.getColumn(addColumn.getName()).orElse(null);
 
         if (existingColumn != null) {
-            if (!existingColumn.getDataType().equals(addColumn.getDataType())
-                    || !Objects.equals(
-                            existingColumn.getComment().orElse(null), 
addColumn.getComment())) {
-                throw new IllegalArgumentException(
-                        "Column " + addColumn.getName() + " already exists.");
-            }
-            return this;
+            throw new InvalidAlterTableException(
+                    "Column " + addColumn.getName() + " already exists.");
         }
 
         if (addColumn.getPosition() != TableChange.ColumnPosition.last()) {

Reply via email to