This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1b269e86a2 enable iceberg schema change test (#6711)
1b269e86a2 is described below
commit 1b269e86a25111926ed0396709ccfb59fc39f697
Author: Xiaojian Sun <[email protected]>
AuthorDate: Mon Apr 15 18:48:49 2024 +0800
enable iceberg schema change test (#6711)
---
.../e2e/connector/iceberg/IcebergSinkCDCIT.java | 57 +++++++++++++++++-----
1 file changed, 46 insertions(+), 11 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
index afe6d3d43b..111c7e0dfb 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
@@ -44,7 +44,6 @@ import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
@@ -64,6 +63,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -165,7 +165,6 @@ public class IcebergSinkCDCIT extends TestSuiteBase
implements TestResource {
"sh", "-c", "cd " + CATALOG_DIR + " && tar -zxvf "
+ NAMESPACE_TAR);
try {
Process process = processBuilder.start();
- // 等待命令执行完成
int exitCode = process.waitFor();
if (exitCode == 0) {
log.info("Extract files successful.");
@@ -210,7 +209,6 @@ public class IcebergSinkCDCIT extends TestSuiteBase
implements TestResource {
}
@TestTemplate
- @Disabled
public void testMysqlCdcCheckSchemaChangeE2e(TestContainer container)
throws IOException, InterruptedException {
// Clear related content to ensure that multiple operations are not
affected
@@ -232,9 +230,10 @@ public class IcebergSinkCDCIT extends TestSuiteBase
implements TestResource {
private void alterSchemaAndCheckIcebergSchema(TestContainer container)
throws InterruptedException, IOException {
- String dropField = "f_binary";
+ String addField = "f_string_add";
// Init table data
- dropTableColumn(MYSQL_DATABASE, SOURCE_TABLE, dropField);
+ addTableColumn(MYSQL_DATABASE, SOURCE_TABLE, addField);
+ insertAddColumnData(MYSQL_DATABASE, SOURCE_TABLE);
// Waiting 30s for source capture data
sleep(30000);
@@ -247,10 +246,21 @@ public class IcebergSinkCDCIT extends TestSuiteBase
implements TestResource {
// copy iceberg to local
container.executeExtraCommands(containerExtendedFactory);
Schema schema = loadIcebergSchema();
- Types.NestedField nestedField =
schema.findField(dropField);
- Assertions.assertEquals(true, nestedField == null);
- // for next test
- addTableColumn(MYSQL_DATABASE, SOURCE_TABLE,
dropField);
+ Types.NestedField nestedField =
schema.findField(addField);
+ Assertions.assertEquals(true,
Objects.nonNull(nestedField));
+
+ List<Record> records = loadIcebergTable();
+ Assertions.assertEquals(4, records.size());
+ for (Record record : records) {
+ Integer id = (Integer) record.getField("id");
+ String f_string_add = (String)
record.getField("f_string_add");
+ if (id == 100) {
+ Assertions.assertEquals("add column
field", f_string_add);
+ }
+ }
+
+ // for next test.
+ dropTableColumn(MYSQL_DATABASE, SOURCE_TABLE,
addField);
});
}
@@ -342,8 +352,9 @@ public class IcebergSinkCDCIT extends TestSuiteBase
implements TestResource {
executeSql("ALTER TABLE " + database + "." + tableName + " DROP COLUMN
" + dropField);
}
- private void addTableColumn(String database, String tableName, String
dropField) {
- executeSql("ALTER TABLE " + database + "." + tableName + " ADD COLUMN
" + dropField);
+ private void addTableColumn(String database, String tableName, String
addField) {
+ executeSql(
+ "ALTER TABLE " + database + "." + tableName + " ADD COLUMN " +
addField + " text");
}
private void clearTable(String database, String tableName) {
@@ -456,4 +467,28 @@ public class IcebergSinkCDCIT extends TestSuiteBase
implements TestResource {
executeSql("UPDATE " + database + "." + tableName + " SET f_bigint =
10000 where id = 3");
}
+
+ private void insertAddColumnData(String database, String tableName) {
+ executeSql(
+ "INSERT INTO "
+ + database
+ + "."
+ + tableName
+ + " ( id, f_binary, f_blob, f_long_varbinary,
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+ + "
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned,
f_integer,\n"
+ + "
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float,
f_double,\n"
+ + "
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar,
f_date, f_datetime,\n"
+ + "
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar,
f_real, f_time,\n"
+ + " f_tinyint,
f_tinyint_unsigned, f_json, f_year, f_string_add)\n"
+ + "VALUES ( 100, "
+ +
"0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+ + " 0x68656C6C6F,
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+ + " 0x74696E79626C6F62,
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321,
1234567, 7654321,\n"
+ + " 123456789, 987654321, 123, 789, 12.34,
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+ + " 'This is a text field', 'This is a tiny
text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n"
+ + " '2023-04-27 11:08:40', 1,
b'0101010101010101010101010101010101010101010101010101010101010101', 'C',
'enum2',\n"
+ + "
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A,
'This is a long varchar field',\n"
+ + " 12.345, '14:30:00', -128, 255, '{ \"key\":
\"value\" }', 1992 , 'add column "
+ + "field')");
+ }
}