This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 9df7b723e [server] Support alter table properties (#1625)
9df7b723e is described below

commit 9df7b723e930d7d33877c5827d7b4986f7c7b8f2
Author: Yang Guo <[email protected]>
AuthorDate: Wed Sep 24 12:46:33 2025 +0800

    [server] Support alter table properties (#1625)
---
 .../java/org/apache/fluss/client/admin/Admin.java  |  23 ++++
 .../org/apache/fluss/client/admin/FlussAdmin.java  |  24 ++++
 .../utils/FlussTableChangeProtoConverter.java      |  44 +++++++
 .../fluss/client/admin/FlussAdminITCase.java       |  67 +++++++++++
 .../org/apache/fluss/config/FlussConfigUtils.java  |   5 +
 .../exception/InvalidAlterTableException.java      |  31 +++++
 .../fluss/metadata/AlterTableConfigsOpType.java    |  52 ++++++++
 .../org/apache/fluss/metadata/TableChange.java     | 129 ++++++++++++++++++++
 .../apache/fluss/flink/catalog/FlinkCatalog.java   |  39 +++++-
 .../utils/FlinkTableChangeToFlussTableChange.java  |  52 ++++++++
 .../fluss/flink/catalog/FlinkCatalogITCase.java    |  55 +++++++++
 .../fluss/flink/catalog/FlinkCatalogTest.java      |   3 -
 .../org/apache/fluss/rpc/gateway/AdminGateway.java |  11 ++
 .../org/apache/fluss/rpc/protocol/ApiKeys.java     |   3 +-
 .../java/org/apache/fluss/rpc/protocol/Errors.java |   5 +-
 fluss-rpc/src/main/proto/FlussApi.proto            |  18 +++
 .../server/coordinator/CoordinatorService.java     |  76 ++++++++++++
 .../fluss/server/coordinator/MetadataManager.java  | 134 +++++++++++++++++++++
 .../fluss/server/utils/ServerRpcMessageUtils.java  |  23 ++++
 .../server/utils/TableDescriptorValidation.java    |  18 +++
 .../fluss/server/zk/data/TableRegistration.java    |  14 +++
 .../server/coordinator/TableManagerITCase.java     |  48 ++++++++
 .../server/coordinator/TestCoordinatorGateway.java |   8 ++
 .../server/testutils/RpcMessageTestUtils.java      |  16 +++
 24 files changed, 891 insertions(+), 7 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java 
b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
index baaab3507..d00094acf 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
@@ -26,6 +26,7 @@ import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.exception.DatabaseAlreadyExistException;
 import org.apache.fluss.exception.DatabaseNotEmptyException;
 import org.apache.fluss.exception.DatabaseNotExistException;
+import org.apache.fluss.exception.InvalidAlterTableException;
 import org.apache.fluss.exception.InvalidDatabaseException;
 import org.apache.fluss.exception.InvalidPartitionException;
 import org.apache.fluss.exception.InvalidReplicationFactorException;
@@ -48,6 +49,7 @@ import org.apache.fluss.metadata.PartitionSpec;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
 import org.apache.fluss.metadata.SchemaInfo;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
@@ -235,6 +237,27 @@ public interface Admin extends AutoCloseable {
      */
     CompletableFuture<List<String>> listTables(String databaseName);
 
+    /**
+     * Alter a table with the given {@code tableChanges}.
+     *
+     * <p>The following exceptions can be anticipated when calling {@code 
get()} on returned future.
+     *
+     * <ul>
+     *   <li>{@link DatabaseNotExistException} when the database does not 
exist.
+     *   <li>{@link TableNotExistException} when the table does not exist, and 
ignoreIfNotExists is
+     *       false.
+     *   <li>{@link InvalidAlterTableException} if the alter operation is 
invalid, such as alter set
+     *       a table option which is not supported to modify currently.
+     * </ul>
+     *
+     * @param tablePath The table path of the table.
+     * @param tableChanges The table changes.
+     * @param ignoreIfNotExists if it is true, do nothing if table does not 
exist. If false, throw a
+     *     TableNotExistException.
+     */
+    CompletableFuture<Void> alterTable(
+            TablePath tablePath, List<TableChange> tableChanges, boolean 
ignoreIfNotExists);
+
     /**
      * List all partitions in the given table in fluss cluster asynchronously.
      *
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java 
b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
index 0f9c007aa..d04829acc 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
@@ -22,6 +22,7 @@ import org.apache.fluss.client.metadata.KvSnapshots;
 import org.apache.fluss.client.metadata.LakeSnapshot;
 import org.apache.fluss.client.metadata.MetadataUpdater;
 import org.apache.fluss.client.utils.ClientRpcMessageUtils;
+import org.apache.fluss.client.utils.FlussTableChangeProtoConverter;
 import org.apache.fluss.cluster.Cluster;
 import org.apache.fluss.cluster.ServerNode;
 import org.apache.fluss.exception.LeaderNotAvailableException;
@@ -33,6 +34,7 @@ import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.SchemaInfo;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
@@ -41,6 +43,7 @@ import org.apache.fluss.rpc.RpcClient;
 import org.apache.fluss.rpc.gateway.AdminGateway;
 import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
 import org.apache.fluss.rpc.gateway.TabletServerGateway;
+import org.apache.fluss.rpc.messages.AlterTableConfigsRequest;
 import org.apache.fluss.rpc.messages.CreateAclsRequest;
 import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
 import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -62,6 +65,7 @@ import org.apache.fluss.rpc.messages.ListOffsetsRequest;
 import org.apache.fluss.rpc.messages.ListPartitionInfosRequest;
 import org.apache.fluss.rpc.messages.ListTablesRequest;
 import org.apache.fluss.rpc.messages.ListTablesResponse;
+import org.apache.fluss.rpc.messages.PbAlterConfigsRequestInfo;
 import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
 import org.apache.fluss.rpc.messages.PbPartitionSpec;
 import org.apache.fluss.rpc.messages.PbTablePath;
@@ -81,6 +85,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 import static 
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
 import static 
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
@@ -235,6 +240,25 @@ public class FlussAdmin implements Admin {
         return gateway.createTable(request).thenApply(r -> null);
     }
 
+    @Override
+    public CompletableFuture<Void> alterTable(
+            TablePath tablePath, List<TableChange> tableChanges, boolean 
ignoreIfNotExists) {
+        tablePath.validate();
+        AlterTableConfigsRequest request = new AlterTableConfigsRequest();
+
+        List<PbAlterConfigsRequestInfo> pbFlussTableChanges =
+                tableChanges.stream()
+                        
.map(FlussTableChangeProtoConverter::toPbAlterConfigsRequestInfo)
+                        .collect(Collectors.toList());
+
+        request.addAllConfigChanges(pbFlussTableChanges)
+                .setIgnoreIfNotExists(ignoreIfNotExists)
+                .setTablePath()
+                .setDatabaseName(tablePath.getDatabaseName())
+                .setTableName(tablePath.getTableName());
+        return gateway.alterTableConfigs(request).thenApply(r -> null);
+    }
+
     @Override
     public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
         GetTableInfoRequest request = new GetTableInfoRequest();
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/utils/FlussTableChangeProtoConverter.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/utils/FlussTableChangeProtoConverter.java
new file mode 100644
index 000000000..2a9f59612
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/utils/FlussTableChangeProtoConverter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.utils;
+
+import org.apache.fluss.metadata.AlterTableConfigsOpType;
+import org.apache.fluss.metadata.TableChange;
+import org.apache.fluss.rpc.messages.PbAlterConfigsRequestInfo;
+
+/** Convert {@link TableChange} to proto. */
+public class FlussTableChangeProtoConverter {
+
+    public static PbAlterConfigsRequestInfo 
toPbAlterConfigsRequestInfo(TableChange tableChange) {
+        PbAlterConfigsRequestInfo info = new PbAlterConfigsRequestInfo();
+        if (tableChange instanceof TableChange.SetOption) {
+            TableChange.SetOption setOption = (TableChange.SetOption) 
tableChange;
+            info.setConfigKey(setOption.getKey());
+            info.setConfigValue(setOption.getValue());
+            info.setOpType(AlterTableConfigsOpType.SET.value());
+        } else if (tableChange instanceof TableChange.ResetOption) {
+            TableChange.ResetOption resetOption = (TableChange.ResetOption) 
tableChange;
+            info.setConfigKey(resetOption.getKey());
+            info.setOpType(AlterTableConfigsOpType.DELETE.value());
+        } else {
+            throw new IllegalArgumentException(
+                    "Unsupported table change: " + tableChange.getClass());
+        }
+        return info;
+    }
+}
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index 0422396c1..d4a117bab 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -55,6 +55,7 @@ import org.apache.fluss.metadata.PartitionSpec;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.SchemaInfo;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
@@ -197,6 +198,72 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
                 .isBetween(timestampBeforeCreate, timestampAfterCreate);
     }
 
+    @Test
+    void testAlterTable() throws Exception {
+        // create table
+        TablePath tablePath = TablePath.of("test_db", "alter_table_1");
+        admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
+
+        TableInfo tableInfo = admin.getTableInfo(tablePath).get();
+
+        TableDescriptor existingTableDescriptor = 
tableInfo.toTableDescriptor();
+        Map<String, String> updateProperties =
+                new HashMap<>(existingTableDescriptor.getProperties());
+        Map<String, String> updateCustomProperties =
+                new HashMap<>(existingTableDescriptor.getCustomProperties());
+        updateCustomProperties.put("client.connect-timeout", "240s");
+
+        TableDescriptor newTableDescriptor =
+                TableDescriptor.builder()
+                        .schema(existingTableDescriptor.getSchema())
+                        
.comment(existingTableDescriptor.getComment().orElse("test table"))
+                        
.partitionedBy(existingTableDescriptor.getPartitionKeys())
+                        .distributedBy(
+                                existingTableDescriptor
+                                        .getTableDistribution()
+                                        .get()
+                                        .getBucketCount()
+                                        .orElse(3),
+                                existingTableDescriptor.getBucketKeys())
+                        .properties(updateProperties)
+                        .customProperties(updateCustomProperties)
+                        .build();
+
+        List<TableChange> tableChanges = new ArrayList<>();
+        TableChange tableChange = TableChange.set("client.connect-timeout", 
"240s");
+        tableChanges.add(tableChange);
+        // alter table
+        admin.alterTable(tablePath, tableChanges, false).get();
+
+        TableInfo alteredTableInfo = admin.getTableInfo(tablePath).get();
+        TableDescriptor alteredTableDescriptor = 
alteredTableInfo.toTableDescriptor();
+        assertThat(alteredTableDescriptor).isEqualTo(newTableDescriptor);
+
+        // throw exception if table not exist
+        assertThatThrownBy(
+                        () ->
+                                admin.alterTable(
+                                                TablePath.of("test_db", 
"alter_table_not_exist"),
+                                                tableChanges,
+                                                false)
+                                        .get())
+                .cause()
+                .isInstanceOf(TableNotExistException.class);
+
+        // throw exception if database not exist
+        assertThatThrownBy(
+                        () ->
+                                admin.alterTable(
+                                                TablePath.of(
+                                                        "test_db_not_exist",
+                                                        
"alter_table_not_exist"),
+                                                tableChanges,
+                                                false)
+                                        .get())
+                .cause()
+                .isInstanceOf(DatabaseNotExistException.class);
+    }
+
     @Test
     void testCreateInvalidDatabaseAndTable() {
         assertThatThrownBy(
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
index 38118397b..18fff5e64 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
@@ -21,7 +21,9 @@ import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.annotation.VisibleForTesting;
 
 import java.lang.reflect.Field;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /** Utilities of Fluss {@link ConfigOptions}. */
@@ -33,9 +35,12 @@ public class FlussConfigUtils {
     public static final String CLIENT_PREFIX = "client.";
     public static final String CLIENT_SECURITY_PREFIX = "client.security.";
 
+    public static final List<String> ALTERABLE_TABLE_CONFIG;
+
     static {
         TABLE_OPTIONS = extractConfigOptions("table.");
         CLIENT_OPTIONS = extractConfigOptions("client.");
+        ALTERABLE_TABLE_CONFIG = Collections.emptyList();
     }
 
     @VisibleForTesting
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/exception/InvalidAlterTableException.java
 
b/fluss-common/src/main/java/org/apache/fluss/exception/InvalidAlterTableException.java
new file mode 100644
index 000000000..411a68d77
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/exception/InvalidAlterTableException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.exception;
+
+/** Exception for invalid alter table operation. */
+public class InvalidAlterTableException extends ApiException {
+    private static final long serialVersionUID = 1L;
+
+    public InvalidAlterTableException(String message) {
+        this(message, null);
+    }
+
+    public InvalidAlterTableException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/AlterTableConfigsOpType.java
 
b/fluss-common/src/main/java/org/apache/fluss/metadata/AlterTableConfigsOpType.java
new file mode 100644
index 000000000..9307d38e8
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/metadata/AlterTableConfigsOpType.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+/** The operation type of altering table configurations. */
+public enum AlterTableConfigsOpType {
+    SET(0),
+    DELETE(1),
+    APPEND(2),
+    SUBTRACT(3);
+
+    public final int value;
+
+    AlterTableConfigsOpType(int value) {
+        this.value = value;
+    }
+
+    public static AlterTableConfigsOpType from(int opType) {
+        switch (opType) {
+            case 0:
+                return SET;
+            case 1:
+                return DELETE;
+            case 2:
+                return APPEND;
+            case 3:
+                return SUBTRACT;
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported AlterTableConfigsOpType: " + opType);
+        }
+    }
+
+    public int value() {
+        return this.value;
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java
new file mode 100644
index 000000000..98e907e5f
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+import java.util.Objects;
+
+/** {@link TableChange} represents the modification of the Fluss Table. */
+public interface TableChange {
+
+    static SetOption set(String key, String value) {
+        return new SetOption(key, value);
+    }
+
+    static ResetOption reset(String key) {
+        return new ResetOption(key);
+    }
+
+    /**
+     * A table change to set the table option.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; SET '&lt;key&gt;' = '&lt;value&gt;';
+     * </pre>
+     */
+    class SetOption implements TableChange {
+
+        private final String key;
+        private final String value;
+
+        private SetOption(String key, String value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        /** Returns the Option key to set. */
+        public String getKey() {
+            return key;
+        }
+
+        /** Returns the Option value to set. */
+        public String getValue() {
+            return value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof SetOption)) {
+                return false;
+            }
+            SetOption setOption = (SetOption) o;
+            return Objects.equals(key, setOption.key) && Objects.equals(value, 
setOption.value);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(key, value);
+        }
+
+        @Override
+        public String toString() {
+            return "SetOption{" + "key='" + key + '\'' + ", value='" + value + 
'\'' + '}';
+        }
+    }
+
+    /**
+     * A table change to reset the table option.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; RESET '&lt;key&gt;'
+     * </pre>
+     */
+    class ResetOption implements TableChange {
+
+        private final String key;
+
+        public ResetOption(String key) {
+            this.key = key;
+        }
+
+        /** Returns the Option key to reset. */
+        public String getKey() {
+            return key;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof ResetOption)) {
+                return false;
+            }
+            ResetOption that = (ResetOption) o;
+            return Objects.equals(key, that.key);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(key);
+        }
+
+        @Override
+        public String toString() {
+            return "ResetOption{" + "key='" + key + '\'' + '}';
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 82a58bd43..051e7249e 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -29,9 +29,11 @@ import org.apache.fluss.flink.procedure.ProcedureManager;
 import org.apache.fluss.flink.utils.CatalogExceptionUtils;
 import org.apache.fluss.flink.utils.DataLakeUtils;
 import org.apache.fluss.flink.utils.FlinkConversions;
+import org.apache.fluss.flink.utils.FlinkTableChangeToFlussTableChange;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.PartitionInfo;
 import org.apache.fluss.metadata.PartitionSpec;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
@@ -78,7 +80,9 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
@@ -398,9 +402,40 @@ public class FlinkCatalog extends AbstractCatalog {
     }
 
     @Override
-    public void alterTable(ObjectPath objectPath, CatalogBaseTable 
catalogBaseTable, boolean b)
+    public void alterTable(
+            ObjectPath objectPath,
+            CatalogBaseTable newTable,
+            List<org.apache.flink.table.catalog.TableChange> tableChanges,
+            boolean ignoreIfNotExists)
             throws TableNotExistException, CatalogException {
-        throw new UnsupportedOperationException();
+        TablePath tablePath = toTablePath(objectPath);
+
+        List<TableChange> flussTableChanges =
+                tableChanges.stream()
+                        .filter(Objects::nonNull)
+                        
.map(FlinkTableChangeToFlussTableChange::toFlussTableChange)
+                        .collect(Collectors.toList());
+        try {
+            admin.alterTable(tablePath, flussTableChanges, 
ignoreIfNotExists).get();
+        } catch (Exception e) {
+            Throwable t = ExceptionUtils.stripExecutionException(e);
+            if (CatalogExceptionUtils.isTableNotExist(t)) {
+                throw new TableNotExistException(getName(), objectPath);
+            } else if (isTableInvalid(t)) {
+                throw new InvalidTableException(t.getMessage());
+            } else {
+                throw new CatalogException(
+                        String.format("Failed to alter table %s in %s", 
objectPath, getName()), t);
+            }
+        }
+    }
+
+    @Override
+    public void alterTable(
+            ObjectPath objectPath, CatalogBaseTable newTable, boolean 
ignoreIfNotExist)
+            throws TableNotExistException, CatalogException {
+        throw new UnsupportedOperationException(
+                "alterTable(objectPath, newTable, ignoreIfNotExist) method is 
not supported, please upgrade your Flink to 1.18+. ");
     }
 
     @SuppressWarnings("checkstyle:WhitespaceAround")
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkTableChangeToFlussTableChange.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkTableChangeToFlussTableChange.java
new file mode 100644
index 000000000..9e3548e53
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkTableChangeToFlussTableChange.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.metadata.TableChange;
+
+/** convert Flink's TableChange class to {@link TableChange}. */
+public class FlinkTableChangeToFlussTableChange {
+
+    public static TableChange toFlussTableChange(
+            org.apache.flink.table.catalog.TableChange tableChange) {
+        TableChange flussTableChange;
+        if (tableChange instanceof 
org.apache.flink.table.catalog.TableChange.SetOption) {
+            flussTableChange =
+                    convertSetOption(
+                            
(org.apache.flink.table.catalog.TableChange.SetOption) tableChange);
+        } else if (tableChange instanceof 
org.apache.flink.table.catalog.TableChange.ResetOption) {
+            flussTableChange =
+                    convertResetOption(
+                            
(org.apache.flink.table.catalog.TableChange.ResetOption) tableChange);
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format("Unsupported flink table change: %s.", 
tableChange));
+        }
+        return flussTableChange;
+    }
+
+    private static TableChange.SetOption convertSetOption(
+            org.apache.flink.table.catalog.TableChange.SetOption 
flinkSetOption) {
+        return TableChange.set(flinkSetOption.getKey(), 
flinkSetOption.getValue());
+    }
+
+    private static TableChange.ResetOption convertResetOption(
+            org.apache.flink.table.catalog.TableChange.ResetOption 
flinkResetOption) {
+        return TableChange.reset(flinkResetOption.getKey());
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index 948ec3635..5ee54ac21 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -20,6 +20,7 @@ package org.apache.fluss.flink.catalog;
 import org.apache.fluss.cluster.ServerNode;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.InvalidAlterTableException;
 import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
@@ -183,6 +184,60 @@ abstract class FlinkCatalogITCase {
         assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
     }
 
+    @Test
+    void testAlterTable() throws Exception {
+        String ddl =
+                "create table test_alter_table_append_only ("
+                        + "a string, "
+                        + "b int) "
+                        + "with ('bucket.num' = '5')";
+        tEnv.executeSql(ddl);
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("a", DataTypes.STRING()).column("b", 
DataTypes.INT());
+        Schema expectedSchema = schemaBuilder.build();
+        CatalogTable table =
+                (CatalogTable)
+                        catalog.getTable(
+                                new ObjectPath(DEFAULT_DB, 
"test_alter_table_append_only"));
+        assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+        Map<String, String> expectedOptions = new HashMap<>();
+        expectedOptions.put("bucket.num", "5");
+        assertOptionsEqual(table.getOptions(), expectedOptions);
+
+        // alter table
+        String dml =
+                "alter table test_alter_table_append_only set 
('client.connect-timeout' = '240s')";
+        tEnv.executeSql(dml);
+        table =
+                (CatalogTable)
+                        catalog.getTable(
+                                new ObjectPath(DEFAULT_DB, 
"test_alter_table_append_only"));
+        assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+        expectedOptions = new HashMap<>();
+
+        // bucket.num is unchanged, but timeout should change
+        expectedOptions.put("bucket.num", "5");
+        expectedOptions.put("client.connect-timeout", "240s"); // updated
+        assertOptionsEqual(table.getOptions(), expectedOptions);
+
+        // alter table set an unsupported modification option should throw 
exception
+        String unSupportedDml1 =
+                "alter table test_alter_table_append_only set 
('table.auto-partition.enabled' = 'true')";
+
+        assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml1))
+                .rootCause()
+                .isInstanceOf(InvalidAlterTableException.class)
+                .hasMessage(
+                        "The option 'table.auto-partition.enabled' is not 
supported to alter set.");
+
+        String unSupportedDml2 =
+                "alter table test_alter_table_append_only set ('k1' = 'v1', 
'table.kv.format' = 'indexed')";
+        assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml2))
+                .rootCause()
+                .isInstanceOf(InvalidAlterTableException.class)
+                .hasMessage("The option 'table.kv.format' is not supported to 
alter set.");
+    }
+
     @Test
     void testCreateUnSupportedTable() {
         // test invalid property
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
index dec2601a2..0f0621cbf 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
@@ -256,9 +256,6 @@ class FlinkCatalogTest {
         assertThatThrownBy(() -> catalog.renameTable(this.tableInDefaultDb, 
"newName", false))
                 .isInstanceOf(UnsupportedOperationException.class);
 
-        assertThatThrownBy(() -> catalog.alterTable(this.tableInDefaultDb, 
null, false))
-                .isInstanceOf(UnsupportedOperationException.class);
-
         // Test lake table handling - should throw TableNotExistException for 
non-existent lake
         // table
         ObjectPath lakePath = new ObjectPath(DEFAULT_DB, "regularTable$lake");
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java
index bda096dfe..1ec13ffd7 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java
@@ -17,6 +17,8 @@
 
 package org.apache.fluss.rpc.gateway;
 
+import org.apache.fluss.rpc.messages.AlterTableConfigsRequest;
+import org.apache.fluss.rpc.messages.AlterTableConfigsResponse;
 import org.apache.fluss.rpc.messages.CreateAclsRequest;
 import org.apache.fluss.rpc.messages.CreateAclsResponse;
 import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
@@ -64,6 +66,15 @@ public interface AdminGateway extends AdminReadOnlyGateway {
     @RPC(api = ApiKeys.CREATE_TABLE)
     CompletableFuture<CreateTableResponse> createTable(CreateTableRequest 
request);
 
+    /**
+     * Alter a table.
+     *
+     * @param request the request to alter the configs of a table.
+     */
+    @RPC(api = ApiKeys.ALTER_TABLE_CONFIGS)
+    CompletableFuture<AlterTableConfigsResponse> alterTableConfigs(
+            AlterTableConfigsRequest request);
+
     /**
      * Drop a table.
      *
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
index 8526581ae..e39dc1b34 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
@@ -71,7 +71,8 @@ public enum ApiKeys {
     LIST_ACLS(1040, 0, 0, PUBLIC),
     DROP_ACLS(1041, 0, 0, PUBLIC),
     LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE),
-    CONTROLLED_SHUTDOWN(1043, 0, 0, PRIVATE);
+    CONTROLLED_SHUTDOWN(1043, 0, 0, PRIVATE),
+    ALTER_TABLE_CONFIGS(1044, 0, 0, PUBLIC);
 
     private static final Map<Integer, ApiKeys> ID_TO_TYPE =
             Arrays.stream(ApiKeys.values())
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
index 5ec892a69..3b899baee 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
@@ -29,6 +29,7 @@ import org.apache.fluss.exception.DuplicateSequenceException;
 import org.apache.fluss.exception.FencedLeaderEpochException;
 import org.apache.fluss.exception.FencedTieringEpochException;
 import org.apache.fluss.exception.IneligibleReplicaException;
+import org.apache.fluss.exception.InvalidAlterTableException;
 import org.apache.fluss.exception.InvalidColumnProjectionException;
 import org.apache.fluss.exception.InvalidConfigException;
 import org.apache.fluss.exception.InvalidCoordinatorException;
@@ -222,7 +223,9 @@ public enum Errors {
     INELIGIBLE_REPLICA_EXCEPTION(
             55,
             "The new ISR contains at least one ineligible replica.",
-            IneligibleReplicaException::new);
+            IneligibleReplicaException::new),
+    INVALID_ALTER_TABLE_EXCEPTION(
+            56, "The alter table is invalid.", 
InvalidAlterTableException::new);
 
     private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
 
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto 
b/fluss-rpc/src/main/proto/FlussApi.proto
index 200b8b520..00797b93a 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -108,6 +108,24 @@ message CreateTableRequest {
 message CreateTableResponse {
 }
 
+// alter table request and response
+message AlterTableConfigsRequest {
+  required PbTablePath table_path = 1;
+  required bool ignore_if_not_exists = 2;
+  repeated PbAlterConfigsRequestInfo config_changes = 3;
+}
+
+message PbAlterConfigsRequestInfo {
+  required string config_key = 1;
+  optional string config_value = 2;
+  required int32 op_type = 3; // SET=0, DELETE=1, APPEND=2, SUBTRACT=3
+}
+
+message AlterTableConfigsResponse {
+}
+
+
+
 // get table request and response
 message GetTableInfoRequest {
   required PbTablePath table_path = 1;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index dd3eb62ab..e944105e3 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -21,6 +21,7 @@ import org.apache.fluss.cluster.ServerType;
 import org.apache.fluss.cluster.TabletServerInfo;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.InvalidAlterTableException;
 import org.apache.fluss.exception.InvalidCoordinatorException;
 import org.apache.fluss.exception.InvalidDatabaseException;
 import org.apache.fluss.exception.InvalidTableException;
@@ -36,6 +37,7 @@ import org.apache.fluss.metadata.PartitionSpec;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePartition;
@@ -43,6 +45,8 @@ import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.rpc.gateway.CoordinatorGateway;
 import org.apache.fluss.rpc.messages.AdjustIsrRequest;
 import org.apache.fluss.rpc.messages.AdjustIsrResponse;
+import org.apache.fluss.rpc.messages.AlterTableConfigsRequest;
+import org.apache.fluss.rpc.messages.AlterTableConfigsResponse;
 import org.apache.fluss.rpc.messages.CommitKvSnapshotRequest;
 import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse;
 import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
@@ -71,6 +75,7 @@ import 
org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest;
 import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse;
 import org.apache.fluss.rpc.messages.MetadataRequest;
 import org.apache.fluss.rpc.messages.MetadataResponse;
+import org.apache.fluss.rpc.messages.PbAlterConfigsRequestInfo;
 import org.apache.fluss.rpc.messages.PbHeartbeatReqForTable;
 import org.apache.fluss.rpc.messages.PbHeartbeatRespForTable;
 import org.apache.fluss.rpc.netty.server.Session;
@@ -98,6 +103,7 @@ import org.apache.fluss.server.metadata.BucketMetadata;
 import org.apache.fluss.server.metadata.PartitionMetadata;
 import org.apache.fluss.server.metadata.ServerMetadataCache;
 import org.apache.fluss.server.metadata.TableMetadata;
+import org.apache.fluss.server.utils.ServerRpcMessageUtils;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.data.BucketAssignment;
 import org.apache.fluss.server.zk.data.LeaderAndIsr;
@@ -114,10 +120,13 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
+import static org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS;
 import static 
org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters;
 import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.fromTablePath;
@@ -294,6 +303,73 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return CompletableFuture.completedFuture(new CreateTableResponse());
     }
 
+    @Override
+    public CompletableFuture<AlterTableConfigsResponse> alterTableConfigs(
+            AlterTableConfigsRequest request) {
+        TablePath tablePath = toTablePath(request.getTablePath());
+        tablePath.validate();
+        if (authorizer != null) {
+            authorizer.authorize(currentSession(), OperationType.ALTER, 
Resource.table(tablePath));
+        }
+
+        AlterTableConfigsResponse alterTableResponse = new 
AlterTableConfigsResponse();
+
+        handleFlussTableChanges(
+                tablePath, request.getConfigChangesList(), 
request.isIgnoreIfNotExists());
+
+        return CompletableFuture.completedFuture(alterTableResponse);
+    }
+
+    private void handleFlussTableChanges(
+            TablePath tablePath,
+            List<PbAlterConfigsRequestInfo> configsRequestInfos,
+            boolean ignoreIfNotExists) {
+        if (configsRequestInfos.isEmpty()) {
+            return;
+        }
+
+        List<TableChange> tableChanges =
+                configsRequestInfos.stream()
+                        .filter(Objects::nonNull)
+                        .map(ServerRpcMessageUtils::toFlussTableChange)
+                        .collect(Collectors.toList());
+
+        MetadataManager.TablePropertyChanges.Builder propertyChangesBuilder =
+                MetadataManager.TablePropertyChanges.builder();
+
+        for (TableChange tableChange : tableChanges) {
+            if (tableChange instanceof TableChange.SetOption) {
+                TableChange.SetOption setOption = (TableChange.SetOption) 
tableChange;
+                String optionKey = setOption.getKey();
+                if (TABLE_OPTIONS.containsKey(optionKey)) {
+                    // currently, we don't support alter any table options 
understand by Fluss
+                    throw new InvalidAlterTableException(
+                            "The option '" + optionKey + "' is not supported 
to alter set.");
+                } else {
+                    // otherwise, it's considered as custom property
+                    propertyChangesBuilder.setCustomProperty(optionKey, 
setOption.getValue());
+                }
+            } else if (tableChange instanceof TableChange.ResetOption) {
+                TableChange.ResetOption resetOption = 
(TableChange.ResetOption) tableChange;
+                String optionKey = resetOption.getKey();
+                if (TABLE_OPTIONS.containsKey(optionKey)) {
+                    // currently, we don't support alter any table options 
understand by Fluss
+                    throw new InvalidAlterTableException(
+                            "The option " + optionKey + " is not supported to 
alter reset.");
+                } else {
+                    // otherwise, it's considered as custom property
+                    propertyChangesBuilder.resetCustomProperty(optionKey);
+                }
+            } else {
+                throw new InvalidAlterTableException(
+                        "Unsupported alter table change: " + tableChange);
+            }
+        }
+
+        metadataManager.alterTableProperties(
+                tablePath, propertyChangesBuilder.build(), ignoreIfNotExists);
+    }
+
     private TableDescriptor applySystemDefaults(TableDescriptor 
tableDescriptor) {
         TableDescriptor newDescriptor = tableDescriptor;
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index a216a7eb1..1c6a18527 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -57,12 +57,14 @@ import javax.annotation.Nullable;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 
+import static 
org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableProperties;
 import static 
org.apache.fluss.server.utils.TableDescriptorValidation.validateTableDescriptor;
 
 /** A manager for metadata. */
@@ -302,6 +304,80 @@ public class MetadataManager {
                 "Fail to create table " + tablePath);
     }
 
+    public void alterTableProperties(
+            TablePath tablePath,
+            TablePropertyChanges tablePropertyChanges,
+            boolean ignoreIfNotExists) {
+        if (!databaseExists(tablePath.getDatabaseName())) {
+            throw new DatabaseNotExistException(
+                    "Database " + tablePath.getDatabaseName() + " does not 
exist.");
+        }
+        if (!tableExists(tablePath)) {
+            if (ignoreIfNotExists) {
+                return;
+            } else {
+                throw new TableNotExistException("Table " + tablePath + " does 
not exists.");
+            }
+        }
+
+        try {
+            TableRegistration updatedTableRegistration =
+                    getUpdatedTableRegistration(tablePath, 
tablePropertyChanges);
+            if (updatedTableRegistration != null) {
+                zookeeperClient.updateTable(tablePath, 
updatedTableRegistration);
+            } else {
+                LOG.debug(
+                        "No properties changed when alter table {}, skip 
update table.", tablePath);
+            }
+        } catch (Exception e) {
+            if (e instanceof KeeperException.NoNodeException) {
+                if (ignoreIfNotExists) {
+                    return;
+                }
+                throw new TableNotExistException("Table " + tablePath + " does 
not exists.");
+            } else {
+                throw new FlussRuntimeException("Failed to alter table: " + 
tablePath, e);
+            }
+        }
+    }
+
+    /**
+     * Get a new TableRegistration with updated properties.
+     *
+     * @param tablePath the table path.
+     * @param tablePropertyChanges the changes for the table properties
+     * @return the updated TableRegistration, or null if no properties updated.
+     */
+    private @Nullable TableRegistration getUpdatedTableRegistration(
+            TablePath tablePath, TablePropertyChanges tablePropertyChanges) {
+        TableRegistration existTableReg = getTableRegistration(tablePath);
+
+        Map<String, String> newProperties = new 
HashMap<>(existTableReg.properties);
+        Map<String, String> newCustomProperties = new 
HashMap<>(existTableReg.customProperties);
+
+        // set properties
+        newProperties.putAll(tablePropertyChanges.tablePropertiesToSet);
+        newCustomProperties.putAll(tablePropertyChanges.customPropertiesToSet);
+
+        // reset properties
+        for (String key : tablePropertyChanges.tablePropertiesToReset) {
+            newProperties.remove(key);
+        }
+
+        for (String key : tablePropertyChanges.customPropertiesToReset) {
+            newCustomProperties.remove(key);
+        }
+
+        // no properties change happen
+        if (newProperties.equals(existTableReg.properties)
+                && newCustomProperties.equals(existTableReg.customProperties)) 
{
+            return null;
+        }
+
+        validateAlterTableProperties(newProperties);
+        return existTableReg.newProperties(newProperties, newCustomProperties);
+    }
+
     public TableInfo getTable(TablePath tablePath) throws 
TableNotExistException {
         Optional<TableRegistration> optionalTable;
         try {
@@ -551,4 +627,62 @@ public class MetadataManager {
             throw new FlussRuntimeException(errorMsg, e);
         }
     }
+
+    /** To describe the changes of the properties of a table. */
+    public static class TablePropertyChanges {
+
+        private final Map<String, String> tablePropertiesToSet;
+        private final Set<String> tablePropertiesToReset;
+
+        private final Map<String, String> customPropertiesToSet;
+        private final Set<String> customPropertiesToReset;
+
+        private TablePropertyChanges(
+                Map<String, String> tablePropertiesToSet,
+                Set<String> tablePropertiesToReset,
+                Map<String, String> customPropertiesToSet,
+                Set<String> customPropertiesToReset) {
+            this.tablePropertiesToSet = tablePropertiesToSet;
+            this.tablePropertiesToReset = tablePropertiesToReset;
+            this.customPropertiesToSet = customPropertiesToSet;
+            this.customPropertiesToReset = customPropertiesToReset;
+        }
+
+        public static Builder builder() {
+            return new Builder();
+        }
+
+        /** The builder for {@link TablePropertyChanges}. */
+        public static class Builder {
+            private final Map<String, String> tablePropertiesToSet = new 
HashMap<>();
+            private final Set<String> tablePropertiesToReset = new HashSet<>();
+
+            private final Map<String, String> customPropertiesToSet = new 
HashMap<>();
+            private final Set<String> customPropertiesToReset = new 
HashSet<>();
+
+            public void setTableProperty(String key, String value) {
+                tablePropertiesToSet.put(key, value);
+            }
+
+            public void resetTableProperty(String key) {
+                tablePropertiesToReset.add(key);
+            }
+
+            public void setCustomProperty(String key, String value) {
+                customPropertiesToSet.put(key, value);
+            }
+
+            public void resetCustomProperty(String key) {
+                customPropertiesToReset.add(key);
+            }
+
+            public TablePropertyChanges build() {
+                return new TablePropertyChanges(
+                        tablePropertiesToSet,
+                        tablePropertiesToReset,
+                        customPropertiesToSet,
+                        customPropertiesToReset);
+            }
+        }
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index 88126f266..0769e464f 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -23,10 +23,12 @@ import org.apache.fluss.cluster.ServerType;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.fs.token.ObtainedSecurityToken;
+import org.apache.fluss.metadata.AlterTableConfigsOpType;
 import org.apache.fluss.metadata.PartitionSpec;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
@@ -80,6 +82,7 @@ import org.apache.fluss.rpc.messages.PbAdjustIsrReqForBucket;
 import org.apache.fluss.rpc.messages.PbAdjustIsrReqForTable;
 import org.apache.fluss.rpc.messages.PbAdjustIsrRespForBucket;
 import org.apache.fluss.rpc.messages.PbAdjustIsrRespForTable;
+import org.apache.fluss.rpc.messages.PbAlterConfigsRequestInfo;
 import org.apache.fluss.rpc.messages.PbBucketMetadata;
 import org.apache.fluss.rpc.messages.PbCreateAclRespInfo;
 import org.apache.fluss.rpc.messages.PbDropAclsFilterResult;
@@ -238,6 +241,26 @@ public class ServerRpcMessageUtils {
                 pbServerNode.hasRack() ? pbServerNode.getRack() : null);
     }
 
+    public static TableChange toFlussTableChange(
+            PbAlterConfigsRequestInfo pbAlterConfigsRequestInfo) {
+        AlterTableConfigsOpType opType =
+                
AlterTableConfigsOpType.from(pbAlterConfigsRequestInfo.getOpType());
+        switch (opType) {
+            case SET: // SET_OPTION
+                return TableChange.set(
+                        pbAlterConfigsRequestInfo.getConfigKey(),
+                        pbAlterConfigsRequestInfo.getConfigValue());
+            case DELETE: // RESET_OPTION
+                return 
TableChange.reset(pbAlterConfigsRequestInfo.getConfigKey());
+            case APPEND:
+            case SUBTRACT:
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported alter configs op type "
+                                + pbAlterConfigsRequestInfo.getOpType());
+        }
+    }
+
     public static MetadataResponse buildMetadataResponse(
             @Nullable ServerNode coordinatorServer,
             Set<ServerNode> aliveTabletServers,
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 1f4cd014a..4c11dc0cf 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -39,6 +39,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -93,6 +94,23 @@ public class TableDescriptorValidation {
         checkSystemColumns(schema);
     }
 
+    public static void validateAlterTableProperties(Map<String, String> 
properties) {
+        Configuration tableConf = Configuration.fromMap(properties);
+        // check properties should only contain table.* options,
+        // and this cluster know it,
+        // and value is valid
+        for (String key : tableConf.keySet()) {
+            if (!TABLE_OPTIONS.containsKey(key)) {
+                throw new InvalidConfigException(
+                        String.format(
+                                "'%s' is not a Fluss table property. Please 
use '.customProperty(..)' to set custom properties.",
+                                key));
+            }
+            ConfigOption<?> option = TABLE_OPTIONS.get(key);
+            validateOptionValue(tableConf, option);
+        }
+    }
+
     private static void checkSystemColumns(RowType schema) {
         List<String> fieldNames = schema.getFieldNames();
         List<String> unsupportedColumns =
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/TableRegistration.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/TableRegistration.java
index bbbecdc83..547326f6a 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/TableRegistration.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/TableRegistration.java
@@ -132,6 +132,20 @@ public class TableRegistration {
                 currentMillis);
     }
 
+    public TableRegistration newProperties(
+            Map<String, String> newProperties, Map<String, String> 
newCustomProperties) {
+        final long currentMillis = System.currentTimeMillis();
+        return new TableRegistration(
+                tableId,
+                comment,
+                partitionKeys,
+                new TableDistribution(bucketCount, bucketKeys),
+                newProperties,
+                newCustomProperties,
+                createdTime,
+                currentMillis);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
index 93352b20c..440ef9a64 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
@@ -32,6 +32,7 @@ import org.apache.fluss.exception.PartitionNotExistException;
 import org.apache.fluss.exception.SchemaNotExistException;
 import org.apache.fluss.exception.TableAlreadyExistException;
 import org.apache.fluss.exception.TableNotExistException;
+import org.apache.fluss.metadata.AlterTableConfigsOpType;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableDescriptor;
@@ -48,6 +49,7 @@ import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
 import org.apache.fluss.rpc.messages.ListDatabasesRequest;
 import org.apache.fluss.rpc.messages.MetadataRequest;
 import org.apache.fluss.rpc.messages.MetadataResponse;
+import org.apache.fluss.rpc.messages.PbAlterConfigsRequestInfo;
 import org.apache.fluss.rpc.messages.PbBucketMetadata;
 import org.apache.fluss.rpc.messages.PbPartitionMetadata;
 import org.apache.fluss.rpc.messages.PbServerNode;
@@ -86,6 +88,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.fluss.config.ConfigOptions.DEFAULT_LISTENER_NAME;
+import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newAlterTableConfigsRequest;
 import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newCreateDatabaseRequest;
 import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newCreateTableRequest;
 import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newDatabaseExistsRequest;
@@ -284,6 +287,29 @@ class TableManagerITCase {
         TableDescriptor gottenTable = 
TableDescriptor.fromJsonBytes(response.getTableJson());
         
assertThat(gottenTable).isEqualTo(tableDescriptor.withReplicationFactor(1));
 
+        // alter table
+        Map<String, String> setProperties = new HashMap<>();
+        setProperties.put("client.connect-timeout", "240s");
+
+        List<String> resetProperties = new ArrayList<>();
+
+        adminGateway
+                .alterTableConfigs(
+                        newAlterTableConfigsRequest(
+                                tablePath,
+                                alterTableProperties(setProperties, 
resetProperties),
+                                false))
+                .get();
+        // get the table and check it
+        GetTableInfoResponse responseAfterAlter =
+                gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+        TableDescriptor gottenTableAfterAlter =
+                
TableDescriptor.fromJsonBytes(responseAfterAlter.getTableJson());
+
+        String valueAfterAlter =
+                
gottenTableAfterAlter.getCustomProperties().get("client.connect-timeout");
+        assertThat(valueAfterAlter).isEqualTo("240s");
+
         // check assignment, just check replica numbers, don't care about 
actual assignment
         checkAssignmentWithReplicaFactor(
                 zkClient.getTableAssignment(response.getTableId()).get(),
@@ -733,6 +759,28 @@ class TableManagerITCase {
                 .build();
     }
 
+    private static List<PbAlterConfigsRequestInfo> alterTableProperties(
+            Map<String, String> setProperties, List<String> resetProperties) {
+        List<PbAlterConfigsRequestInfo> res = new ArrayList<>();
+
+        for (Map.Entry<String, String> entry : setProperties.entrySet()) {
+            PbAlterConfigsRequestInfo info = new PbAlterConfigsRequestInfo();
+            info.setConfigKey(entry.getKey());
+            info.setConfigValue(entry.getValue());
+            info.setOpType(AlterTableConfigsOpType.SET.value());
+            res.add(info);
+        }
+
+        for (String resetProperty : resetProperties) {
+            PbAlterConfigsRequestInfo info = new PbAlterConfigsRequestInfo();
+            info.setConfigKey(resetProperty);
+            info.setOpType(AlterTableConfigsOpType.DELETE.value());
+            res.add(info);
+        }
+
+        return res;
+    }
+
     private static Schema newPkSchema() {
         return Schema.newBuilder()
                 .column("a", DataTypes.INT())
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
index 5ef93cd71..15222305f 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
@@ -23,6 +23,8 @@ import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.rpc.gateway.CoordinatorGateway;
 import org.apache.fluss.rpc.messages.AdjustIsrRequest;
 import org.apache.fluss.rpc.messages.AdjustIsrResponse;
+import org.apache.fluss.rpc.messages.AlterTableConfigsRequest;
+import org.apache.fluss.rpc.messages.AlterTableConfigsResponse;
 import org.apache.fluss.rpc.messages.ApiVersionsRequest;
 import org.apache.fluss.rpc.messages.ApiVersionsResponse;
 import org.apache.fluss.rpc.messages.CommitKvSnapshotRequest;
@@ -139,6 +141,12 @@ public class TestCoordinatorGateway implements 
CoordinatorGateway {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public CompletableFuture<AlterTableConfigsResponse> alterTableConfigs(
+            AlterTableConfigsRequest request) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public CompletableFuture<DropTableResponse> dropTable(DropTableRequest 
request) {
         throw new UnsupportedOperationException();
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
index a3d6d4492..fa0482b10 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
@@ -27,6 +27,7 @@ import org.apache.fluss.record.KvRecordBatch;
 import org.apache.fluss.record.MemoryLogRecords;
 import org.apache.fluss.record.bytesview.MemorySegmentBytesView;
 import org.apache.fluss.rpc.gateway.CoordinatorGateway;
+import org.apache.fluss.rpc.messages.AlterTableConfigsRequest;
 import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
 import org.apache.fluss.rpc.messages.CreatePartitionRequest;
 import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -45,6 +46,7 @@ import 
org.apache.fluss.rpc.messages.ListPartitionInfosRequest;
 import org.apache.fluss.rpc.messages.ListTablesRequest;
 import org.apache.fluss.rpc.messages.LookupRequest;
 import org.apache.fluss.rpc.messages.MetadataRequest;
+import org.apache.fluss.rpc.messages.PbAlterConfigsRequestInfo;
 import org.apache.fluss.rpc.messages.PbFetchLogReqForBucket;
 import org.apache.fluss.rpc.messages.PbFetchLogReqForTable;
 import org.apache.fluss.rpc.messages.PbFetchLogRespForBucket;
@@ -139,6 +141,20 @@ public class RpcMessageTestUtils {
         return createTableRequest;
     }
 
+    public static AlterTableConfigsRequest newAlterTableConfigsRequest(
+            TablePath tablePath,
+            List<PbAlterConfigsRequestInfo> pbAlterConfigsRequestInfos,
+            boolean ignoreIfExists) {
+        AlterTableConfigsRequest alterTableConfigsRequest = new 
AlterTableConfigsRequest();
+        alterTableConfigsRequest
+                .addAllConfigChanges(pbAlterConfigsRequestInfos)
+                .setIgnoreIfNotExists(ignoreIfExists)
+                .setTablePath()
+                .setDatabaseName(tablePath.getDatabaseName())
+                .setTableName(tablePath.getTableName());
+        return alterTableConfigsRequest;
+    }
+
     public static MetadataRequest newMetadataRequest(List<TablePath> 
tablePaths) {
         MetadataRequest metadataRequest = new MetadataRequest();
         metadataRequest.addAllTablePaths(

Reply via email to