This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git


The following commit(s) were added to refs/heads/main by this push:
     new 6fc28a0  fix: code style issues to make compile pass with checkstyle 
plugin enabled (#160)
6fc28a0 is described below

commit 6fc28a05f92f912085ed5ffa48eb2d9c80e0c53a
Author: Zhanhui Li <[email protected]>
AuthorDate: Sun Feb 25 18:57:46 2024 +0800

    fix: code style issues to make compile pass with checkstyle plugin enabled 
(#160)
    
    Signed-off-by: Li Zhanhui <[email protected]>
---
 .../adapter/benchmark/AbstractEventCommon.java     |  6 +-
 .../adapter/benchmark/EventTPSCommon.java          | 16 ++---
 .../adapter/persistence/DatasourceConfig.java      |  5 +-
 .../MybatisEventTargetRunnerRepository.java        |  1 -
 .../impl/runtime/RuntimeTargetRunnerAPIImpl.java   | 28 ++++----
 .../eventbridge/adapter/runtime/Runtime.java       | 11 ++-
 .../adapter/runtime/boot/EventBusListener.java     |  6 +-
 .../adapter/runtime/boot/EventRuleTransfer.java    | 16 ++---
 .../adapter/runtime/boot/EventTargetTrigger.java   | 18 +++--
 .../runtime/boot/common/CirculatorContext.java     | 82 +++++++++++++---------
 .../runtime/boot/listener/EventSubscriber.java     |  8 ++-
 .../runtime/boot/transfer/TransformEngine.java     | 17 +++--
 .../runtime/boot/trigger/TriggerTaskContext.java   | 32 ++++-----
 .../adapter/runtime/common/LoggerName.java         |  8 +--
 .../adapter/runtime/common/ServiceThread.java      | 16 ++---
 .../runtime/common/entity/TargetKeyValue.java      | 14 ++--
 .../runtime/common/entity/TargetRunnerConfig.java  | 18 ++---
 .../adapter/runtime/common/plugin/Plugin.java      | 12 +++-
 .../adapter/runtime/common/plugin/PluginUtils.java | 11 ++-
 .../common/store/FileBaseKeyValueStore.java        |  2 +-
 .../adapter/runtime/config/RuntimeConfigProps.java | 17 +++--
 .../runtime/config/RuntimeConfiguration.java       |  1 -
 .../adapter/runtime/converter/JsonConverter.java   |  2 +-
 .../adapter/runtime/converter/ListConverter.java   |  2 +-
 .../runtime/converter/RecordOffsetConverter.java   |  2 +-
 .../adapter/runtime/error/ErrorHandler.java        |  2 +-
 .../AbstractTargetRunnerConfigObserver.java        |  2 +-
 .../service/TargetRunnerConfigOnFileObserver.java  |  1 -
 .../adapter/runtime/utils/ExceptionUtil.java       |  4 +-
 .../adapter/runtime/utils/ShutdownUtils.java       |  6 +-
 .../rocketmq/runtimer/RocketMQEventSubscriber.java | 61 +++++++++-------
 .../rocketmq/runtimer/consumer/ClientConfig.java   |  7 +-
 .../rocketmq/runtimer/consumer/ConsumeRequest.java |  4 --
 .../runtimer/consumer/LitePullConsumer.java        |  4 --
 .../runtimer/consumer/LitePullConsumerImpl.java    | 40 +++++------
 .../runtimer/consumer/LocalMessageCache.java       |  4 --
 .../eventbridge/enums/PushRetryStrategyEnum.java   |  3 +-
 .../infrastructure/trace/TraceStrategy.java        |  5 +-
 .../eventbridge/filter/ValidateFilter.java         | 16 ++---
 39 files changed, 258 insertions(+), 252 deletions(-)

diff --git 
a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
 
b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
index ed6a91c..33b59fa 100644
--- 
a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
+++ 
b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
@@ -48,19 +48,19 @@ public abstract class AbstractEventCommon {
             return;
         }
 
-        // tps: 每秒文件打印的行数
+        // tps: rows to print for each second
         final long tps = currentRowCount - previousRowCount.get();
         previousRowCount.set(currentRowCount);
         writeCount.add(currentRowCount);
         costTime.add(1000);
-        // delayTime(条/ms)=接收的数量/花费的时间
+        // delayTime(record/ms)= receiving-amount / time
         final double delayTime = writeCount.longValue() / costTime.longValue();
         // String delayTimeStr = twoDecimal(delayTime);
 
         String info = String.format("Current Time: %s  |  TPS: %d  ",
                 UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), 
tps);
 
-        System.out.println(info);
+        System.out.printf("%s%n", info);
     }
 
 
diff --git 
a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java
 
b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java
index e9acb87..8f91eb8 100644
--- 
a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java
+++ 
b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java
@@ -16,18 +16,20 @@
  */
 package org.apache.rocketmq.eventbridge.adapter.benchmark;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.LineNumberReader;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.*;
 import java.util.TimerTask;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * 整条链路
