This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0c593a39e3 [BugFix][CDC Base] Fix added columns cannot be parsed after
job restore (#6118)
0c593a39e3 is described below
commit 0c593a39e378e53ef7b78bd48af9d0d77c6223d6
Author: ic4y <[email protected]>
AuthorDate: Fri Jan 19 19:20:57 2024 +0800
[BugFix][CDC Base] Fix added columns cannot be parsed after job restore
(#6118)
---
.../row/SeaTunnelRowDebeziumDeserializeSchema.java | 6 ++
.../seatunnel/cdc/postgres/PostgresCDCIT.java | 112 +++++++++++++++++++++
.../src/test/resources/ddl/inventory.sql | 24 +++++
.../postgrescdc_to_postgres_test_add_Filed.conf | 61 +++++++++++
4 files changed, 203 insertions(+)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
index 3e86a6603d..d249b0d927 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
@@ -234,6 +234,12 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
@Override
public void restoreCheckpointProducedType(SeaTunnelDataType<SeaTunnelRow>
checkpointDataType) {
+ // If checkpointDataType is null, it indicates that DDL changes are
not supported.
+ // Therefore, we need to use the latest table structure to ensure that
data from newly added
+ // columns can be parsed correctly.
+ if (schemaChangeResolver == null) {
+ return;
+ }
if (SqlType.ROW.equals(checkpointDataType.getSqlType())
&& SqlType.MULTIPLE_ROW.equals(resultTypeInfo.getSqlType())) {
// TODO: Older versions may have this issue
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
index e4ee9df24f..88aca48e1e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
@@ -82,8 +82,10 @@ public class PostgresCDCIT extends TestSuiteBase implements
TestResource {
private static final String SOURCE_TABLE_1 = "postgres_cdc_table_1";
private static final String SOURCE_TABLE_2 = "postgres_cdc_table_2";
+ private static final String SOURCE_TABLE_3 = "postgres_cdc_table_3";
private static final String SINK_TABLE_1 = "sink_postgres_cdc_table_1";
private static final String SINK_TABLE_2 = "sink_postgres_cdc_table_2";
+ private static final String SINK_TABLE_3 = "sink_postgres_cdc_table_3";
private static final String SOURCE_TABLE_NO_PRIMARY_KEY =
"full_types_no_primary_key";
@@ -373,6 +375,102 @@ public class PostgresCDCIT extends TestSuiteBase
implements TestResource {
}
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support multi
table")
+ public void testAddFiledWithRestore(TestContainer container)
+ throws IOException, InterruptedException {
+ try {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return container.executeJob(
+
"/postgrescdc_to_postgres_test_add_Filed.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" +
e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertAll(
+ () ->
+
Assertions.assertIterableEquals(
+ query(
+
getQuerySQL(
+
POSTGRESQL_SCHEMA,
+
SOURCE_TABLE_3)),
+ query(
+
getQuerySQL(
+
POSTGRESQL_SCHEMA,
+
SINK_TABLE_3)))));
+
+ Pattern jobIdPattern =
+ Pattern.compile(
+ ".*Init JobMaster for Job SeaTunnel_Job
\\(([0-9]*)\\).*",
+ Pattern.DOTALL);
+ Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
+ String jobId;
+ if (matcher.matches()) {
+ jobId = matcher.group(1);
+ } else {
+ throw new RuntimeException("Can not find jobId");
+ }
+
+ Assertions.assertEquals(0,
container.savepointJob(jobId).getExitCode());
+
+ // add filed add insert source table data
+ addFieldsForTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_3);
+ addFieldsForTable(POSTGRESQL_SCHEMA, SINK_TABLE_3);
+ insertSourceTableForAddFields(POSTGRESQL_SCHEMA, SOURCE_TABLE_3);
+
+ // Restore job
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.restoreJob(
+
"/postgrescdc_to_postgres_test_add_Filed.conf", jobId);
+ } catch (Exception e) {
+ log.error("Commit task exception :" +
e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertAll(
+ () ->
+
Assertions.assertIterableEquals(
+ query(
+
getQuerySQL(
+
POSTGRESQL_SCHEMA,
+
SOURCE_TABLE_3)),
+ query(
+
getQuerySQL(
+
POSTGRESQL_SCHEMA,
+
SINK_TABLE_3)))));
+
+ log.info("****************** container logs start
******************");
+ String containerLogs = container.getServerLogs();
+ log.info(containerLogs);
+ // pg cdc logs contain ERROR
+ // Assertions.assertFalse(containerLogs.contains("ERROR"));
+ log.info("****************** container logs end
******************");
+ } finally {
+ // Clear related content to ensure that multiple operations are
not affected
+ clearTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_3);
+ clearTable(POSTGRESQL_SCHEMA, SINK_TABLE_3);
+ }
+ }
+
@TestTemplate
public void testPostgresCdcCheckDataWithNoPrimaryKey(TestContainer
container) throws Exception {
@@ -541,6 +639,20 @@ public class PostgresCDCIT extends TestSuiteBase
implements TestResource {
}
}
+ private void addFieldsForTable(String database, String tableName) {
+
+ executeSql("ALTER TABLE " + database + "." + tableName + " ADD COLUMN
f_big BIGINT");
+ }
+
+ private void insertSourceTableForAddFields(String database, String
tableName) {
+ executeSql(
+ "INSERT INTO "
+ + database
+ + "."
+ + tableName
+ + " VALUES (2, '2', 32767, 65535, 2147483647);");
+ }
+
private void upsertDeleteSourceTable(String database, String tableName) {
executeSql(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
index 98e6d46028..cff1a3980f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
@@ -145,12 +145,33 @@ CREATE TABLE full_types_no_primary_key
f_default_numeric NUMERIC
);
+CREATE TABLE postgres_cdc_table_3
+(
+ id INTEGER NOT NULL,
+ f_bytea BYTEA,
+ f_small SMALLINT,
+ f_int INTEGER,
+ PRIMARY KEY (id)
+);
+
+CREATE TABLE sink_postgres_cdc_table_3
+(
+ id INTEGER NOT NULL,
+ f_bytea BYTEA,
+ f_small SMALLINT,
+ f_int INTEGER,
+ PRIMARY KEY (id)
+);
+
ALTER TABLE postgres_cdc_table_1
REPLICA IDENTITY FULL;
ALTER TABLE postgres_cdc_table_2
REPLICA IDENTITY FULL;
+ALTER TABLE postgres_cdc_table_3
+ REPLICA IDENTITY FULL;
+
ALTER TABLE sink_postgres_cdc_table_1
REPLICA IDENTITY FULL;
@@ -170,6 +191,9 @@ VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6,
123.12345, 404.4443, true,
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
'2020-07-17', '18:00:22', 500);
+INSERT INTO postgres_cdc_table_3
+VALUES (1, '2', 32767, 65535);
+
INSERT INTO full_types_no_primary_key
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_test_add_Filed.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_test_add_Filed.conf
new file mode 100644
index 0000000000..f6e8f89d2b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_test_add_Filed.conf
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ Postgres-CDC {
+ result_table_name = "customers_postgres_cdc"
+ username = "postgres"
+ password = "postgres"
+ database-names = ["postgres_cdc"]
+ schema-names = ["inventory"]
+ table-names = ["postgres_cdc.inventory.postgres_cdc_table_3"]
+ base-url =
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+ decoding.plugin.name = "decoderbufs"
+ }
+}
+
+transform {
+
+}
+
+sink {
+ jdbc {
+ source_table_name = "customers_postgres_cdc"
+ url =
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+ driver = "org.postgresql.Driver"
+ user = "postgres"
+ password = "postgres"
+
+ generate_sink_sql = true
+ # You need to configure both database and table
+ database = postgres_cdc
+ table = inventory.sink_postgres_cdc_table_3
+ primary_keys = ["id"]
+ }
+}