This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e19d73282e [Fix][Connector-V2] Fix some throwable error not be caught
(#7657)
e19d73282e is described below
commit e19d73282e618deb6a3747c86bdf6bd7d620d47c
Author: Jia Fan <[email protected]>
AuthorDate: Sat Sep 21 22:15:58 2024 +0800
[Fix][Connector-V2] Fix some throwable error not be caught (#7657)
---
.../reader/external/IncrementalSourceScanFetcher.java | 2 +-
.../reader/external/IncrementalSourceStreamFetcher.java | 2 +-
.../seatunnel/rocketmq/source/RocketMqSourceReader.java | 4 ++--
.../connectors/seatunnel/sls/source/SlsSourceReader.java | 15 +++------------
.../checkpoint/storage/api/AbstractCheckpointStorage.java | 2 +-
5 files changed, 8 insertions(+), 17 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
index da048b47e5..7f927af587 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -93,7 +93,7 @@ public class IncrementalSourceScanFetcher implements
Fetcher<SourceRecords, Sour
currentSnapshotSplit,
taskContext.isExactlyOnce());
snapshotSplitReadTask.execute(taskContext);
- } catch (Exception e) {
+ } catch (Throwable e) {
log.error(
String.format(
"Execute snapshot read task for
snapshot split %s fail",
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 338cb657b3..16e4537656 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -104,7 +104,7 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
currentIncrementalSplit,
taskContext.isExactlyOnce());
streamFetchTask.execute(taskContext);
- } catch (Exception e) {
+ } catch (Throwable e) {
log.error(
String.format(
"Execute stream read task for
incremental split %s fail",
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
index 90bc8f3231..0dfa0b179d 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
@@ -98,7 +98,7 @@ public class RocketMqSourceReader implements
SourceReader<SeaTunnelRow, RocketMq
Thread.sleep(THREAD_WAIT_TIME);
return;
}
- while (pendingPartitionsQueue.size() != 0) {
+ while (!pendingPartitionsQueue.isEmpty()) {
sourceSplits.add(pendingPartitionsQueue.poll());
}
sourceSplits.forEach(
@@ -166,7 +166,7 @@ public class RocketMqSourceReader implements
SourceReader<SeaTunnelRow, RocketMq
// just for bounded mode
sourceSplit.setEndOffset(lastOffset);
}
- } catch (Exception e) {
+ } catch (Throwable e) {
completableFuture.completeExceptionally(e);
}
completableFuture.complete(null);
diff --git
a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java
index 43cb75328a..819b3b07d6 100644
---
a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java
+++
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java
@@ -127,12 +127,8 @@ public class SlsSourceReader implements
SourceReader<SeaTunnelRow, SlsSourceSpli
sourceSplit.setStartCursor(
response.getNextCursor());
completableFuture.complete(true);
- } catch (LogException e) {
- e.printStackTrace();
-
completableFuture.completeExceptionally(e);
- throw new RuntimeException(e);
- } catch (IOException e) {
- e.printStackTrace();
+ } catch (Throwable e) {
+ log.error("pull logs failed",
e);
completableFuture.completeExceptionally(e);
throw new RuntimeException(e);
}
@@ -141,11 +137,7 @@ public class SlsSourceReader implements
SourceReader<SeaTunnelRow, SlsSourceSpli
if (completableFuture.get()) {
finishedSplits.add(sourceSplit);
}
- } catch (InterruptedException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- e.printStackTrace();
+ } catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
@@ -215,7 +207,6 @@ public class SlsSourceReader implements
SourceReader<SeaTunnelRow, SlsSourceSpli
slsSourceSplit
.getStartCursor());
} catch (LogException
e) {
-
e.printStackTrace();
log.error(
"LogException: commit cursor to sls failed",
e);
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
index a466408d9c..ca912ee2b7 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
@@ -195,7 +195,7 @@ public abstract class AbstractCheckpointStorage implements
CheckpointStorage {
() -> {
try {
storeCheckPoint(state);
- } catch (Exception e) {
+ } catch (Throwable e) {
log.error(
String.format(
"store checkpoint failed, job id : %s,
pipeline id : %d",