+ * End-to-End use case
  */
 public class EventTPSCommon extends AbstractEventCommon {
     public static void main(String[] args) {
@@ -35,12 +37,10 @@ public class EventTPSCommon extends AbstractEventCommon {
         if (args.length > 0) {
             filePath = args[0];
         }
-        EventTPSCommon tpsCommon = null;
+        EventTPSCommon tpsCommon;
         try {
             tpsCommon = new EventTPSCommon(filePath);
             tpsCommon.start();
-        } catch (FileNotFoundException e) {
-            e.printStackTrace();
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -56,7 +56,7 @@ public class EventTPSCommon extends AbstractEventCommon {
         previousRowCount = new AtomicReference<>();
         previousRowCount.set(0);
         executorService = new ScheduledThreadPoolExecutor(1,
-                new 
BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-all-%d").build());
+            new 
BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-all-%d").build());
     }
 
     @Override
diff --git 
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java
 
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java
index 6718b8d..8418830 100644
--- 
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java
+++ 
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java
@@ -75,7 +75,7 @@ public class DatasourceConfig {
     private Long validationTimeoutMs;
 
     @Bean("dataSource")
-    public DataSource getMasterDataSource(){
+    public DataSource getMasterDataSource() {
         HikariConfig hikariConfig = new HikariConfig();
         hikariConfig.setJdbcUrl(baseUrl);
         hikariConfig.setDriverClassName(baseDriverClassName);
@@ -102,7 +102,8 @@ public class DatasourceConfig {
     }
 
     @Bean("sqlSessionTemplate")
-    public SqlSessionTemplate 
masterSqlSessionTemplate(@Qualifier("sqlSessionFactory") SqlSessionFactory 
sqlSessionFactory){
+    public SqlSessionTemplate masterSqlSessionTemplate(
+        @Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
         return new SqlSessionTemplate(sqlSessionFactory);
     }
 
diff --git 
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
 
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
index 97bfb88..adab7ec 100644
--- 
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
+++ 
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
@@ -22,7 +22,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import org.apache.commons.lang3.StringUtils;
 import 
org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.converter.EventTargetRunnerConverter;
 import 
org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.dataobject.EventTargetRunnerDO;
 import 
org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.mapper.EventTargetRunnerMapper;
diff --git 
a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
 
b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
index 77c6bc9..a0ebc6e 100644
--- 
a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
+++ 
b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
@@ -52,19 +52,21 @@ public class RuntimeTargetRunnerAPIImpl implements 
TargetRunnerAPI {
         targetRunnerConfig.setName(name);
         List<Map<String, String>> components = Lists.newArrayList();
         targetRunnerConfig.setComponents(components);
-        Map<String, String> sourceComponent = new Gson().fromJson(new 
Gson().toJson(source
-            .getConfig()), new TypeToken<Map<String, String>>() {
-        }.getType());
-        Map<String, String> filterComponent = new Gson().fromJson(new 
Gson().toJson(RocketMQConverter.buildEventBridgeFilterTransform(filterPattern)
-            .getConfig()), new TypeToken<Map<String, String>>() {
-        }.getType());
-
-        Map<String, String> transformComponent = new Gson().fromJson(new 
Gson().toJson(RocketMQConverter.buildEventBridgeTransform(targetTransform)
-            .getConfig()), new TypeToken<Map<String, String>>() {
-        }.getType());
-        Map<String, String> targetComponent = new Gson().fromJson(new 
Gson().toJson(target
-            .getConfig()), new TypeToken<Map<String, String>>() {
-        }.getType());
+
+        Map<String, String> sourceComponent = new Gson().fromJson(
+            new Gson().toJson(source.getConfig()),
+            new TypeToken<Map<String, String>>() {}.getType());
+
+        Map<String, String> filterComponent = new Gson().fromJson(
+            new 
Gson().toJson(RocketMQConverter.buildEventBridgeFilterTransform(filterPattern).getConfig()),
+            new TypeToken<Map<String, String>>() {}.getType());
+
+        Map<String, String> transformComponent = new Gson().fromJson(new 
Gson().toJson(RocketMQConverter.buildEventBridgeTransform(targetTransform).getConfig()),
+            new TypeToken<Map<String, String>>() {}.getType());
+
+        Map<String, String> targetComponent = new Gson().fromJson(new 
Gson().toJson(target.getConfig()),
+            new TypeToken<Map<String, String>>() {}.getType());
+
         components.add(sourceComponent);
         components.add(filterComponent);
         components.add(transformComponent);
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
index af8bbbd..ba3ed1f 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
@@ -34,19 +34,16 @@ import org.springframework.stereotype.Component;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.StartAndShutdown;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.AbstractStartAndShutdown;
 
-
 import javax.annotation.PostConstruct;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * event bridge runtime
- *
- * @author artisan
  */
 @Component
 public class Runtime {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(Runtime.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Runtime.class);
 
     private AtomicReference<RuntimeState> runtimerState;
 
@@ -65,7 +62,7 @@ public class Runtime {
 
     @PostConstruct
     public void initAndStart() throws Exception {
-        logger.info("Start init runtime.");
+        LOGGER.info("Start init runtime.");
         
circulatorContext.initCirculatorContext(runnerConfigObserver.getTargetRunnerConfig());
         runnerConfigObserver.registerListener(circulatorContext);
         runnerConfigObserver.registerListener(eventSubscriber);
@@ -80,11 +77,11 @@ public class Runtime {
         RUNTIME_START_AND_SHUTDOWN.start();
 
         java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-            logger.info("try to shutdown server");
+            LOGGER.info("try to shutdown server");
             try {
                 RUNTIME_START_AND_SHUTDOWN.shutdown();
             } catch (Exception e) {
-                logger.error("err when shutdown runtime ", e);
+                LOGGER.error("err when shutdown runtime ", e);
             }
         }));
 
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
index 48af27f..b14b551 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
@@ -30,12 +30,10 @@ import org.slf4j.LoggerFactory;
 
 /**
  * listen the event and offer to queue
- *
- * @author artisan
  */
 public class EventBusListener extends ServiceThread {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(EventBusListener.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EventBusListener.class);
 
     private final CirculatorContext circulatorContext;
     private final EventSubscriber eventSubscriber;
@@ -60,7 +58,7 @@ public class EventBusListener extends ServiceThread {
                 }
                 circulatorContext.offerEventRecords(pullRecordList);
             } catch (Exception exception) {
-                logger.error(getServiceName() + " - event bus pull record 
exception, stackTrace - ", exception);
+                LOGGER.error(getServiceName() + " - event bus pull record 
exception, stackTrace - ", exception);
                 pullRecordList.forEach(pullRecord -> 
errorHandler.handle(pullRecord, exception));
             }
         }
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
index f24ed1b..bc671db 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
  */
 public class EventRuleTransfer extends ServiceThread {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(EventRuleTransfer.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EventRuleTransfer.class);
 
     private volatile Integer batchSize = 100;
 
@@ -68,18 +68,18 @@ public class EventRuleTransfer extends ServiceThread {
 
     @Override
     public void run() {
-        List<ConnectRecord> afterTransformConnect = new 
CopyOnWriteArrayList<>();;
+        List<ConnectRecord> afterTransformConnect = new 
CopyOnWriteArrayList<>();
         while (!stopped) {
             try {
                 Map<String, List<ConnectRecord>> eventRecordMap = 
circulatorContext.takeEventRecords(batchSize);
                 if (MapUtils.isEmpty(eventRecordMap)) {
-                    logger.trace("listen eventRecords is empty, continue by 
curTime - {}", System.currentTimeMillis());
+                    LOGGER.trace("listen eventRecords is empty, continue by 
curTime - {}", System.currentTimeMillis());
                     this.waitForRunning(1000);
                     continue;
                 }
                 Map<String, TransformEngine<ConnectRecord>> latestTransformMap 
= circulatorContext.getTaskTransformMap();
                 if (MapUtils.isEmpty(latestTransformMap)) {
-                    logger.warn("latest transform engine is empty, continue by 
curTime - {}", System.currentTimeMillis());
+                    LOGGER.warn("latest transform engine is empty, continue by 
curTime - {}", System.currentTimeMillis());
                     this.waitForRunning(3000);
                     continue;
                 }
@@ -92,7 +92,7 @@ public class EventRuleTransfer extends ServiceThread {
                     curEventRecords.forEach(pullRecord -> {
                         CompletableFuture<Void> transformFuture = 
CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord))
                             .exceptionally((exception) -> {
-                                logger.error("transfer do transform event 
record failed,stackTrace-", exception);
+                                LOGGER.error("transfer do transform event 
record failed, stackTrace-", exception);
                                 errorHandler.handle(pullRecord, exception);
                                 return null;
                             })
@@ -108,9 +108,9 @@ public class EventRuleTransfer extends ServiceThread {
                 }
                 CompletableFuture.allOf(completableFutures.toArray(new 
CompletableFuture[eventRecordMap.values().size()])).get();
                 circulatorContext.offerTargetTaskQueue(afterTransformConnect);
-                logger.info("offer target task queues succeed, transforms - 
{}", JSON.toJSONString(afterTransformConnect));
+                LOGGER.info("offer target task queues succeed, transforms - 
{}", JSON.toJSONString(afterTransformConnect));
             } catch (Exception exception) {
-                logger.error("transfer event record failed, stackTrace-", 
exception);
+                LOGGER.error("transfer event record failed, stackTrace-", 
exception);
                 afterTransformConnect.forEach(transferRecord -> 
errorHandler.handle(transferRecord, exception));
             }
 
@@ -127,7 +127,7 @@ public class EventRuleTransfer extends ServiceThread {
         try {
             circulatorContext.releaseTaskTransform();
         } catch (Exception e) {
-            logger.error(String.format("current thread: %s, error Track: %s ", 
getServiceName(), ExceptionUtil.getErrorMessage(e)));
+            LOGGER.error(String.format("current thread: %s, error Track: %s ", 
getServiceName(), ExceptionUtil.getErrorMessage(e)));
         }
     }
 
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
index 85dae3b..b9d175b 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
@@ -36,12 +36,10 @@ import org.slf4j.LoggerFactory;
 
 /**
  * event target push to sink task
- *
- * @author artisan
  */
 public class EventTargetTrigger extends ServiceThread {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(EventTargetTrigger.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EventTargetTrigger.class);
 
     private final CirculatorContext circulatorContext;
     private final OffsetManager offsetManager;
@@ -49,7 +47,7 @@ public class EventTargetTrigger extends ServiceThread {
     private volatile Integer batchSize = 100;
 
     public EventTargetTrigger(CirculatorContext circulatorContext, 
OffsetManager offsetManager,
-                              ErrorHandler errorHandler) {
+        ErrorHandler errorHandler) {
         this.circulatorContext = circulatorContext;
         this.offsetManager = offsetManager;
         this.errorHandler = errorHandler;
@@ -60,15 +58,15 @@ public class EventTargetTrigger extends ServiceThread {
         while (!stopped) {
             Map<String, List<ConnectRecord>> targetRecordMap = 
circulatorContext.takeTargetRecords(batchSize);
             if (MapUtils.isEmpty(targetRecordMap)) {
-                logger.trace("current target pusher is empty");
+                LOGGER.trace("current target pusher is empty");
                 this.waitForRunning(1000);
                 continue;
             }
-            if (logger.isDebugEnabled()) {
-                logger.debug("start push content by pusher - {}", 
JSON.toJSONString(targetRecordMap));
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("start push content by pusher - {}", 
JSON.toJSONString(targetRecordMap));
             }
 
-            for(String runnerName: targetRecordMap.keySet()){
+            for (String runnerName : targetRecordMap.keySet()) {
                 ExecutorService executorService = 
circulatorContext.getExecutorService(runnerName);
                 executorService.execute(() -> {
                     SinkTask sinkTask = 
circulatorContext.getPusherTaskMap().get(runnerName);
@@ -77,7 +75,7 @@ public class EventTargetTrigger extends ServiceThread {
                         sinkTask.put(triggerRecords);
                         offsetManager.commit(triggerRecords);
                     } catch (Exception exception) {
-                        logger.error(getServiceName() + " push target 
exception, stackTrace-", exception);
+                        LOGGER.error(getServiceName() + " push target 
exception, stackTrace-", exception);
                         triggerRecords.forEach(triggerRecord -> 
errorHandler.handle(triggerRecord, exception));
                     }
                 });
@@ -101,7 +99,7 @@ public class EventTargetTrigger extends ServiceThread {
             circulatorContext.releaseExecutorService();
             circulatorContext.releaseTriggerTask();
         } catch (Exception e) {
-            logger.error(String.format("current thread: %s, error Track: %s ", 
getServiceName(), ExceptionUtil.getErrorMessage(e)));
+            LOGGER.error(String.format("current thread: %s, error Track: %s ", 
getServiceName(), ExceptionUtil.getErrorMessage(e)));
         }
     }
 }
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
index 2763be3..3ea21fa 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
@@ -21,6 +21,12 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.boot.trigger.TriggerTaskContext;
@@ -41,40 +47,40 @@ import org.springframework.stereotype.Component;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.*;
 
 /**
- * event circulator context for listener, transfer and trigger
+ * Event circulatory context for listener, transfer and trigger
  */
 @Component
 public class CirculatorContext implements TargetRunnerListener {
 
-    private final static Logger logger = 
LoggerFactory.getLogger(LoggerName.EventBus_Listener);
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.EVENT_BUS_LISTENER);
 
     @Autowired
     private Plugin plugin;
 
-    private static Integer QUEUE_CAPACITY = 50000;
+    private static final Integer QUEUE_CAPACITY = 50000;
 
-    private BlockingQueue<ConnectRecord> eventQueue = new 
LinkedBlockingQueue<>(50000);
+    private final BlockingQueue<ConnectRecord> eventQueue = new 
LinkedBlockingQueue<>(50000);
 
-    private BlockingQueue<ConnectRecord> targetQueue = new 
LinkedBlockingQueue<>(50000);
+    private final BlockingQueue<ConnectRecord> targetQueue = new 
LinkedBlockingQueue<>(50000);
 
-    private Map<String/*RunnerName*/, TargetRunnerConfig> runnerConfigMap = 
new ConcurrentHashMap<>(30);
+    private final Map<String/*RunnerName*/, TargetRunnerConfig> 
runnerConfigMap = new ConcurrentHashMap<>(30);
 
-    private Map<String/*RunnerName*/, BlockingQueue<ConnectRecord>> 
eventQueueMap = new ConcurrentHashMap<>(30);
+    private final Map<String/*RunnerName*/, BlockingQueue<ConnectRecord>> 
eventQueueMap = new ConcurrentHashMap<>(30);
 
-    private Map<String/*RunnerName*/, BlockingQueue<ConnectRecord>> 
targetQueueMap = new ConcurrentHashMap<>(30);
+    private final Map<String/*RunnerName*/, BlockingQueue<ConnectRecord>> 
targetQueueMap = new ConcurrentHashMap<>(30);
 
-    private Map<String/*RunnerName*/, TransformEngine<ConnectRecord>> 
taskTransformMap = new ConcurrentHashMap<>(20);
+    private final Map<String/*RunnerName*/, TransformEngine<ConnectRecord>> 
taskTransformMap = new ConcurrentHashMap<>(20);
 
-    private Map<String/*RunnerName*/, SinkTask> pusherTaskMap = new 
ConcurrentHashMap<>(20);
+    private final Map<String/*RunnerName*/, SinkTask> pusherTaskMap = new 
ConcurrentHashMap<>(20);
 
-    private Map<String/*RunnerName*/, ExecutorService> pusherExecutorMap = new 
ConcurrentHashMap<>(10);
+    private final Map<String/*RunnerName*/, ExecutorService> pusherExecutorMap 
= new ConcurrentHashMap<>(10);
 
     /**
      * initial targetRunnerMap, taskTransformMap, pusherTaskMap
-     * @param targetRunnerConfigs
+     *
+     * @param targetRunnerConfigs Configurations for the target runner
      */
     public void initCirculatorContext(Set<TargetRunnerConfig> 
targetRunnerConfigs) {
         if (CollectionUtils.isEmpty(targetRunnerConfigs)) {
@@ -102,10 +108,11 @@ public class CirculatorContext implements 
TargetRunnerListener {
 
     /**
      * get target runner config by runner name
+     *
      * @param runnerName
      * @return
      */
-    public TargetRunnerConfig getRunnerConfig(String runnerName){
+    public TargetRunnerConfig getRunnerConfig(String runnerName) {
         return runnerConfigMap.get(runnerName);
     }
 
@@ -122,21 +129,23 @@ public class CirculatorContext implements 
TargetRunnerListener {
 
     /**
      * update record queue map
+     *
      * @param recordMap
      * @param eventQueueMap
      */
-    private boolean updateRecordQueueMap(Map<String, List<ConnectRecord>> 
recordMap, Map<String, BlockingQueue<ConnectRecord>> eventQueueMap) {
-        try{
-            for(String runnerName : recordMap.keySet()){
+    private boolean updateRecordQueueMap(Map<String, List<ConnectRecord>> 
recordMap,
+        Map<String, BlockingQueue<ConnectRecord>> eventQueueMap) {
+        try {
+            for (String runnerName : recordMap.keySet()) {
                 BlockingQueue<ConnectRecord> recordQueue = 
eventQueueMap.get(runnerName);
-                if(CollectionUtils.isEmpty(recordQueue)){
+                if (CollectionUtils.isEmpty(recordQueue)) {
                     recordQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
                 }
                 recordQueue.addAll(recordMap.get(runnerName));
                 eventQueueMap.put(runnerName, recordQueue);
             }
             return true;
-        }catch (Exception exception){
+        } catch (Exception exception) {
             return false;
         }
     }
@@ -147,7 +156,7 @@ public class CirculatorContext implements 
TargetRunnerListener {
      * @return
      */
     public Map<String, List<ConnectRecord>> takeEventRecords(int batchSize) {
-        if(eventQueue.isEmpty()){
+        if (eventQueue.isEmpty()) {
             return null;
         }
         List<ConnectRecord> eventRecords = Lists.newArrayList();
@@ -171,11 +180,12 @@ public class CirculatorContext implements 
TargetRunnerListener {
 
     /**
      * take batch target records
+     *
      * @param batchSize
      * @return
      */
     public Map<String, List<ConnectRecord>> takeTargetRecords(Integer 
batchSize) {
-        if(targetQueue.isEmpty()){
+        if (targetQueue.isEmpty()) {
             return null;
         }
         List<ConnectRecord> targetRecords = Lists.newArrayList();
@@ -185,6 +195,7 @@ public class CirculatorContext implements 
TargetRunnerListener {
 
     /**
      * user runner-name as key
+     *
      * @param eventRecords
      * @return
      */
@@ -193,7 +204,7 @@ public class CirculatorContext implements 
TargetRunnerListener {
         for (ConnectRecord connectRecord : eventRecords) {
             String runnerName = 
connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME);
             List<ConnectRecord> curEventRecords = 
eventRecordMap.get(runnerName);
-            if(CollectionUtils.isEmpty(curEventRecords)){
+            if (CollectionUtils.isEmpty(curEventRecords)) {
                 curEventRecords = Lists.newArrayList();
             }
             curEventRecords.add(connectRecord);
@@ -204,15 +215,17 @@ public class CirculatorContext implements 
TargetRunnerListener {
 
     /**
      * get specific thread pool by push name
+     *
      * @param runnerName
      * @return
      */
-    public ExecutorService getExecutorService(String runnerName){
+    public ExecutorService getExecutorService(String runnerName) {
         return pusherExecutorMap.get(runnerName);
     }
 
     /**
      * refresh target runner where config changed
+     *
      * @param targetRunnerConfig
      * @param refreshTypeEnum
      */
@@ -225,7 +238,7 @@ public class CirculatorContext implements 
TargetRunnerListener {
                 TransformEngine<ConnectRecord> transformChain = new 
TransformEngine<>(targetRunnerConfig.getComponents(), plugin);
                 taskTransformMap.put(runnerName, transformChain);
 
-                int endIndex = targetRunnerConfig.getComponents().size() -1;
+                int endIndex = targetRunnerConfig.getComponents().size() - 1;
                 TargetKeyValue targetKeyValue = new 
TargetKeyValue(targetRunnerConfig.getComponents().get(endIndex));
                 SinkTask sinkTask = initTargetSinkTask(targetKeyValue);
                 pusherTaskMap.put(runnerName, sinkTask);
@@ -234,16 +247,16 @@ public class CirculatorContext implements 
TargetRunnerListener {
                     pusherExecutorMap.put(runnerName, 
initDefaultThreadPoolExecutor(runnerName));
                 }
 
-                if(logger.isInfoEnabled()){
-                    logger.info("runnerName -{}- refresh context by refresh 
type -{}- succeed", runnerName, refreshTypeEnum.name());
+                if (LOGGER.isInfoEnabled()) {
+                    LOGGER.info("runnerName -{}- refresh context by refresh 
type -{}- succeed", runnerName, refreshTypeEnum.name());
                 }
                 break;
             case DELETE:
                 runnerConfigMap.remove(runnerName);
                 taskTransformMap.remove(runnerName);
                 pusherTaskMap.remove(runnerName);
-                if(logger.isInfoEnabled()){
-                    logger.info("runnerName -{}- remove context succeed", 
runnerName);
+                if (LOGGER.isInfoEnabled()) {
+                    LOGGER.info("runnerName -{}- remove context succeed", 
runnerName);
                 }
                 break;
             default:
@@ -253,16 +266,18 @@ public class CirculatorContext implements 
TargetRunnerListener {
 
     /**
      * init default thread poll param, support auto config
+     *
      * @param threadPollName
      * @return
      */
     private ExecutorService initDefaultThreadPoolExecutor(String 
threadPollName) {
         return new ThreadPoolExecutor(200, 300, 1, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<>(300), 
ThreadUtils.newThreadFactory(threadPollName, false));
+            new LinkedBlockingQueue<>(300), 
ThreadUtils.newThreadFactory(threadPollName, false));
     }
 
     /**
      * init target sink task
+     *
      * @param targetKeyValue
      * @return
      */
@@ -286,13 +301,12 @@ public class CirculatorContext implements 
TargetRunnerListener {
                 Plugin.compareAndSwapLoaders(loader);
             }
             return sinkTask;
-        }catch (Exception exception) {
-            logger.error("task class -" + taskClass + "- init its sinkTask 
failed, ex- ", exception);
+        } catch (Exception exception) {
+            LOGGER.error("task class -" + taskClass + "- init its sinkTask 
failed, ex- ", exception);
         }
         return null;
     }
 
-
     public void releaseTaskTransform() throws Exception {
         for (Map.Entry<String, TransformEngine<ConnectRecord>> taskTransform : 
taskTransformMap.entrySet()) {
             String runnerName = taskTransform.getKey();
@@ -303,7 +317,7 @@ public class CirculatorContext implements 
TargetRunnerListener {
     }
 
     public void releaseTriggerTask() {
-        for (Map.Entry<String, SinkTask> triggerTask: 
pusherTaskMap.entrySet()) {
+        for (Map.Entry<String, SinkTask> triggerTask : 
pusherTaskMap.entrySet()) {
             SinkTask sinkTask = triggerTask.getValue();
             String runnerName = triggerTask.getKey();
             sinkTask.stop();
@@ -312,7 +326,7 @@ public class CirculatorContext implements 
TargetRunnerListener {
     }
 
     public void releaseExecutorService() throws Exception {
-        for (Map.Entry<String, ExecutorService> pusherExecutor: 
pusherExecutorMap.entrySet()) {
+        for (Map.Entry<String, ExecutorService> pusherExecutor : 
pusherExecutorMap.entrySet()) {
             ExecutorService pusher = pusherExecutor.getValue();
             ShutdownUtils.shutdownThreadPool(pusher);
         }
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
index be4db9b..dc18ab7 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
@@ -29,6 +29,7 @@ public abstract class EventSubscriber implements 
TargetRunnerListener {
 
     /**
      * Refresh subscriber inner data when runner keys changed
+     *
      * @param subscribeRunnerKeys
      * @param refreshTypeEnum
      */
@@ -42,7 +43,7 @@ public abstract class EventSubscriber implements 
TargetRunnerListener {
     public abstract List<ConnectRecord> pull();
 
     /**
-     * Commit the connect records.
+     * Commit connect records.
      *
      * @param connectRecordList
      */
@@ -54,12 +55,13 @@ public abstract class EventSubscriber implements 
TargetRunnerListener {
     public abstract void close();
 
     /**
-     * Put the connect record to the eventbus.
+     * Put connect record to the eventbus.
+     *
      * @param eventBusName
      * @param connectRecord
      * @param delaySec
      */
-    public  boolean put(String eventBusName, ConnectRecord connectRecord, int 
delaySec){
+    public boolean put(String eventBusName, ConnectRecord connectRecord, int 
delaySec) {
         // convert the eventBusName to Topic ?
         return true;
     }
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java
index f9e879a..5737892 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java
@@ -37,7 +37,7 @@ import java.util.Objects;
 
 public class TransformEngine<R extends ConnectRecord> implements AutoCloseable 
{
 
-    private static final Logger logger = 
LoggerFactory.getLogger(LoggerName.EventRule_Transfer);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.EVENT_RULE_TRANSFER);
 
     private final List<Transform> transformList;
 
@@ -76,18 +76,19 @@ public class TransformEngine<R extends ConnectRecord> 
implements AutoCloseable {
                 transform.init(transformConfig);
                 this.transformList.add(transform);
             } catch (Exception e) {
-                logger.error("transform new instance error", e);
+                LOGGER.error("transform new instance error", e);
             }
         }
     }
 
     /**
      * format listener and pusher key
+     *
      * @param components
      * @return
      */
     private TargetKeyValue formatTargetKey(List<Map<String, String>> 
components) {
-        if(CollectionUtils.isEmpty(components)){
+        if (CollectionUtils.isEmpty(components)) {
             return null;
         }
         int startIndex = 0;
@@ -101,6 +102,7 @@ public class TransformEngine<R extends ConnectRecord> 
implements AutoCloseable {
 
     /**
      * transform event record for target record
+     *
      * @param connectRecord
      * @return
      */
@@ -120,10 +122,11 @@ public class TransformEngine<R extends ConnectRecord> 
implements AutoCloseable {
 
     /**
      * get task config value by key
+     *
      * @param configKey
      * @return
      */
-    public String getConnectConfig(String configKey){
+    public String getConnectConfig(String configKey) {
         return config.getString(configKey);
     }
 
@@ -149,8 +152,10 @@ public class TransformEngine<R extends ConnectRecord> 
implements AutoCloseable {
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
         TransformEngine<?> that = (TransformEngine<?>) o;
         return transformList.equals(that.transformList) && 
config.equals(that.config);
     }
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java
index 6af6382..6edb06d 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java
@@ -43,7 +43,7 @@ public class TriggerTaskContext implements SinkTaskContext {
      */
     private final TargetKeyValue taskConfig;
 
-    private static final Logger logger = 
LoggerFactory.getLogger(LoggerName.EventTarget_Trigger);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.EVENT_TARGET_TRIGGER);
 
     private final Map<MessageQueue, Long> messageQueuesOffsetMap = new 
ConcurrentHashMap<>(64);
 
@@ -61,20 +61,20 @@ public class TriggerTaskContext implements SinkTaskContext {
     @Override
     public void resetOffset(RecordPartition recordPartition, RecordOffset 
recordOffset) {
         if (null == recordPartition || null == recordPartition.getPartition() 
|| null == recordOffset || null == recordOffset.getOffset()) {
-            logger.warn("recordPartition {} info is null or recordOffset {} 
info is null", recordPartition, recordOffset);
+            LOGGER.warn("recordPartition {} info is null or recordOffset {} 
info is null", recordPartition, recordOffset);
             return;
         }
         String brokerName = (String) 
recordPartition.getPartition().get(BROKER_NAME);
         String topic = (String) recordPartition.getPartition().get(TOPIC);
         Integer queueId = Integer.valueOf((String) 
recordPartition.getPartition().get(QUEUE_ID));
         if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || 
null == queueId) {
-            logger.warn("brokerName is null or queueId is null or queueName is 
null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
+            LOGGER.warn("brokerName is null or queueId is null or queueName is 
null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
             return;
         }
         MessageQueue messageQueue = new MessageQueue(topic, brokerName, 
queueId);
         Long offset = Long.valueOf((String) 
recordOffset.getOffset().get(QUEUE_OFFSET));
         if (null == offset) {
-            logger.warn("resetOffset, offset is null");
+            LOGGER.warn("resetOffset, offset is null");
             return;
         }
         messageQueuesOffsetMap.put(messageQueue, offset);
@@ -83,12 +83,12 @@ public class TriggerTaskContext implements SinkTaskContext {
     @Override
     public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
         if (MapUtils.isEmpty(offsets)) {
-            logger.warn("resetOffset, offsets {} is null", offsets);
+            LOGGER.warn("resetOffset, offsets {} is null", offsets);
             return;
         }
         for (Map.Entry<RecordPartition, RecordOffset> entry : 
offsets.entrySet()) {
             if (null == entry || null == entry.getKey() || null == 
entry.getKey().getPartition() || null == entry.getValue() || null == 
entry.getValue().getOffset()) {
-                logger.warn("recordPartition {} info is null or recordOffset 
{} info is null, entry {}", entry);
+                LOGGER.warn("recordPartition {} info is null or recordOffset 
{} info is null, entry {}", entry);
                 continue;
             }
             RecordPartition recordPartition = entry.getKey();
@@ -96,14 +96,14 @@ public class TriggerTaskContext implements SinkTaskContext {
             String topic = (String) recordPartition.getPartition().get(TOPIC);
             Integer queueId = Integer.valueOf((String) 
recordPartition.getPartition().get(QUEUE_ID));
             if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) 
|| null == queueId) {
-                logger.warn("brokerName is null or queueId is null or 
queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, 
topic);
+                LOGGER.warn("brokerName is null or queueId is null or 
queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, 
topic);
                 continue;
             }
             MessageQueue messageQueue = new MessageQueue(topic, brokerName, 
queueId);
             RecordOffset recordOffset = entry.getValue();
             Long offset = Long.valueOf((String) 
recordOffset.getOffset().get(QUEUE_OFFSET));
             if (null == offset) {
-                logger.warn("resetOffset, offset is null");
+                LOGGER.warn("resetOffset, offset is null");
                 continue;
             }
             messageQueuesOffsetMap.put(messageQueue, offset);
@@ -113,24 +113,24 @@ public class TriggerTaskContext implements 
SinkTaskContext {
     @Override
     public void pause(List<RecordPartition> recordPartitions) {
         if (recordPartitions == null || recordPartitions.size() == 0) {
-            logger.warn("recordPartitions is null or recordPartitions.size() 
is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
+            LOGGER.warn("recordPartitions is null or recordPartitions.size() 
is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
             return;
         }
         for (RecordPartition recordPartition : recordPartitions) {
             if (null == recordPartition || null == 
recordPartition.getPartition()) {
-                logger.warn("recordPartition {} info is null", 
recordPartition);
+                LOGGER.warn("recordPartition {} info is null", 
recordPartition);
                 continue;
             }
             String brokerName = (String) 
recordPartition.getPartition().get(BROKER_NAME);
             String topic = (String) recordPartition.getPartition().get(TOPIC);
             Integer queueId = Integer.valueOf((String) 
recordPartition.getPartition().get(QUEUE_ID));
             if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) 
|| null == queueId) {
-                logger.warn("brokerName is null or queueId is null or 
queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, 
topic);
+                LOGGER.warn("brokerName is null or queueId is null or 
queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, 
topic);
                 continue;
             }
             MessageQueue messageQueue = new MessageQueue(topic, brokerName, 
queueId);
             if (!messageQueuesOffsetMap.containsKey(messageQueue)) {
-                logger.warn("sink task current messageQueuesOffsetMap {} not 
contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
+                LOGGER.warn("sink task current messageQueuesOffsetMap {} not 
contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
                 continue;
             }
             messageQueuesStateMap.put(messageQueue, QueueState.PAUSE);
@@ -140,24 +140,24 @@ public class TriggerTaskContext implements 
SinkTaskContext {
     @Override
     public void resume(List<RecordPartition> recordPartitions) {
         if (recordPartitions == null || recordPartitions.size() == 0) {
-            logger.warn("recordPartitions is null or recordPartitions.size() 
is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
+            LOGGER.warn("recordPartitions is null or recordPartitions.size() 
is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
             return;
         }
         for (RecordPartition recordPartition : recordPartitions) {
             if (null == recordPartition || null == 
recordPartition.getPartition()) {
-                logger.warn("recordPartition {} info is null", 
recordPartition);
+                LOGGER.warn("recordPartition {} info is null", 
recordPartition);
                 continue;
             }
             String brokerName = (String) 
recordPartition.getPartition().get(BROKER_NAME);
             String topic = (String) recordPartition.getPartition().get(TOPIC);
             Integer queueId = Integer.valueOf((String) 
recordPartition.getPartition().get(QUEUE_ID));
             if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) 
|| null == queueId) {
-                logger.warn("brokerName is null or queueId is null or 
queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, 
topic);
+                LOGGER.warn("brokerName is null or queueId is null or 
queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, 
topic);
                 continue;
             }
             MessageQueue messageQueue = new MessageQueue(topic, brokerName, 
queueId);
             if (!messageQueuesOffsetMap.containsKey(messageQueue)) {
-                logger.warn("sink task current messageQueuesOffsetMap {} not 
contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
+                LOGGER.warn("sink task current messageQueuesOffsetMap {} not 
contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
                 continue;
             }
             messageQueuesStateMap.remove(messageQueue);
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java
index 39734c4..2174060 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java
@@ -21,8 +21,8 @@ package 
org.apache.rocketmq.eventbridge.adapter.runtime.common;
  * Define all the logger name of the runtime.
  */
 public class LoggerName {
-    public static final String EventBridge_RUNTIMER = "EventBridgeRuntimer";
-    public static final String EventBus_Listener = "EventBusListener";
-    public static final String EventRule_Transfer = "EventRuleTransfer";
-    public static final String EventTarget_Trigger = "EventTargetTrigger";
+    public static final String EVENT_BRIDGE_RUNTIMER = "EventBridgeRuntimer";
+    public static final String EVENT_BUS_LISTENER = "EventBusListener";
+    public static final String EVENT_RULE_TRANSFER = "EventRuleTransfer";
+    public static final String EVENT_TARGET_TRIGGER = "EventTargetTrigger";
 }
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
index 400cf47..027925e 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 public abstract class ServiceThread extends AbstractStartAndShutdown 
implements Runnable {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
 
     private static final long JOIN_TIME = 90 * 1000;
 
@@ -45,14 +45,14 @@ public abstract class ServiceThread extends 
AbstractStartAndShutdown implements
     public abstract String getServiceName();
 
     public void start() {
-        logger.info("Try to start service thread:{} started:{} lastThread:{}", 
getServiceName(), hasNotified.get(), thread);
+        LOGGER.info("Try to start service thread:{} started:{} lastThread:{}", 
getServiceName(), hasNotified.get(), thread);
         if (!hasNotified.compareAndSet(false, true)) {
             return;
         }
         stopped = false;
         this.thread.setDaemon(isDaemon);
         this.thread.start();
-        logger.info("Start service thread:{} started:{} lastThread:{}", 
getServiceName(), hasNotified.get(), thread);
+        LOGGER.info("Start service thread:{} started:{} lastThread:{}", 
getServiceName(), hasNotified.get(), thread);
     }
 
     public void shutdown() {
@@ -61,7 +61,7 @@ public abstract class ServiceThread extends 
AbstractStartAndShutdown implements
 
     public void shutdown(final boolean interrupt) {
         this.stopped = true;
-        logger.info("shutdown thread " + this.getServiceName() + " interrupt " 
+ interrupt);
+        LOGGER.info("shutdown thread " + this.getServiceName() + " interrupt " 
+ interrupt);
 
         if (hasNotified.compareAndSet(false, true)) {
             waitPoint.countDown(); // notify
@@ -77,10 +77,10 @@ public abstract class ServiceThread extends 
AbstractStartAndShutdown implements
                 this.thread.join(this.getJointime());
             }
             long eclipseTime = System.currentTimeMillis() - beginTime;
-            logger.info("join thread " + this.getServiceName() + " eclipse 
time(ms) " + eclipseTime + " "
+            LOGGER.info("join thread " + this.getServiceName() + " eclipse 
time(ms) " + eclipseTime + " "
                 + this.getJointime());
         } catch (InterruptedException e) {
-            logger.error("Interrupted", e);
+            LOGGER.error("Interrupted", e);
         }
     }
 
@@ -94,7 +94,7 @@ public abstract class ServiceThread extends 
AbstractStartAndShutdown implements
 
     public void stop(final boolean interrupt) {
         this.stopped = true;
-        logger.info("stop thread " + this.getServiceName() + " interrupt " + 
interrupt);
+        LOGGER.info("stop thread " + this.getServiceName() + " interrupt " + 
interrupt);
 
         if (hasNotified.compareAndSet(false, true)) {
             waitPoint.countDown(); // notify
@@ -107,7 +107,7 @@ public abstract class ServiceThread extends 
AbstractStartAndShutdown implements
 
     public void makeStop() {
         this.stopped = true;
-        logger.info("makestop thread " + this.getServiceName());
+        LOGGER.info("makestop thread " + this.getServiceName());
     }
 
     public void wakeup() {
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java
index aa696e8..58d8e2b 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java
@@ -139,15 +139,17 @@ public class TargetKeyValue implements KeyValue, 
Serializable {
         this.properties = properties;
     }
 
-    public KeyValue putAll(Map<String,String> configProps){
+    public KeyValue putAll(Map<String, String> configProps) {
         this.properties.putAll(configProps);
         return this;
     }
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
         TargetKeyValue that = (TargetKeyValue) o;
         return Objects.equals(targetKeyId, that.targetKeyId) && 
Objects.equals(properties, that.properties);
     }
@@ -160,9 +162,9 @@ public class TargetKeyValue implements KeyValue, 
Serializable {
     @Override
     public String toString() {
         return "TargetKeyValue{" +
-                "targetKeyId='" + targetKeyId + '\'' +
-                ", properties=" + properties +
-                '}';
+            "targetKeyId='" + targetKeyId + '\'' +
+            ", properties=" + properties +
+            '}';
     }
 
     public String getTargetKeyId() {
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java
index 6cbc2f7..d3ab201 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java
@@ -71,19 +71,11 @@ public class TargetRunnerConfig implements Serializable {
 
     private boolean isEqualsComponents(List<Map<String, String>> source, 
List<Map<String, String>> target) {
         if (source == null || target == null) {
-            if (source != target) {
-                return false;
-            } else {
-                return true;
-            }
+            return source == target;
         }
 
         if (source.isEmpty() || target.isEmpty()) {
-            if (source.isEmpty() && target.isEmpty()) {
-                return true;
-            } else {
-                return false;
-            }
+            return source.isEmpty() && target.isEmpty();
         }
 
         if (source.size() != target.size()) {
@@ -99,10 +91,8 @@ public class TargetRunnerConfig implements Serializable {
                 String element = targetComponent.get(entry.getKey());
                 if (element == null && entry.getValue() == null) {
                     return true;
-                } else if (element.equals(entry.getValue())) {
-                    return true;
                 } else {
-                    return false;
+                    return element.equals(entry.getValue());
                 }
             }
         }
@@ -117,7 +107,7 @@ public class TargetRunnerConfig implements Serializable {
         return components.get(0).get(ACCOUNT_ID);
     }
 
-    public SubscribeRunnerKeys getSubscribeRunnerKeys(){
+    public SubscribeRunnerKeys getSubscribeRunnerKeys() {
         SubscribeRunnerKeys subscribeRunnerKeys = new SubscribeRunnerKeys();
         subscribeRunnerKeys.setRunnerName(this.getName());
         subscribeRunnerKeys.setAccountId(this.getAccountId());
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/Plugin.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/Plugin.java
index 26475f2..cf14e66 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/Plugin.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/Plugin.java
@@ -19,6 +19,7 @@ package 
org.apache.rocketmq.eventbridge.adapter.runtime.common.plugin;
 import io.openmessaging.connector.api.component.Transform;
 import io.openmessaging.connector.api.component.connector.Connector;
 import io.openmessaging.connector.api.component.task.Task;
+import java.util.ArrayList;
 import org.apache.commons.lang3.StringUtils;
 import org.reflections.Configuration;
 import org.reflections.Reflections;
@@ -39,7 +40,12 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
-import java.util.*;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Collection;
+import java.util.Set;
+import java.util.Arrays;
 
 @Component
 public class Plugin extends URLClassLoader {
@@ -48,7 +54,7 @@ public class Plugin extends URLClassLoader {
     @Value("${runtime.pluginpath:}")
     private String pluginPath;
 
-    private Map<String, PluginWrapper> classLoaderMap = new HashMap<>();
+    private final Map<String, PluginWrapper> classLoaderMap = new HashMap<>();
 
     public Plugin() {
         super(new URL[0], Plugin.class.getClassLoader());
@@ -62,7 +68,7 @@ public class Plugin extends URLClassLoader {
         }
     }
 
-    private List<String> initPluginPath(String plugin){
+    private List<String> initPluginPath(String plugin) {
         List<String> pluginPaths = new ArrayList<>();
         if (StringUtils.isNotEmpty(plugin)) {
             String[] strArr = plugin.split(",");
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java
index 2c11e1a..0cae63d 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java
@@ -16,6 +16,16 @@
  */
 package org.apache.rocketmq.eventbridge.adapter.runtime.common.plugin;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.TreeSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -23,7 +33,6 @@ import java.io.IOException;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.*;
 import java.util.regex.Pattern;
 
 public class PluginUtils {
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java
index e16d4ec..04aa07b 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java
@@ -37,7 +37,7 @@ import java.util.Map;
  */
 public class FileBaseKeyValueStore<K, V> extends MemoryBasedKeyValueStore<K, 
V> {
 
-    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
 
     private String configFilePath;
     private Converter keyConverter;
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java
index ea5fd11..cf0cc79 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java
@@ -29,25 +29,24 @@ import java.util.Properties;
  */
 public class RuntimeConfigProps {
 
-    private final static Logger logger = 
LoggerFactory.getLogger(RuntimeConfigProps.class);
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(RuntimeConfigProps.class);
 
     private Properties properties;
 
-    private RuntimeConfigProps(){
+    private RuntimeConfigProps() {
         try {
-             properties = 
PropertiesLoaderUtils.loadAllProperties("runtime.properties");
+            properties = 
PropertiesLoaderUtils.loadAllProperties("runtime.properties");
         } catch (IOException exception) {
-            logger.error("runtime load properties failed, stackTrace-", 
exception);
+            LOGGER.error("runtime load properties failed, stackTrace-", 
exception);
         }
     }
 
-    private static class  RuntimerConfigPropsHolder{
-        private static final RuntimeConfigProps instance = new 
RuntimeConfigProps();
+    private static class RuntimerConfigPropsHolder {
+        private static final RuntimeConfigProps INSTANCE = new 
RuntimeConfigProps();
     }
 
-    public static RuntimeConfigProps build(){
-        return RuntimerConfigPropsHolder.instance;
+    public static RuntimeConfigProps build() {
+        return RuntimerConfigPropsHolder.INSTANCE;
     }
 
-
 }
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java
index 56fa123..00b3c4a 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java
@@ -26,7 +26,6 @@ import 
org.apache.rocketmq.eventbridge.domain.repository.EventTargetRunnerReposi
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.DependsOn;
 
 @Configuration
 public class RuntimeConfiguration {
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java
index dfad654..34800ba 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java
@@ -30,7 +30,7 @@ import java.io.UnsupportedEncodingException;
  */
 public class JsonConverter implements Converter {
 
-    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
 
     private Class clazz;
 
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java
index b704c7b..2c31046 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java
@@ -32,7 +32,7 @@ import java.util.List;
  */
 public class ListConverter implements Converter<List> {
 
-    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
 
     private Class clazz;
 
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java
index 4a6d9ef..5b3af12 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java
@@ -31,7 +31,7 @@ import java.io.UnsupportedEncodingException;
  */
 public class RecordOffsetConverter implements Converter<RecordOffset> {
 
-    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
 
     @Override
     public byte[] objectToByte(RecordOffset recordOffset) {
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java
index b7cee55..a988981 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java
@@ -91,7 +91,7 @@ public class ErrorHandler {
                     return -1;
                 }
                 int pow = (int) Math.pow(2, 3 + retryTimes);
-                return (pow > 512 ? 512 : pow);
+                return pow > 512 ? 512 : pow;
             default:
                 return -1;
         }
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java
index 0615e35..589238f 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java
@@ -46,7 +46,7 @@ public abstract class AbstractTargetRunnerConfigObserver 
implements TargetRunner
 
     @Override
     public Set<SubscribeRunnerKeys> getSubscribeRunnerKeys() {
-        if(CollectionUtils.isEmpty(targetRunnerConfigs)){
+        if (CollectionUtils.isEmpty(targetRunnerConfigs)) {
             return null;
         }
         return targetRunnerConfigs.stream().map(item -> {
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java
index cbd01e0..85547f2 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java
@@ -38,7 +38,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig;
 import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
-import org.springframework.stereotype.Component;
 
 @Slf4j
 //@Component
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java
index e6a9d1d..30bb31e 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java
@@ -25,7 +25,7 @@ import java.io.StringWriter;
 
 public class ExceptionUtil {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(ExceptionUtil.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ExceptionUtil.class);
 
     public static String getErrorMessage(Throwable e) {
         if (null == e) {
@@ -40,7 +40,7 @@ public class ExceptionUtil {
             StringBuffer buffer = stringWriter.getBuffer();
             return buffer.toString();
         } catch (Throwable ex) {
-            logger.error("", ex);
+            LOGGER.error("", ex);
         }
         return null;
     }
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
index cf8679d..2405a8f 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
@@ -20,14 +20,12 @@ package 
org.apache.rocketmq.eventbridge.adapter.runtime.utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 public class ShutdownUtils {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(ShutdownUtils.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ShutdownUtils.class);
 
     public static void shutdownThreadPool(ExecutorService executor) {
         if (executor != null) {
@@ -35,7 +33,7 @@ public class ShutdownUtils {
             try {
                 executor.awaitTermination(60, TimeUnit.SECONDS);
             } catch (Exception e) {
-                logger.error("Shutdown threadPool failed", e);
+                LOGGER.error("Shutdown threadPool failed", e);
             }
             if (!executor.isTerminated()) {
                 executor.shutdownNow();
diff --git 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
index 4d32dad..58dda61 100644
--- 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
+++ 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
@@ -27,6 +27,13 @@ import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
 import io.openmessaging.internal.DefaultKeyValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -57,7 +64,6 @@ import org.springframework.stereotype.Component;
 import javax.annotation.PostConstruct;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
-import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -72,7 +78,7 @@ import java.util.stream.Collectors;
 @DependsOn("flyway")
 public class RocketMQEventSubscriber extends EventSubscriber {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(RocketMQEventSubscriber.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RocketMQEventSubscriber.class);
 
     @Autowired
     private EventDataRepository eventDataRepository;
@@ -98,7 +104,7 @@ public class RocketMQEventSubscriber extends EventSubscriber 
{
     public static final String MSG_ID = "msgId";
 
     @PostConstruct
-    public void initRocketMQEventSubscriber(){
+    public void initRocketMQEventSubscriber() {
         this.initMqProperties();
         this.initConsumeWorkers();
     }
@@ -123,18 +129,18 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
         ArrayList<MessageExt> messages = new ArrayList<>();
         messageBuffer.drainTo(messages, pullBatchSize);
         if (CollectionUtils.isEmpty(messages)) {
-            logger.trace("consumer poll message empty.");
+            LOGGER.trace("consumer poll message empty.");
             return null;
         }
         List<ConnectRecord> connectRecords = new CopyOnWriteArrayList<>();
         List<CompletableFuture<Void>> completableFutures = 
Lists.newArrayList();
-        messages.forEach(item->{
-            CompletableFuture<Void> recordCompletableFuture = 
CompletableFuture.supplyAsync(()-> convertToSinkRecord(item))
-                    .exceptionally((exception) -> {
-                        logger.error("execute completable job 
failed,stackTrace-", exception);
-                        return null;
-                    })
-                    .thenAccept(connectRecords::add);
+        messages.forEach(item -> {
+            CompletableFuture<Void> recordCompletableFuture = 
CompletableFuture.supplyAsync(() -> convertToSinkRecord(item))
+                .exceptionally((exception) -> {
+                    LOGGER.error("execute completable job failed", exception);
+                    return null;
+                })
+                .thenAccept(connectRecords::add);
             completableFutures.add(recordCompletableFuture);
         });
 
@@ -145,24 +151,25 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
 
     /**
      * group by runner name batch commit
+     *
      * @param connectRecordList
      */
     @Override
     public void commit(List<ConnectRecord> connectRecordList) {
-        if(CollectionUtils.isEmpty(connectRecordList)){
-            logger.warn("commit event record data empty!");
+        if (CollectionUtils.isEmpty(connectRecordList)) {
+            LOGGER.warn("commit event record data empty!");
             return;
         }
         String runnerName = 
connectRecordList.iterator().next().getExtension(RuntimeConfigDefine.RUNNER_NAME);
         List<String> msgIds = connectRecordList.stream().map(item -> 
item.getPosition()
-                
.getPartition().getPartition().get(MSG_ID).toString()).collect(Collectors.toList());
+            
.getPartition().getPartition().get(MSG_ID).toString()).collect(Collectors.toList());
         consumeWorkerMap.get(runnerName).commit(msgIds);
     }
 
     @Override
     public void close() {
         for (Map.Entry<String, ConsumeWorker> item : 
consumeWorkerMap.entrySet()) {
-            ConsumeWorker consumeWorker =  item.getValue();
+            ConsumeWorker consumeWorker = item.getValue();
             consumeWorker.shutdown();
         }
     }
@@ -187,7 +194,7 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
 
             clientConfig.setNameSrvAddr(namesrvAddr);
             
clientConfig.setAccessChannel(AccessChannel.CLOUD.name().equals(accessChannel) ?
-                    AccessChannel.CLOUD : AccessChannel.LOCAL);
+                AccessChannel.CLOUD : AccessChannel.LOCAL);
             clientConfig.setNamespace(namespace);
             this.clientConfig = clientConfig;
 
@@ -196,7 +203,7 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
             }
 
             if (StringUtils.isNotBlank(socks5UserName) && 
StringUtils.isNotBlank(socks5Password)
-                    && StringUtils.isNotBlank(socks5Endpoint)) {
+                && StringUtils.isNotBlank(socks5Endpoint)) {
                 SocksProxyConfig proxyConfig = new SocksProxyConfig();
                 proxyConfig.setUsername(socks5UserName);
                 proxyConfig.setPassword(socks5Password);
@@ -206,8 +213,8 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
                 this.socksProxy = new Gson().toJson(proxyConfigMap);
             }
 
-        }catch (Exception exception){
-            logger.error("init rocket mq property exception, stack trace-", 
exception);
+        } catch (Exception exception) {
+            LOGGER.error("init rocket mq property exception, stack trace-", 
exception);
         }
     }
 
@@ -215,8 +222,8 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
      * init rocket mq pull consumer
      */
     private void initConsumeWorkers() {
-        Set<SubscribeRunnerKeys> subscribeRunnerKeysSet =  
runnerConfigObserver.getSubscribeRunnerKeys();
-        if(subscribeRunnerKeysSet == null || subscribeRunnerKeysSet.isEmpty()){
+        Set<SubscribeRunnerKeys> subscribeRunnerKeysSet = 
runnerConfigObserver.getSubscribeRunnerKeys();
+        if (subscribeRunnerKeysSet == null || 
subscribeRunnerKeysSet.isEmpty()) {
             return;
         }
         for (SubscribeRunnerKeys subscribeRunnerKeys : subscribeRunnerKeysSet) 
{
@@ -229,6 +236,7 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
 
     /**
      * first init default rocketmq pull consumer
+     *
      * @return
      */
     public LitePullConsumer initLitePullConsumer(SubscribeRunnerKeys 
subscribeRunnerKeys) {
@@ -245,7 +253,7 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
             pullConsumer.attachTopic(topic, "*");
             pullConsumer.startup();
         } catch (Exception exception) {
-            logger.error("init default pull consumer exception, topic -" + 
topic + "-stackTrace-", exception);
+            LOGGER.error("init default pull consumer exception, topic -" + 
topic + "-stackTrace-", exception);
             throw new EventBridgeException(" init rocketmq consumer failed");
         }
         return pullConsumer;
@@ -265,6 +273,7 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
 
     /**
      * MessageExt convert to connect record
+     *
      * @param messageExt
      * @return
      */
@@ -311,7 +320,7 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
 
     private void putConsumeWorker(SubscribeRunnerKeys subscribeRunnerKeys) {
         ConsumeWorker consumeWorker = 
consumeWorkerMap.get(subscribeRunnerKeys.getRunnerName());
-        if (!Objects.isNull(consumeWorker)){
+        if (!Objects.isNull(consumeWorker)) {
             consumeWorker.shutdown();
         }
         LitePullConsumer litePullConsumer = 
initLitePullConsumer(subscribeRunnerKeys);
@@ -322,7 +331,7 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
 
     private void removeConsumeWorker(SubscribeRunnerKeys subscribeRunnerKeys) {
         ConsumeWorker consumeWorker = 
consumeWorkerMap.remove(subscribeRunnerKeys.getRunnerName());
-        if (!Objects.isNull(consumeWorker)){
+        if (!Objects.isNull(consumeWorker)) {
             consumeWorker.shutdown();
         }
     }
@@ -352,12 +361,12 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
                         messageBuffer.put(message);
                     }
                 } catch (Exception exception) {
-                    logger.error(getServiceName() + " - 
RocketMQEventSubscriber pull record exception, stackTrace - ", exception);
+                    LOGGER.error(getServiceName() + " - 
RocketMQEventSubscriber pull record exception, stackTrace - ", exception);
                 }
             }
         }
 
-        public void commit(List<String> messageIds){
+        public void commit(List<String> messageIds) {
             this.pullConsumer.commit(messageIds);
         }
 
diff --git 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
index b37b851..f42cf9b 100644
--- 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
+++ 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
@@ -20,10 +20,6 @@ package 
org.apache.rocketmq.eventbridge.adapter.storage.rocketmq.runtimer.consum
 import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:08 上午
- */
 public class ClientConfig {
     private int rmqPullMessageCacheCapacity = 1000;
     private int rmqPullMessageBatchNums = 20;
@@ -59,7 +55,7 @@ public class ClientConfig {
     }
 
     public void setConsumeFromWhere(
-            final ConsumeFromWhere consumeFromWhere) {
+        final ConsumeFromWhere consumeFromWhere) {
         this.consumeFromWhere = consumeFromWhere;
     }
 
@@ -119,7 +115,6 @@ public class ClientConfig {
         this.accessChannel = accessChannel;
     }
 
-
     public static ClientConfig cloneConfig(ClientConfig clientConfig) {
         ClientConfig newConfig = new ClientConfig();
         
newConfig.setRmqPullMessageBatchNums(clientConfig.getRmqPullMessageBatchNums());
diff --git 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java
 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java
index 9923eab..1755490 100644
--- 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java
+++ 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java
@@ -21,10 +21,6 @@ import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:07 上午
- */
 public class ConsumeRequest {
     private final MessageExt messageExt;
     private final MessageQueue messageQueue;
diff --git 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java
 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java
index 667f17a..834b7c1 100644
--- 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java
+++ 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java
@@ -23,10 +23,6 @@ import org.apache.rocketmq.common.message.MessageExt;
 import java.time.Duration;
 import java.util.List;
 
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:09 上午
- */
 public interface LitePullConsumer {
     void startup() throws MQClientException;
 
diff --git 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
index a76012e..888d36d 100644
--- 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
+++ 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
@@ -48,12 +48,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:10 上午
- */
 public class LitePullConsumerImpl implements LitePullConsumer {
-    private static final Logger log = 
LoggerFactory.getLogger(LitePullConsumerImpl.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LitePullConsumerImpl.class);
     private final DefaultMQPullConsumer rocketmqPullConsumer;
     private final LocalMessageCache localMessageCache;
     private final ClientConfig clientConfig;
@@ -83,7 +79,7 @@ public class LitePullConsumerImpl implements LitePullConsumer 
{
     @Override
     public void startup() throws MQClientException {
         rocketmqPullConsumer.start();
-        log.info("RocketmqPullConsumer start.");
+        LOGGER.info("RocketmqPullConsumer start.");
     }
 
     @Override
@@ -98,7 +94,7 @@ public class LitePullConsumerImpl implements LitePullConsumer 
{
             try {
                 executor.awaitTermination(60, TimeUnit.SECONDS);
             } catch (Exception e) {
-                log.error("Shutdown threadPool failed", e);
+                LOGGER.error("Shutdown threadPool failed", e);
             }
             if (!executor.isTerminated()) {
                 executor.shutdownNow();
@@ -114,7 +110,7 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
             public void messageQueueChanged(String topic, Set<MessageQueue> 
mqAll, Set<MessageQueue> mqDivided) {
                 submitPullTask(topic, tag, mqDivided);
                 localMessageCache.shrinkPullOffsetTable(mqDivided);
-                log.info("Load balance result of topic {} changed, mqAll {}, 
mqDivided {}.", topic, mqAll, mqDivided);
+                LOGGER.info("Load balance result of topic {} changed, mqAll 
{}, mqDivided {}.", topic, mqAll, mqDivided);
             }
         });
     }
@@ -156,7 +152,7 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
             }
         }
         if (CollectionUtils.isEmpty(assignedQueues)) {
-            log.warn("Not found any messageQueue, topic:{}", topic);
+            LOGGER.warn("Not found any messageQueue, topic:{}", topic);
             return;
         }
 
@@ -167,10 +163,10 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
                 try {
                     PullTask pullTask = new PullTask(messageQueue, tag);
                     pullImmediately(pullTask);
-                    log.info("Submit pullTask:{}", messageQueue);
+                    LOGGER.info("Submit pullTask:{}", messageQueue);
                 } catch (Exception e) {
-                    log.error("Failed submit pullTask:{}, {}, wait next 
balancing", topic, messageQueue, e);
-                    // 添加pull失败,等待下次 rebalance
+                    LOGGER.error("Failed submit pullTask:{}, {}, wait next 
balancing", topic, messageQueue, e);
+                    // Failed to add pull task, waiting for the next round of 
re-balance
                     processQueue = 
rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
                             .getProcessQueueTable().remove(messageQueue);
                     if (processQueue != null) {
@@ -215,13 +211,13 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
         public void run() {
             try {
                 if 
(!ServiceState.RUNNING.equals(rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getServiceState()))
 {
-                    log.warn("RocketmqPullConsumer not running, pullTask 
exit.");
+                    LOGGER.warn("RocketmqPullConsumer not running, pullTask 
exit.");
                     return;
                 }
                 ProcessQueue processQueue = 
rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
                         .getProcessQueueTable().get(messageQueue);
                 if (processQueue == null || processQueue.isDropped()) {
-                    log.info("ProcessQueue {} dropped, pullTask exit", 
messageQueue);
+                    LOGGER.info("ProcessQueue {} dropped, pullTask exit", 
messageQueue);
                     return;
                 }
                 long offset = localMessageCache.nextPullOffset(messageQueue);
@@ -231,7 +227,7 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
                     public void onSuccess(PullResult pullResult) {
                         try {
                             if 
(!ServiceState.RUNNING.equals(rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getServiceState()))
 {
-                                log.warn("rocketmqPullConsumer not running, 
pullTask exit.");
+                                LOGGER.warn("rocketmqPullConsumer not running, 
pullTask exit.");
                                 return;
                             }
 
@@ -248,11 +244,11 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
                                         pullImmediately(PullTask.this);
                                     } else {
                                         
localMessageCache.removePullOffset(messageQueue);
-                                        log.info("ProcessQueue {} dropped, 
discard the pulled message.", messageQueue);
+                                        LOGGER.info("ProcessQueue {} dropped, 
discard the pulled message.", messageQueue);
                                     }
                                     break;
                                 case OFFSET_ILLEGAL:
-                                    log.warn("The pull request offset is 
illegal, offset is {}, message queue is {}, " +
+                                    LOGGER.warn("The pull request offset is 
illegal, offset is {}, message queue is {}, " +
                                                     "pull result is {}, delay 
{} ms for next pull",
                                             offset, messageQueue, pullResult, 
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                                     
localMessageCache.updatePullOffset(messageQueue, 
pullResult.getNextBeginOffset());
@@ -260,16 +256,16 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
                                     break;
                                 case NO_NEW_MSG:
                                 case NO_MATCHED_MSG:
-                                    log.info("No NEW_MSG or MATCHED_MSG for 
mq:{}, pull again.", messageQueue);
+                                    LOGGER.info("No NEW_MSG or MATCHED_MSG for 
mq:{}, pull again.", messageQueue);
                                     
localMessageCache.updatePullOffset(messageQueue, 
pullResult.getNextBeginOffset());
                                     pullImmediately(PullTask.this);
                                     break;
                                 default:
-                                    log.warn("Failed to process pullResult, 
mq:{} {}", messageQueue, pullResult);
+                                    LOGGER.warn("Failed to process pullResult, 
mq:{} {}", messageQueue, pullResult);
                                     break;
                             }
                         } catch (Throwable t) {
-                            log.error("Exception occurs when process 
pullResult", t);
+                            LOGGER.error("Exception occurs when process 
pullResult", t);
                             pullLater(PullTask.this, 
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS);
                         }
                     }
@@ -282,13 +278,13 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
                         } else {
                             delayTimeMillis = 
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION;
                         }
-                        log.error("Exception happens when pull message 
process, delay {} ms for message queue {}",
+                        LOGGER.error("Exception happens when pull message 
process, delay {} ms for message queue {}",
                                 delayTimeMillis, messageQueue, e);
                         pullLater(PullTask.this, delayTimeMillis, 
TimeUnit.MILLISECONDS);
                     }
                 });
             } catch (Throwable t) {
-                log.error("Error occurs when pull message process, delay {} ms 
for message queue {}",
+                LOGGER.error("Error occurs when pull message process, delay {} 
ms for message queue {}",
                         PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, messageQueue, t);
                 pullLater(PullTask.this, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, 
TimeUnit.MILLISECONDS);
             }
diff --git 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
index f3e3617..131e6fc 100644
--- 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
+++ 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
@@ -37,10 +37,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:06 上午
- */
 public class LocalMessageCache {
     private static final Logger log = 
LoggerFactory.getLogger(LocalMessageCache.class);
     private final BlockingQueue<ConsumeRequest> consumeRequestCache;
diff --git 
a/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
 
b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
index 61000fd..aae17c2 100644
--- 
a/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
+++ 
b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
@@ -24,8 +24,9 @@ public enum PushRetryStrategyEnum {
      * 3 times: every 10s~20s
      */
     BACKOFF_RETRY(1, 3),
+
     /**
-     * 176 times: 1,2,4,8,16,32,64,128,256,512,512...512秒 ... 512s(176)
+     * 176 times: 1, 2, 4, 8, 16, 36, 64, 128, 256, 512, 512...512s(176)
      */
     EXPONENTIAL_DECAY_RETRY(2, 176);
 
diff --git 
a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java
 
b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java
index 739ecbe..e9e578d 100644
--- 
a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java
+++ 
b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java
@@ -19,9 +19,8 @@ package org.apache.rocketmq.eventbridge.infrastructure.trace;
 
 
 /**
- * EventMeshTraceService
- * SPI可扩展
- * 基于OpenTelemetry实现封装不同追踪器
+ * Offers extension capability via SPI, allowing different tracing/metrics 
observation implementations: OpenTelemetry,
+ * Jaeger, Zipkin, etc.
  */
 public interface TraceStrategy {
 
diff --git 
a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java
 
b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java
index e6f81c2..0ed31ba 100644
--- 
a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java
+++ 
b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java
@@ -43,27 +43,27 @@ public class ValidateFilter implements WebFilter {
 
     private List<AuthValidation> validations = new CopyOnWriteArrayList<>();
 
-    @Value(value="${auth.validation:default}")
+    @Value(value = "${auth.validation:default}")
     private String validationName;
 
     @PostConstruct
     public void init() {
         List<String> validationNames = 
Arrays.stream(validationName.split(",")).collect(Collectors.toList());
-        boolean match = 
Arrays.stream(validationName.split(",")).allMatch(validationName-> 
validationName.equals("default"));
+        boolean match = 
Arrays.stream(validationName.split(",")).allMatch(validationName -> 
validationName.equals("default"));
         if (!match) {
             validationNames.add(0, "default");
         }
-        
validationNames.forEach(action->validations.add(ValidationServiceFactory.getInstance(action)));
+        validationNames.forEach(action -> 
validations.add(ValidationServiceFactory.getInstance(action)));
     }
 
     @Override
     public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) 
{
         ServerHttpRequest request = exchange.getRequest();
         return chain.filter(exchange)
-                .subscriberContext(ctx -> {
-                    AtomicReference<Context> result = new 
AtomicReference<Context>();
-                    validations.forEach(validation-> 
result.set(validation.validate(request, ctx)));
-                    return result.get();
-                });
+            .subscriberContext(ctx -> {
+                AtomicReference<Context> result = new 
AtomicReference<Context>();
+                validations.forEach(validation -> 
result.set(validation.validate(request, ctx)));
+                return result.get();
+            });
     }
 }

Reply via email to