This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git


The following commit(s) were added to refs/heads/runtimer by this push:
     new b8516c0  feat:targets own isolate consumerGroup
b8516c0 is described below

commit b8516c02b6f49af5da3c7857ed7522ae3a5f034d
Author: changfeng <[email protected]>
AuthorDate: Mon Apr 24 16:22:59 2023 +0800

    feat:targets own isolate consumerGroup
---
 .../boot/listener/RocketMQEventSubscriber.java     | 157 ++++++++++++---------
 .../{consumer => rocketmq}/ClientConfig.java       |   2 +-
 .../{consumer => rocketmq}/ConsumeRequest.java     |   2 +-
 .../ExponentialRetryPolicy.java                    |   2 +-
 .../{consumer => rocketmq}/LitePullConsumer.java   |   2 +-
 .../LitePullConsumerImpl.java                      |  13 +-
 .../{consumer => rocketmq}/LocalMessageCache.java  |   2 +-
 .../{consumer => rocketmq}/RetryPolicy.java        |   2 +-
 8 files changed, 104 insertions(+), 78 deletions(-)

diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
index b4c7527..338e3ff 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
@@ -37,9 +37,10 @@ import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.utils.NetworkUtil;
-import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer.ClientConfig;
-import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer.LitePullConsumer;
-import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer.LitePullConsumerImpl;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq.ClientConfig;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq.LitePullConsumer;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq.LitePullConsumerImpl;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.enums.RefreshTypeEnum;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
@@ -54,14 +55,18 @@ import 
org.springframework.core.io.support.PropertiesLoaderUtils;
 
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import org.springframework.stereotype.Component;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * RocketMQ implement event subscriber
@@ -71,7 +76,7 @@ public class RocketMQEventSubscriber extends EventSubscriber {
 
     private static final Logger logger = 
LoggerFactory.getLogger(RocketMQEventSubscriber.class);
 
-    private LitePullConsumer pullConsumer;
+    private final BlockingQueue<MessageExt> messageBuffer = new 
LinkedBlockingQueue<>(50000);
 
     @Autowired
     private final TargetRunnerConfigObserver runnerConfigObserver;
@@ -82,46 +87,42 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
     private ClientConfig clientConfig;
     private SessionCredentials sessionCredentials;
     private String socksProxy;
+    private Map<String, ConsumeWorker> consumeWorkerMap = new 
ConcurrentHashMap<>();
 
     private static final String SEMICOLON = ";";
 
     private static final String SYS_DEFAULT_GROUP = 
"event-bridge-default-group";
 
     public static final String QUEUE_OFFSET = "queueOffset";
+    private static final String RUNNER_NAME = "runnerName";
 
     public RocketMQEventSubscriber(TargetRunnerConfigObserver 
runnerConfigObserver) {
         this.runnerConfigObserver = runnerConfigObserver;
         this.initMqProperties();
-        this.initPullConsumer();
+        this.initConsumeWorkers(runnerConfigObserver);
     }
 
     @Override
     public void refresh(TargetRunnerConfig targetRunnerConfig, RefreshTypeEnum 
refreshTypeEnum) {
-        if(Objects.isNull(pullConsumer)){
-            pullConsumer = initLitePullConsumer();
-            return;
-        }
-        Set<String> currentTopics = 
parseTopicsByRunnerConfigs(Sets.newHashSet(targetRunnerConfig));
-        for (String topic : currentTopics){
-            switch (refreshTypeEnum){
-                case ADD:
-                case UPDATE:
-                        subscribe(topic);
-                        break;
-                case DELETE:
-                        unSubscribe(topic);
-                        break;
-                default:
-                    break;
-            }
+        switch (refreshTypeEnum) {
+            case ADD:
+            case UPDATE:
+                putConsumeWorker(targetRunnerConfig);
+                break;
+            case DELETE:
+                removeConsumeWorker(targetRunnerConfig);
+                break;
+            default:
+                break;
         }
     }
 
     @Override
     public List<ConnectRecord> pull() {
-        List<MessageExt> messages = pullConsumer.poll(pullBatchSize, 
Duration.ofSeconds(pullTimeOut));
+        ArrayList<MessageExt> messages = new ArrayList<>();
+        messageBuffer.drainTo(messages, pullBatchSize);
         if (CollectionUtils.isEmpty(messages)) {
-            logger.info("consumer poll message empty , consumer - {}", 
JSON.toJSONString(pullConsumer));
+            logger.info("consumer poll message empty.");
             return null;
         }
         List<ConnectRecord> connectRecords = Lists.newArrayList();
@@ -148,23 +149,17 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
 
     /**
      * parse topics by specific target runner configs
-     * @param targetRunnerConfigs
+     * @param targetRunnerConfig
      * @return
      */
-    public Set<String> parseTopicsByRunnerConfigs(Set<TargetRunnerConfig> 
targetRunnerConfigs){
-        if(CollectionUtils.isEmpty(targetRunnerConfigs)){
-            logger.warn("target runner config is empty, parse to topic 
failed!");
-            return null;
-        }
+    public Set<String> parseTopicsByRunnerConfig(TargetRunnerConfig 
targetRunnerConfig){
         Set<String> listenTopics = Sets.newHashSet();
-        for(TargetRunnerConfig runnerConfig : targetRunnerConfigs){
-            List<Map<String,String>> runnerConfigMap = 
runnerConfig.getComponents();
-            if(CollectionUtils.isEmpty(runnerConfigMap)){
-                logger.warn("target runner config components is empty, config 
info - {}", runnerConfig);
-                continue;
-            }
-            
listenTopics.add(runnerConfigMap.iterator().next().get(RuntimerConfigDefine.CHANNEL_NAME));
+        List<Map<String,String>> runnerConfigMap = 
targetRunnerConfig.getComponents();
+        if (CollectionUtils.isEmpty(runnerConfigMap)){
+            logger.warn("target runner config components is empty, config info 
- {}", targetRunnerConfig);
+            return listenTopics;
         }
+        
listenTopics.add(runnerConfigMap.iterator().next().get(RuntimerConfigDefine.CHANNEL_NAME));
         return listenTopics;
     }
 
@@ -218,28 +213,25 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
     /**
      * init rocket mq pull consumer
      */
-    private void initPullConsumer() {
-        pullConsumer = initLitePullConsumer();
+    private void initConsumeWorkers(TargetRunnerConfigObserver 
runnerConfigObserver) {
+        for (TargetRunnerConfig targetRunnerConfig : 
runnerConfigObserver.getTargetRunnerConfig()) {
+            LitePullConsumer litePullConsumer = 
initLitePullConsumer(targetRunnerConfig);
+            ConsumeWorker consumeWorker = new ConsumeWorker(litePullConsumer, 
targetRunnerConfig.getName());
+            consumeWorkerMap.put(targetRunnerConfig.getName(), consumeWorker);
+            consumeWorker.start();
+        }
+
     }
 
     /**
      * first init default rocketmq pull consumer
      * @return
      */
-    public LitePullConsumer initLitePullConsumer() {
-        if (pullConsumer != null) {
-            try {
-                pullConsumer.shutdown();
-            } catch (Exception e) {
-                logger.error("Consumer shutdown failed.", e);
-            }
-        }
-
-        Set<TargetRunnerConfig> targetRunnerConfigs = 
runnerConfigObserver.getTargetRunnerConfig();
-        Set<String> topics = parseTopicsByRunnerConfigs(targetRunnerConfigs);
+    public LitePullConsumer initLitePullConsumer(TargetRunnerConfig 
targetRunnerConfig) {
+        Set<String> topics = parseTopicsByRunnerConfig(targetRunnerConfig);
 
         RPCHook rpcHook = this.sessionCredentials != null ? new 
AclClientRPCHook(this.sessionCredentials) : null;
-        this.pullConsumer = new LitePullConsumerImpl(this.clientConfig, 
rpcHook);
+        LitePullConsumerImpl pullConsumer = new 
LitePullConsumerImpl(this.clientConfig, rpcHook);
         if (StringUtils.isNotBlank(this.socksProxy)) {
             pullConsumer.setSockProxyJson(this.socksProxy);
         }
@@ -310,20 +302,59 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
         return recordOffset;
     }
 
-    /**
-     * new topic for subscribe
-     * @param topic
-     */
-    private void subscribe(String topic) {
-        pullConsumer.subscribe(topic);
+    private void putConsumeWorker(TargetRunnerConfig targetRunnerConfig) {
+        ConsumeWorker consumeWorker = 
consumeWorkerMap.get(targetRunnerConfig.getName());
+        if (!Objects.isNull(consumeWorker)){
+            consumeWorker.shutdown();
+        }
+        LitePullConsumer litePullConsumer = 
initLitePullConsumer(targetRunnerConfig);
+        ConsumeWorker newWorker = new ConsumeWorker(litePullConsumer, 
targetRunnerConfig.getName());
+        consumeWorkerMap.put(targetRunnerConfig.getName(), newWorker);
+        newWorker.start();
     }
 
-    /**
-     * unsubscribe old topic
-     * @param topic
-     */
-    private void unSubscribe(String topic) {
-        pullConsumer.unsubscribe(topic);
+    private void removeConsumeWorker(TargetRunnerConfig targetRunnerConfig) {
+        ConsumeWorker consumeWorker = 
consumeWorkerMap.remove(targetRunnerConfig.getName());
+        if (!Objects.isNull(consumeWorker)){
+            consumeWorker.shutdown();
+        }
+    }
+
+    class ConsumeWorker extends ServiceThread {
+
+        private final LitePullConsumer pullConsumer;
+        private final String runnerName;
+
+        public ConsumeWorker(LitePullConsumer pullConsumer, String runnerName) 
{
+            this.pullConsumer = pullConsumer;
+            this.runnerName = runnerName;
+        }
+
+        @Override
+        public String getServiceName() {
+            return ConsumeWorker.class.getSimpleName();
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                try {
+                    List<MessageExt> messages = 
pullConsumer.poll(pullBatchSize, Duration.ofSeconds(pullTimeOut));
+                    for (MessageExt message : messages) {
+                        message.putUserProperty(RUNNER_NAME, runnerName);
+                        messageBuffer.put(message);
+                    }
+                } catch (Exception exception) {
+                    logger.error(getServiceName() + " - event bus pull record 
exception, stackTrace - ", exception);
+                }
+            }
+        }
+
+        @Override
+        public void shutdown() {
+            pullConsumer.shutdown();
+            super.shutdown();
+        }
     }
 
 }
\ No newline at end of file
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ClientConfig.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/ClientConfig.java
similarity index 99%
rename from 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ClientConfig.java
rename to 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/ClientConfig.java
index 2df7b4e..0be8b2c 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ClientConfig.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/ClientConfig.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq;
 
 import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ConsumeRequest.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/ConsumeRequest.java
similarity index 99%
rename from 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ConsumeRequest.java
rename to 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/ConsumeRequest.java
index c59e2e1..404769a 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ConsumeRequest.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/ConsumeRequest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq;
 
 import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
 import org.apache.rocketmq.common.message.MessageExt;
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ExponentialRetryPolicy.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/ExponentialRetryPolicy.java
similarity index 99%
rename from 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ExponentialRetryPolicy.java
rename to 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/ExponentialRetryPolicy.java
index c217784..06c0b5b 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ExponentialRetryPolicy.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/ExponentialRetryPolicy.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq;
 
 import com.google.common.base.MoreObjects;
 
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumer.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/LitePullConsumer.java
similarity index 98%
rename from 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumer.java
rename to 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/LitePullConsumer.java
index 46f03a3..e9ed516 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumer.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/LitePullConsumer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq;
 
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumerImpl.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/LitePullConsumerImpl.java
similarity index 97%
rename from 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumerImpl.java
rename to 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/LitePullConsumerImpl.java
index 3d6f74a..21e94c2 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumerImpl.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/LitePullConsumerImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -64,12 +64,14 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
     private final ExecutorService executorService = 
Executors.newSingleThreadExecutor(
             ThreadUtils.newThreadFactory("PullConsumerExecutorService", 
false));
 
+    private static final String DEFAULT_INSTANCE_NAME = 
"EventBridge_Consumer_INSTANCE";
+
     public LitePullConsumerImpl(final ClientConfig clientConfig, final RPCHook 
rpcHook) {
         this.clientConfig = clientConfig;
         rocketmqPullConsumer = new 
DefaultMQPullConsumer(clientConfig.getConsumerGroup(), rpcHook);
         rocketmqPullConsumer.setNamesrvAddr(clientConfig.getNameSrvAddr());
         rocketmqPullConsumer.setAllocateMessageQueueStrategy(new 
AllocateMessageQueueAveragelyByCircle());
-        
rocketmqPullConsumer.setInstanceName(buildInstanceName(clientConfig.getNameSrvAddr(),
 clientConfig.getConsumerGroup()));
+        rocketmqPullConsumer.setInstanceName(DEFAULT_INSTANCE_NAME);
         if (clientConfig.getAccessChannel() != null) {
             
rocketmqPullConsumer.setAccessChannel(clientConfig.getAccessChannel());
         }
@@ -147,13 +149,6 @@ public class LitePullConsumerImpl implements 
LitePullConsumer {
         rocketmqPullConsumer.setSocksProxyConfig(proxyJson);
     }
 
-    private String buildInstanceName(String nameSrvAddr, String consumerGroup) 
{
-        return UtilAll.getPid()
-                + "#" + nameSrvAddr.hashCode()
-                + "#" + consumerGroup
-                + "#" + System.nanoTime();
-    }
-
     private RetryPolicy getRetryPolicy(MessageQueue messageQueue) {
         return retryPolicies.computeIfAbsent(messageQueue, queue ->
                 new ExponentialRetryPolicy(Duration.ofMillis(100).toMillis(), 
Duration.ofMinutes(10).toMillis(), 2));
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LocalMessageCache.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/LocalMessageCache.java
similarity index 99%
rename from 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LocalMessageCache.java
rename to 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/LocalMessageCache.java
index 73d0e87..eaa3538 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LocalMessageCache.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/LocalMessageCache.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq;
 
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.exception.MQClientException;
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/RetryPolicy.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/RetryPolicy.java
similarity index 98%
rename from 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/RetryPolicy.java
rename to 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/RetryPolicy.java
index 0b4637f..5753119 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/RetryPolicy.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/rocketmq/RetryPolicy.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.rocketmq;
 
 /**
  * @Author changfeng

Reply via email to