This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e19d73282e [Fix][Connector-V2] Fix some throwable error not be caught 
(#7657)
e19d73282e is described below

commit e19d73282e618deb6a3747c86bdf6bd7d620d47c
Author: Jia Fan <[email protected]>
AuthorDate: Sat Sep 21 22:15:58 2024 +0800

    [Fix][Connector-V2] Fix some throwable error not be caught (#7657)
---
 .../reader/external/IncrementalSourceScanFetcher.java     |  2 +-
 .../reader/external/IncrementalSourceStreamFetcher.java   |  2 +-
 .../seatunnel/rocketmq/source/RocketMqSourceReader.java   |  4 ++--
 .../connectors/seatunnel/sls/source/SlsSourceReader.java  | 15 +++------------
 .../checkpoint/storage/api/AbstractCheckpointStorage.java |  2 +-
 5 files changed, 8 insertions(+), 17 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
index da048b47e5..7f927af587 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -93,7 +93,7 @@ public class IncrementalSourceScanFetcher implements 
Fetcher<SourceRecords, Sour
                                 currentSnapshotSplit,
                                 taskContext.isExactlyOnce());
                         snapshotSplitReadTask.execute(taskContext);
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         log.error(
                                 String.format(
                                         "Execute snapshot read task for 
snapshot split %s fail",
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 338cb657b3..16e4537656 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -104,7 +104,7 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
                                 currentIncrementalSplit,
                                 taskContext.isExactlyOnce());
                         streamFetchTask.execute(taskContext);
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         log.error(
                                 String.format(
                                         "Execute stream read task for 
incremental split %s fail",
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
index 90bc8f3231..0dfa0b179d 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
@@ -98,7 +98,7 @@ public class RocketMqSourceReader implements 
SourceReader<SeaTunnelRow, RocketMq
             Thread.sleep(THREAD_WAIT_TIME);
             return;
         }
-        while (pendingPartitionsQueue.size() != 0) {
+        while (!pendingPartitionsQueue.isEmpty()) {
             sourceSplits.add(pendingPartitionsQueue.poll());
         }
         sourceSplits.forEach(
@@ -166,7 +166,7 @@ public class RocketMqSourceReader implements 
SourceReader<SeaTunnelRow, RocketMq
                                                     // just for bounded mode
                                                     
sourceSplit.setEndOffset(lastOffset);
                                                 }
-                                            } catch (Exception e) {
+                                            } catch (Throwable e) {
                                                 
completableFuture.completeExceptionally(e);
                                             }
                                             completableFuture.complete(null);
diff --git 
a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java
 
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java
index 43cb75328a..819b3b07d6 100644
--- 
a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java
@@ -127,12 +127,8 @@ public class SlsSourceReader implements 
SourceReader<SeaTunnelRow, SlsSourceSpli
                                                 sourceSplit.setStartCursor(
                                                         
response.getNextCursor());
                                                 
completableFuture.complete(true);
-                                            } catch (LogException e) {
-                                                e.printStackTrace();
-                                                
completableFuture.completeExceptionally(e);
-                                                throw new RuntimeException(e);
-                                            } catch (IOException e) {
-                                                e.printStackTrace();
+                                            } catch (Throwable e) {
+                                                log.error("pull logs failed", 
e);
                                                 
completableFuture.completeExceptionally(e);
                                                 throw new RuntimeException(e);
                                             }
@@ -141,11 +137,7 @@ public class SlsSourceReader implements 
SourceReader<SeaTunnelRow, SlsSourceSpli
                         if (completableFuture.get()) {
                             finishedSplits.add(sourceSplit);
                         }
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                        throw new RuntimeException(e);
-                    } catch (ExecutionException e) {
-                        e.printStackTrace();
+                    } catch (InterruptedException | ExecutionException e) {
                         throw new RuntimeException(e);
                     }
                 });
@@ -215,7 +207,6 @@ public class SlsSourceReader implements 
SourceReader<SeaTunnelRow, SlsSourceSpli
                                                                     
slsSourceSplit
                                                                             
.getStartCursor());
                                                         } catch (LogException 
e) {
-                                                            
e.printStackTrace();
                                                             log.error(
                                                                     
"LogException: commit cursor to sls failed",
                                                                     e);
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
index a466408d9c..ca912ee2b7 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
@@ -195,7 +195,7 @@ public abstract class AbstractCheckpointStorage implements 
CheckpointStorage {
                 () -> {
                     try {
                         storeCheckPoint(state);
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         log.error(
                                 String.format(
                                         "store checkpoint failed, job id : %s, 
pipeline id : %d",

Reply via email to