This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 85904f2ff [lake] Paimon lake table support alter table properties 
(#1754)
85904f2ff is described below

commit 85904f2ff7b050ed984829361d69ae981f38c960
Author: Liebing <[email protected]>
AuthorDate: Mon Sep 29 19:07:18 2025 +0800

    [lake] Paimon lake table support alter table properties (#1754)
---
 .../apache/fluss/lake/lakestorage/LakeCatalog.java |  14 +++
 .../lake/lakestorage/PluginLakeStorageWrapper.java |  12 ++
 .../fluss/lake/lakestorage/LakeStorageTest.java    |   7 ++
 .../fluss/lake/iceberg/IcebergLakeCatalog.java     |   9 ++
 .../apache/fluss/lake/lance/LanceLakeCatalog.java  |   9 ++
 .../fluss/lake/paimon/PaimonLakeCatalog.java       |  41 +++++++
 .../fluss/lake/paimon/utils/PaimonConversions.java |  29 +++++
 .../lake/paimon/LakeEnabledTableCreateITCase.java  | 126 ++++++++++++++++++++-
 .../fluss/lake/paimon/PaimonLakeCatalogTest.java   |  98 ++++++++++++++++
 .../server/coordinator/CoordinatorService.java     |  43 ++++++-
 .../fluss/server/coordinator/MetadataManager.java  |  51 ++++++---
 .../fluss/server/utils/ServerRpcMessageUtils.java  |  45 +-------
 .../lakehouse/TestingPaimonStoragePlugin.java      |   9 ++
 13 files changed, 435 insertions(+), 58 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java 
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
index 813c6629f..2dce05471 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
@@ -19,9 +19,13 @@ package org.apache.fluss.lake.lakestorage;
 
 import org.apache.fluss.annotation.PublicEvolving;
 import org.apache.fluss.exception.TableAlreadyExistException;
+import org.apache.fluss.exception.TableNotExistException;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 
+import java.util.List;
+
 /**
  * A catalog interface to modify metadata in external datalake.
  *
@@ -40,6 +44,16 @@ public interface LakeCatalog extends AutoCloseable {
     void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
             throws TableAlreadyExistException;
 
+    /**
+     * Alter a table in lake.
+     *
+     * @param tablePath path of the table to be altered
+     * @param tableChanges The changes to be applied to the table
+     * @throws TableNotExistException if the table not exists
+     */
+    void alterTable(TablePath tablePath, List<TableChange> tableChanges)
+            throws TableNotExistException;
+
     @Override
     default void close() throws Exception {
         // default do nothing
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java
 
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java
index ce4143d88..9c75d3609 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java
@@ -19,13 +19,17 @@ package org.apache.fluss.lake.lakestorage;
 
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.TableAlreadyExistException;
+import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.lake.source.LakeSource;
 import org.apache.fluss.lake.writer.LakeTieringFactory;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.utils.TemporaryClassLoaderContext;
 import org.apache.fluss.utils.WrappingProxy;
 
+import java.util.List;
+
 /**
  * A wrapper around {@link LakeStoragePlugin} that ensures the plugin 
classloader is used for all
  * {@link LakeCatalog} operations.
@@ -78,6 +82,14 @@ public class PluginLakeStorageWrapper implements 
LakeStoragePlugin {
             }
         }
 
+        @Override
+        public void alterTable(TablePath tablePath, List<TableChange> 
tableChanges)
+                throws TableNotExistException {
+            try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(loader)) {
+                inner.alterTable(tablePath, tableChanges);
+            }
+        }
+
         @Override
         public void close() throws Exception {
             try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(loader)) {
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java
index f0452812b..5812cc3ca 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java
@@ -19,8 +19,10 @@ package org.apache.fluss.lake.lakestorage;
 
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.TableAlreadyExistException;
+import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.lake.source.LakeSource;
 import org.apache.fluss.lake.writer.LakeTieringFactory;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.plugin.PluginManager;
@@ -30,6 +32,7 @@ import org.junit.jupiter.api.Test;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -145,5 +148,9 @@ class LakeStorageTest {
         @Override
         public void createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
                 throws TableAlreadyExistException {}
+
+        @Override
+        public void alterTable(TablePath tablePath, List<TableChange> 
tableChanges)
+                throws TableNotExistException {}
     }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
index 842fcddde..7b3a913b8 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -20,8 +20,10 @@ package org.apache.fluss.lake.iceberg;
 import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.TableAlreadyExistException;
+import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
 import org.apache.fluss.lake.lakestorage.LakeCatalog;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.utils.IOUtils;
@@ -112,6 +114,13 @@ public class IcebergLakeCatalog implements LakeCatalog {
         }
     }
 
+    @Override
+    public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
+            throws TableNotExistException {
+        throw new UnsupportedOperationException(
+                "Alter table is not supported for Iceberg at the moment");
+    }
+
     private TableIdentifier toIcebergTableIdentifier(TablePath tablePath) {
         return TableIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getTableName());
     }
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java
index 16a063499..2a55fc46a 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java
@@ -19,9 +19,11 @@ package org.apache.fluss.lake.lance;
 
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.lake.lakestorage.LakeCatalog;
 import org.apache.fluss.lake.lance.utils.LanceArrowUtils;
 import org.apache.fluss.lake.lance.utils.LanceDatasetAdapter;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 
@@ -68,6 +70,13 @@ public class LanceLakeCatalog implements LakeCatalog {
         }
     }
 
+    @Override
+    public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
+            throws TableNotExistException {
+        throw new UnsupportedOperationException(
+                "Alter table is not supported for Lance at the moment");
+    }
+
     @Override
     public void close() throws Exception {
         LakeCatalog.super.close();
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
index 9504ccc51..f84b6e590 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
@@ -17,10 +17,13 @@
 
 package org.apache.fluss.lake.paimon;
 
+import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.exception.TableAlreadyExistException;
+import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.lake.lakestorage.LakeCatalog;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.utils.IOUtils;
@@ -32,6 +35,7 @@ import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 
@@ -39,6 +43,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges;
 import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
 import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
 import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
@@ -72,6 +77,11 @@ public class PaimonLakeCatalog implements LakeCatalog {
                         
CatalogContext.create(Options.fromMap(configuration.toMap())));
     }
 
+    @VisibleForTesting
+    protected Catalog getPaimonCatalog() {
+        return paimonCatalog;
+    }
+
     @Override
     public void createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
             throws TableAlreadyExistException {
@@ -97,6 +107,20 @@ public class PaimonLakeCatalog implements LakeCatalog {
         }
     }
 
+    @Override
+    public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
+            throws TableNotExistException {
+        try {
+            Identifier paimonPath = toPaimonIdentifier(tablePath);
+            List<SchemaChange> paimonSchemaChanges =
+                    toPaimonSchemaChanges(tableChanges, 
this::getFlussPropertyKeyToPaimon);
+            alterTable(paimonPath, paimonSchemaChanges);
+        } catch (Catalog.ColumnAlreadyExistException | 
Catalog.ColumnNotExistException e) {
+            // shouldn't happen before we support schema change
+            throw new RuntimeException(e);
+        }
+    }
+
     private void createTable(Identifier tablePath, Schema schema)
             throws Catalog.DatabaseNotExistException {
         try {
@@ -116,6 +140,15 @@ public class PaimonLakeCatalog implements LakeCatalog {
         }
     }
 
+    private void alterTable(Identifier tablePath, List<SchemaChange> 
tableChanges)
+            throws Catalog.ColumnAlreadyExistException, 
Catalog.ColumnNotExistException {
+        try {
+            paimonCatalog.alterTable(tablePath, tableChanges, false);
+        } catch (Catalog.TableNotExistException e) {
+            throw new TableNotExistException("Table " + tablePath + " not 
exists.");
+        }
+    }
+
     private Identifier toPaimonIdentifier(TablePath tablePath) {
         return Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName());
     }
@@ -190,6 +223,14 @@ public class PaimonLakeCatalog implements LakeCatalog {
         }
     }
 
+    private String getFlussPropertyKeyToPaimon(String key) {
+        if (key.startsWith(PAIMON_CONF_PREFIX)) {
+            return key.substring(PAIMON_CONF_PREFIX.length());
+        } else {
+            return FLUSS_CONF_PREFIX + key;
+        }
+    }
+
     @Override
     public void close() {
         IOUtils.closeQuietly(paimonCatalog, "paimon catalog");
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
index 72033f257..c456411e8 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
@@ -19,6 +19,7 @@ package org.apache.fluss.lake.paimon.utils;
 
 import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.record.ChangeType;
 import org.apache.fluss.row.GenericRow;
@@ -28,13 +29,16 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Function;
 
 /** Utils for conversion between Paimon and Fluss. */
 public class PaimonConversions {
@@ -106,4 +110,29 @@ public class PaimonConversions {
         return org.apache.paimon.data.InternalRow.createFieldGetter(dataType, 
0)
                 .getFieldOrNull(flussRowAsPaimonRow);
     }
+
+    public static List<SchemaChange> toPaimonSchemaChanges(
+            List<TableChange> tableChanges, Function<String, String> 
optionKeyTransformer) {
+        List<SchemaChange> schemaChanges = new 
ArrayList<>(tableChanges.size());
+
+        for (TableChange tableChange : tableChanges) {
+            if (tableChange instanceof TableChange.SetOption) {
+                TableChange.SetOption setOption = (TableChange.SetOption) 
tableChange;
+                schemaChanges.add(
+                        SchemaChange.setOption(
+                                optionKeyTransformer.apply(setOption.getKey()),
+                                setOption.getValue()));
+            } else if (tableChange instanceof TableChange.ResetOption) {
+                TableChange.ResetOption resetOption = 
(TableChange.ResetOption) tableChange;
+                schemaChanges.add(
+                        SchemaChange.removeOption(
+                                
optionKeyTransformer.apply(resetOption.getKey())));
+            } else {
+                throw new UnsupportedOperationException(
+                        "Unsupported table change: " + tableChange.getClass());
+            }
+        }
+
+        return schemaChanges;
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
index 5e97a635a..99f33b05e 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
@@ -23,7 +23,9 @@ import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.InvalidConfigException;
 import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.exception.LakeTableAlreadyExistException;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
@@ -113,6 +115,7 @@ class LakeEnabledTableCreateITCase {
             throw new FlussRuntimeException("Failed to create warehouse path");
         }
         conf.setString("datalake.paimon.warehouse", warehousePath);
+        conf.setString("datalake.paimon.cache-enabled", "false");
         paimonCatalog =
                 CatalogFactory.createCatalog(
                         
CatalogContext.create(Options.fromMap(extractLakeProperties(conf))));
@@ -373,7 +376,7 @@ class LakeEnabledTableCreateITCase {
         customProperties.put("k1", "v1");
         customProperties.put("paimon.file.format", "parquet");
 
-        // log table with lake disabled
+        // create log table with lake disabled
         TableDescriptor logTable =
                 TableDescriptor.builder()
                         .schema(
@@ -429,6 +432,27 @@ class LakeEnabledTableCreateITCase {
                         }),
                 "log_c1,log_c2",
                 BUCKET_NUM);
+
+        // disable lake table
+        TableChange.SetOption disableLake =
+                TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"false");
+        changes = Collections.singletonList(disableLake);
+        admin.alterTable(logTablePath, changes, false).get();
+        // paimon table should still exist although lake is disabled
+        paimonCatalog.getTable(Identifier.create(DATABASE, 
logTablePath.getTableName()));
+
+        // try to enable lake table again
+        enableLake = 
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
+        List<TableChange> finalChanges = Collections.singletonList(enableLake);
+        // TODO: After #846 is implemented, we should remove this exception 
assertion.
+        assertThatThrownBy(() -> admin.alterTable(logTablePath, finalChanges, 
false).get())
+                .cause()
+                .isInstanceOf(LakeTableAlreadyExistException.class)
+                .hasMessage(
+                        String.format(
+                                "The table %s already exists in paimon 
catalog, please "
+                                        + "first drop the table in paimon 
catalog or use a new table name.",
+                                logTablePath));
     }
 
     @Test
@@ -455,6 +479,106 @@ class LakeEnabledTableCreateITCase {
         }
     }
 
+    @Test
+    void testAlterLakeEnabledTableProperties() throws Exception {
+        Map<String, String> customProperties = new HashMap<>();
+        customProperties.put("k1", "v1");
+        customProperties.put("paimon.file.format", "parquet");
+
+        // create table
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("c1", DataTypes.INT())
+                                        .column("c2", DataTypes.STRING())
+                                        .build())
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .customProperties(customProperties)
+                        .distributedBy(BUCKET_NUM, "c1", "c2")
+                        .build();
+        TablePath tablePath = TablePath.of(DATABASE, "alter_table");
+        admin.createTable(tablePath, tableDescriptor, false).get();
+        Table paimonTable =
+                paimonCatalog.getTable(Identifier.create(DATABASE, 
tablePath.getTableName()));
+        verifyPaimonTable(
+                paimonTable,
+                tableDescriptor,
+                RowType.of(
+                        new DataType[] {
+                            org.apache.paimon.types.DataTypes.INT(),
+                            org.apache.paimon.types.DataTypes.STRING(),
+                            // for __bucket, __offset, __timestamp
+                            org.apache.paimon.types.DataTypes.INT(),
+                            org.apache.paimon.types.DataTypes.BIGINT(),
+                            
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+                        },
+                        new String[] {
+                            "c1",
+                            "c2",
+                            BUCKET_COLUMN_NAME,
+                            OFFSET_COLUMN_NAME,
+                            TIMESTAMP_COLUMN_NAME
+                        }),
+                "c1,c2",
+                BUCKET_NUM);
+
+        // test alter table properties
+        List<TableChange> tableChanges =
+                Arrays.asList(TableChange.reset("k1"), TableChange.set("k2", 
"v2"));
+        admin.alterTable(tablePath, tableChanges, false).get();
+        paimonTable = paimonCatalog.getTable(Identifier.create(DATABASE, 
tablePath.getTableName()));
+        customProperties.remove("k1");
+        customProperties.put("k2", "v2");
+        tableDescriptor =
+                
tableDescriptor.withProperties(tableDescriptor.getProperties(), 
customProperties);
+        verifyPaimonTable(
+                paimonTable,
+                tableDescriptor,
+                RowType.of(
+                        new DataType[] {
+                            org.apache.paimon.types.DataTypes.INT(),
+                            org.apache.paimon.types.DataTypes.STRING(),
+                            // for __bucket, __offset, __timestamp
+                            org.apache.paimon.types.DataTypes.INT(),
+                            org.apache.paimon.types.DataTypes.BIGINT(),
+                            
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+                        },
+                        new String[] {
+                            "c1",
+                            "c2",
+                            BUCKET_COLUMN_NAME,
+                            OFFSET_COLUMN_NAME,
+                            TIMESTAMP_COLUMN_NAME
+                        }),
+                "c1,c2",
+                BUCKET_NUM);
+
+        // test alter paimon properties, should throw exception
+        tableChanges = 
Collections.singletonList(TableChange.set("paimon.bucket", "10"));
+        List<TableChange> finalTableChanges = tableChanges;
+        assertThatThrownBy(() -> admin.alterTable(tablePath, 
finalTableChanges, false).get())
+                .cause()
+                .isInstanceOf(InvalidConfigException.class)
+                .hasMessage(
+                        "Property 'paimon.bucket' is not supported to alter 
which is for datalake table.");
+
+        // test alter table if lake table not exists
+        paimonCatalog.dropTable(Identifier.create(DATABASE, 
tablePath.getTableName()), true);
+        tableChanges = Collections.singletonList(TableChange.set("k3", "v3"));
+        List<TableChange> finalTableChanges1 = tableChanges;
+        assertThatThrownBy(() -> admin.alterTable(tablePath, 
finalTableChanges1, false).get())
+                .cause()
+                .isInstanceOf(FlussRuntimeException.class)
+                .hasMessageContaining(
+                        "Lake table doesn't exists for lake-enabled table "
+                                + tablePath
+                                + ", which shouldn't be happened. Please check 
if the lake table was deleted manually.");
+
+        // alter a not exist table when ignoreIfNotExists = true is ok
+        admin.alterTable(TablePath.of(DATABASE, "not_exist_table"), 
tableChanges, true).get();
+    }
+
     private void verifyPaimonTable(
             Table paimonTable,
             TableDescriptor flussTable,
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
new file mode 100644
index 000000000..c2275fe21
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.paimon;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableChange;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for {@link PaimonLakeCatalog}. */
+class PaimonLakeCatalogTest {
+
+    @TempDir private File tempWarehouseDir;
+
+    private PaimonLakeCatalog flussPaimonCatalog;
+
+    @BeforeEach
+    public void setUp() {
+        Configuration configuration = new Configuration();
+        configuration.setString("warehouse", 
tempWarehouseDir.toURI().toString());
+        flussPaimonCatalog = new PaimonLakeCatalog(configuration);
+    }
+
+    @Test
+    void testAlterTableConfigs() throws Exception {
+        String database = "test_alter_table_configs_db";
+        String tableName = "test_alter_table_configs_table";
+        TablePath tablePath = TablePath.of(database, tableName);
+        Identifier identifier = Identifier.create(database, tableName);
+        createTable(database, tableName);
+        Table table = 
flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
+
+        // value should be null for key
+        assertThat(table.options().get("key")).isEqualTo(null);
+
+        // set the value for key
+        flussPaimonCatalog.alterTable(tablePath, 
Arrays.asList(TableChange.set("key", "value")));
+
+        table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
+        // we have set the value for key
+        assertThat(table.options().get("fluss.key")).isEqualTo("value");
+
+        // reset the value for key
+        flussPaimonCatalog.alterTable(tablePath, 
Arrays.asList(TableChange.reset("key")));
+
+        table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
+        // we have reset the value for key
+        assertThat(table.options().get("fluss.key")).isEqualTo(null);
+    }
+
+    private void createTable(String database, String tableName) {
+        Schema flussSchema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.BIGINT())
+                        .column("name", DataTypes.STRING())
+                        .column("amount", DataTypes.INT())
+                        .column("address", DataTypes.STRING())
+                        .build();
+
+        TableDescriptor td =
+                TableDescriptor.builder()
+                        .schema(flussSchema)
+                        .distributedBy(3) // no bucket key
+                        .build();
+
+        TablePath tablePath = TablePath.of(database, tableName);
+
+        flussPaimonCatalog.createTable(tablePath, td);
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 5e0c5dc2a..47cc555d2 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -21,6 +21,7 @@ import org.apache.fluss.cluster.ServerType;
 import org.apache.fluss.cluster.TabletServerInfo;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.InvalidAlterTableException;
 import org.apache.fluss.exception.InvalidCoordinatorException;
 import org.apache.fluss.exception.InvalidDatabaseException;
 import org.apache.fluss.exception.InvalidTableException;
@@ -34,6 +35,7 @@ import org.apache.fluss.metadata.DataLakeFormat;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.PartitionSpec;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.rpc.gateway.CoordinatorGateway;
@@ -112,6 +114,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
+import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
 import static 
org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters;
 import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.fromTablePath;
@@ -121,8 +124,8 @@ import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.getCommitRemot
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.getPartitionSpec;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeCreateAclsResponse;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeDropAclsResponse;
+import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.toTableChanges;
 import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePath;
-import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePropertyChanges;
 import static 
org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment;
 import static org.apache.fluss.utils.PartitionUtils.validatePartitionSpec;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -298,11 +301,12 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
             authorizer.authorize(currentSession(), OperationType.ALTER, 
Resource.table(tablePath));
         }
 
-        TablePropertyChanges tablePropertyChanges =
-                toTablePropertyChanges(request.getConfigChangesList());
+        List<TableChange> tableChanges = 
toTableChanges(request.getConfigChangesList());
+        TablePropertyChanges tablePropertyChanges = 
toTablePropertyChanges(tableChanges);
 
         metadataManager.alterTableProperties(
                 tablePath,
+                tableChanges,
                 tablePropertyChanges,
                 request.isIgnoreIfNotExists(),
                 lakeCatalog,
@@ -312,6 +316,39 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return CompletableFuture.completedFuture(new 
AlterTablePropertiesResponse());
     }
 
+    public static TablePropertyChanges 
toTablePropertyChanges(List<TableChange> tableChanges) {
+        TablePropertyChanges.Builder builder = TablePropertyChanges.builder();
+        if (tableChanges.isEmpty()) {
+            return builder.build();
+        }
+
+        for (TableChange tableChange : tableChanges) {
+            if (tableChange instanceof TableChange.SetOption) {
+                TableChange.SetOption setOption = (TableChange.SetOption) 
tableChange;
+                String optionKey = setOption.getKey();
+                if (isTableStorageConfig(optionKey)) {
+                    builder.setTableProperty(optionKey, setOption.getValue());
+                } else {
+                    // otherwise, it's considered as custom property
+                    builder.setCustomProperty(optionKey, setOption.getValue());
+                }
+            } else if (tableChange instanceof TableChange.ResetOption) {
+                TableChange.ResetOption resetOption = 
(TableChange.ResetOption) tableChange;
+                String optionKey = resetOption.getKey();
+                if (isTableStorageConfig(optionKey)) {
+                    builder.resetTableProperty(optionKey);
+                } else {
+                    // otherwise, it's considered as custom property
+                    builder.resetCustomProperty(optionKey);
+                }
+            } else {
+                throw new InvalidAlterTableException(
+                        "Unsupported alter table change: " + tableChange);
+            }
+        }
+        return builder.build();
+    }
+
     private TableDescriptor applySystemDefaults(TableDescriptor 
tableDescriptor) {
         TableDescriptor newDescriptor = tableDescriptor;
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index 2056068a5..b1127ff39 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -40,6 +40,7 @@ import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.DatabaseInfo;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
 import org.apache.fluss.metadata.SchemaInfo;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePartition;
@@ -310,6 +311,7 @@ public class MetadataManager {
 
     public void alterTableProperties(
             TablePath tablePath,
+            List<TableChange> tableChanges,
             TablePropertyChanges tablePropertyChanges,
             boolean ignoreIfNotExists,
             @Nullable LakeCatalog lakeCatalog,
@@ -340,7 +342,12 @@ public class MetadataManager {
                 // pre alter table properties, e.g. create lake table in lake 
storage if it's to
                 // enable datalake for the table
                 preAlterTableProperties(
-                        tablePath, tableDescriptor, newDescriptor, 
lakeCatalog, dataLakeFormat);
+                        tablePath,
+                        tableDescriptor,
+                        newDescriptor,
+                        tableChanges,
+                        lakeCatalog,
+                        dataLakeFormat);
                 // update the table to zk
                 TableRegistration updatedTableRegistration =
                         tableReg.newProperties(
@@ -377,27 +384,29 @@ public class MetadataManager {
             TablePath tablePath,
             TableDescriptor tableDescriptor,
             TableDescriptor newDescriptor,
+            List<TableChange> tableChanges,
             LakeCatalog lakeCatalog,
             DataLakeFormat dataLakeFormat) {
-
-        boolean toEnableDataLake =
-                !isDataLakeEnabled(tableDescriptor) && 
isDataLakeEnabled(newDescriptor);
-
-        // enable lake table
-        if (toEnableDataLake) {
-            // TODO: should tolerate if the lake exist but matches our schema. 
This ensures
-            // eventually
-            //  consistent by idempotently creating the table multiple times. 
See #846
-            // before create table in fluss, we may create in lake
+        if (isDataLakeEnabled(newDescriptor)) {
             if (lakeCatalog == null) {
                 throw new InvalidAlterTableException(
                         "Cannot alter table "
                                 + tablePath
-                                + " to enable data lake, because the Fluss 
cluster doesn't enable datalake tables.");
-            } else {
+                                + " in data lake, because the Fluss cluster 
doesn't enable datalake tables.");
+            }
+
+            boolean isLakeTableNewlyCreated = false;
+            // to enable lake table
+            if (!isDataLakeEnabled(tableDescriptor)) {
+                // before create table in fluss, we may create in lake
                 try {
                     lakeCatalog.createTable(tablePath, newDescriptor);
+                    // no need to alter lake table if it is newly created
+                    isLakeTableNewlyCreated = true;
                 } catch (TableAlreadyExistException e) {
+                    // TODO: should tolerate if the lake exist but matches our 
schema. This ensures
+                    // eventually consistent by idempotently creating the 
table multiple times. See
+                    // #846
                     throw new LakeTableAlreadyExistException(
                             String.format(
                                     "The table %s already exists in %s 
catalog, please "
@@ -405,8 +414,22 @@ public class MetadataManager {
                                     tablePath, dataLakeFormat, 
dataLakeFormat));
                 }
             }
+
+            // only need to alter lake table if it is not newly created
+            if (!isLakeTableNewlyCreated) {
+                {
+                    try {
+                        lakeCatalog.alterTable(tablePath, tableChanges);
+                    } catch (TableNotExistException e) {
+                        throw new FlussRuntimeException(
+                                "Lake table doesn't exists for lake-enabled 
table "
+                                        + tablePath
+                                        + ", which shouldn't be happened. 
Please check if the lake table was deleted manually.",
+                                e);
+                    }
+                }
+            }
         }
-        // more pre-alter actions can be added here
     }
 
     private void postAlterTableProperties(
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index e9c055ed4..fcf2a6b9d 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -21,7 +21,6 @@ import org.apache.fluss.cluster.Endpoint;
 import org.apache.fluss.cluster.ServerNode;
 import org.apache.fluss.cluster.ServerType;
 import org.apache.fluss.config.ConfigOptions;
-import org.apache.fluss.exception.InvalidAlterTableException;
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.fs.token.ObtainedSecurityToken;
 import org.apache.fluss.metadata.AlterConfigOpType;
@@ -148,7 +147,6 @@ import 
org.apache.fluss.server.entity.NotifyLeaderAndIsrResultForBucket;
 import org.apache.fluss.server.entity.NotifyRemoteLogOffsetsData;
 import org.apache.fluss.server.entity.StopReplicaData;
 import org.apache.fluss.server.entity.StopReplicaResultForBucket;
-import org.apache.fluss.server.entity.TablePropertyChanges;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshotJsonSerde;
 import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle;
@@ -179,7 +177,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
 import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toByteBuffer;
 import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toPbAclInfo;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -261,43 +258,11 @@ public class ServerRpcMessageUtils {
         }
     }
 
-    public static TablePropertyChanges 
toTablePropertyChanges(List<PbAlterConfig> alterConfigs) {
-        TablePropertyChanges.Builder builder = TablePropertyChanges.builder();
-        if (alterConfigs.isEmpty()) {
-            return builder.build();
-        }
-
-        List<TableChange> tableChanges =
-                alterConfigs.stream()
-                        .filter(Objects::nonNull)
-                        .map(ServerRpcMessageUtils::toTableChange)
-                        .collect(Collectors.toList());
-
-        for (TableChange tableChange : tableChanges) {
-            if (tableChange instanceof TableChange.SetOption) {
-                TableChange.SetOption setOption = (TableChange.SetOption) 
tableChange;
-                String optionKey = setOption.getKey();
-                if (isTableStorageConfig(optionKey)) {
-                    builder.setTableProperty(optionKey, setOption.getValue());
-                } else {
-                    // otherwise, it's considered as custom property
-                    builder.setCustomProperty(optionKey, setOption.getValue());
-                }
-            } else if (tableChange instanceof TableChange.ResetOption) {
-                TableChange.ResetOption resetOption = 
(TableChange.ResetOption) tableChange;
-                String optionKey = resetOption.getKey();
-                if (isTableStorageConfig(optionKey)) {
-                    builder.resetTableProperty(optionKey);
-                } else {
-                    // otherwise, it's considered as custom property
-                    builder.resetCustomProperty(optionKey);
-                }
-            } else {
-                throw new InvalidAlterTableException(
-                        "Unsupported alter table change: " + tableChange);
-            }
-        }
-        return builder.build();
+    public static List<TableChange> toTableChanges(List<PbAlterConfig> 
alterConfigs) {
+        return alterConfigs.stream()
+                .filter(Objects::nonNull)
+                .map(ServerRpcMessageUtils::toTableChange)
+                .collect(Collectors.toList());
     }
 
     public static MetadataResponse buildMetadataResponse(
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
index 74962c486..deb2cddb0 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
@@ -19,16 +19,19 @@ package org.apache.fluss.server.lakehouse;
 
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.TableAlreadyExistException;
+import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.lake.lakestorage.LakeCatalog;
 import org.apache.fluss.lake.lakestorage.LakeStorage;
 import org.apache.fluss.lake.lakestorage.LakeStoragePlugin;
 import org.apache.fluss.lake.source.LakeSource;
 import org.apache.fluss.lake.writer.LakeTieringFactory;
 import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /** A plugin of paimon just for testing purpose. */
@@ -79,6 +82,12 @@ public class TestingPaimonStoragePlugin implements 
LakeStoragePlugin {
             tableByPath.put(tablePath, tableDescriptor);
         }
 
+        @Override
+        public void alterTable(TablePath tablePath, List<TableChange> 
tableChanges)
+                throws TableNotExistException {
+            // do nothing
+        }
+
         public TableDescriptor getTable(TablePath tablePath) {
             return tableByPath.get(tablePath);
         }


Reply via email to