This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 885a0591a [FLINK-39204][pipeline-connector/fluss] Fluss yaml sink 
support add column at last
885a0591a is described below

commit 885a0591a23e3ccc20c45d1ad638bec577e0d2a5
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Mar 6 18:55:43 2026 +0800

    [FLINK-39204][pipeline-connector/fluss] Fluss yaml sink support add column 
at last
    
    This closes  #4305.
---
 .../fluss/sink/FlussEventSerializationSchema.java  |  55 ++++--
 .../fluss/sink/FlussMetaDataApplier.java           |  73 ++++---
 .../fluss/sink/row/{ => row}/CdcAsFlussRow.java    |   0
 .../connectors/fluss/utils/FlussConversions.java   |  30 ++-
 .../cdc/connectors/fluss/FlussPipelineITCase.java  | 211 ++++++++++++++++++---
 .../sink/FlussEventSerializationSchemaTest.java    |  76 +++++++-
 .../fluss/sink/FlussMetadataApplierTest.java       |  91 ++++++++-
 .../connectors/fluss/sink/v2/FlussSinkITCase.java  |  43 ++++-
 .../fluss/utils/FlussConversionsTest.java          |  12 +-
 9 files changed, 488 insertions(+), 103 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java
index 086da5894..c8ac822c2 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java
@@ -17,13 +17,16 @@
 
 package org.apache.flink.cdc.connectors.fluss.sink;
 
+import org.apache.flink.cdc.common.event.AddColumnEvent;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.OperationType;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
 import org.apache.flink.cdc.connectors.fluss.sink.row.CdcAsFlussRow;
 import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEvent;
 import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEventSerializer;
@@ -43,19 +46,19 @@ import java.util.Map;
 import static 
org.apache.flink.cdc.connectors.fluss.sink.v2.FlussOperationType.APPEND;
 import static 
org.apache.flink.cdc.connectors.fluss.sink.v2.FlussOperationType.DELETE;
 import static 
org.apache.flink.cdc.connectors.fluss.sink.v2.FlussOperationType.UPSERT;
-import static 
org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue;
+import static 
org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.sameSchemaIgnoreCommentAndDefaultValue;
 import static 
org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussSchema;
 
 /** Serialization schema that converts a CDC data record to a Fluss event. */
 public class FlussEventSerializationSchema implements 
