This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 56669095b7 [Improve][connector-iceberg] fix schema change event (#9217)
56669095b7 is described below

commit 56669095b7e9d663568705f474180ac79f5669d2
Author: CosmosNi <[email protected]>
AuthorDate: Mon Apr 28 10:52:10 2025 +0800

    [Improve][connector-iceberg] fix schema change event (#9217)
---
 .../iceberg/sink/writer/IcebergRecordWriter.java   |  34 +-
 .../e2e/connector/iceberg/IcebergSinkCDCIT.java    | 542 ++++++++++++++++++++-
 2 files changed, 569 insertions(+), 7 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
index 8d3e832ed5..7c30487d52 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
@@ -24,6 +24,8 @@ import 
org.apache.seatunnel.shade.com.google.common.collect.Lists;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
 import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
 import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
 import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
 import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
@@ -38,6 +40,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 
 import lombok.extern.slf4j.Slf4j;
@@ -92,18 +95,40 @@ public class IcebergRecordWriter implements RecordWriter {
 
     @Override
     public void applySchemaChange(SeaTunnelRowType afterRowType, 
SchemaChangeEvent event) {
-        log.info("Apply schema change start.");
+        log.info("Apply schema change start. Event type: {}", 
event.getEventType());
         SchemaChangeWrapper updates = new SchemaChangeWrapper();
         // get the latest schema in case another process updated it
         table.refresh();
         Schema schema = table.schema();
-        if (event instanceof AlterTableDropColumnEvent) {
+        if (event instanceof AlterTableColumnsEvent) {
+            AlterTableColumnsEvent columnsEvent = (AlterTableColumnsEvent) 
event;
+            log.info(
+                    "Processing AlterTableColumnsEvent with {} events",
+                    columnsEvent.getEvents().size());
+            for (AlterTableColumnEvent columnEvent : columnsEvent.getEvents()) 
{
+                applySchemaChange(afterRowType, columnEvent);
+            }
+            return;
+        } else if (event instanceof AlterTableDropColumnEvent) {
             AlterTableDropColumnEvent dropColumnEvent = 
(AlterTableDropColumnEvent) event;
             updates.deleteColumn(dropColumnEvent.getColumn());
         } else if (event instanceof AlterTableAddColumnEvent) {
-            // Update column , during data consumption process
+            AlterTableAddColumnEvent addColumnEvent = 
(AlterTableAddColumnEvent) event;
+            Column column = addColumnEvent.getColumn();
+            Type columnType = SchemaUtils.toIcebergType(column.getDataType());
+            updates.addColumn(null, column.getName(), columnType);
         } else if (event instanceof AlterTableModifyColumnEvent) {
-            // Update type , during data consumption process
+            AlterTableModifyColumnEvent modifyColumnEvent = 
(AlterTableModifyColumnEvent) event;
+            Column column = modifyColumnEvent.getColumn();
+            Type columnType = SchemaUtils.toIcebergType(column.getDataType());
+            if (columnType instanceof Type.PrimitiveType) {
+                updates.modifyColumn(column.getName(), (Type.PrimitiveType) 
columnType);
+            } else {
+                log.warn(
+                        "Cannot modify column {} to non-primitive type {}",
+                        column.getName(),
+                        columnType);
+            }
         } else if (event instanceof AlterTableChangeColumnEvent) {
             // rename
             AlterTableChangeColumnEvent changeColumnEvent = 
(AlterTableChangeColumnEvent) event;
@@ -126,7 +151,6 @@ public class IcebergRecordWriter implements RecordWriter {
             updates.changeColumn(oldColumn, column.getName());
         }
     }
-
     /** apply schema update */
     private void applySchemaUpdate(SchemaChangeWrapper updates) {
         // complete the current file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
index a0758b0a1e..45bba927a0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
@@ -233,6 +233,325 @@ public class IcebergSinkCDCIT extends TestSuiteBase 
implements TestResource {
         alterSchemaAndCheckIcebergSchema(container);
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason =
+                    "Currently SPARK do not support cdc. In addition, 
currently only the zeta engine supports schema evolution for pr 
https://github.com/apache/seatunnel/pull/5125.";)
+    public void testMysqlCdcCheckMultiSchemaChangeE2e(TestContainer container)
+            throws IOException, InterruptedException {
+        // Clear related content to ensure that multiple operations are not 
affected
+        clearTable(MYSQL_DATABASE, SOURCE_TABLE);
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        container.executeJob(
+                                
"/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf");
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                    return null;
+                });
+        initSourceTableData(MYSQL_DATABASE, SOURCE_TABLE);
+        alterMultiSchemaAndCheckIcebergSchema(container);
+    }
+
+    private void alterMultiSchemaAndCheckIcebergSchema(TestContainer container)
+            throws InterruptedException, IOException {
+        log.info("Starting multi-column schema evolution test cases");
+
+        // Case 1: Test adding multiple columns in a single ALTER TABLE 
statement
+        log.info("Case 1: Testing adding multiple columns in a single 
statement");
+        String addField1 = "f_multi_add1";
+        String addField2 = "f_multi_add2";
+        String addField3 = "f_multi_add3";
+
+        // Add multiple columns in a single ALTER TABLE statement
+        String addMultiColumnsSql =
+                String.format(
+                        "ALTER TABLE %s.%s ADD COLUMN %s VARCHAR(255) DEFAULT 
'multi-column-1', "
+                                + "ADD COLUMN %s INT DEFAULT 42, "
+                                + "ADD COLUMN %s FLOAT DEFAULT 3.14",
+                        MYSQL_DATABASE, SOURCE_TABLE, addField1, addField2, 
addField3);
+        executeSql(addMultiColumnsSql);
+
+        // Insert data with the new columns
+        String insertMultiColumnSql =
+                String.format(
+                        "INSERT INTO %s.%s (id, f_binary, f_blob, 
f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, "
+                                + "f_smallint_unsigned, f_mediumint, 
f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, "
+                                + "f_integer_unsigned, f_bigint, 
f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, "
+                                + "f_double_precision, f_longtext, 
f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, "
+                                + "f_timestamp, f_bit1, f_bit64, f_char, 
f_enum, f_mediumblob, f_long_varchar, f_real, f_time, "
+                                + "f_tinyint, f_tinyint_unsigned, f_json, 
f_year, %s, %s, %s) "
+                                + "VALUES (200, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
 "
+                                + "0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, "
+                                + "0x74696E79626C6F62, 
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 
1234567, 7654321, "
+                                + "123456789, 987654321, 123, 789, 12.34, 
56.78, 90.12, 'This is a long text field', 'This is a medium text field', "
+                                + "'This is a text field', 'This is a tiny 
text field', 'test varchar multi-add', '2022-04-27', '2022-04-27 14:30:00', "
+                                + "'2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2', "
+                                + 
"0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field', "
+                                + "12.345, '14:30:00', -128, 255, '{ \"key\": 
\"value\" }', 1992, 'custom multi-column-1', 100, 9.99)",
+                        MYSQL_DATABASE, SOURCE_TABLE, addField1, addField2, 
addField3);
+        executeSql(insertMultiColumnSql);
+
+        sleep(30000); // Wait for source capture data
+
+        // Verify that multiple columns were added and data is correct
+        given().ignoreExceptions()
+                .await()
+                .atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            Schema schema = loadIcebergSchema();
+
+                            // Verify all new columns exist
+                            Types.NestedField field1 = 
schema.findField(addField1);
+                            Types.NestedField field2 = 
schema.findField(addField2);
+                            Types.NestedField field3 = 
schema.findField(addField3);
+
+                            Assertions.assertNotNull(
+                                    field1, "Column " + addField1 + " should 
exist");
+                            Assertions.assertNotNull(
+                                    field2, "Column " + addField2 + " should 
exist");
+                            Assertions.assertNotNull(
+                                    field3, "Column " + addField3 + " should 
exist");
+
+                            // Verify data in the new columns
+                            List<Record> records = loadIcebergTable();
+                            boolean foundMultiColumnRecord = false;
+                            for (Record record : records) {
+                                Integer id = (Integer) record.getField("id");
+                                if (id == 200) {
+                                    String stringValue = (String) 
record.getField(addField1);
+                                    Integer intValue = (Integer) 
record.getField(addField2);
+                                    Float floatValue = (Float) 
record.getField(addField3);
+
+                                    Assertions.assertEquals("custom 
multi-column-1", stringValue);
+                                    Assertions.assertEquals(100, intValue);
+                                    Assertions.assertEquals(9.99f, floatValue, 
0.01f);
+                                    foundMultiColumnRecord = true;
+                                }
+                            }
+                            Assertions.assertTrue(
+                                    foundMultiColumnRecord,
+                                    "Should find record with multiple new 
columns");
+                        });
+        // Case 2: Test modifying multiple column types in a single ALTER 
TABLE statement
+        log.info("Case 2: Testing modifying multiple column types in a single 
statement");
+        String modifyTypeField1 = "f_multi_type1";
+        String modifyTypeField2 = "f_multi_type2";
+
+        // Add columns first
+        String addTypeColumnsSql =
+                String.format(
+                        "ALTER TABLE %s.%s ADD COLUMN %s VARCHAR(50) DEFAULT 
'to-be-modified-type-1', "
+                                + "ADD COLUMN %s INT DEFAULT 42",
+                        MYSQL_DATABASE, SOURCE_TABLE, modifyTypeField1, 
modifyTypeField2);
+        executeSql(addTypeColumnsSql);
+
+        // Insert data with the new columns
+        String insertTypeColumnsSql =
+                String.format(
+                        "INSERT INTO %s.%s (id, f_binary, f_blob, 
f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, "
+                                + "f_smallint_unsigned, f_mediumint, 
f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, "
+                                + "f_integer_unsigned, f_bigint, 
f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, "
+                                + "f_double_precision, f_longtext, 
f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, "
+                                + "f_timestamp, f_bit1, f_bit64, f_char, 
f_enum, f_mediumblob, f_long_varchar, f_real, f_time, "
+                                + "f_tinyint, f_tinyint_unsigned, f_json, 
f_year, %s, %s) "
+                                + "VALUES (300, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
 "
+                                + "0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, "
+                                + "0x74696E79626C6F62, 
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 
1234567, 7654321, "
+                                + "123456789, 987654321, 123, 789, 12.34, 
56.78, 90.12, 'This is a long text field', 'This is a medium text field', "
+                                + "'This is a text field', 'This is a tiny 
text field', 'test varchar for multi-type', '2022-04-27', '2022-04-27 
14:30:00', "
+                                + "'2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2', "
+                                + 
"0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field', "
+                                + "12.345, '14:30:00', -128, 255, '{ \"key\": 
\"value\" }', 1992, 'original type value 1', 100)",
+                        MYSQL_DATABASE, SOURCE_TABLE, modifyTypeField1, 
modifyTypeField2);
+        executeSql(insertTypeColumnsSql);
+
+        sleep(30000); // Wait for source capture data
+
+        // Now modify multiple column types in a single ALTER TABLE statement
+        String modifyTypesSql =
+                String.format(
+                        "ALTER TABLE %s.%s MODIFY %s VARCHAR(500) DEFAULT 
'modified-type-column-1', "
+                                + "MODIFY %s BIGINT DEFAULT 1000",
+                        MYSQL_DATABASE, SOURCE_TABLE, modifyTypeField1, 
modifyTypeField2);
+        executeSql(modifyTypesSql);
+
+        // Insert data with the modified columns
+        String insertAfterModifyTypesSql =
+                String.format(
+                        "INSERT INTO %s.%s (id, f_binary, f_blob, 
f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, "
+                                + "f_smallint_unsigned, f_mediumint, 
f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, "
+                                + "f_integer_unsigned, f_bigint, 
f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, "
+                                + "f_double_precision, f_longtext, 
f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, "
+                                + "f_timestamp, f_bit1, f_bit64, f_char, 
f_enum, f_mediumblob, f_long_varchar, f_real, f_time, "
+                                + "f_tinyint, f_tinyint_unsigned, f_json, 
f_year, %s, %s) "
+                                + "VALUES (301, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
 "
+                                + "0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, "
+                                + "0x74696E79626C6F62, 
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 
1234567, 7654321, "
+                                + "123456789, 987654321, 123, 789, 12.34, 
56.78, 90.12, 'This is a long text field', 'This is a medium text field', "
+                                + "'This is a text field', 'This is a tiny 
text field', 'test varchar after multi-type', '2022-04-27', '2022-04-27 
14:30:00', "
+                                + "'2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2', "
+                                + 
"0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field', "
+                                + "12.345, '14:30:00', -128, 255, '{ \"key\": 
\"value\" }', 1992, 'This is a much longer text value that would not fit in the 
original VARCHAR(50)', 2000)",
+                        MYSQL_DATABASE, SOURCE_TABLE, modifyTypeField1, 
modifyTypeField2);
+        executeSql(insertAfterModifyTypesSql);
+
+        sleep(30000); // Wait for source capture data
+
+        // Verify that column types were modified and data is correct
+        given().ignoreExceptions()
+                .await()
+                .atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            Schema schema = loadIcebergSchema();
+
+                            // Verify columns exist with correct types
+                            Types.NestedField field1 = 
schema.findField(modifyTypeField1);
+                            Types.NestedField field2 = 
schema.findField(modifyTypeField2);
+
+                            Assertions.assertNotNull(
+                                    field1, "Column " + modifyTypeField1 + " 
should exist");
+                            Assertions.assertNotNull(
+                                    field2, "Column " + modifyTypeField2 + " 
should exist");
+
+                            // Verify data in the modified columns
+                            List<Record> records = loadIcebergTable();
+                            boolean foundModifiedRecord = false;
+                            for (Record record : records) {
+                                Integer id = (Integer) record.getField("id");
+                                if (id == 301) {
+                                    String stringValue = (String) 
record.getField(modifyTypeField1);
+                                    Long longValue = (Long) 
record.getField(modifyTypeField2);
+
+                                    Assertions.assertEquals(
+                                            "This is a much longer text value 
that would not fit in the original VARCHAR(50)",
+                                            stringValue);
+                                    Assertions.assertEquals(2000L, 
longValue.longValue());
+                                    foundModifiedRecord = true;
+                                }
+                            }
+                            Assertions.assertTrue(
+                                    foundModifiedRecord,
+                                    "Should find record with modified column 
types");
+                        });
+        // Case 3: Test modifying multiple columns in a single ALTER TABLE 
statement
+        log.info("Case 3: Testing modifying multiple columns in a single 
statement");
+        String modifyField1 = "f_multi_modify1";
+        String modifyField2 = "f_multi_modify2";
+
+        // Add columns first
+        String addModifyColumnsSql =
+                String.format(
+                        "ALTER TABLE %s.%s ADD COLUMN %s VARCHAR(50) DEFAULT 
'to-be-modified-1', "
+                                + "ADD COLUMN %s INT DEFAULT 42",
+                        MYSQL_DATABASE, SOURCE_TABLE, modifyField1, 
modifyField2);
+        executeSql(addModifyColumnsSql);
+
+        // Insert data with the new columns
+        String insertModifyColumnsSql =
+                String.format(
+                        "INSERT INTO %s.%s (id, f_binary, f_blob, 
f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, "
+                                + "f_smallint_unsigned, f_mediumint, 
f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, "
+                                + "f_integer_unsigned, f_bigint, 
f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, "
+                                + "f_double_precision, f_longtext, 
f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, "
+                                + "f_timestamp, f_bit1, f_bit64, f_char, 
f_enum, f_mediumblob, f_long_varchar, f_real, f_time, "
+                                + "f_tinyint, f_tinyint_unsigned, f_json, 
f_year, %s, %s) "
+                                + "VALUES (400, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
 "
+                                + "0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, "
+                                + "0x74696E79626C6F62, 
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 
1234567, 7654321, "
+                                + "123456789, 987654321, 123, 789, 12.34, 
56.78, 90.12, 'This is a long text field', 'This is a medium text field', "
+                                + "'This is a text field', 'This is a tiny 
text field', 'test varchar for multi-modify', '2022-04-27', '2022-04-27 
14:30:00', "
+                                + "'2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2', "
+                                + 
"0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field', "
+                                + "12.345, '14:30:00', -128, 255, '{ \"key\": 
\"value\" }', 1992, 'original multi-value for modify', 100)",
+                        MYSQL_DATABASE, SOURCE_TABLE, modifyField1, 
modifyField2);
+        executeSql(insertModifyColumnsSql);
+
+        sleep(30000); // Wait for source capture data
+
+        // Now modify multiple columns in a single ALTER TABLE statement
+        String modifyColumnsSql =
+                String.format(
+                        "ALTER TABLE %s.%s MODIFY %s TEXT, " + "MODIFY %s 
BIGINT DEFAULT 1000",
+                        MYSQL_DATABASE, SOURCE_TABLE, modifyField1, 
modifyField2);
+        executeSql(modifyColumnsSql);
+
+        // Insert data with the modified columns
+        String insertAfterModifySql =
+                String.format(
+                        "INSERT INTO %s.%s (id, f_binary, f_blob, 
f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, "
+                                + "f_smallint_unsigned, f_mediumint, 
f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, "
+                                + "f_integer_unsigned, f_bigint, 
f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, "
+                                + "f_double_precision, f_longtext, 
f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, "
+                                + "f_timestamp, f_bit1, f_bit64, f_char, 
f_enum, f_mediumblob, f_long_varchar, f_real, f_time, "
+                                + "f_tinyint, f_tinyint_unsigned, f_json, 
f_year, %s, %s) "
+                                + "VALUES (401, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
 "
+                                + "0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, "
+                                + "0x74696E79626C6F62, 
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 
1234567, 7654321, "
+                                + "123456789, 987654321, 123, 789, 12.34, 
56.78, 90.12, 'This is a long text field', 'This is a medium text field', "
+                                + "'This is a text field', 'This is a tiny 
text field', 'test varchar after multi-modify', '2022-04-27', '2022-04-27 
14:30:00', "
+                                + "'2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2', "
+                                + 
"0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field', "
+                                + "12.345, '14:30:00', -128, 255, '{ \"key\": 
\"value\" }', 1992, 'This is a much longer text value for multi-modify that 
would not fit in the original VARCHAR(50)', 3000)",
+                        MYSQL_DATABASE, SOURCE_TABLE, modifyField1, 
modifyField2);
+        executeSql(insertAfterModifySql);
+
+        sleep(30000); // Wait for source capture data
+
+        // Verify that columns were modified and data is correct
+        given().ignoreExceptions()
+                .await()
+                .atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            Schema schema = loadIcebergSchema();
+
+                            // Verify columns exist with correct types
+                            Types.NestedField fieldObj1 = 
schema.findField(modifyField1);
+                            Types.NestedField fieldObj2 = 
schema.findField(modifyField2);
+
+                            Assertions.assertNotNull(
+                                    fieldObj1, "Column " + modifyField1 + " 
should exist");
+                            Assertions.assertNotNull(
+                                    fieldObj2, "Column " + modifyField2 + " 
should exist");
+
+                            // Verify data in the modified columns
+                            List<Record> records = loadIcebergTable();
+                            boolean foundModifiedRecord = false;
+                            for (Record record : records) {
+                                Integer id = (Integer) record.getField("id");
+                                if (id == 401) {
+                                    String stringValue = (String) 
record.getField(modifyField1);
+                                    Long longValue = (Long) 
record.getField(modifyField2);
+
+                                    Assertions.assertEquals(
+                                            "This is a much longer text value 
for multi-modify that would not fit in the original VARCHAR(50)",
+                                            stringValue);
+                                    Assertions.assertEquals(3000L, 
longValue.longValue());
+                                    foundModifiedRecord = true;
+                                }
+                            }
+                            Assertions.assertTrue(
+                                    foundModifiedRecord,
+                                    "Should find record with modified 
columns");
+                        });
+
+        // Case 4: Test dropping multiple columns in a single ALTER TABLE 
statement
+        // (AlterTableColumnsEvent)
+        log.warn(
+                "Case 4: Deleting multiple columns is not 
supported,unsupported table metadata field type 0 ");
+    }
+
     private void alterSchemaAndCheckIcebergSchema(TestContainer container)
             throws InterruptedException, IOException {
         String addField = "f_string_add";
@@ -263,9 +582,168 @@ public class IcebergSinkCDCIT extends TestSuiteBase 
implements TestResource {
                                     Assertions.assertEquals("add column 
field", f_string_add);
                                 }
                             }
+                        });
+
+        String modifyField = "f_varchar";
+        modifyTableColumn(MYSQL_DATABASE, SOURCE_TABLE, modifyField, "text");
+        insertModifyColumnData(MYSQL_DATABASE, SOURCE_TABLE);
+        // Waiting 30s for source capture data
+        sleep(30000);
+
+        given().ignoreExceptions()
+                .await()
+                .atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            // copy iceberg to local
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            List<Record> records = loadIcebergTable();
+                            Assertions.assertEquals(5, records.size());
+                            for (Record record : records) {
+                                Integer id = (Integer) record.getField("id");
+                                if (id == 101) {
+                                    String f_varchar = (String) 
record.getField("f_varchar");
+                                    Assertions.assertEquals(
+                                            "This is a modified varchar field 
with longer text that would exceed the original varchar length",
+                                            f_varchar);
+                                }
+                            }
+                        });
+
+        dropTableColumn(MYSQL_DATABASE, SOURCE_TABLE, addField);
+        insertAfterDropColumnData(MYSQL_DATABASE, SOURCE_TABLE);
+        // Waiting 30s for source capture data
+        sleep(30000);
 
-                            // for next test.
-                            dropTableColumn(MYSQL_DATABASE, SOURCE_TABLE, 
addField);
+        given().ignoreExceptions()
+                .await()
+                .atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            // copy iceberg to local
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            Schema schema = loadIcebergSchema();
+                            Types.NestedField nestedField = 
schema.findField(addField);
+                            // The column should be marked as deleted in 
Iceberg
+                            Assertions.assertEquals(
+                                    true, nestedField == null || 
!nestedField.isRequired());
+
+                            List<Record> records = loadIcebergTable();
+                            Assertions.assertEquals(6, records.size());
+                            for (Record record : records) {
+                                Integer id = (Integer) record.getField("id");
+                                if (id == 102) {
+                                    // The dropped column should not be 
accessible or should be null
+                                    try {
+                                        Object droppedField = 
record.getField(addField);
+                                        Assertions.assertNull(
+                                                droppedField, "Dropped field 
should be null");
+                                    } catch (Exception e) {
+                                        log.info(
+                                                "Field {} is not accessible 
after dropping, which is expected",
+                                                addField);
+                                    }
+                                }
+                            }
+                        });
+
+        // Testing changing a single column name
+        String oldColumnName = "f_column_to_rename";
+        String newColumnName = "f_renamed_column";
+
+        // Add a column first
+        String addColumnSql =
+                String.format(
+                        "ALTER TABLE %s.%s ADD COLUMN %s VARCHAR(255) DEFAULT 
'to-be-renamed'",
+                        MYSQL_DATABASE, SOURCE_TABLE, oldColumnName);
+        executeSql(addColumnSql);
+
+        // Insert data with the new column
+        String insertSql =
+                String.format(
+                        "INSERT INTO %s.%s (id, f_binary, f_blob, 
f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, "
+                                + "f_smallint_unsigned, f_mediumint, 
f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, "
+                                + "f_integer_unsigned, f_bigint, 
f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, "
+                                + "f_double_precision, f_longtext, 
f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, "
+                                + "f_timestamp, f_bit1, f_bit64, f_char, 
f_enum, f_mediumblob, f_long_varchar, f_real, f_time, "
+                                + "f_tinyint, f_tinyint_unsigned, f_json, 
f_year, %s) "
+                                + "VALUES (150, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
 "
+                                + "0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, "
+                                + "0x74696E79626C6F62, 
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 
1234567, 7654321, "
+                                + "123456789, 987654321, 123, 789, 12.34, 
56.78, 90.12, 'This is a long text field', 'This is a medium text field', "
+                                + "'This is a text field', 'This is a tiny 
text field', 'test varchar', '2022-04-27', '2022-04-27 14:30:00', "
+                                + "'2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2', "
+                                + 
"0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field', "
+                                + "12.345, '14:30:00', -128, 255, '{ \"key\": 
\"value\" }', 1992, 'original column value')",
+                        MYSQL_DATABASE, SOURCE_TABLE, oldColumnName);
+        executeSql(insertSql);
+
+        // Now rename the column
+        String renameColumnSql =
+                String.format(
+                        "ALTER TABLE %s.%s CHANGE %s %s VARCHAR(255) DEFAULT 
'renamed-column'",
+                        MYSQL_DATABASE, SOURCE_TABLE, oldColumnName, 
newColumnName);
+        executeSql(renameColumnSql);
+
+        // Insert data with the renamed column
+        String insertAfterRenameSql =
+                String.format(
+                        "INSERT INTO %s.%s (id, f_binary, f_blob, 
f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, "
+                                + "f_smallint_unsigned, f_mediumint, 
f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, "
+                                + "f_integer_unsigned, f_bigint, 
f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, "
+                                + "f_double_precision, f_longtext, 
f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, "
+                                + "f_timestamp, f_bit1, f_bit64, f_char, 
f_enum, f_mediumblob, f_long_varchar, f_real, f_time, "
+                                + "f_tinyint, f_tinyint_unsigned, f_json, 
f_year,  %s) "
+                                + "VALUES (151, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
 "
+                                + "0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, "
+                                + "0x74696E79626C6F62, 
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 
1234567, 7654321, "
+                                + "123456789, 987654321, 123, 789, 12.34, 
56.78, 90.12, 'This is a long text field', 'This is a medium text field', "
+                                + "'This is a text field', 'This is a tiny 
text field', 'test varchar after rename', '2022-04-27', '2022-04-27 14:30:00', "
+                                + "'2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2', "
+                                + 
"0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field', "
+                                + "12.345, '14:30:00', -128, 255, '{ \"key\": 
\"value\" }', 1992,  'renamed column value')",
+                        MYSQL_DATABASE, SOURCE_TABLE, newColumnName);
+        executeSql(insertAfterRenameSql);
+
+        sleep(30000); // Wait for source capture data
+
+        // Verify that column was renamed and data is correct
+        given().ignoreExceptions()
+                .await()
+                .atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            Schema schema = loadIcebergSchema();
+
+                            // Verify old column is gone and new column exists
+                            Types.NestedField oldField = 
schema.findField(oldColumnName);
+                            Types.NestedField newField = 
schema.findField(newColumnName);
+
+                            // Old column should be gone or marked as deleted
+                            Assertions.assertTrue(
+                                    oldField == null || !oldField.isRequired(),
+                                    "Column "
+                                            + oldColumnName
+                                            + " should be deleted or marked 
optional");
+
+                            // New column should exist
+                            Assertions.assertNotNull(
+                                    newField, "Column " + newColumnName + " 
should exist");
+
+                            // Verify data in the renamed column
+                            List<Record> records = loadIcebergTable();
+                            boolean foundRenamedValue = false;
+                            for (Record record : records) {
+                                Integer id = (Integer) record.getField("id");
+                                if (id == 151) {
+                                    String renamedValue = (String) 
record.getField(newColumnName);
+                                    Assertions.assertEquals("renamed column 
value", renamedValue);
+                                    foundRenamedValue = true;
+                                }
+                            }
+                            Assertions.assertTrue(
+                                    foundRenamedValue, "Should find record 
with renamed column");
                         });
     }
 
@@ -362,6 +840,19 @@ public class IcebergSinkCDCIT extends TestSuiteBase 
implements TestResource {
                 "ALTER TABLE " + database + "." + tableName + " ADD COLUMN " + 
addField + " text");
     }
 
+    private void modifyTableColumn(
+            String database, String tableName, String columnName, String 
newType) {
+        executeSql(
+                "ALTER TABLE "
+                        + database
+                        + "."
+                        + tableName
+                        + " MODIFY COLUMN "
+                        + columnName
+                        + " "
+                        + newType);
+    }
+
     private void clearTable(String database, String tableName) {
         executeSql("truncate table " + database + "." + tableName);
     }
@@ -496,4 +987,51 @@ public class IcebergSinkCDCIT extends TestSuiteBase 
implements TestResource {
                         + "         12.345, '14:30:00', -128, 255, '{ \"key\": 
\"value\" }', 1992 , 'add column "
                         + "field')");
     }
+
+    private void insertModifyColumnData(String database, String tableName) {
+        executeSql(
+                "INSERT INTO "
+                        + database
+                        + "."
+                        + tableName
+                        + " ( id, f_binary, f_blob, f_long_varbinary, 
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+                        + "                                         
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, 
f_integer,\n"
+                        + "                                         
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, 
f_double,\n"
+                        + "                                         
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, 
f_date, f_datetime,\n"
+                        + "                                         
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, 
f_real, f_time,\n"
+                        + "                                         f_tinyint, 
f_tinyint_unsigned, f_json, f_year, f_string_add)\n"
+                        + "VALUES ( 101, "
+                        + 
"0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+                        + "         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+                        + "         0x74696E79626C6F62, 
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 
1234567, 7654321,\n"
+                        + "         123456789, 987654321, 123, 789, 12.34, 
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+                        + "         'This is a text field', 'This is a tiny 
text field', 'This is a modified varchar field with longer text that would 
exceed the original varchar length', '2022-04-27', '2022-04-27 14:30:00',\n"
+                        + "         '2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',\n"
+                        + "         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',\n"
+                        + "         12.345, '14:30:00', -128, 255, '{ \"key\": 
\"value\" }', 1992 , 'add column "
+                        + "field')");
+    }
+
+    private void insertAfterDropColumnData(String database, String tableName) {
+        executeSql(
+                "INSERT INTO "
+                        + database
+                        + "."
+                        + tableName
+                        + " ( id, f_binary, f_blob, f_long_varbinary, 
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+                        + "                                         
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, 
f_integer,\n"
+                        + "                                         
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, 
f_double,\n"
+                        + "                                         
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, 
f_date, f_datetime,\n"
+                        + "                                         
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, 
f_real, f_time,\n"
+                        + "                                         f_tinyint, 
f_tinyint_unsigned, f_json, f_year)\n"
+                        + "VALUES ( 102, "
+                        + 
"0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+                        + "         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+                        + "         0x74696E79626C6F62, 
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 
1234567, 7654321,\n"
+                        + "         123456789, 987654321, 123, 789, 12.34, 
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+                        + "         'This is a text field', 'This is a tiny 
text field', 'This is a varchar field after drop column', '2022-04-27', 
'2022-04-27 14:30:00',\n"
+                        + "         '2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',\n"
+                        + "         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',\n"
+                        + "         12.345, '14:30:00', -128, 255, '{ \"key\": 
\"value\" }', 1992)");
+    }
 }


Reply via email to