This is an automated email from the ASF dual-hosted git repository.
hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 63215867a [test] Add test for recovering coordinator server with
schema evolution (#2176)
63215867a is described below
commit 63215867a8fffee66b5ca207510e147cf5d973c3
Author: Jackeyzhe <[email protected]>
AuthorDate: Wed Dec 17 14:06:18 2025 +0800
[test] Add test for recovering coordinator server with schema evolution
(#2176)
---
.../server/coordinator/TableManagerITCase.java | 61 ++++++++++++++++++++++
.../server/testutils/RpcMessageTestUtils.java | 7 ++-
2 files changed, 67 insertions(+), 1 deletion(-)
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
index b8b8891c7..8f12e43c4 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
@@ -36,6 +36,7 @@ import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metrics.registry.MetricRegistry;
import org.apache.fluss.rpc.GatewayClientProxy;
@@ -49,6 +50,7 @@ import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
import org.apache.fluss.rpc.messages.ListDatabasesRequest;
import org.apache.fluss.rpc.messages.MetadataRequest;
import org.apache.fluss.rpc.messages.MetadataResponse;
+import org.apache.fluss.rpc.messages.PbAddColumn;
import org.apache.fluss.rpc.messages.PbAlterConfig;
import org.apache.fluss.rpc.messages.PbBucketMetadata;
import org.apache.fluss.rpc.messages.PbPartitionMetadata;
@@ -62,6 +64,8 @@ import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.BucketAssignment;
import org.apache.fluss.server.zk.data.TableAssignment;
import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.utils.json.DataTypeJsonSerde;
+import org.apache.fluss.utils.json.JsonSerdeUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -298,6 +302,7 @@ class TableManagerITCase {
newAlterTableRequest(
tablePath,
alterTableProperties(setProperties,
resetProperties),
+ Collections.emptyList(),
false))
.get();
// get the table and check it
@@ -655,6 +660,48 @@ class TableManagerITCase {
FLUSS_CLUSTER_EXTENSION.getTabletServerNodes());
}
+ @Test
+ void testSchemaEvolution() throws Exception {
+ AdminReadOnlyGateway gateway = getAdminOnlyGateway(true);
+ AdminGateway adminGateway = getAdminGateway();
+
+ // create database and table
+ String db1 = "db1";
+ String tb1 = "tb1";
+ TablePath tablePath = TablePath.of(db1, tb1);
+ adminGateway.createDatabase(newCreateDatabaseRequest(db1,
false)).get();
+ TableDescriptor tableDescriptor = newPkTable();
+ adminGateway.createTable(newCreateTableRequest(tablePath,
tableDescriptor, false)).get();
+
+ // add column
+ adminGateway
+ .alterTable(
+ newAlterTableRequest(
+ tablePath, Collections.emptyList(),
alterTableAddColumns(), false))
+ .get();
+
+ // restart coordinatorServer
+ FLUSS_CLUSTER_EXTENSION.stopCoordinatorServer();
+ FLUSS_CLUSTER_EXTENSION.startCoordinatorServer();
+
+ // check metadata response
+ MetadataResponse metadataResponse =
+
gateway.metadata(newMetadataRequest(Collections.singletonList(tablePath))).get();
+ assertThat(metadataResponse.getTableMetadatasCount()).isEqualTo(1);
+ PbTableMetadata pbTableMetadata =
metadataResponse.getTableMetadataAt(0);
+ assertThat(pbTableMetadata.getSchemaId()).isEqualTo(2);
+ TableInfo tableInfo =
+ TableInfo.of(
+ tablePath,
+ pbTableMetadata.getTableId(),
+ pbTableMetadata.getSchemaId(),
+
TableDescriptor.fromJsonBytes(pbTableMetadata.getTableJson()),
+ pbTableMetadata.getCreatedTime(),
+ pbTableMetadata.getModifiedTime());
+ List<Schema.Column> columns = tableInfo.getSchema().getColumns();
+ assertThat(columns.size()).isEqualTo(3);
+ }
+
private void checkBucketMetadata(int expectBucketCount,
List<PbBucketMetadata> bucketMetadata) {
Set<Integer> liveServers =
FLUSS_CLUSTER_EXTENSION.getTabletServerNodes().stream()
@@ -781,6 +828,20 @@ class TableManagerITCase {
return res;
}
+ private static List<PbAddColumn> alterTableAddColumns() {
+ List<PbAddColumn> addColumns = new ArrayList<>();
+ PbAddColumn newColumn = new PbAddColumn();
+ newColumn
+ .setColumnName("new_column")
+ .setDataTypeJson(
+ JsonSerdeUtils.writeValueAsBytes(
+ DataTypes.STRING(),
DataTypeJsonSerde.INSTANCE))
+ .setComment("new_column")
+ .setColumnPositionType(0);
+ addColumns.add(newColumn);
+ return addColumns;
+ }
+
private static Schema newPkSchema() {
return Schema.newBuilder()
.column("a", DataTypes.INT())
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
index c9a2f6871..b567eff1f 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
@@ -48,6 +48,7 @@ import
org.apache.fluss.rpc.messages.ListPartitionInfosRequest;
import org.apache.fluss.rpc.messages.ListTablesRequest;
import org.apache.fluss.rpc.messages.LookupRequest;
import org.apache.fluss.rpc.messages.MetadataRequest;
+import org.apache.fluss.rpc.messages.PbAddColumn;
import org.apache.fluss.rpc.messages.PbAlterConfig;
import org.apache.fluss.rpc.messages.PbFetchLogReqForBucket;
import org.apache.fluss.rpc.messages.PbFetchLogReqForTable;
@@ -145,9 +146,13 @@ public class RpcMessageTestUtils {
}
public static AlterTableRequest newAlterTableRequest(
- TablePath tablePath, List<PbAlterConfig> alterConfigs, boolean
ignoreIfExists) {
+ TablePath tablePath,
+ List<PbAlterConfig> alterConfigs,
+ List<PbAddColumn> addColumns,
+ boolean ignoreIfExists) {
AlterTableRequest request = new AlterTableRequest();
request.addAllConfigChanges(alterConfigs)
+ .addAllAddColumns(addColumns)
.setIgnoreIfNotExists(ignoreIfExists)
.setTablePath()
.setDatabaseName(tablePath.getDatabaseName())