This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6545e4f9e [INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot
split exit catch exception (#7141)
6545e4f9e is described below
commit 6545e4f9e474a776b8178604c4048aa661793fff
Author: Schnapps <[email protected]>
AuthorDate: Wed Jan 4 17:24:03 2023 +0800
[INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot split exit
catch exception (#7141)
Co-authored-by: stingpeng <[email protected]>
---
.../assigners/MySqlSnapshotSplitAssigner.java | 24 +++++++++++++---------
1 file changed, 14 insertions(+), 10 deletions(-)
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index 498edec76..d270fa792 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -205,17 +205,21 @@ public class MySqlSnapshotSplitAssigner implements
MySqlSplitAssigner {
executor.submit(
() -> {
- Iterator<TableId> iterator =
remainingTables.iterator();
- while (iterator.hasNext()) {
- TableId nextTable = iterator.next();
- // split the given table into chunks (snapshot
splits)
- Collection<MySqlSnapshotSplit> splits =
- chunkSplitter.generateSplits(nextTable);
- synchronized (lock) {
- remainingSplits.addAll(splits);
- remainingTables.remove(nextTable);
- lock.notify();
+ try {
+ Iterator<TableId> iterator =
remainingTables.iterator();
+ while (iterator.hasNext()) {
+ TableId nextTable = iterator.next();
+ // split the given table into chunks (snapshot
splits)
+ Collection<MySqlSnapshotSplit> splits =
+
chunkSplitter.generateSplits(nextTable);
+ synchronized (lock) {
+ remainingSplits.addAll(splits);
+ remainingTables.remove(nextTable);
+ lock.notify();
+ }
}
+ } catch (Exception e) {
+ LOG.error("asynchronously split exit with
exception", e);
}
});
}