This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new eb81225a [ISSUE #468] Prevent schedule tasks from exiting due to 
exceptions (#469)
eb81225a is described below

commit eb81225a61a9b3406a431551b975faed3f9f9f45
Author: rongtong <[email protected]>
AuthorDate: Fri Apr 7 16:09:54 2023 +0800

    [ISSUE #468] Prevent schedule tasks from exiting due to exceptions (#469)
---
 .../rocketmq/replicator/ReplicatorSourceTask.java      | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)

diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
index e76eae44..74884545 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
@@ -317,13 +317,21 @@ public class ReplicatorSourceTask extends SourceTask {
         metricsMonitorExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
-                replicateLagMetric();
+                try {
+                    replicateLagMetric();
+                } catch (Throwable e) {
+                    log.error("replicate log metric error", e);
+                }
             }
         }, period, period, TimeUnit.MILLISECONDS);
 
         commitOffsetScheduleService.scheduleAtFixedRate(new Runnable() {
             @Override public void run() {
-                commitOffsetSchedule();
+                try {
+                    commitOffsetSchedule();
+                } catch (Throwable e) {
+                    log.error("commit offset error", e);
+                }
             }
         }, connectorConfig.getCommitOffsetIntervalMs(), 
connectorConfig.getCommitOffsetIntervalMs(), TimeUnit.MILLISECONDS);
     }
@@ -366,14 +374,14 @@ public class ReplicatorSourceTask extends SourceTask {
             
metricsItem2KeyMap.put(ReplicatorTaskStats.REPLICATOR_SOURCE_TASK_DELAY_NUMS, 
delayNumsKeys);
             
metricsItem2KeyMap.put(ReplicatorTaskStats.REPLICATOR_SOURCE_TASK_DELAY_MS, 
delayMsKeys);
         } catch (RemotingException | MQClientException e) {
-            log.error(" occur remoting or mqclient exception, retry build 
mqadminclient,", e);
+            log.error("occur remoting or mqclient exception, retry build 
mqadminclient", e);
             try {
                 buildMqAdminClient();
             } catch (MQClientException mqClientException) {
-                log.error(" rebuild mqadminclient error,", e);
+                log.error("rebuild mqadmin client error", e);
             }
         } catch (Exception e) {
-            log.error(" occur unknow exception,", e);
+            log.error(" occur unknown exception", e);
         }
     }
 

Reply via email to