FlussEventSerializer<Event> {
     private static final long serialVersionUID = 1L;
 
-    private transient Map<TableId, TableSchemaInfo> tableInfoMap;
+    private transient Map<TableId, TableSchemaInfo> schemaMaps;
     private transient Connection connection;
 
     @Override
     public void open(Connection connection) {
-        this.tableInfoMap = new HashMap<>();
+        this.schemaMaps = new HashMap<>();
         this.connection = connection;
     }
 
@@ -82,29 +85,45 @@ public class FlussEventSerializationSchema implements 
FlussEventSerializer<Event
             org.apache.flink.cdc.common.schema.Schema newSchema =
                     ((CreateTableEvent) event).getSchema();
             // if the table is not exist or the schema is changed, update the 
table info.
-            if (!tableInfoMap.containsKey(tableId)
-                    || !sameCdcColumnsIgnoreCommentAndDefaultValue(
-                            tableInfoMap.get(tableId).upstreamCdcSchema, 
newSchema)) {
+            if (!schemaMaps.containsKey(tableId)
+                    || !sameSchemaIgnoreCommentAndDefaultValue(
+                            schemaMaps.get(tableId).upstreamCdcSchema, 
newSchema)) {
                 Table table = connection.getTable(getTablePath(tableId));
                 TableSchemaInfo newSchemaInfo =
                         new TableSchemaInfo(newSchema, 
table.getTableInfo().getSchema());
-                tableInfoMap.put(tableId, newSchemaInfo);
+                schemaMaps.put(tableId, newSchemaInfo);
+            }
+        } else if (event instanceof AddColumnEvent) {
+            TableSchemaInfo schemaInfo = schemaMaps.get(event.tableId());
+            if (schemaInfo == null) {
+                throw new IllegalStateException(
+                        "Cannot apply AddColumnEvent for table "
+                                + event.tableId()
+                                + ": table schema not found. Ensure 
CreateTableEvent is processed before AddColumnEvent.");
+            }
+            Schema schema = schemaInfo.upstreamCdcSchema;
+            if (!SchemaUtils.isSchemaChangeEventRedundant(schema, event)) {
+                Table table = connection.getTable(getTablePath(tableId));
+                TableSchemaInfo newSchemaInfo =
+                        new TableSchemaInfo(
+                                SchemaUtils.applySchemaChangeEvent(schema, 
event),
+                                table.getTableInfo().getSchema());
+                schemaMaps.put(tableId, newSchemaInfo);
             }
         } else {
-            // TODO: Logics for altering tables are not supported yet.
-            // This is anticipated to be supported in Fluss version 0.8.0.
-            throw new RuntimeException(
-                    "Schema change type not supported. Only CreateTableEvent 
is allowed at the moment.");
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Schema change type %s not supported. Only 
CreateTableEvent and AddColumnEvent are allowed at the moment.",
+                            event.getClass()));
         }
     }
 
     private FlussRowWithOp applyDataChangeEvent(DataChangeEvent record) {
         OperationType op = record.op();
-        TableSchemaInfo tableSchemaInfo = tableInfoMap.get(record.tableId());
+        TableSchemaInfo tableSchemaInfo = schemaMaps.get(record.tableId());
         Preconditions.checkNotNull(
                 tableSchemaInfo, "Table schema not found for table " + 
record.tableId());
-        int flussFieldCount =
-                
tableSchemaInfo.downStreamFlusstreamSchema.getRowType().getFieldCount();
+        int flussFieldCount = 
tableSchemaInfo.downstreamFlussSchema.getRowType().getFieldCount();
         boolean hasPrimaryKey = 
!tableSchemaInfo.upstreamCdcSchema.primaryKeys().isEmpty();
         switch (op) {
             case INSERT:
@@ -130,17 +149,17 @@ public class FlussEventSerializationSchema implements 
FlussEventSerializer<Event
 
     private static class TableSchemaInfo {
         org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema;
-        org.apache.fluss.metadata.Schema downStreamFlusstreamSchema;
+        org.apache.fluss.metadata.Schema downstreamFlussSchema;
         Map<Integer, Integer> indexMapping;
 
         private TableSchemaInfo(
                 org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema,
-                org.apache.fluss.metadata.Schema downStreamFlusstreamSchema) {
+                org.apache.fluss.metadata.Schema downstreamFlussSchema) {
             this.upstreamCdcSchema = upstreamCdcSchema;
-            this.downStreamFlusstreamSchema = downStreamFlusstreamSchema;
+            this.downstreamFlussSchema = downstreamFlussSchema;
             this.indexMapping =
                     sanityCheckAndGenerateIndexMapping(
-                            toFlussSchema(upstreamCdcSchema), 
downStreamFlusstreamSchema);
+                            toFlussSchema(upstreamCdcSchema), 
downstreamFlussSchema);
         }
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
index 4e28623e2..e2cda4bd0 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
@@ -17,12 +17,14 @@
 
 package org.apache.flink.cdc.connectors.fluss.sink;
 
+import org.apache.flink.cdc.common.event.AddColumnEvent;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.DropTableEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEventType;
 import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import org.apache.flink.table.api.ValidationException;
 
@@ -31,12 +33,14 @@ import org.apache.fluss.client.ConnectionFactory;
 import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -47,6 +51,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
 import static 
org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
 import static 
org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussTable;
+import static 
org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussType;
 
 /** {@link MetadataApplier} for fluss. */
 public class FlussMetaDataApplier implements MetadataApplier {
@@ -58,9 +63,6 @@ public class FlussMetaDataApplier implements MetadataApplier {
     private Set<SchemaChangeEventType> enabledEventTypes =
             new HashSet<>(Arrays.asList(CREATE_TABLE, DROP_TABLE));
 
-    private transient Connection connection;
-    private transient Admin admin;
-
     public FlussMetaDataApplier(
             Configuration flussClientConfig,
             Map<String, String> tableProperties,
@@ -92,22 +94,25 @@ public class FlussMetaDataApplier implements 
MetadataApplier {
     @Override
     public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
         LOG.info("fluss metadata applier receive schemaChangeEvent {}", 
schemaChangeEvent);
-        Admin admin = getAdmin();
         if (schemaChangeEvent instanceof CreateTableEvent) {
             CreateTableEvent createTableEvent = (CreateTableEvent) 
schemaChangeEvent;
-            applyCreateTable(admin, createTableEvent);
+            applyCreateTable(createTableEvent);
         } else if (schemaChangeEvent instanceof DropTableEvent) {
             DropTableEvent dropTableEvent = (DropTableEvent) schemaChangeEvent;
-            applyDropTable(admin, dropTableEvent);
+            applyDropTable(dropTableEvent);
+        } else if (schemaChangeEvent instanceof AddColumnEvent) {
+            AddColumnEvent addColumnEvent = (AddColumnEvent) schemaChangeEvent;
+            applyAddColumnTable(addColumnEvent);
         } else {
             throw new IllegalArgumentException(
-                    "fluss metadata applier only support CreateTableEvent now 
but receives "
+                    "fluss metadata applier only supports CreateTableEvent and 
AddColumnEvent now but receives "
                             + schemaChangeEvent);
         }
     }
 
-    private void applyCreateTable(Admin admin, CreateTableEvent event) {
-        try {
+    private void applyCreateTable(CreateTableEvent event) {
+        try (Connection connection = 
ConnectionFactory.createConnection(flussClientConfig);
+                Admin admin = connection.getAdmin()) {
             TableId tableId = event.tableId();
             TablePath tablePath = new TablePath(tableId.getSchemaName(), 
tableId.getTableName());
             String tableIdentifier = tablePath.getDatabaseName() + "." + 
tablePath.getTableName();
@@ -129,8 +134,9 @@ public class FlussMetaDataApplier implements 
MetadataApplier {
         }
     }
 
-    private void applyDropTable(Admin admin, DropTableEvent event) {
-        try {
+    private void applyDropTable(DropTableEvent event) {
+        try (Connection connection = 
ConnectionFactory.createConnection(flussClientConfig);
+                Admin admin = connection.getAdmin()) {
             TableId tableId = event.tableId();
             TablePath tablePath = new TablePath(tableId.getSchemaName(), 
tableId.getTableName());
             admin.dropTable(tablePath, true).get();
@@ -140,21 +146,36 @@ public class FlussMetaDataApplier implements 
MetadataApplier {
         }
     }
 
-    private Admin getAdmin() {
-        if (connection == null) {
-            connection = ConnectionFactory.createConnection(flussClientConfig);
-            admin = connection.getAdmin();
-        }
-        return admin;
-    }
-
-    @Override
-    public void close() throws Exception {
-        if (admin != null) {
-            admin.close();
-        }
-        if (connection != null) {
-            connection.close();
+    private void applyAddColumnTable(AddColumnEvent event) {
+        List<TableChange> tableChanges = new ArrayList<>();
+        event.getAddedColumns()
+                .forEach(
+                        columnWithPosition -> {
+                            if (columnWithPosition.getPosition()
+                                    != AddColumnEvent.ColumnPosition.LAST) {
+                                throw new IllegalArgumentException(
+                                        "Fluss metadata applier only supports 
LAST position for adding columns now but receives "
+                                                + 
columnWithPosition.getPosition()
+                                                + ". Consider using 
'schema.change.behavior' configuration with 'LENIENT' mode to handle schema 
changes more flexibly.");
+                            }
+
+                            Column column = columnWithPosition.getAddColumn();
+                            tableChanges.add(
+                                    TableChange.addColumn(
+                                            column.getName(),
+                                            toFlussType(column.getType()),
+                                            column.getComment(),
+                                            
TableChange.ColumnPosition.last()));
+                        });
+
+        try (Connection connection = 
ConnectionFactory.createConnection(flussClientConfig);
+                Admin admin = connection.getAdmin()) {
+            TableId tableId = event.tableId();
+            TablePath tablePath = new TablePath(tableId.getSchemaName(), 
tableId.getTableName());
+            admin.alterTable(tablePath, tableChanges, true).get();
+        } catch (Exception e) {
+            LOG.error("Failed to apply schema change {}", event, e);
+            throw new RuntimeException(e);
         }
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussRow.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/row/CdcAsFlussRow.java
similarity index 100%
rename from 
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussRow.java
rename to 
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/row/CdcAsFlussRow.java
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java
index 25a9f53b6..6858c3e65 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java
@@ -41,7 +41,6 @@ import org.apache.flink.cdc.common.types.ZonedTimestampType;
 import org.apache.flink.cdc.common.utils.Preconditions;
 import org.apache.flink.util.CollectionUtil;
 
-import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableDescriptor;
 
@@ -94,28 +93,25 @@ public class FlussConversions {
             schemBuilder.primaryKey(cdcSchema.primaryKeys());
         }
 
-        Schema schema =
-                schemBuilder
-                        .fromColumns(
-                                cdcSchema.getColumns().stream()
-                                        .map(
-                                                column ->
-                                                        new Schema.Column(
-                                                                
column.getName(),
-                                                                
toFlussType(column.getType()),
-                                                                
column.getComment()))
-                                        .collect(Collectors.toList()))
-                        .build();
-        return schema;
+        // use schemBuilder.column rather than schemBuilder.fromColumns to 
reassign nested row id.
+        cdcSchema
+                .getColumns()
+                .forEach(
+                        column ->
+                                schemBuilder
+                                        .column(
+                                                column.getName(),
+                                                
column.getType().accept(TO_FLUSS_TYPE_INSTANCE))
+                                        .withComment(column.getComment()));
+        return schemBuilder.build();
     }
 
-    @VisibleForTesting
-    private static org.apache.fluss.types.DataType toFlussType(
+    public static org.apache.fluss.types.DataType toFlussType(
             org.apache.flink.cdc.common.types.DataType flinkDataType) {
         return flinkDataType.accept(TO_FLUSS_TYPE_INSTANCE);
     }
 
-    public static Boolean sameCdcColumnsIgnoreCommentAndDefaultValue(
+    public static Boolean sameSchemaIgnoreCommentAndDefaultValue(
             org.apache.flink.cdc.common.schema.Schema oldSchema,
             org.apache.flink.cdc.common.schema.Schema newSchema) {
         List<org.apache.flink.cdc.common.schema.Column> upstreamColumns = 
oldSchema.getColumns();
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
index bfc71a1ad..602b32680 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
@@ -19,12 +19,13 @@ package org.apache.flink.cdc.connectors.fluss;
 
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.pipeline.PipelineOptions;
-import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.RowType;
@@ -64,6 +65,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR;
+import static org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior.EVOLVE;
+import static org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior.IGNORE;
+import static 
org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior.LENIENT;
 import static 
org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_1;
 import static 
org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_2;
 import static 
org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
@@ -233,7 +238,7 @@ public class FlussPipelineITCase {
         eventOfSplits.add(Collections.singletonList(updateEvent));
         eventOfSplits.add(Collections.singletonList(deleteEvent));
 
-        composeAndExecute(eventOfSplits);
+        composeAndExecuteInEvolveMode(eventOfSplits);
         checkResult(TABLE_1, Arrays.asList("+I[2, b2]", "+I[3, c]"));
     }
 
@@ -279,16 +284,114 @@ public class FlussPipelineITCase {
         split1.add(insertEvent3);
         eventOfSplits.add(split1);
 
-        composeAndExecute(eventOfSplits);
+        composeAndExecuteInEvolveMode(eventOfSplits);
         checkResult(TABLE_1, Arrays.asList("+I[1, a]", "+I[2, b]", "+I[3, 
c]"));
     }
 
     @Test
-    void testSingleLogTableWithSchemaChange() {
-        assertThatThrownBy(() -> 
composeAndExecute(ValuesDataSourceHelper.singleSplitSingleTable()))
+    void testSingleLogTableWithNotSupportedSchemaChange() throws Exception {
+        assertThatThrownBy(
+                        () ->
+                                composeAndExecuteInEvolveMode(
+                                        
ValuesDataSourceHelper.singleSplitSingleTable()))
                 .rootCause()
                 .hasMessageContaining(
-                        "fluss metadata applier only support CreateTableEvent 
now but receives AddColumnEvent");
+                        "fluss metadata applier only supports CreateTableEvent 
and AddColumnEvent now but receives RenameColumnEvent");
+    }
+
+    @Test
+    void testSingleLogTableInLenientMode() throws Exception {
+        // test add/drop/rename column in lenient mode
+        composeAndExecute(
+                ValuesDataSourceHelper.singleSplitSingleTable(),
+                new Configuration().set(PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
LENIENT));
+        checkResult(
+                TABLE_1, Arrays.asList("+I[2, null, null, null, x]", "+I[3, 3, 
null, null, null]"));
+    }
+
+    @Test
+    void testSingleLogTableInIgnoreMode() throws Exception {
+        // test add/drop/rename column in ignore mode
+        composeAndExecute(
+                ValuesDataSourceHelper.singleSplitSingleTable(),
+                new Configuration().set(PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
IGNORE));
+        checkResult(TABLE_1, Arrays.asList("+I[2, null]", "+I[3, 3]"));
+    }
+
+    @Test
+    void testSingleLogTableWithAddColumn() throws Exception {
+        List<List<Event>> eventOfSplits = new ArrayList<>();
+        List<Event> split1 = new ArrayList<>();
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("col1", DataTypes.STRING())
+                        .physicalColumn("col2", DataTypes.STRING())
+                        .build();
+        CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, 
schema);
+        split1.add(createTableEvent);
+        DataChangeEvent insertEvent1 =
+                DataChangeEvent.insertEvent(
+                        TABLE_1,
+                        new BinaryRecordDataGenerator(
+                                        RowType.of(DataTypes.STRING(), 
DataTypes.STRING()))
+                                .generate(
+                                        new Object[] {
+                                            BinaryStringData.fromString("1"),
+                                            BinaryStringData.fromString("a")
+                                        }));
+        split1.add(insertEvent1);
+
+        // add column at last.
+        split1.add(
+                new AddColumnEvent(
+                        TABLE_1,
+                        Collections.singletonList(
+                                AddColumnEvent.last(
+                                        Column.physicalColumn("newColumn", 
DataTypes.STRING())))));
+        DataChangeEvent insertEvent2 =
+                DataChangeEvent.insertEvent(
+                        TABLE_1,
+                        new BinaryRecordDataGenerator(
+                                        RowType.of(
+                                                DataTypes.STRING(),
+                                                DataTypes.STRING(),
+                                                DataTypes.STRING()))
+                                .generate(
+                                        new Object[] {
+                                            BinaryStringData.fromString("2"),
+                                            BinaryStringData.fromString("b"),
+                                            BinaryStringData.fromString("bb")
+                                        }));
+        split1.add(insertEvent2);
+        split1.add(
+                new AddColumnEvent(
+                        TABLE_1,
+                        Collections.singletonList(
+                                AddColumnEvent.last(
+                                        Column.physicalColumn("newColumn2", 
DataTypes.STRING())))));
+        DataChangeEvent insertEvent3 =
+                DataChangeEvent.insertEvent(
+                        TABLE_1,
+                        new BinaryRecordDataGenerator(
+                                        RowType.of(
+                                                DataTypes.STRING(),
+                                                DataTypes.STRING(),
+                                                DataTypes.STRING(),
+                                                DataTypes.STRING()))
+                                .generate(
+                                        new Object[] {
+                                            BinaryStringData.fromString("3"),
+                                            BinaryStringData.fromString("c"),
+                                            BinaryStringData.fromString("cc"),
+                                            BinaryStringData.fromString("ccc")
+                                        }));
+        split1.add(insertEvent3);
+        eventOfSplits.add(split1);
+
+        composeAndExecuteInEvolveMode(eventOfSplits);
+        checkResult(
+                TABLE_1,
+                Arrays.asList("+I[1, a, null, null]", "+I[2, b, bb, null]", 
"+I[3, c, cc, ccc]"));
     }
 
     @Test
@@ -298,7 +401,8 @@ public class FlussPipelineITCase {
                                 composeAndExecute(
                                         
ValuesDataSourceHelper.singleSplitSingleTable(),
                                         Collections.singletonMap(
-                                                BOOTSTRAP_SERVERS.key(), 
getBootstrapServers())))
+                                                BOOTSTRAP_SERVERS.key(), 
getBootstrapServers()),
+                                        new Configuration()))
                 .rootCause()
                 .hasMessageContaining(
                         "The connection has not completed authentication yet. 
This may be caused by a missing or incorrect configuration of 
'client.security.protocol' on the client side.");
@@ -318,7 +422,8 @@ public class FlussPipelineITCase {
                         () ->
                                 composeAndExecute(
                                         
ValuesDataSourceHelper.singleSplitSingleTable(),
-                                        sinkOption))
+                                        sinkOption,
+                                        new Configuration()))
                 .rootCause()
                 .hasMessageContaining("'table.non-key' is not a recognized 
Fluss table property");
     }
@@ -398,7 +503,7 @@ public class FlussPipelineITCase {
         split1.add(insertTabl2Event3);
         eventOfSplits.add(split1);
 
-        composeAndExecute(eventOfSplits);
+        composeAndExecuteInEvolveMode(eventOfSplits);
         checkResult(TABLE_1, Arrays.asList("+I[1, 1]", "+I[2, 2]", "+I[3, 
3]"));
         checkResult(TABLE_2, Arrays.asList("+I[1, 1]", "+I[2, 2]", "+I[3, 
3]"));
     }
@@ -438,7 +543,7 @@ public class FlussPipelineITCase {
                                 new Object[] {
                                     BinaryStringData.fromString("1"),
                                     BinaryStringData.fromString("a"),
-                                    BinaryStringData.fromString("1")
+                                    BinaryStringData.fromString("aa")
                                 }));
         split1.add(insertEvent1);
         DataChangeEvent insertEvent2 =
@@ -447,14 +552,39 @@ public class FlussPipelineITCase {
                         generator.generate(
                                 new Object[] {
                                     BinaryStringData.fromString("2"),
-                                    BinaryStringData.fromString("2"),
-                                    BinaryStringData.fromString("2")
+                                    BinaryStringData.fromString("b"),
+                                    BinaryStringData.fromString("bb")
                                 }));
         split1.add(insertEvent2);
         eventOfSplits.add(split1);
 
-        composeAndExecute(eventOfSplits);
-        checkResult(TABLE_1, Arrays.asList("+I[a, 1]", "+I[2, 2]"));
+        // add column at last
+        split1.add(
+                new AddColumnEvent(
+                        TABLE_1,
+                        Collections.singletonList(
+                                AddColumnEvent.last(
+                                        Column.physicalColumn("newColumn", 
DataTypes.STRING())))));
+        DataChangeEvent insertEvent3 =
+                DataChangeEvent.insertEvent(
+                        TABLE_1,
+                        new BinaryRecordDataGenerator(
+                                        RowType.of(
+                                                DataTypes.STRING(),
+                                                DataTypes.STRING(),
+                                                DataTypes.STRING(),
+                                                DataTypes.STRING()))
+                                .generate(
+                                        new Object[] {
+                                            BinaryStringData.fromString("3"),
+                                            BinaryStringData.fromString("c"),
+                                            BinaryStringData.fromString("cc"),
+                                            BinaryStringData.fromString("ccc")
+                                        }));
+        split1.add(insertEvent3);
+
+        composeAndExecuteInEvolveMode(eventOfSplits);
+        checkResult(TABLE_1, Arrays.asList("+I[a, 1, null]", "+I[b, 2, null]", 
"+I[c, 3, ccc]"));
     }
 
     @Test
@@ -499,27 +629,64 @@ public class FlussPipelineITCase {
                         generator.generate(
                                 new Object[] {
                                     BinaryStringData.fromString("2"),
-                                    BinaryStringData.fromString("2")
+                                    BinaryStringData.fromString("b")
                                 }));
         split1.add(insertEvent2);
         eventOfSplits.add(split1);
 
-        composeAndExecute(eventOfSplits);
-        checkResult(TABLE_1, Arrays.asList("+I[1, a, null]", "+I[2, 2, 
null]"));
+        // add column at last
+        split1.add(
+                new AddColumnEvent(
+                        TABLE_1,
+                        Collections.singletonList(
+                                AddColumnEvent.last(
+                                        Column.physicalColumn("newColumn", 
DataTypes.STRING())))));
+        DataChangeEvent insertEvent3 =
+                DataChangeEvent.insertEvent(
+                        TABLE_1,
+                        new BinaryRecordDataGenerator(
+                                        RowType.of(
+                                                DataTypes.STRING(),
+                                                DataTypes.STRING(),
+                                                DataTypes.STRING()))
+                                .generate(
+                                        new Object[] {
+                                            BinaryStringData.fromString("3"),
+                                            BinaryStringData.fromString("c"),
+                                            BinaryStringData.fromString("cc")
+                                        }));
+        split1.add(insertEvent3);
+
+        composeAndExecuteInEvolveMode(eventOfSplits);
+        checkResult(
+                TABLE_1,
+                Arrays.asList(
+                        "+I[1, a, null, null]", "+I[2, b, null, null]", "+I[3, 
c, null, cc]"));
     }
 
-    private void composeAndExecute(List<List<Event>> customSourceEvents) 
throws Exception {
+    private void composeAndExecuteInEvolveMode(List<List<Event>> 
customSourceEvents)
+            throws Exception {
+        composeAndExecute(
+                customSourceEvents,
+                new Configuration().set(PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
EVOLVE));
+    }
+
+    private void composeAndExecute(
+            List<List<Event>> customSourceEvents, Configuration 
pipelineConfig) throws Exception {
         Map<String, String> sinkOption = new HashMap<>();
         sinkOption.put(BOOTSTRAP_SERVERS.key(), getBootstrapServers());
         sinkOption.put("properties.client.security.protocol", "sasl");
         sinkOption.put("properties.client.security.sasl.mechanism", "PLAIN");
         sinkOption.put("properties.client.security.sasl.username", "guest");
         sinkOption.put("properties.client.security.sasl.password", 
"password2");
-        composeAndExecute(customSourceEvents, sinkOption);
+        composeAndExecute(customSourceEvents, sinkOption, pipelineConfig);
     }
 
     private void composeAndExecute(
-            List<List<Event>> customSourceEvents, Map<String, String> 
sinkOption) throws Exception {
+            List<List<Event>> customSourceEvents,
+            Map<String, String> sinkOption,
+            Configuration pipelineConfig)
+            throws Exception {
         FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
 
         // Setup value source
@@ -535,10 +702,8 @@ public class FlussPipelineITCase {
         SinkDef sinkDef = new SinkDef("fluss", "Fluss Sink", 
Configuration.fromMap(sinkOption));
 
         // Setup pipeline
-        Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 4);
-        pipelineConfig.set(
-                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
+        pipelineConfig.addAll(pipelineConfig);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java
index 1af59aa97..663cb0830 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java
@@ -23,12 +23,15 @@ import 
org.apache.flink.cdc.common.data.LocalZonedTimestampData;
 import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.BooleanType;
 import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.DateType;
 import org.apache.flink.cdc.common.types.DecimalType;
 import org.apache.flink.cdc.common.types.FloatType;
@@ -97,7 +100,7 @@ public class FlussEventSerializationSchemaTest {
     }
 
     @Test
-    void testMixedSchemaAndDataChanges() throws Exception {
+    void testSchemaAndDataChangesInMultiTables() throws Exception {
         // 1. create table1, and insert/delete/update data
         TableId table1 = TableId.parse("test.tbl1");
         Schema schema1 =
@@ -223,6 +226,77 @@ public class FlussEventSerializationSchemaTest {
                 Objects.requireNonNull(serializer.serialize(deleteEvent2)));
     }
 
+    @Test
+    void testAddColumn() throws Exception {
+        // 1. create table1, and insert/delete/update data
+        TableId table1 = TableId.parse("test.tbl1");
+        Schema schemaBefore =
+                Schema.newBuilder()
+                        .physicalColumn("col1", new IntType())
+                        .physicalColumn("col2", new BooleanType())
+                        .physicalColumn("col3", new TimestampType())
+                        .primaryKey("col1")
+                        .build();
+        AddColumnEvent addColumnEvent =
+                new AddColumnEvent(
+                        table1,
+                        Collections.singletonList(
+                                AddColumnEvent.last(
+                                        Column.physicalColumn("newColumn", 
DataTypes.STRING()))));
+        Schema schemaAfter =
+                Schema.newBuilder()
+                        .physicalColumn("col1", new IntType())
+                        .physicalColumn("col2", new BooleanType())
+                        .physicalColumn("col3", new TimestampType())
+                        .physicalColumn("newColumn", new VarCharType())
+                        .primaryKey("col1")
+                        .build();
+        CreateTableEvent createTableEvent1 = new CreateTableEvent(table1, 
schemaBefore);
+        flussMetaDataApplier.applySchemaChange(createTableEvent1);
+        verifySchemaChangeEvent(table1, 
serializer.serialize(createTableEvent1));
+
+        BinaryRecordDataGenerator generator1 =
+                new BinaryRecordDataGenerator(
+                        schemaBefore.getColumnDataTypes().toArray(new 
DataType[0]));
+
+        RecordData record =
+                generator1.generate(
+                        new Object[] {
+                            1,
+                            true,
+                            TimestampData.fromMillis(
+                                    Timestamp.valueOf("2023-11-27 
18:00:00").getTime())
+                        });
+        DataChangeEvent insertEvent1 = DataChangeEvent.insertEvent(table1, 
record);
+
+        verifyDataChangeEvent(
+                table1,
+                new FlussRowWithOp(CdcAsFlussRow.replace(record), 
FlussOperationType.UPSERT),
+                serializer.serialize(insertEvent1));
+
+        // 2. add column at last.
+        flussMetaDataApplier.applySchemaChange(addColumnEvent);
+        verifySchemaChangeEvent(table1, serializer.serialize(addColumnEvent));
+        BinaryRecordDataGenerator generator2 =
+                new BinaryRecordDataGenerator(
+                        schemaAfter.getColumnDataTypes().toArray(new 
DataType[0]));
+        RecordData newRecord =
+                generator2.generate(
+                        new Object[] {
+                            3,
+                            false,
+                            TimestampData.fromMillis(
+                                    Timestamp.valueOf("2023-11-27 
20:00:00").getTime()),
+                            new BinaryStringData("insert new column")
+                        });
+        DataChangeEvent updateEvent1 = DataChangeEvent.updateEvent(table1, 
record, newRecord);
+
+        verifyDataChangeEvent(
+                table1,
+                new FlussRowWithOp(CdcAsFlussRow.replace(newRecord), 
FlussOperationType.UPSERT),
+                serializer.serialize(updateEvent1));
+    }
+
     private void verifySchemaChangeEvent(TableId tableId, FlussEvent 
flussEvent) throws Exception {
         verifySerializeResult(
                 new FlussEvent(
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java
index c95aa7868..2ff64765c 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java
@@ -17,9 +17,11 @@
 
 package org.apache.flink.cdc.connectors.fluss.sink;
 
+import org.apache.flink.cdc.common.event.AddColumnEvent;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.DropTableEvent;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.IntType;
@@ -48,6 +50,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 import static org.apache.fluss.config.ConfigOptions.TABLE_REPLICATION_FACTOR;
+import static org.apache.fluss.types.DataTypeChecks.equalsWithFieldId;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -105,6 +108,9 @@ public class FlussMetadataApplierTest {
                     "time_col",
                     "timestamp_col",
                     "timestamp_ltz_col",
+                    "array_col",
+                    "map_col",
+                    "row_col"
                 };
 
         org.apache.flink.cdc.common.types.DataType[] cdcDataTypes =
@@ -126,7 +132,12 @@ public class FlussMetadataApplierTest {
                     DataTypes.DATE(),
                     DataTypes.TIME(),
                     DataTypes.TIMESTAMP(3),
-                    DataTypes.TIMESTAMP_LTZ(6)
+                    DataTypes.TIMESTAMP_LTZ(6),
+                    DataTypes.ARRAY(DataTypes.INT()),
+                    DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()),
+                    DataTypes.ROW(
+                            DataTypes.FIELD("name", DataTypes.STRING()),
+                            DataTypes.FIELD("age", DataTypes.INT()))
                 };
 
         org.apache.fluss.types.DataType[] flussDataTypes =
@@ -150,7 +161,16 @@ public class FlussMetadataApplierTest {
                     org.apache.fluss.types.DataTypes.DATE(),
                     org.apache.fluss.types.DataTypes.TIME(),
                     org.apache.fluss.types.DataTypes.TIMESTAMP(3),
-                    org.apache.fluss.types.DataTypes.TIMESTAMP_LTZ(6)
+                    org.apache.fluss.types.DataTypes.TIMESTAMP_LTZ(6),
+                    
org.apache.fluss.types.DataTypes.ARRAY(org.apache.fluss.types.DataTypes.INT()),
+                    org.apache.fluss.types.DataTypes.MAP(
+                            org.apache.fluss.types.DataTypes.STRING(),
+                            org.apache.fluss.types.DataTypes.INT()),
+                    org.apache.fluss.types.DataTypes.ROW(
+                            org.apache.fluss.types.DataTypes.FIELD(
+                                    "name", 
org.apache.fluss.types.DataTypes.STRING()),
+                            org.apache.fluss.types.DataTypes.FIELD(
+                                    "age", 
org.apache.fluss.types.DataTypes.INT()))
                 };
 
         try (FlussMetaDataApplier applier =
@@ -183,6 +203,21 @@ public class FlussMetadataApplierTest {
             if (primaryKeyTable) {
                 
assertThat(tableInfo.getPrimaryKeys()).containsExactly("int_col");
             }
+
+            // check field of nested row.
+            assertThat(
+                            equalsWithFieldId(
+                                    
flussRowType.getTypeAt(flussRowType.getFieldCount() - 1),
+                                    org.apache.fluss.types.DataTypes.ROW(
+                                            
org.apache.fluss.types.DataTypes.FIELD(
+                                                    "name",
+                                                    
org.apache.fluss.types.DataTypes.STRING(),
+                                                    
flussRowType.getFieldCount()),
+                                            
org.apache.fluss.types.DataTypes.FIELD(
+                                                    "age",
+                                                    
org.apache.fluss.types.DataTypes.INT(),
+                                                    
flussRowType.getFieldCount() + 1))))
+                    .isTrue();
         }
     }
 
@@ -457,6 +492,58 @@ public class FlussMetadataApplierTest {
         }
     }
 
+    @Test
+    void testAddColumnAtLast() throws Exception {
+        TablePath tablePath = new TablePath("fluss", "add_column_table");
+        TableId tableId = TableId.tableId("default_namespace", "fluss", 
"add_column_table");
+        admin.createTable(
+                        tablePath,
+                        TableDescriptor.builder()
+                                .schema(
+                                        
org.apache.fluss.metadata.Schema.newBuilder()
+                                                .column(
+                                                        "id",
+                                                        
org.apache.fluss.types.DataTypes.INT())
+                                                .column(
+                                                        "name",
+                                                        
org.apache.fluss.types.DataTypes.INT())
+                                                .build())
+                                .build(),
+                        true)
+                .get();
+
+        Column oldColumn = Column.physicalColumn("name", DataTypes.STRING());
+        Column newColumn = Column.physicalColumn("newColumn", 
DataTypes.STRING());
+
+        try (FlussMetaDataApplier applier =
+                new FlussMetaDataApplier(
+                        FLUSS_CLUSTER_EXTENSION.getClientConfig(),
+                        Collections.emptyMap(),
+                        Collections.emptyMap(),
+                        Collections.emptyMap())) {
+            assertThatThrownBy(
+                            () ->
+                                    applier.applySchemaChange(
+                                            new AddColumnEvent(
+                                                    tableId,
+                                                    Collections.singletonList(
+                                                            
AddColumnEvent.last(oldColumn)))))
+                    .rootCause()
+                    .hasMessageContaining("Column name already exists.");
+            applier.applySchemaChange(
+                    new AddColumnEvent(
+                            tableId, 
Collections.singletonList(AddColumnEvent.last(newColumn))));
+            TableInfo tableInfo = admin.getTableInfo(tablePath).get();
+            assertThat(tableInfo.getSchema())
+                    .isEqualTo(
+                            org.apache.fluss.metadata.Schema.newBuilder()
+                                    .column("id", 
org.apache.fluss.types.DataTypes.INT())
+                                    .column("name", 
org.apache.fluss.types.DataTypes.INT())
+                                    .column("newColumn", 
org.apache.fluss.types.DataTypes.STRING())
+                                    .build());
+        }
+    }
+
     @Test
     void testRecreateTableWithDifferentSchema() throws Exception {
         TableId tableId = TableId.tableId("default_namespace", DATABASE_NAME, 
"table1");
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
index 1cd80c6a6..2e408e40a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
@@ -17,9 +17,11 @@
 
 package org.apache.flink.cdc.connectors.fluss.sink.v2;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
 import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
@@ -31,6 +33,7 @@ import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
 import 
org.apache.flink.cdc.connectors.fluss.sink.FlussEventSerializationSchema;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
@@ -147,7 +150,10 @@ public class FlussSinkITCase extends AbstractTestBase {
                                         + "    date_type DATE,\n"
                                         + "    time_type TIME,\n"
                                         + "    timestamp_type TIMESTAMP,\n"
-                                        + "    timestamp_ltz_type 
TIMESTAMP_LTZ(8)\n"
+                                        + "    timestamp_ltz_type 
TIMESTAMP_LTZ(8),\n"
+                                        + "    array_type ARRAY<INT>,\n"
+                                        + "    map_type MAP<STRING, INT>,\n"
+                                        + "    row_type ROW< f0 INT, f1 STRING 
>\n"
                                         + (primaryKeyTable
                                                 ? " ,PRIMARY KEY (int_type) 
NOT ENFORCED \n"
                                                 : "")
@@ -171,7 +177,10 @@ public class FlussSinkITCase extends AbstractTestBase {
                     "date_type",
                     "time_type",
                     "timestamp_type",
-                    "timestamp_ltz_type"
+                    "timestamp_ltz_type",
+                    "array_type",
+                    "map_type",
+                    "row_type"
                 };
         String[] pkFieldNames = primaryKeyTable ? new String[] {"int_type"} : 
new String[0];
 
@@ -191,9 +200,20 @@ public class FlussSinkITCase extends AbstractTestBase {
                     DataTypes.DATE(),
                     DataTypes.TIME(),
                     DataTypes.TIMESTAMP(),
-                    DataTypes.TIMESTAMP_LTZ(8)
+                    DataTypes.TIMESTAMP_LTZ(8),
+                    DataTypes.ARRAY(DataTypes.INT()),
+                    DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()),
+                    DataTypes.ROW(
+                            DataTypes.FIELD("f0", DataTypes.INT()),
+                            DataTypes.FIELD("f1", DataTypes.STRING()))
                 };
 
+        // Currently, AbstractBinaryWriter.writeRecord only support 
BinaryRecordData but not
+        // GenericRecordData
+        RecordData nestedRowData =
+                new BinaryRecordDataGenerator(new DataType[] {DataTypes.INT(), 
DataTypes.STRING()})
+                        .generate(new Object[] {1, 
BinaryStringData.fromString("hello")});
+
         Object[][] insertedValues =
                 new Object[][] {
                     new Object[] {
@@ -216,22 +236,26 @@ public class FlussSinkITCase extends AbstractTestBase {
                         LocalZonedTimestampData.fromInstant(
                                 LocalDateTime.of(2023, 11, 11, 11, 11, 11, 11)
                                         .atZone(ZoneId.of("GMT+05:00"))
-                                        .toInstant())
+                                        .toInstant()),
+                        new GenericArrayData(new Object[] {1, 2, 3}),
+                        new GenericMapData(
+                                
Collections.singletonMap(BinaryStringData.fromString("key"), 123)),
+                        nestedRowData
                     },
                     new Object[] {
                         null, null, null, null, null, null, null, 0, null, 
null, null, null, null,
-                        null, null
+                        null, null, null, null, null
                     }
                 };
         // default timezone is asian/shanghai
         List<String> expectedRows =
                 Arrays.asList(
                         String.format(
-                                "+I[a, test character, test text, false, 
8119.21, 1, 32767, 32768, 652482, 20.2007, 8.58965, 2023-11-12, 08:30:15, %s, 
2023-11-11T06:11:11.000000011Z]",
+                                "+I[a, test character, test text, false, 
8119.21, 1, 32767, 32768, 652482, 20.2007, 8.58965, 2023-11-12, 08:30:15, %s, 
2023-11-11T06:11:11.000000011Z, [1, 2, 3], {key=123}, +I[1, hello]]",
                                 primaryKeyTable
                                         ? "2023-11-11T11:11:11.000000011"
                                         : "2023-11-11T11:11:11"),
-                        "+I[null, null, null, null, null, null, null, 0, null, 
null, null, null, null, null, null]");
+                        "+I[null, null, null, null, null, null, null, 0, null, 
null, null, null, null, null, null, null, null, null]");
 
         testInsertSingleTable(
                 tableId,
@@ -576,8 +600,7 @@ public class FlussSinkITCase extends AbstractTestBase {
                 StreamExecutionEnvironment.getExecutionEnvironment();
         environment.setParallelism(1);
 
-        DataStreamSource<Event> source =
-                environment.fromData(events, TypeInformation.of(Event.class));
+        DataStreamSource<Event> source = environment.fromData(events, new 
EventTypeInfo());
 
         FlussEventSerializationSchema flussRecordSerializer = new 
FlussEventSerializationSchema();
         FlussSink<Event> flussSink =
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
index eddaeb6ff..d4e1e9a33 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
@@ -294,7 +294,7 @@ class FlussConversionsTest {
                         .physicalColumn("name", DataTypes.STRING())
                         .build();
 
-        
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1, 
schema2))
+        
assertThat(FlussConversions.sameSchemaIgnoreCommentAndDefaultValue(schema1, 
schema2))
                 .isTrue();
     }
 
@@ -312,7 +312,7 @@ class FlussConversionsTest {
                         .physicalColumn("age", DataTypes.INT())
                         .build();
 
-        
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1, 
schema2))
+        
assertThat(FlussConversions.sameSchemaIgnoreCommentAndDefaultValue(schema1, 
schema2))
                 .isFalse();
     }
 
@@ -330,7 +330,7 @@ class FlussConversionsTest {
                         .physicalColumn("name", DataTypes.STRING())
                         .build();
 
-        
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1, 
schema2))
+        
assertThat(FlussConversions.sameSchemaIgnoreCommentAndDefaultValue(schema1, 
schema2))
                 .isFalse();
     }
 
@@ -344,7 +344,7 @@ class FlussConversionsTest {
 
         Schema schema2 = Schema.newBuilder().physicalColumn("id", 
DataTypes.INT()).build();
 
-        
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1, 
schema2))
+        
assertThat(FlussConversions.sameSchemaIgnoreCommentAndDefaultValue(schema1, 
schema2))
                 .isFalse();
     }
 
@@ -363,7 +363,7 @@ class FlussConversionsTest {
                         .build();
 
         // Should be true because comments are ignored
-        
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1, 
schema2))
+        
assertThat(FlussConversions.sameSchemaIgnoreCommentAndDefaultValue(schema1, 
schema2))
                 .isTrue();
     }
 
@@ -382,7 +382,7 @@ class FlussConversionsTest {
                         .physicalColumn("name", DataTypes.STRING())
                         .build();
 
-        
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1, 
schema2))
+        
assertThat(FlussConversions.sameSchemaIgnoreCommentAndDefaultValue(schema1, 
schema2))
                 .isTrue();
     }
 }

Reply via email to