This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a8d0d4ce77 [Hotfix][CDC] Fix occasional database connection leak when
read snapshot split (#7918)
a8d0d4ce77 is described below
commit a8d0d4ce77a1ac9116f93381dfe296d22d8f0e8b
Author: hailin0 <[email protected]>
AuthorDate: Mon Oct 28 17:57:27 2024 +0800
[Hotfix][CDC] Fix occasional database connection leak when read snapshot
split (#7918)
---
.../external/IncrementalSourceScanFetcher.java | 16 +++-
.../external/IncrementalSourceStreamFetcher.java | 16 +++-
.../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java | 85 +++++++++++++++++++++-
3 files changed, 108 insertions(+), 9 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
index 7f927af587..2ed961865e 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -222,12 +222,15 @@ public class IncrementalSourceScanFetcher implements
Fetcher<SourceRecords, Sour
@Override
public void close() {
try {
- if (taskContext != null) {
- taskContext.close();
- }
+ // 1. try close the split task
if (snapshotSplitReadTask != null) {
- snapshotSplitReadTask.shutdown();
+ try {
+ snapshotSplitReadTask.shutdown();
+ } catch (Exception e) {
+ log.error("Close snapshot split read task error", e);
+ }
}
+ // 2. close the fetcher thread
if (executorService != null) {
executorService.shutdown();
if (!executorService.awaitTermination(
@@ -240,6 +243,11 @@ public class IncrementalSourceScanFetcher implements
Fetcher<SourceRecords, Sour
}
} catch (Exception e) {
log.error("Close scan fetcher error", e);
+ } finally {
+ // 3. close the task context
+ if (taskContext != null) {
+ taskContext.close();
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 16e4537656..17536d9de0 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -187,12 +187,15 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
@Override
public void close() {
try {
- if (taskContext != null) {
- taskContext.close();
- }
+ // 1. try close the split task
if (streamFetchTask != null) {
- streamFetchTask.shutdown();
+ try {
+ streamFetchTask.shutdown();
+ } catch (Exception e) {
+ log.error("Close stream split read task error", e);
+ }
}
+ // 2. close the fetcher thread
if (executorService != null) {
executorService.shutdown();
if (!executorService.awaitTermination(
@@ -205,6 +208,11 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
}
} catch (Exception e) {
log.error("Close stream fetcher error", e);
+ } finally {
+ // 3. close the task context
+ if (taskContext != null) {
+ taskContext.close();
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index bf7e8d8fe7..2891907431 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -328,6 +328,9 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
clearTable(MYSQL_DATABASE2, SOURCE_TABLE_1);
clearTable(MYSQL_DATABASE2, SOURCE_TABLE_2);
+ // init
+ initSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1);
+
Long jobId = JobIdGenerator.newJobId();
CompletableFuture.supplyAsync(
() -> {
@@ -341,8 +344,32 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
}
});
+ // wait for data written to sink
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertTrue(
+
query(getSourceQuerySQL(MYSQL_DATABASE2, SOURCE_TABLE_1))
+ .size()
+ > 1));
+
+ // Restore job with snapshot read phase
+ Assertions.assertEquals(0,
container.savepointJob(String.valueOf(jobId)).getExitCode());
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.restoreJob(
+
"/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf",
+ String.valueOf(jobId));
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
// insert update delete
- upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1);
+ changeSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1);
// stream stage
await().atMost(60000, TimeUnit.MILLISECONDS)
@@ -521,6 +548,62 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
}
}
+ private void initSourceTable(String database, String tableName) {
+ for (int i = 1; i < 100; i++) {
+ executeSql(
+ "INSERT INTO "
+ + database
+ + "."
+ + tableName
+ + " ( id, f_binary, f_blob, f_long_varbinary,
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+ + "
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned,
f_integer,\n"
+ + "
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float,
f_double,\n"
+ + "
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar,
f_date, f_datetime,\n"
+ + "
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar,
f_real, f_time,\n"
+ + "
f_tinyint, f_tinyint_unsigned, f_json, f_year )\n"
+ + "VALUES ( "
+ + i
+ + ",
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+ + " 0x68656C6C6F,
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+ + " 0x74696E79626C6F62,
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321,
1234567, 7654321,\n"
+ + " 123456789, 987654321, 123, 789, 12.34,
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+ + " 'This is a text field', 'This is a
tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27
14:30:00',\n"
+ + " '2023-04-27 11:08:40', 1,
b'0101010101010101010101010101010101010101010101010101010101010101', 'C',
'enum2',\n"
+ + "
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A,
'This is a long varchar field',\n"
+ + " 12.345, '14:30:00', -128, 255, '{
\"key\": \"value\" }', 1992 )");
+ }
+ }
+
+ private void changeSourceTable(String database, String tableName) {
+ for (int i = 100; i < 110; i++) {
+ executeSql(
+ "INSERT INTO "
+ + database
+ + "."
+ + tableName
+ + " ( id, f_binary, f_blob, f_long_varbinary,
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+ + "
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned,
f_integer,\n"
+ + "
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float,
f_double,\n"
+ + "
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar,
f_date, f_datetime,\n"
+ + "
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar,
f_real, f_time,\n"
+ + "
f_tinyint, f_tinyint_unsigned, f_json, f_year )\n"
+ + "VALUES ( "
+ + i
+ + ",
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+ + " 0x68656C6C6F,
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+ + " 0x74696E79626C6F62,
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321,
1234567, 7654321,\n"
+ + " 123456789, 987654321, 123, 789, 12.34,
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+ + " 'This is a text field', 'This is a
tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27
14:30:00',\n"
+ + " '2023-04-27 11:08:40', 1,
b'0101010101010101010101010101010101010101010101010101010101010101', 'C',
'enum2',\n"
+ + "
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A,
'This is a long varchar field',\n"
+ + " 12.345, '14:30:00', -128, 255, '{
\"key\": \"value\" }', 1992 )");
+ }
+
+ executeSql("DELETE FROM " + database + "." + tableName + " where id >
100");
+
+ executeSql("UPDATE " + database + "." + tableName + " SET f_bigint =
10000 where id < 10");
+ }
+
private void upsertDeleteSourceTable(String database, String tableName) {
executeSql(