This is an automated email from the ASF dual-hosted git repository.

hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 63215867a [test] Add test for recovering coordinator server with 
schema evolution (#2176)
63215867a is described below

commit 63215867a8fffee66b5ca207510e147cf5d973c3
Author: Jackeyzhe <[email protected]>
AuthorDate: Wed Dec 17 14:06:18 2025 +0800

    [test] Add test for recovering coordinator server with schema evolution 
(#2176)
---
 .../server/coordinator/TableManagerITCase.java     | 61 ++++++++++++++++++++++
 .../server/testutils/RpcMessageTestUtils.java      |  7 ++-
 2 files changed, 67 insertions(+), 1 deletion(-)

diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
index b8b8891c7..8f12e43c4 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
@@ -36,6 +36,7 @@ import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.metrics.registry.MetricRegistry;
 import org.apache.fluss.rpc.GatewayClientProxy;
@@ -49,6 +50,7 @@ import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
 import org.apache.fluss.rpc.messages.ListDatabasesRequest;
 import org.apache.fluss.rpc.messages.MetadataRequest;
 import org.apache.fluss.rpc.messages.MetadataResponse;
+import org.apache.fluss.rpc.messages.PbAddColumn;
 import org.apache.fluss.rpc.messages.PbAlterConfig;
 import org.apache.fluss.rpc.messages.PbBucketMetadata;
 import org.apache.fluss.rpc.messages.PbPartitionMetadata;
@@ -62,6 +64,8 @@ import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.data.BucketAssignment;
 import org.apache.fluss.server.zk.data.TableAssignment;
 import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.utils.json.DataTypeJsonSerde;
+import org.apache.fluss.utils.json.JsonSerdeUtils;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -298,6 +302,7 @@ class TableManagerITCase {
                         newAlterTableRequest(
                                 tablePath,
                                 alterTableProperties(setProperties, 
resetProperties),
+                                Collections.emptyList(),
                                 false))
                 .get();
         // get the table and check it
@@ -655,6 +660,48 @@ class TableManagerITCase {
                         FLUSS_CLUSTER_EXTENSION.getTabletServerNodes());
     }
 
+    @Test
+    void testSchemaEvolution() throws Exception {
+        AdminReadOnlyGateway gateway = getAdminOnlyGateway(true);
+        AdminGateway adminGateway = getAdminGateway();
+
+        // create database and table
+        String db1 = "db1";
+        String tb1 = "tb1";
+        TablePath tablePath = TablePath.of(db1, tb1);
+        adminGateway.createDatabase(newCreateDatabaseRequest(db1, 
false)).get();
+        TableDescriptor tableDescriptor = newPkTable();
+        adminGateway.createTable(newCreateTableRequest(tablePath, 
tableDescriptor, false)).get();
+
+        // add column
+        adminGateway
+                .alterTable(
+                        newAlterTableRequest(
+                                tablePath, Collections.emptyList(), 
alterTableAddColumns(), false))
+                .get();
+
+        // restart coordinatorServer
+        FLUSS_CLUSTER_EXTENSION.stopCoordinatorServer();
+        FLUSS_CLUSTER_EXTENSION.startCoordinatorServer();
+
+        // check metadata response
+        MetadataResponse metadataResponse =
+                
gateway.metadata(newMetadataRequest(Collections.singletonList(tablePath))).get();
+        assertThat(metadataResponse.getTableMetadatasCount()).isEqualTo(1);
+        PbTableMetadata pbTableMetadata = 
metadataResponse.getTableMetadataAt(0);
+        assertThat(pbTableMetadata.getSchemaId()).isEqualTo(2);
+        TableInfo tableInfo =
+                TableInfo.of(
+                        tablePath,
+                        pbTableMetadata.getTableId(),
+                        pbTableMetadata.getSchemaId(),
+                        
TableDescriptor.fromJsonBytes(pbTableMetadata.getTableJson()),
+                        pbTableMetadata.getCreatedTime(),
+                        pbTableMetadata.getModifiedTime());
+        List<Schema.Column> columns = tableInfo.getSchema().getColumns();
+        assertThat(columns.size()).isEqualTo(3);
+    }
+
     private void checkBucketMetadata(int expectBucketCount, 
List<PbBucketMetadata> bucketMetadata) {
         Set<Integer> liveServers =
                 FLUSS_CLUSTER_EXTENSION.getTabletServerNodes().stream()
@@ -781,6 +828,20 @@ class TableManagerITCase {
         return res;
     }
 
+    private static List<PbAddColumn> alterTableAddColumns() {
+        List<PbAddColumn> addColumns = new ArrayList<>();
+        PbAddColumn newColumn = new PbAddColumn();
+        newColumn
+                .setColumnName("new_column")
+                .setDataTypeJson(
+                        JsonSerdeUtils.writeValueAsBytes(
+                                DataTypes.STRING(), 
DataTypeJsonSerde.INSTANCE))
+                .setComment("new_column")
+                .setColumnPositionType(0);
+        addColumns.add(newColumn);
+        return addColumns;
+    }
+
     private static Schema newPkSchema() {
         return Schema.newBuilder()
                 .column("a", DataTypes.INT())
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
index c9a2f6871..b567eff1f 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
@@ -48,6 +48,7 @@ import 
org.apache.fluss.rpc.messages.ListPartitionInfosRequest;
 import org.apache.fluss.rpc.messages.ListTablesRequest;
 import org.apache.fluss.rpc.messages.LookupRequest;
 import org.apache.fluss.rpc.messages.MetadataRequest;
+import org.apache.fluss.rpc.messages.PbAddColumn;
 import org.apache.fluss.rpc.messages.PbAlterConfig;
 import org.apache.fluss.rpc.messages.PbFetchLogReqForBucket;
 import org.apache.fluss.rpc.messages.PbFetchLogReqForTable;
@@ -145,9 +146,13 @@ public class RpcMessageTestUtils {
     }
 
     public static AlterTableRequest newAlterTableRequest(
-            TablePath tablePath, List<PbAlterConfig> alterConfigs, boolean 
ignoreIfExists) {
+            TablePath tablePath,
+            List<PbAlterConfig> alterConfigs,
+            List<PbAddColumn> addColumns,
+            boolean ignoreIfExists) {
         AlterTableRequest request = new AlterTableRequest();
         request.addAllConfigChanges(alterConfigs)
+                .addAllAddColumns(addColumns)
                 .setIgnoreIfNotExists(ignoreIfExists)
                 .setTablePath()
                 .setDatabaseName(tablePath.getDatabaseName())

Reply via email to