This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0c593a39e3 [BugFix][CDC Base] Fix added columns cannot be parsed after 
job restore (#6118)
0c593a39e3 is described below

commit 0c593a39e378e53ef7b78bd48af9d0d77c6223d6
Author: ic4y <[email protected]>
AuthorDate: Fri Jan 19 19:20:57 2024 +0800

    [BugFix][CDC Base] Fix added columns cannot be parsed after job restore 
(#6118)
---
 .../row/SeaTunnelRowDebeziumDeserializeSchema.java |   6 ++
 .../seatunnel/cdc/postgres/PostgresCDCIT.java      | 112 +++++++++++++++++++++
 .../src/test/resources/ddl/inventory.sql           |  24 +++++
 .../postgrescdc_to_postgres_test_add_Filed.conf    |  61 +++++++++++
 4 files changed, 203 insertions(+)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
index 3e86a6603d..d249b0d927 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
@@ -234,6 +234,12 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
 
     @Override
     public void restoreCheckpointProducedType(SeaTunnelDataType<SeaTunnelRow> 
checkpointDataType) {
+        // If checkpointDataType is null, it indicates that DDL changes are 
not supported.
+        // Therefore, we need to use the latest table structure to ensure that 
data from newly added
+        // columns can be parsed correctly.
+        if (schemaChangeResolver == null) {
+            return;
+        }
         if (SqlType.ROW.equals(checkpointDataType.getSqlType())
                 && SqlType.MULTIPLE_ROW.equals(resultTypeInfo.getSqlType())) {
             // TODO: Older versions may have this issue
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
index e4ee9df24f..88aca48e1e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
@@ -82,8 +82,10 @@ public class PostgresCDCIT extends TestSuiteBase implements 
TestResource {
 
     private static final String SOURCE_TABLE_1 = "postgres_cdc_table_1";
     private static final String SOURCE_TABLE_2 = "postgres_cdc_table_2";
+    private static final String SOURCE_TABLE_3 = "postgres_cdc_table_3";
     private static final String SINK_TABLE_1 = "sink_postgres_cdc_table_1";
     private static final String SINK_TABLE_2 = "sink_postgres_cdc_table_2";
+    private static final String SINK_TABLE_3 = "sink_postgres_cdc_table_3";
 
     private static final String SOURCE_TABLE_NO_PRIMARY_KEY = 
"full_types_no_primary_key";
 
@@ -373,6 +375,102 @@ public class PostgresCDCIT extends TestSuiteBase 
implements TestResource {
         }
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Currently SPARK and FLINK do not support multi 
table")
+    public void testAddFiledWithRestore(TestContainer container)
+            throws IOException, InterruptedException {
+        try {
+            CompletableFuture.supplyAsync(
+                    () -> {
+                        try {
+                            return container.executeJob(
+                                    
"/postgrescdc_to_postgres_test_add_Filed.conf");
+                        } catch (Exception e) {
+                            log.error("Commit task exception :" + 
e.getMessage());
+                            throw new RuntimeException(e);
+                        }
+                    });
+
+            // stream stage
+            await().atMost(60000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertAll(
+                                            () ->
+                                                    
Assertions.assertIterableEquals(
+                                                            query(
+                                                                    
getQuerySQL(
+                                                                            
POSTGRESQL_SCHEMA,
+                                                                            
SOURCE_TABLE_3)),
+                                                            query(
+                                                                    
getQuerySQL(
+                                                                            
POSTGRESQL_SCHEMA,
+                                                                            
SINK_TABLE_3)))));
+
+            Pattern jobIdPattern =
+                    Pattern.compile(
+                            ".*Init JobMaster for Job SeaTunnel_Job 
\\(([0-9]*)\\).*",
+                            Pattern.DOTALL);
+            Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
+            String jobId;
+            if (matcher.matches()) {
+                jobId = matcher.group(1);
+            } else {
+                throw new RuntimeException("Can not find jobId");
+            }
+
+            Assertions.assertEquals(0, 
container.savepointJob(jobId).getExitCode());
+
+            // add filed add insert source table data
+            addFieldsForTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_3);
+            addFieldsForTable(POSTGRESQL_SCHEMA, SINK_TABLE_3);
+            insertSourceTableForAddFields(POSTGRESQL_SCHEMA, SOURCE_TABLE_3);
+
+            // Restore job
+            CompletableFuture.supplyAsync(
+                    () -> {
+                        try {
+                            container.restoreJob(
+                                    
"/postgrescdc_to_postgres_test_add_Filed.conf", jobId);
+                        } catch (Exception e) {
+                            log.error("Commit task exception :" + 
e.getMessage());
+                            throw new RuntimeException(e);
+                        }
+                        return null;
+                    });
+
+            // stream stage
+            await().atMost(60000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertAll(
+                                            () ->
+                                                    
Assertions.assertIterableEquals(
+                                                            query(
+                                                                    
getQuerySQL(
+                                                                            
POSTGRESQL_SCHEMA,
+                                                                            
SOURCE_TABLE_3)),
+                                                            query(
+                                                                    
getQuerySQL(
+                                                                            
POSTGRESQL_SCHEMA,
+                                                                            
SINK_TABLE_3)))));
+
+            log.info("****************** container logs start 
******************");
+            String containerLogs = container.getServerLogs();
+            log.info(containerLogs);
+            // pg cdc logs contain ERROR
+            // Assertions.assertFalse(containerLogs.contains("ERROR"));
+            log.info("****************** container logs end 
******************");
+        } finally {
+            // Clear related content to ensure that multiple operations are 
not affected
+            clearTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_3);
+            clearTable(POSTGRESQL_SCHEMA, SINK_TABLE_3);
+        }
+    }
+
     @TestTemplate
     public void testPostgresCdcCheckDataWithNoPrimaryKey(TestContainer 
container) throws Exception {
 
@@ -541,6 +639,20 @@ public class PostgresCDCIT extends TestSuiteBase 
implements TestResource {
         }
     }
 
+    private void addFieldsForTable(String database, String tableName) {
+
+        executeSql("ALTER TABLE " + database + "." + tableName + " ADD COLUMN 
f_big BIGINT");
+    }
+
+    private void insertSourceTableForAddFields(String database, String 
tableName) {
+        executeSql(
+                "INSERT INTO "
+                        + database
+                        + "."
+                        + tableName
+                        + " VALUES (2, '2', 32767, 65535, 2147483647);");
+    }
+
     private void upsertDeleteSourceTable(String database, String tableName) {
 
         executeSql(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
index 98e6d46028..cff1a3980f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
@@ -145,12 +145,33 @@ CREATE TABLE full_types_no_primary_key
     f_default_numeric   NUMERIC
 );
 
+CREATE TABLE postgres_cdc_table_3
+(
+    id                  INTEGER NOT NULL,
+    f_bytea             BYTEA,
+    f_small             SMALLINT,
+    f_int               INTEGER,
+    PRIMARY KEY (id)
+);
+
+CREATE TABLE sink_postgres_cdc_table_3
+(
+    id                  INTEGER NOT NULL,
+    f_bytea             BYTEA,
+    f_small             SMALLINT,
+    f_int               INTEGER,
+    PRIMARY KEY (id)
+);
+
 ALTER TABLE postgres_cdc_table_1
     REPLICA IDENTITY FULL;
 
 ALTER TABLE postgres_cdc_table_2
     REPLICA IDENTITY FULL;
 
+ALTER TABLE postgres_cdc_table_3
+    REPLICA IDENTITY FULL;
+
 ALTER TABLE sink_postgres_cdc_table_1
     REPLICA IDENTITY FULL;
 
@@ -170,6 +191,9 @@ VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 
123.12345, 404.4443, true,
         'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', 
'2020-07-17 18:00:22.123456',
         '2020-07-17', '18:00:22', 500);
 
+INSERT INTO postgres_cdc_table_3
+VALUES (1, '2', 32767, 65535);
+
 INSERT INTO full_types_no_primary_key
 VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
         'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', 
'2020-07-17 18:00:22.123456',
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_test_add_Filed.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_test_add_Filed.conf
new file mode 100644
index 0000000000..f6e8f89d2b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_test_add_Filed.conf
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  read_limit.bytes_per_second=7000000
+  read_limit.rows_per_second=400
+}
+
+source {
+  Postgres-CDC {
+    result_table_name = "customers_postgres_cdc"
+    username = "postgres"
+    password = "postgres"
+    database-names = ["postgres_cdc"]
+    schema-names = ["inventory"]
+    table-names = ["postgres_cdc.inventory.postgres_cdc_table_3"]
+    base-url = 
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+    decoding.plugin.name = "decoderbufs"
+  }
+}
+
+transform {
+
+}
+
+sink {
+  jdbc {
+    source_table_name = "customers_postgres_cdc"
+    url = 
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+    driver = "org.postgresql.Driver"
+    user = "postgres"
+    password = "postgres"
+
+    generate_sink_sql = true
+    # You need to configure both database and table
+    database = postgres_cdc
+    table = inventory.sink_postgres_cdc_table_3
+    primary_keys = ["id"]
+  }
+}

Reply via email to