This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch feat/runtimer-commit
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git

commit 50cfa49118ebb009a49795d7bbefea67443e7f31
Author: 2011shenlin <[email protected]>
AuthorDate: Sun Apr 23 16:33:35 2023 +0800

    feat:support record commit.
---
 .../eventbridge/adapter/runtimer/Runtimer.java     | 36 +++++-----
 .../adapter/runtimer/boot/EventRuleTransfer.java   | 76 +++++++++++++---------
 .../adapter/runtimer/boot/EventTargetPusher.java   | 20 +++---
 .../OffsetManager.java}                            | 25 +++++--
 .../runtimer/boot/listener/EventSubscriber.java    | 10 +++
 .../boot/listener/RocketMQEventSubscriber.java     |  4 ++
 .../adapter/runtimer/common/ServiceThread.java     |  1 +
 .../runtimer/config/RuntimerConfiguration.java     | 41 ++++++++++++
 .../{retry => error}/DeadLetterQueueService.java   |  2 +-
 .../runtimer/{retry => error}/ErrorHandler.java    |  9 ++-
 .../adapter/runtimer/retry/EventBusStorage.java    | 25 -------
 .../service/TargetRunnerConfigOnFileObserver.java  |  2 +
 start/src/main/resources/application.properties    | 12 ++--
 13 files changed, 161 insertions(+), 102 deletions(-)

diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
index 7eb922d..495aeee 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
@@ -22,17 +22,14 @@ import javax.annotation.PostConstruct;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventBusListener;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventRuleTransfer;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventTargetPusher;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.OffsetManager;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber;
-import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.RocketMQEventSubscriber;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.RuntimerState;
-import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.enums.ConfigModeEnum;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
-import 
org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigOnDBObserver;
-import 
org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigOnFileObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Value;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
@@ -49,30 +46,33 @@ public class Runtimer {
 
     private CirculatorContext circulatorContext;
 
+    @Autowired
     private TargetRunnerConfigObserver runnerConfigObserver;
+    @Autowired
+    private OffsetManager offsetManager;
+    @Autowired
+    private EventSubscriber eventSubscriber;
 
-    public Runtimer(CirculatorContext circulatorContext, 
@Value("${rumtimer.config.mode}") String configMode) {
+    public Runtimer(
+        CirculatorContext circulatorContext,
+        TargetRunnerConfigObserver runnerConfigObserver,
+        OffsetManager offsetManager,
+        EventSubscriber eventSubscriber) {
         this.circulatorContext = circulatorContext;
-        switch (ConfigModeEnum.parse(configMode)) {
-            case DB:
-                this.runnerConfigObserver = new 
TargetRunnerConfigOnDBObserver();
-                break;
-            default:
-                this.runnerConfigObserver = new 
TargetRunnerConfigOnFileObserver();
-                break;
-        }
+        this.runnerConfigObserver = runnerConfigObserver;
+        this.offsetManager = offsetManager;
+        this.eventSubscriber = eventSubscriber;
     }
 
     @PostConstruct
     public void initAndStart() {
-        logger.info("init runtimer task config");
+        logger.info("Start init runtimer.");
         
circulatorContext.initListenerMetadata(runnerConfigObserver.getTargetRunnerConfig());
-        EventSubscriber eventSubscriber = new 
RocketMQEventSubscriber(runnerConfigObserver);
         runnerConfigObserver.registerListener(circulatorContext);
         runnerConfigObserver.registerListener(eventSubscriber);
         new EventBusListener(circulatorContext, eventSubscriber).start();
-        new EventRuleTransfer(circulatorContext).start();
-        new EventTargetPusher(circulatorContext).start();
+        new EventRuleTransfer(circulatorContext, offsetManager).start();
+        new EventTargetPusher(circulatorContext, offsetManager).start();
         startRuntimer();
     }
 
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
index 635c4bd..06f1ce3 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
@@ -20,6 +20,13 @@ package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
 import com.alibaba.fastjson.JSON;
 import com.google.common.collect.Lists;
 import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import javax.annotation.PostConstruct;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
@@ -28,13 +35,8 @@ import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.*;
-import java.util.stream.Collectors;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
 /**
  * receive event and transfer the rule to pusher
@@ -43,10 +45,14 @@ public class EventRuleTransfer extends ServiceThread {
 
     private static final Logger logger = 
LoggerFactory.getLogger(EventRuleTransfer.class);
 
+    @Autowired
     private final CirculatorContext circulatorContext;
+    @Autowired
+    private final OffsetManager offsetManager;
 
-    public EventRuleTransfer(CirculatorContext circulatorContext) {
+    public EventRuleTransfer(CirculatorContext circulatorContext, 
OffsetManager offsetManager) {
         this.circulatorContext = circulatorContext;
+        this.offsetManager = offsetManager;
     }
 
     @Override
@@ -54,6 +60,10 @@ public class EventRuleTransfer extends ServiceThread {
         return this.getClass().getSimpleName();
     }
 
+    @PostConstruct
+    public void init(){
+        super.start();
+    }
     @Override
     public void run() {
         while (!stopped) {
@@ -64,7 +74,7 @@ public class EventRuleTransfer extends ServiceThread {
                 continue;
             }
             Map<String, TransformEngine<ConnectRecord>> latestTransformMap = 
circulatorContext.getTaskTransformMap();
-            if(MapUtils.isEmpty(latestTransformMap)){
+            if (MapUtils.isEmpty(latestTransformMap)) {
                 logger.warn("latest transform engine is empty, continue by 
curTime - {}", System.currentTimeMillis());
                 this.waitForRunning(3000);
                 continue;
@@ -73,38 +83,40 @@ public class EventRuleTransfer extends ServiceThread {
             String eventChannelName = RuntimerConfigDefine.CHANNEL_NAME;
             String eventChannel = eventRecord.getExtension(eventChannelName);
             Set<TransformEngine<ConnectRecord>> adaptTransformSet = 
latestTransformMap.values().stream()
-                    .filter(engine -> 
eventChannel.equals(engine.getConnectConfig(eventChannelName)))
-                    .collect(Collectors.toSet());
-            if(CollectionUtils.isEmpty(adaptTransformSet)){
-                    logger.warn("adapt specific topic ref transform engine is 
empty, eventChannelName- {}", eventChannel);
-                    this.waitForRunning(3000);
-                    continue;
+                .filter(engine -> 
eventChannel.equals(engine.getConnectConfig(eventChannelName)))
+                .collect(Collectors.toSet());
+            if (CollectionUtils.isEmpty(adaptTransformSet)) {
+                logger.warn("adapt specific topic ref transform engine is 
empty, eventChannelName- {}", eventChannel);
+                this.waitForRunning(3000);
+                continue;
             }
             List<ConnectRecord> afterTransformConnect = Lists.newArrayList();
             List<CompletableFuture<Void>> completableFutures = 
Lists.newArrayList();
-            adaptTransformSet.forEach(transfer->{
-                CompletableFuture<Void> transformFuture = 
CompletableFuture.supplyAsync(()-> transfer.doTransforms(eventRecord))
-                        .exceptionally((exception) -> {
-                            logger.error("transfer do transform event record 
failed,stackTrace-", exception);
-                            return null;
-                        })
-                        .thenAccept(record-> {
-                            if(Objects.nonNull(record)){
-                                String runnerNameKey = 
RuntimerConfigDefine.RUNNER_NAME;
-                                String taskClassKey = 
RuntimerConfigDefine.TASK_CLASS;
-                                record.getExtensions().put(runnerNameKey, 
transfer.getConnectConfig(runnerNameKey));
-                                record.getExtensions().put(taskClassKey, 
transfer.getConnectConfig(taskClassKey));
-                                afterTransformConnect.add(record);
-                            }
-                        });
+            adaptTransformSet.forEach(transfer -> {
+                CompletableFuture<Void> transformFuture = 
CompletableFuture.supplyAsync(() -> transfer.doTransforms(eventRecord))
+                    .exceptionally((exception) -> {
+                        logger.error("transfer do transform event record 
failed,stackTrace-", exception);
+                        return null;
+                    })
+                    .thenAccept(record -> {
+                        if (Objects.nonNull(record)) {
+                            String runnerNameKey = 
RuntimerConfigDefine.RUNNER_NAME;
+                            String taskClassKey = 
RuntimerConfigDefine.TASK_CLASS;
+                            record.getExtensions().put(runnerNameKey, 
transfer.getConnectConfig(runnerNameKey));
+                            record.getExtensions().put(taskClassKey, 
transfer.getConnectConfig(taskClassKey));
+                            afterTransformConnect.add(record);
+                        }else{
+                            offsetManager.commit(eventRecord);
+                        }
+                    });
                 completableFutures.add(transformFuture);
             });
 
-            try{
+            try {
                 CompletableFuture.allOf(completableFutures.toArray(new 
CompletableFuture[adaptTransformSet.size()])).get();
                 circulatorContext.offerTargetTaskQueue(afterTransformConnect);
                 logger.info("offer target task queues succeed, transforms - 
{}", JSON.toJSONString(afterTransformConnect));
-            }catch (Exception exception){
+            } catch (Exception exception) {
                 logger.error("transfer event record failed, stackTrace-", 
exception);
             }
 
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
index 43e6b29..e420ee2 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
@@ -21,29 +21,29 @@ import com.alibaba.fastjson.JSON;
 import com.google.common.collect.Lists;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-
 /**
  * event target push to sink task
  *
  * @author artisan
  */
-public class EventTargetPusher extends ServiceThread{
+public class EventTargetPusher extends ServiceThread {
 
     private static final Logger logger = 
LoggerFactory.getLogger(EventTargetPusher.class);
 
     private final CirculatorContext circulatorContext;
+    private final OffsetManager offsetManager;
 
-    public EventTargetPusher(CirculatorContext circulatorContext) {
+    public EventTargetPusher(CirculatorContext circulatorContext, 
OffsetManager offsetManager) {
         this.circulatorContext = circulatorContext;
+        this.offsetManager = offsetManager;
     }
 
     @Override
@@ -55,7 +55,7 @@ public class EventTargetPusher extends ServiceThread{
                 this.waitForRunning(1000);
                 continue;
             }
-            if(logger.isDebugEnabled()){
+            if (logger.isDebugEnabled()) {
                 logger.debug("start push content by pusher - {}", 
JSON.toJSONString(targetRecord));
             }
 
@@ -63,9 +63,10 @@ public class EventTargetPusher extends ServiceThread{
             executorService.execute(() -> {
                 try {
                     String runnerName = 
targetRecord.getExtensions().getString(RuntimerConfigDefine.RUNNER_NAME);
-                    SinkTask sinkTask = 
circulatorContext.getPusherTaskMap().get(runnerName);;
+                    SinkTask sinkTask = 
circulatorContext.getPusherTaskMap().get(runnerName);
                     sinkTask.put(Lists.newArrayList(targetRecord));
-                }catch (Exception exception){
+                    offsetManager.commit(targetRecord);
+                } catch (Exception exception) {
                     logger.error(getServiceName() + " push target exception, 
record - " + targetRecord + " , stackTrace-", exception);
                 }
             });
@@ -77,5 +78,4 @@ public class EventTargetPusher extends ServiceThread{
         return EventTargetPusher.class.getSimpleName();
     }
 
-
 }
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorageOnRocketMQ.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/OffsetManager.java
similarity index 54%
rename from 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorageOnRocketMQ.java
rename to 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/OffsetManager.java
index 94d8d4a..0613efb 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorageOnRocketMQ.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/OffsetManager.java
@@ -15,20 +15,31 @@
  *  limitations under the License.
  */
 
-package org.apache.rocketmq.eventbridge.adapter.runtimer.retry;
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
 
+import com.google.common.collect.Lists;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import java.util.List;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
-public class EventBusStorageOnRocketMQ implements EventBusStorage {
+@Component
+public class OffsetManager {
 
-    @Override
-    public void put(String eventBusName, ConnectRecord connectRecord, int 
delaySec) {
+    @Autowired
+    EventSubscriber eventSubscriber;
 
+    public OffsetManager(EventSubscriber eventSubscriber) {
+        this.eventSubscriber = eventSubscriber;
     }
 
-    public List<String> parseEventBusName(String eventBusName) {
-        //TODO
-        return null;
+    public void commit(final List<ConnectRecord> connectRecordList) {
+        this.eventSubscriber.commit(connectRecordList);
     }
+
+    public void commit(final ConnectRecord connectRecord) {
+        this.eventSubscriber.commit(Lists.newArrayList(connectRecord));
+    }
+
 }
\ No newline at end of file
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java
index f24ce08..39d08d7 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java
@@ -46,6 +46,16 @@ public abstract class EventSubscriber implements 
TargetRunnerListener {
      */
     public abstract void commit(List<ConnectRecord> connectRecordList);
 
+    /**
+     * Put the connect record to the eventbus.
+     * @param eventBusName
+     * @param connectRecord
+     * @param delaySec
+     */
+    public  void put(String eventBusName, ConnectRecord connectRecord, int 
delaySec){
+        // convert the eventBusName to Topic ?
+    }
+
     /**
      * Call when add new target runner to runtimer.
      *
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
index aa263c7..b4c7527 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
@@ -49,6 +49,7 @@ import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.proxy.SocksProxyConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.io.support.PropertiesLoaderUtils;
 
 import java.nio.charset.StandardCharsets;
@@ -60,16 +61,19 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import org.springframework.stereotype.Component;
 
 /**
  * RocketMQ implement event subscriber
  */
+@Component
 public class RocketMQEventSubscriber extends EventSubscriber {
 
     private static final Logger logger = 
LoggerFactory.getLogger(RocketMQEventSubscriber.class);
 
     private LitePullConsumer pullConsumer;
 
+    @Autowired
     private final TargetRunnerConfigObserver runnerConfigObserver;
 
     private Integer pullTimeOut;
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/ServiceThread.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/ServiceThread.java
index 33f4d9d..566d8e2 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/ServiceThread.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/ServiceThread.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtimer.common;
 
+import javax.annotation.PostConstruct;
 import org.apache.rocketmq.common.CountDownLatch2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfiguration.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfiguration.java
new file mode 100644
index 0000000..7160cca
--- /dev/null
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfiguration.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.config;
+
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.enums.ConfigModeEnum;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigOnDBObserver;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigOnFileObserver;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class RuntimerConfiguration {
+
+    @Bean(name = "runnerConfigObserver")
+    public TargetRunnerConfigObserver 
targetRunnerConfigObserver(@Value("${rumtimer.config.mode}") String configMode) 
{
+        switch (ConfigModeEnum.parse(configMode)) {
+            case DB:
+                return new TargetRunnerConfigOnDBObserver();
+            default:
+                return new TargetRunnerConfigOnFileObserver();
+        }
+    }
+
+}
\ No newline at end of file
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/DeadLetterQueueService.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/DeadLetterQueueService.java
similarity index 92%
rename from 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/DeadLetterQueueService.java
rename to 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/DeadLetterQueueService.java
index ce6a642..fd108c9 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/DeadLetterQueueService.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/DeadLetterQueueService.java
@@ -15,7 +15,7 @@
  *  limitations under the License.
  */
 
-package org.apache.rocketmq.eventbridge.adapter.runtimer.retry;
+package org.apache.rocketmq.eventbridge.adapter.runtimer.error;
 
 public class DeadLetterQueueService {
 
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/ErrorHandler.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java
similarity index 91%
rename from 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/ErrorHandler.java
rename to 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java
index 412b538..16bdbb9 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/ErrorHandler.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java
@@ -15,24 +15,27 @@
  *  limitations under the License.
  */
 
-package org.apache.rocketmq.eventbridge.adapter.runtimer.retry;
+package org.apache.rocketmq.eventbridge.adapter.runtimer.error;
 
 import com.google.common.base.Strings;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerContext;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
 import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
 import static 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.CONNECT_RECORDS_KEY;
 import static 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.RUNNER_NAME;
 
 @Slf4j
+@Component
 public class ErrorHandler {
 
     @Autowired
-    EventBusStorage eventBusStorage;
+    EventSubscriber eventSubscriber;
 
     public void handle(ConnectRecord connectRecord, Throwable t) {
         String eventRunnerName = connectRecord.getExtension(RUNNER_NAME);
@@ -42,7 +45,7 @@ public class ErrorHandler {
         int retryTimes = parseRetryTimes(connectRecord);
         int delaySec = calcDelaySec(retryTimes, pushRetryStrategyEnum);
         if (delaySec > 0) {
-            eventBusStorage.put(eventBusName, connectRecord, delaySec);
+            eventSubscriber.put(eventBusName, connectRecord, delaySec);
         }
     }
 
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorage.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorage.java
deleted file mode 100644
index cbec332..0000000
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorage.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.eventbridge.adapter.runtimer.retry;
-
-import io.openmessaging.connector.api.data.ConnectRecord;
-
-public interface EventBusStorage {
-
-    void put(String eventBusName, ConnectRecord connectRecord, int delaySec);
-}
\ No newline at end of file
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
index 63f1e9d..791efc1 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
@@ -41,9 +41,11 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
 import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
+import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Service;
 
 @Slf4j
+@Component
 public class TargetRunnerConfigOnFileObserver extends 
AbstractTargetRunnerConfigObserver {
 
     private String pathName;
diff --git a/start/src/main/resources/application.properties 
b/start/src/main/resources/application.properties
index 3f86877..3c26e4b 100644
--- a/start/src/main/resources/application.properties
+++ b/start/src/main/resources/application.properties
@@ -17,10 +17,10 @@ server.port=7001
 management.server.port=7002
 management.endpoints.web.base-path=/
 ## database
-spring.datasource.url=jdbc:mysql://localhost:3306/preview_eventbridge?useUnicode=true&characterEncoding=utf8&useSSL=false
+spring.datasource.url=jdbc:mysql://rm-bp18b0b8s152l8s76oo.mysql.rds.aliyuncs.com:3306/rocketmq_eventbridge19?useUnicode=true&characterEncoding=utf8&useSSL=false
 spring.datasource.driver-class-name=com.mysql.jdbc.Driver
-spring.datasource.username=root
-spring.datasource.password=Artisan012
+spring.datasource.username=gongshi
+spring.datasource.password=1qaz513!
 mybatis.mapper-locations=classpath:mybatis/*.xml
 mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
 ## flyway
@@ -30,12 +30,12 @@ rocketmq.namesrvAddr=localhost:9876
 rocketmq.connect.endpoint=http://127.0.0.1:8082
 rocketmq.cluster.name=DefaultCluster
 ## runtimer
-rumtimer.config.mode=FILE
 rumtimer.name=eventbridge-runtimer
-runtimer.pluginpath=/Users/Local/eventbridge/plugin
+rumtimer.config.mode=FILE
+runtimer.pluginpath=/Users/jingluo.sl/eventbridge/plugin
 runtimer.storePathRootDir=/Users/Local/eventbridge/store
 
 ## log
 app.name=rocketmqeventbridge
 log.level=INFO
-log.path=/Users/artisan/logs
\ No newline at end of file
+log.path=~/logs
\ No newline at end of file

Reply via email to