This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1b269e86a2 enable iceberg schema change test (#6711)
1b269e86a2 is described below

commit 1b269e86a25111926ed0396709ccfb59fc39f697
Author: Xiaojian Sun <[email protected]>
AuthorDate: Mon Apr 15 18:48:49 2024 +0800

    enable iceberg schema change test (#6711)
---
 .../e2e/connector/iceberg/IcebergSinkCDCIT.java    | 57 +++++++++++++++++-----
 1 file changed, 46 insertions(+), 11 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
index afe6d3d43b..111c7e0dfb 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
@@ -44,7 +44,6 @@ import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
@@ -64,6 +63,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
@@ -165,7 +165,6 @@ public class IcebergSinkCDCIT extends TestSuiteBase 
implements TestResource {
                             "sh", "-c", "cd " + CATALOG_DIR + " && tar -zxvf " 
+ NAMESPACE_TAR);
                     try {
                         Process process = processBuilder.start();
-                        // 等待命令执行完成
                         int exitCode = process.waitFor();
                         if (exitCode == 0) {
                             log.info("Extract files successful.");
@@ -210,7 +209,6 @@ public class IcebergSinkCDCIT extends TestSuiteBase 
implements TestResource {
     }
 
     @TestTemplate
-    @Disabled
     public void testMysqlCdcCheckSchemaChangeE2e(TestContainer container)
             throws IOException, InterruptedException {
         // Clear related content to ensure that multiple operations are not 
affected
@@ -232,9 +230,10 @@ public class IcebergSinkCDCIT extends TestSuiteBase 
implements TestResource {
 
     private void alterSchemaAndCheckIcebergSchema(TestContainer container)
             throws InterruptedException, IOException {
-        String dropField = "f_binary";
+        String addField = "f_string_add";
         // Init table data
-        dropTableColumn(MYSQL_DATABASE, SOURCE_TABLE, dropField);
+        addTableColumn(MYSQL_DATABASE, SOURCE_TABLE, addField);
+        insertAddColumnData(MYSQL_DATABASE, SOURCE_TABLE);
         // Waiting 30s for source capture data
         sleep(30000);
 
@@ -247,10 +246,21 @@ public class IcebergSinkCDCIT extends TestSuiteBase 
implements TestResource {
                             // copy iceberg to local
                             
container.executeExtraCommands(containerExtendedFactory);
                             Schema schema = loadIcebergSchema();
-                            Types.NestedField nestedField = 
schema.findField(dropField);
-                            Assertions.assertEquals(true, nestedField == null);
-                            // for next test
-                            addTableColumn(MYSQL_DATABASE, SOURCE_TABLE, 
dropField);
+                            Types.NestedField nestedField = 
schema.findField(addField);
+                            Assertions.assertEquals(true, 
Objects.nonNull(nestedField));
+
+                            List<Record> records = loadIcebergTable();
+                            Assertions.assertEquals(4, records.size());
+                            for (Record record : records) {
+                                Integer id = (Integer) record.getField("id");
+                                String f_string_add = (String) 
record.getField("f_string_add");
+                                if (id == 100) {
+                                    Assertions.assertEquals("add column 
field", f_string_add);
+                                }
+                            }
+
+                            // for next test.
+                            dropTableColumn(MYSQL_DATABASE, SOURCE_TABLE, 
addField);
                         });
     }
 
@@ -342,8 +352,9 @@ public class IcebergSinkCDCIT extends TestSuiteBase 
implements TestResource {
         executeSql("ALTER TABLE " + database + "." + tableName + " DROP COLUMN 
" + dropField);
     }
 
-    private void addTableColumn(String database, String tableName, String 
dropField) {
-        executeSql("ALTER TABLE " + database + "." + tableName + " ADD COLUMN 
" + dropField);
+    private void addTableColumn(String database, String tableName, String 
addField) {
+        executeSql(
+                "ALTER TABLE " + database + "." + tableName + " ADD COLUMN " + 
addField + " text");
     }
 
     private void clearTable(String database, String tableName) {
@@ -456,4 +467,28 @@ public class IcebergSinkCDCIT extends TestSuiteBase 
implements TestResource {
 
         executeSql("UPDATE " + database + "." + tableName + " SET f_bigint = 
10000 where id = 3");
     }
+
+    private void insertAddColumnData(String database, String tableName) {
+        executeSql(
+                "INSERT INTO "
+                        + database
+                        + "."
+                        + tableName
+                        + " ( id, f_binary, f_blob, f_long_varbinary, 
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+                        + "                                         
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, 
f_integer,\n"
+                        + "                                         
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, 
f_double,\n"
+                        + "                                         
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, 
f_date, f_datetime,\n"
+                        + "                                         
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, 
f_real, f_time,\n"
+                        + "                                         f_tinyint, 
f_tinyint_unsigned, f_json, f_year, f_string_add)\n"
+                        + "VALUES ( 100, "
+                        + 
"0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+                        + "         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+                        + "         0x74696E79626C6F62, 
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 
1234567, 7654321,\n"
+                        + "         123456789, 987654321, 123, 789, 12.34, 
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+                        + "         'This is a text field', 'This is a tiny 
text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n"
+                        + "         '2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',\n"
+                        + "         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',\n"
+                        + "         12.345, '14:30:00', -128, 255, '{ \"key\": 
\"value\" }', 1992 , 'add column "
+                        + "field')");
+    }
 }

Reply via email to