This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a8d0d4ce77 [Hotfix][CDC] Fix occasional database connection leak when 
read snapshot split (#7918)
a8d0d4ce77 is described below

commit a8d0d4ce77a1ac9116f93381dfe296d22d8f0e8b
Author: hailin0 <[email protected]>
AuthorDate: Mon Oct 28 17:57:27 2024 +0800

    [Hotfix][CDC] Fix occasional database connection leak when read snapshot 
split (#7918)
---
 .../external/IncrementalSourceScanFetcher.java     | 16 +++-
 .../external/IncrementalSourceStreamFetcher.java   | 16 +++-
 .../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java | 85 +++++++++++++++++++++-
 3 files changed, 108 insertions(+), 9 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
index 7f927af587..2ed961865e 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -222,12 +222,15 @@ public class IncrementalSourceScanFetcher implements 
Fetcher<SourceRecords, Sour
     @Override
     public void close() {
         try {
-            if (taskContext != null) {
-                taskContext.close();
-            }
+            // 1. try close the split task
             if (snapshotSplitReadTask != null) {
-                snapshotSplitReadTask.shutdown();
+                try {
+                    snapshotSplitReadTask.shutdown();
+                } catch (Exception e) {
+                    log.error("Close snapshot split read task error", e);
+                }
             }
+            // 2. close the fetcher thread
             if (executorService != null) {
                 executorService.shutdown();
                 if (!executorService.awaitTermination(
@@ -240,6 +243,11 @@ public class IncrementalSourceScanFetcher implements 
Fetcher<SourceRecords, Sour
             }
         } catch (Exception e) {
             log.error("Close scan fetcher error", e);
+        } finally {
+            // 3. close the task context
+            if (taskContext != null) {
+                taskContext.close();
+            }
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 16e4537656..17536d9de0 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -187,12 +187,15 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
     @Override
     public void close() {
         try {
-            if (taskContext != null) {
-                taskContext.close();
-            }
+            // 1. try close the split task
             if (streamFetchTask != null) {
-                streamFetchTask.shutdown();
+                try {
+                    streamFetchTask.shutdown();
+                } catch (Exception e) {
+                    log.error("Close stream split read task error", e);
+                }
             }
+            // 2. close the fetcher thread
             if (executorService != null) {
                 executorService.shutdown();
                 if (!executorService.awaitTermination(
@@ -205,6 +208,11 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
             }
         } catch (Exception e) {
             log.error("Close stream fetcher error", e);
+        } finally {
+            // 3. close the task context
+            if (taskContext != null) {
+                taskContext.close();
+            }
         }
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index bf7e8d8fe7..2891907431 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -328,6 +328,9 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
         clearTable(MYSQL_DATABASE2, SOURCE_TABLE_1);
         clearTable(MYSQL_DATABASE2, SOURCE_TABLE_2);
 
+        // init
+        initSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1);
+
         Long jobId = JobIdGenerator.newJobId();
         CompletableFuture.supplyAsync(
                 () -> {
@@ -341,8 +344,32 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
                     }
                 });
 
+        // wait for data written to sink
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertTrue(
+                                        
query(getSourceQuerySQL(MYSQL_DATABASE2, SOURCE_TABLE_1))
+                                                        .size()
+                                                > 1));
+
+        // Restore job with snapshot read phase
+        Assertions.assertEquals(0, 
container.savepointJob(String.valueOf(jobId)).getExitCode());
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        container.restoreJob(
+                                
"/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf",
+                                String.valueOf(jobId));
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                    return null;
+                });
+
         // insert update delete
-        upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1);
+        changeSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1);
 
         // stream stage
         await().atMost(60000, TimeUnit.MILLISECONDS)
@@ -521,6 +548,62 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
         }
     }
 
+    private void initSourceTable(String database, String tableName) {
+        for (int i = 1; i < 100; i++) {
+            executeSql(
+                    "INSERT INTO "
+                            + database
+                            + "."
+                            + tableName
+                            + " ( id, f_binary, f_blob, f_long_varbinary, 
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+                            + "                                         
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, 
f_integer,\n"
+                            + "                                         
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, 
f_double,\n"
+                            + "                                         
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, 
f_date, f_datetime,\n"
+                            + "                                         
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, 
f_real, f_time,\n"
+                            + "                                         
f_tinyint, f_tinyint_unsigned, f_json, f_year )\n"
+                            + "VALUES ( "
+                            + i
+                            + ", 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+                            + "         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+                            + "         0x74696E79626C6F62, 
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 
1234567, 7654321,\n"
+                            + "         123456789, 987654321, 123, 789, 12.34, 
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+                            + "         'This is a text field', 'This is a 
tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 
14:30:00',\n"
+                            + "         '2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',\n"
+                            + "         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',\n"
+                            + "         12.345, '14:30:00', -128, 255, '{ 
\"key\": \"value\" }', 1992 )");
+        }
+    }
+
+    private void changeSourceTable(String database, String tableName) {
+        for (int i = 100; i < 110; i++) {
+            executeSql(
+                    "INSERT INTO "
+                            + database
+                            + "."
+                            + tableName
+                            + " ( id, f_binary, f_blob, f_long_varbinary, 
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+                            + "                                         
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, 
f_integer,\n"
+                            + "                                         
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, 
f_double,\n"
+                            + "                                         
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, 
f_date, f_datetime,\n"
+                            + "                                         
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, 
f_real, f_time,\n"
+                            + "                                         
f_tinyint, f_tinyint_unsigned, f_json, f_year )\n"
+                            + "VALUES ( "
+                            + i
+                            + ", 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+                            + "         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+                            + "         0x74696E79626C6F62, 
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 
1234567, 7654321,\n"
+                            + "         123456789, 987654321, 123, 789, 12.34, 
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+                            + "         'This is a text field', 'This is a 
tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 
14:30:00',\n"
+                            + "         '2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',\n"
+                            + "         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',\n"
+                            + "         12.345, '14:30:00', -128, 255, '{ 
\"key\": \"value\" }', 1992 )");
+        }
+
+        executeSql("DELETE FROM " + database + "." + tableName + " where id > 
100");
+
+        executeSql("UPDATE " + database + "." + tableName + " SET f_bigint = 
10000 where id < 10");
+    }
+
     private void upsertDeleteSourceTable(String database, String tableName) {
 
         executeSql(

Reply via email to