This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 13f4d98ff [ISSUE#4178] When there are a large number of errors in the 
HTTP target, RocketMQ consumption is slow (#4544)
13f4d98ff is described below

commit 13f4d98ff9374c5a036eaf608dd8ef36017aa7d5
Author: yanrongzhen <[email protected]>
AuthorDate: Tue Nov 14 18:03:39 2023 +0800

    [ISSUE#4178] When there are a large number of errors in the HTTP target, 
RocketMQ consumption is slow (#4544)
    
    * Provide MQ-storage-based retry strategy in http protocol.
    
    * Remove unused method.
    
    * fix
    
    * fix
    
    * Add retry, rocketmq-retry module.
    
    * Do some optimization.
    
    * Use interface instead of HttpHandleMsgContext.
    
    * optimize
    
    * fix
    
    * Remove unused methods.
    
    * fix log
    
    * Remove retry-rocketmq dependency.
    
    * rollback dependency changes.
    
    * Use plugin to load spi.
    
    * Remove unused methods.
    
    * fix: remove storage dependency
    
    * fix: log
    
    * fix: change 'standalone' to 'default'
    
    * fix: codereview
---
 .../org/apache/eventmesh/common/Constants.java     |   2 +
 .../common/config/CommonConfiguration.java         |   4 +
 .../common/protocol/http/common/ProtocolKey.java   |   1 +
 .../eventmesh/common/utils/ReflectUtils.java       |   3 +
 .../eventmesh-retry-api/build.gradle               |  29 ++--
 .../eventmesh/retry/api}/AbstractRetryer.java      |   9 +-
 .../org/apache/eventmesh/retry/api}/Retryer.java   |   4 +-
 .../retry/api/conf/RetryConfiguration.java         |  26 ++--
 .../retry/api/strategy/RetryStrategy.java          |  24 ++--
 .../retry/api}/timer/HashedWheelTimer.java         |   4 +-
 .../apache/eventmesh/retry/api}/timer/Timeout.java |   2 +-
 .../apache/eventmesh/retry/api}/timer/Timer.java   |   2 +-
 .../eventmesh/retry/api}/timer/TimerTask.java      |   6 +-
 .../eventmesh-retry-rocketmq/build.gradle          |  47 +++++++
 .../eventmesh-retry-rocketmq/gradle.properties     |  19 +++
 .../retry/rocketmq/RocketMQRetryStrategyImpl.java  |  75 ++++++++++
 ...ache.eventmesh.retry.api.strategy.RetryStrategy |  16 +++
 eventmesh-runtime/build.gradle                     |   2 +
 eventmesh-runtime/conf/eventmesh.properties        |   2 +
 .../eventmesh/runtime/boot/AbstractHTTPServer.java |   6 +
 .../runtime/boot/AbstractRemotingServer.java       |  19 ++-
 .../eventmesh/runtime/boot/AbstractTCPServer.java  |   6 +
 .../runtime/boot/EventMeshGrpcServer.java          |  19 ++-
 .../runtime/boot/EventMeshHTTPServer.java          |   2 +-
 .../Retryer.java => boot/RemotingServer.java}      |  21 +--
 .../configuration/EventMeshHTTPConfiguration.java  |   1 +
 .../runtime/core/protocol/RetryContext.java        |  85 +++++++++--
 .../consumer/HandleMessageContext.java}            |  21 +--
 .../protocol/grpc/consumer/EventMeshConsumer.java  |   4 +-
 .../processor/BatchPublishCloudEventProcessor.java |   6 +-
 .../processor/PublishCloudEventsProcessor.java     |   6 +-
 .../grpc/processor/ReplyMessageProcessor.java      |   6 +-
 .../grpc/processor/RequestCloudEventProcessor.java |   6 +-
 .../protocol/grpc/producer/ProducerManager.java    |  90 ------------
 .../protocol/grpc/push/AbstractPushRequest.java    |   3 +-
 .../core/protocol/grpc/retry/GrpcRetryer.java      |   2 +-
 .../protocol/http/consumer/EventMeshConsumer.java  |  13 +-
 .../protocol/http/consumer/HandleMsgContext.java   |   3 +-
 .../http/processor/BatchSendMessageProcessor.java  |   5 +-
 .../processor/BatchSendMessageV2Processor.java     |   4 +-
 .../http/processor/ReplyMessageProcessor.java      |   4 +-
 .../http/processor/SendAsyncEventProcessor.java    |   4 +-
 .../http/processor/SendAsyncMessageProcessor.java  |   4 +-
 .../processor/SendAsyncRemoteEventProcessor.java   |   4 +-
 .../http/processor/SendSyncMessageProcessor.java   |   4 +-
 .../protocol/http/producer/EventMeshProducer.java  | 130 -----------------
 .../protocol/http/producer/SendMessageContext.java | 156 ---------------------
 .../http/push/AbstractHTTPPushRequest.java         |  10 +-
 .../protocol/http/push/AsyncHTTPPushRequest.java   |   8 +-
 .../core/protocol/http/retry/HttpRetryer.java      |   2 +-
 .../{grpc => }/producer/EventMeshProducer.java     |  25 +++-
 .../{http => }/producer/ProducerManager.java       |  20 +--
 .../{grpc => }/producer/SendMessageContext.java    |  13 +-
 .../client/session/push/DownStreamMsgContext.java  |   3 +-
 .../tcp/client/session/retry/TcpRetryer.java       |   4 +-
 .../client/session/send/UpStreamMsgContext.java    |   3 +-
 .../eventmesh/spi/EventMeshExtensionType.java      |   1 +
 .../org/apache/eventmesh/api/TopicNameHelper.java  |  26 ++--
 .../rocketmq/common/TopicNameHelperImpl.java       |  21 +--
 .../org.apache.eventmesh.api.TopicNameHelper       |  16 +++
 settings.gradle                                    |   4 +
 61 files changed, 473 insertions(+), 594 deletions(-)

diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
index eaf595659..3bc3cbd7e 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
@@ -204,4 +204,6 @@ public class Constants {
 
     public static final String OS_WIN_PREFIX = "win";
 
+    public static final String DEFAULT = "default";
+
 }
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
index 17f1c5b78..47f52de4c 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.common.config;
 
 import static org.apache.eventmesh.common.Constants.HTTP;
 
+import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.utils.IPUtils;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -111,6 +112,9 @@ public class CommonConfiguration {
     @ConfigFiled(reload = true)
     private String meshGroup;
 
+    @ConfigFiled(field = "server.retry.plugin.type")
+    private String eventMeshRetryPluginType = Constants.DEFAULT;
+
     public void reload() {
         this.eventMeshWebhookOrigin = "eventmesh." + eventMeshIDC;
 
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java
index 636c5035f..71ad6eef0 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java
@@ -32,6 +32,7 @@ public class ProtocolKey {
     public static final String PROTOCOL_VERSION = "protocolversion";
 
     public static final String PROTOCOL_DESC = "protocoldesc";
+    public static final String TOPIC = "topic";
 
     public static final String CONTENT_TYPE = "contenttype";
 
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ReflectUtils.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ReflectUtils.java
index 44fac016a..7dd120568 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ReflectUtils.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ReflectUtils.java
@@ -20,6 +20,9 @@ package org.apache.eventmesh.common.utils;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
 public class ReflectUtils {
 
     /**
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
 b/eventmesh-retry/eventmesh-retry-api/build.gradle
similarity index 64%
copy from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
copy to eventmesh-retry/eventmesh-retry-api/build.gradle
index b004c6aab..228d23f59 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
+++ b/eventmesh-retry/eventmesh-retry-api/build.gradle
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.retry;
+dependencies {
+    implementation project(":eventmesh-storage-plugin:eventmesh-storage-api")
+    implementation project(":eventmesh-common")
+    implementation project(":eventmesh-spi")
 
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
+    implementation 'io.cloudevents:cloudevents-core'
+    implementation 'io.cloudevents:cloudevents-json-jackson'
 
-import java.util.concurrent.TimeUnit;
+    compileOnly 'org.projectlombok:lombok'
+    annotationProcessor 'org.projectlombok:lombok'
 
-/**
- * Retryer interface.
- */
-public interface Retryer {
-
-    void start();
-
-    void shutdown();
-
-    long getPendingTimeouts();
-
-    void printState();
-
-    void newTimeout(TimerTask timerTask, long delay, TimeUnit timeUnit);
-
-}
+    testImplementation 'org.junit.jupiter:junit-jupiter'
+}
\ No newline at end of file
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/AbstractRetryer.java
 
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/AbstractRetryer.java
similarity index 90%
rename from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/AbstractRetryer.java
rename to 
eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/AbstractRetryer.java
index 8f1491346..0b6ddf844 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/AbstractRetryer.java
+++ 
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/AbstractRetryer.java
@@ -15,13 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.protocol;
+package org.apache.eventmesh.retry.api;
 
 import org.apache.eventmesh.common.EventMeshThreadFactory;
-import org.apache.eventmesh.runtime.core.retry.Retryer;
-import org.apache.eventmesh.runtime.core.timer.HashedWheelTimer;
-import org.apache.eventmesh.runtime.core.timer.Timer;
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
+import org.apache.eventmesh.retry.api.timer.HashedWheelTimer;
+import org.apache.eventmesh.retry.api.timer.Timer;
+import org.apache.eventmesh.retry.api.timer.TimerTask;
 
 import java.text.SimpleDateFormat;
 import java.util.Date;
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
 
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/Retryer.java
similarity index 90%
copy from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
copy to 
eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/Retryer.java
index b004c6aab..82634e173 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
+++ 
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/Retryer.java
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.retry;
+package org.apache.eventmesh.retry.api;
 
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
+import org.apache.eventmesh.retry.api.timer.TimerTask;
 
 import java.util.concurrent.TimeUnit;
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
 
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/conf/RetryConfiguration.java
similarity index 64%
copy from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
copy to 
eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/conf/RetryConfiguration.java
index b004c6aab..351a06fd3 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
+++ 
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/conf/RetryConfiguration.java
@@ -15,25 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.retry;
+package org.apache.eventmesh.retry.api.conf;
 
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
+import org.apache.eventmesh.api.producer.Producer;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
 
-import java.util.concurrent.TimeUnit;
+import io.cloudevents.CloudEvent;
 
-/**
- * Retryer interface.
- */
-public interface Retryer {
+import lombok.Builder;
+import lombok.Data;
 
-    void start();
+@Data
+@Builder
+public class RetryConfiguration {
 
-    void shutdown();
+    private CloudEvent event;
 
-    long getPendingTimeouts();
+    private String consumerGroupName;
 
-    void printState();
+    private Producer producer;
 
-    void newTimeout(TimerTask timerTask, long delay, TimeUnit timeUnit);
+    private String topic;
 
+    private SubscriptionMode subscriptionMode;
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
 
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/strategy/RetryStrategy.java
similarity index 66%
copy from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
copy to 
eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/strategy/RetryStrategy.java
index b004c6aab..739be171b 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
+++ 
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/strategy/RetryStrategy.java
@@ -15,25 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.retry;
+package org.apache.eventmesh.retry.api.strategy;
 
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
-
-import java.util.concurrent.TimeUnit;
+import org.apache.eventmesh.retry.api.conf.RetryConfiguration;
+import org.apache.eventmesh.spi.EventMeshExtensionType;
+import org.apache.eventmesh.spi.EventMeshSPI;
 
 /**
- * Retryer interface.
+ * Retry strategy.
  */
-public interface Retryer {
-
-    void start();
-
-    void shutdown();
-
-    long getPendingTimeouts();
-
-    void printState();
-
-    void newTimeout(TimerTask timerTask, long delay, TimeUnit timeUnit);
+@EventMeshSPI(isSingleton = false, eventMeshExtensionType = 
EventMeshExtensionType.RETRY)
+public interface RetryStrategy {
 
+    void retry(RetryConfiguration configuration);
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/HashedWheelTimer.java
 
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/HashedWheelTimer.java
similarity index 99%
rename from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/HashedWheelTimer.java
rename to 
eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/HashedWheelTimer.java
index 443f0a85f..e71abe61a 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/HashedWheelTimer.java
+++ 
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/HashedWheelTimer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.timer;
+package org.apache.eventmesh.retry.api.timer;
 
 import static org.apache.eventmesh.common.Constants.OS_NAME_KEY;
 import static org.apache.eventmesh.common.Constants.OS_WIN_PREFIX;
@@ -632,7 +632,7 @@ public class HashedWheelTimer implements Timer {
             }
 
             try {
-                task.run(this);
+                task.run();
                 task.setExecuteTimeHook(System.currentTimeMillis());
             } catch (Throwable t) {
                 if (logger.isWarnEnabled()) {
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timeout.java
 
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/Timeout.java
similarity index 97%
rename from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timeout.java
rename to 
eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/Timeout.java
index 12cea1aea..e6485ff51 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timeout.java
+++ 
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/Timeout.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.timer;
+package org.apache.eventmesh.retry.api.timer;
 
 /**
  * A handle associated with a {@link TimerTask} that is returned by a
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timer.java
 
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/Timer.java
similarity index 97%
rename from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timer.java
rename to 
eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/Timer.java
index 076f8d04b..50d1ce06a 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timer.java
+++ 
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/Timer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.timer;
+package org.apache.eventmesh.retry.api.timer;
 
 import java.util.Set;
 import java.util.concurrent.RejectedExecutionException;
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/TimerTask.java
 
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/TimerTask.java
similarity index 88%
rename from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/TimerTask.java
rename to 
eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/TimerTask.java
index ae8447f04..879481454 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/TimerTask.java
+++ 
b/eventmesh-retry/eventmesh-retry-api/src/main/java/org/apache/eventmesh/retry/api/timer/TimerTask.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.timer;
+package org.apache.eventmesh.retry.api.timer;
 
 import java.util.concurrent.TimeUnit;
 
@@ -28,10 +28,8 @@ public interface TimerTask {
     /**
      * Executed after the delay specified with
      * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
-     *
-     * @param timeout a handle which is associated with this task
      */
-    void run(Timeout timeout) throws Exception;
+    void run() throws Exception;
 
     /**
      * Hook method to set the execute time.
diff --git a/eventmesh-retry/eventmesh-retry-rocketmq/build.gradle 
b/eventmesh-retry/eventmesh-retry-rocketmq/build.gradle
new file mode 100644
index 000000000..3d33929b4
--- /dev/null
+++ b/eventmesh-retry/eventmesh-retry-rocketmq/build.gradle
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+List rocketmq = [
+        "org.apache.rocketmq:rocketmq-client:$rocketmq_version",
+        "org.apache.rocketmq:rocketmq-broker:$rocketmq_version",
+        "org.apache.rocketmq:rocketmq-common:$rocketmq_version",
+        "org.apache.rocketmq:rocketmq-store:$rocketmq_version",
+        "org.apache.rocketmq:rocketmq-namesrv:$rocketmq_version",
+        "org.apache.rocketmq:rocketmq-tools:$rocketmq_version",
+        "org.apache.rocketmq:rocketmq-remoting:$rocketmq_version",
+        "org.apache.rocketmq:rocketmq-logging:$rocketmq_version",
+        "org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version",
+        "org.apache.rocketmq:rocketmq-filter:$rocketmq_version",
+        "org.apache.rocketmq:rocketmq-acl:$rocketmq_version",
+        "org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version",
+]
+
+dependencies {
+    implementation project(":eventmesh-storage-plugin:eventmesh-storage-api")
+    implementation 
project(":eventmesh-storage-plugin:eventmesh-storage-rocketmq")
+    implementation rocketmq
+    implementation project(":eventmesh-retry:eventmesh-retry-api")
+    implementation project(":eventmesh-common")
+
+    implementation 'io.cloudevents:cloudevents-core'
+    implementation 'io.cloudevents:cloudevents-json-jackson'
+
+    compileOnly 'org.projectlombok:lombok'
+    annotationProcessor 'org.projectlombok:lombok'
+
+    testImplementation 'org.junit.jupiter:junit-jupiter'
+}
\ No newline at end of file
diff --git a/eventmesh-retry/eventmesh-retry-rocketmq/gradle.properties 
b/eventmesh-retry/eventmesh-retry-rocketmq/gradle.properties
new file mode 100644
index 000000000..5114e0523
--- /dev/null
+++ b/eventmesh-retry/eventmesh-retry-rocketmq/gradle.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+rocketmq_version=4.9.5
+pluginType=retry
+pluginName=rocketmq
\ No newline at end of file
diff --git 
a/eventmesh-retry/eventmesh-retry-rocketmq/src/main/java/org/apache/eventmesh/retry/rocketmq/RocketMQRetryStrategyImpl.java
 
b/eventmesh-retry/eventmesh-retry-rocketmq/src/main/java/org/apache/eventmesh/retry/rocketmq/RocketMQRetryStrategyImpl.java
new file mode 100644
index 000000000..9d817eec5
--- /dev/null
+++ 
b/eventmesh-retry/eventmesh-retry-rocketmq/src/main/java/org/apache/eventmesh/retry/rocketmq/RocketMQRetryStrategyImpl.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.retry.rocketmq;
+
+import org.apache.eventmesh.api.SendCallback;
+import org.apache.eventmesh.api.SendResult;
+import org.apache.eventmesh.api.exception.OnExceptionContext;
+import org.apache.eventmesh.api.producer.Producer;
+import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
+import org.apache.eventmesh.retry.api.conf.RetryConfiguration;
+import org.apache.eventmesh.retry.api.strategy.RetryStrategy;
+
+import org.apache.rocketmq.common.MixAll;
+
+import java.util.Objects;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class RocketMQRetryStrategyImpl implements RetryStrategy {
+
+    @Override
+    public void retry(RetryConfiguration configuration) {
+        sendMessageBack(configuration);
+    }
+
+    @SneakyThrows
+    private void sendMessageBack(final RetryConfiguration configuration) {
+        CloudEvent event = configuration.getEvent();
+        String topic = configuration.getTopic();
+        String consumerGroupName = configuration.getConsumerGroupName();
+        String retryTopicName = MixAll.getRetryTopic(consumerGroupName);
+
+        String bizSeqNo = 
Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey())).toString();
+        String uniqueId = 
Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey())).toString();
+        CloudEvent retryEvent = CloudEventBuilder.from(event)
+            .withExtension(ProtocolKey.TOPIC, topic)
+            .withSubject(retryTopicName)
+            .build();
+        Producer producer = configuration.getProducer();
+        producer.publish(retryEvent, new SendCallback() {
+
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                log.info("consumer:{} consume success,, bizSeqno:{}, 
uniqueId:{}",
+                    consumerGroupName, bizSeqNo, uniqueId);
+            }
+
+            @Override
+            public void onException(OnExceptionContext context) {
+                log.warn("consumer:{} consume fail, sendMessageBack, 
bizSeqno:{}, uniqueId:{}",
+                    consumerGroupName, bizSeqNo, uniqueId, 
context.getException());
+            }
+        });
+    }
+}
diff --git 
a/eventmesh-retry/eventmesh-retry-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.retry.api.strategy.RetryStrategy
 
b/eventmesh-retry/eventmesh-retry-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.retry.api.strategy.RetryStrategy
new file mode 100644
index 000000000..71c2006e9
--- /dev/null
+++ 
b/eventmesh-retry/eventmesh-retry-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.retry.api.strategy.RetryStrategy
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+rocketmq=org.apache.eventmesh.retry.rocketmq.RocketMQRetryStrategyImpl
\ No newline at end of file
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index 54f953369..a7fc97018 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -72,6 +72,8 @@ dependencies {
     implementation project(":eventmesh-webhook:eventmesh-webhook-api")
     implementation project(":eventmesh-webhook:eventmesh-webhook-receive")
 
+    implementation project(":eventmesh-retry:eventmesh-retry-api")
+
     testImplementation "org.mockito:mockito-inline"
     testImplementation "org.mockito:mockito-junit-jupiter"
     testImplementation "commons-io:commons-io"
diff --git a/eventmesh-runtime/conf/eventmesh.properties 
b/eventmesh-runtime/conf/eventmesh.properties
index de4150fba..cabe3f9bc 100644
--- a/eventmesh-runtime/conf/eventmesh.properties
+++ b/eventmesh-runtime/conf/eventmesh.properties
@@ -55,6 +55,8 @@ eventMesh.server.retry.sync.pushRetryTimes=3
 eventMesh.server.retry.async.pushRetryDelayInMills=500
 eventMesh.server.retry.sync.pushRetryDelayInMills=500
 eventMesh.server.retry.pushRetryQueueSize=10000
+eventMesh.server.retry.plugin.type=default
+
 # runtime admin
 eventMesh.server.admin.http.port=10106
 # metaStorage
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
index a8d9e41e6..da33f9852 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
@@ -18,6 +18,7 @@
 package org.apache.eventmesh.runtime.boot;
 
 import org.apache.eventmesh.common.ThreadPoolFactory;
+import org.apache.eventmesh.common.config.CommonConfiguration;
 import org.apache.eventmesh.common.protocol.http.HttpCommand;
 import org.apache.eventmesh.common.protocol.http.body.Body;
 import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
@@ -149,6 +150,11 @@ public abstract class AbstractHTTPServer extends 
AbstractRemotingServer {
         httpThreadPoolGroup.initThreadPool();
     }
 
+    @Override
+    public CommonConfiguration getConfiguration() {
+        return eventMeshHttpConfiguration;
+    }
+
     @Override
     public void start() throws Exception {
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
index 0a876c413..7f93e1260 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
@@ -21,6 +21,7 @@ import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.common.utils.LogUtils;
 import org.apache.eventmesh.common.utils.SystemUtils;
 import org.apache.eventmesh.common.utils.ThreadUtils;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
 
 import java.util.concurrent.TimeUnit;
 
@@ -36,7 +37,7 @@ import lombok.extern.slf4j.Slf4j;
  * The most basic server
  */
 @Slf4j
-public abstract class AbstractRemotingServer {
+public abstract class AbstractRemotingServer implements RemotingServer {
 
     private static final int MAX_THREADS = 
Runtime.getRuntime().availableProcessors();
     private static final int DEFAULT_SLEEP_SECONDS = 30;
@@ -44,6 +45,7 @@ public abstract class AbstractRemotingServer {
     private EventLoopGroup bossGroup;
     private EventLoopGroup ioGroup;
     private EventExecutorGroup workerGroup;
+    protected ProducerManager producerManager;
 
     private int port;
 
@@ -68,19 +70,32 @@ public abstract class AbstractRemotingServer {
         workerGroup = new NioEventLoopGroup(MAX_THREADS, new 
EventMeshThreadFactory(threadPrefix + "-worker"));
     }
 
+    protected void initProducerManager() throws Exception {
+        producerManager = new ProducerManager(this);
+        producerManager.init();
+    }
+
+    public ProducerManager getProducerManager() {
+        return producerManager;
+    }
+
     public void init(final String threadPrefix) throws Exception {
         buildBossGroup(threadPrefix);
         buildIOGroup(threadPrefix);
         buildWorkerGroup(threadPrefix);
+        initProducerManager();
     }
 
-    public abstract void start() throws Exception;
+    public void start() throws Exception {
+        producerManager.start();
+    }
 
     public void shutdown() throws Exception {
         if (bossGroup != null) {
             bossGroup.shutdownGracefully();
             LogUtils.info(log, "shutdown bossGroup");
         }
+        producerManager.shutdown();
 
         
ThreadUtils.randomPause(TimeUnit.SECONDS.toMillis(DEFAULT_SLEEP_SECONDS));
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
index f6ae561c7..708f0000e 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
@@ -17,6 +17,7 @@
 
 package org.apache.eventmesh.runtime.boot;
 
+import org.apache.eventmesh.common.config.CommonConfiguration;
 import org.apache.eventmesh.common.protocol.tcp.Command;
 import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
 import org.apache.eventmesh.common.protocol.tcp.Header;
@@ -111,6 +112,11 @@ public class AbstractTCPServer extends 
AbstractRemotingServer {
         tcpThreadPoolGroup.initThreadPool();
     }
 
+    @Override
+    public CommonConfiguration getConfiguration() {
+        return eventMeshTCPConfiguration;
+    }
+
     @Override
     public void start() throws Exception {
         initSharableHandlers();
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
index 1ee3d69da..58832d1d3 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
@@ -22,6 +22,7 @@ import static org.apache.eventmesh.common.Constants.GRPC;
 import org.apache.eventmesh.api.meta.dto.EventMeshRegisterInfo;
 import org.apache.eventmesh.api.meta.dto.EventMeshUnRegisterInfo;
 import org.apache.eventmesh.common.ThreadPoolFactory;
+import org.apache.eventmesh.common.config.CommonConfiguration;
 import org.apache.eventmesh.common.exception.EventMeshException;
 import org.apache.eventmesh.common.utils.IPUtils;
 import org.apache.eventmesh.metrics.api.MetricsPluginFactory;
@@ -30,7 +31,6 @@ import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import 
org.apache.eventmesh.runtime.core.protocol.grpc.consumer.ConsumerManager;
-import 
org.apache.eventmesh.runtime.core.protocol.grpc.producer.ProducerManager;
 import org.apache.eventmesh.runtime.core.protocol.grpc.retry.GrpcRetryer;
 import org.apache.eventmesh.runtime.core.protocol.grpc.service.ConsumerService;
 import 
org.apache.eventmesh.runtime.core.protocol.grpc.service.HeartbeatService;
@@ -60,7 +60,7 @@ import com.google.common.util.concurrent.RateLimiter;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public class EventMeshGrpcServer {
+public class EventMeshGrpcServer extends AbstractRemotingServer {
 
     private final EventMeshGrpcConfiguration eventMeshGrpcConfiguration;
 
@@ -70,8 +70,6 @@ public class EventMeshGrpcServer {
 
     private Server server;
 
-    private ProducerManager producerManager;
-
     private ConsumerManager consumerManager;
 
     private GrpcRetryer grpcRetryer;
@@ -112,9 +110,7 @@ public class EventMeshGrpcServer {
 
         msgRateLimiter = 
RateLimiter.create(eventMeshGrpcConfiguration.getEventMeshMsgReqNumPerSecond());
 
-        producerManager = new ProducerManager(this);
-        producerManager.init();
-
+        initProducerManager();
         consumerManager = new ConsumerManager(this);
         consumerManager.init();
 
@@ -134,6 +130,11 @@ public class EventMeshGrpcServer {
         log.info("-----------------EventMeshGRPCServer initialized");
     }
 
+    @Override
+    public CommonConfiguration getConfiguration() {
+        return eventMeshGrpcConfiguration;
+    }
+
     public void start() throws Exception {
         log.info("---------------EventMeshGRPCServer 
starting-------------------");
 
@@ -207,10 +208,6 @@ public class EventMeshGrpcServer {
         return this.eventMeshGrpcConfiguration;
     }
 
-    public ProducerManager getProducerManager() {
-        return producerManager;
-    }
-
     public ConsumerManager getConsumerManager() {
         return consumerManager;
     }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index 2cfe2d28d..cff5a6e69 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -52,9 +52,9 @@ import 
org.apache.eventmesh.runtime.core.protocol.http.processor.SendSyncMessage
 import 
org.apache.eventmesh.runtime.core.protocol.http.processor.SubscribeProcessor;
 import 
org.apache.eventmesh.runtime.core.protocol.http.processor.UnSubscribeProcessor;
 import 
org.apache.eventmesh.runtime.core.protocol.http.processor.WebHookProcessor;
-import 
org.apache.eventmesh.runtime.core.protocol.http.producer.ProducerManager;
 import org.apache.eventmesh.runtime.core.protocol.http.push.HTTPClientPool;
 import org.apache.eventmesh.runtime.core.protocol.http.retry.HttpRetryer;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
 import org.apache.eventmesh.runtime.meta.MetaStorage;
 import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
 import org.apache.eventmesh.webhook.receive.WebHookController;
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/RemotingServer.java
similarity index 69%
copy from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
copy to 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/RemotingServer.java
index b004c6aab..764075cc6 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/RemotingServer.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.retry;
+package org.apache.eventmesh.runtime.boot;
 
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
-
-import java.util.concurrent.TimeUnit;
+import org.apache.eventmesh.common.config.CommonConfiguration;
 
 /**
- * Retryer interface.
+ * Remoting server interface.
  */
-public interface Retryer {
-
-    void start();
-
-    void shutdown();
-
-    long getPendingTimeouts();
-
-    void printState();
+public interface RemotingServer {
 
-    void newTimeout(TimerTask timerTask, long delay, TimeUnit timeUnit);
+    void init() throws Exception;
 
+    CommonConfiguration getConfiguration();
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
index c266a26bd..a7ed9b26f 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
@@ -131,4 +131,5 @@ public class EventMeshHTTPConfiguration extends 
CommonConfiguration {
 
     @ConfigFiled(field = "blacklist.ipv6")
     private List<IPAddress> eventMeshIpv6BlackList = Collections.emptyList();
+
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java
index 754ac7cfc..ca796dc33 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java
@@ -17,43 +17,98 @@
 
 package org.apache.eventmesh.runtime.core.protocol;
 
-import org.apache.eventmesh.runtime.core.timer.Timeout;
-import org.apache.eventmesh.runtime.core.timer.Timer;
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.config.CommonConfiguration;
+import org.apache.eventmesh.retry.api.conf.RetryConfiguration;
+import org.apache.eventmesh.retry.api.strategy.RetryStrategy;
+import org.apache.eventmesh.retry.api.timer.TimerTask;
+import 
org.apache.eventmesh.runtime.core.protocol.consumer.HandleMessageContext;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
+import org.apache.eventmesh.spi.EventMeshExtensionFactory;
 
-import java.util.concurrent.TimeUnit;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
 
 import io.cloudevents.CloudEvent;
 
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public abstract class RetryContext implements TimerTask {
 
+    private static final Set<String> RETRY_STRATEGY_PROCESSED_EVENT_LIST = 
Collections.synchronizedSet(new HashSet<>());
+
     public CloudEvent event;
 
     public String seq;
 
     public int retryTimes;
 
+    public CommonConfiguration commonConfiguration;
+
     public long executeTime = System.currentTimeMillis();
 
-    public long getExecuteTime() {
-        return executeTime;
+    public void setEvent(CloudEvent event) {
+        this.event = event;
+    }
+
+    @Override
+    public void setExecuteTimeHook(long executeTime) {
+        this.executeTime = executeTime;
     }
 
-    protected void rePut(Timeout timeout, long tick, TimeUnit timeUnit) {
-        if (timeout == null) {
+    @Override
+    public final void run() throws Exception {
+        String eventMeshRetryPluginType = 
Optional.ofNullable(commonConfiguration.getEventMeshRetryPluginType())
+            .orElse(Constants.DEFAULT);
+        if (Constants.DEFAULT.equals(eventMeshRetryPluginType)) {
+            log.warn("Because eventmesh retry plugin is default, retry in 
memory.");
+            doRun();
             return;
         }
-
-        Timer timer = timeout.timer();
-        if (timer.isStop() || timeout.isCancelled()) {
+        if 
(!eventMeshRetryPluginType.equals(commonConfiguration.getEventMeshStoragePluginType()))
 {
+            log.warn("Because eventmesh retry plugin type mismatched with 
storage plugin type, retry in memory.");
+            doRun();
             return;
         }
+        Optional<RetryStrategy> retryStrategy = Optional.ofNullable(
+            EventMeshExtensionFactory.getExtension(RetryStrategy.class,
+                commonConfiguration.getEventMeshRetryPluginType()));
+        if (!retryStrategy.isPresent()) {
+            log.warn("Storage retry SPI not found, retry in memory.");
+            doRun();
+            return;
+        }
+        if (!RETRY_STRATEGY_PROCESSED_EVENT_LIST.contains(event.getId())) {
+            String consumerGroupName = 
getHandleMessageContext().getConsumerGroup();
+            EventMeshProducer producer = 
getProducerManager().getEventMeshProducer(consumerGroupName);
+            RetryConfiguration retryConfiguration = 
RetryConfiguration.builder()
+                .event(event)
+                .consumerGroupName(consumerGroupName)
+                .producer(producer.getMqProducerWrapper().getMeshMQProducer())
+                .topic(getHandleMessageContext().getTopic())
+                .build();
+            retryStrategy.get().retry(retryConfiguration);
+            RETRY_STRATEGY_PROCESSED_EVENT_LIST.add(event.getId());
+        } else {
+            RETRY_STRATEGY_PROCESSED_EVENT_LIST.remove(event.getId());
+            getHandleMessageContext().finish();
+        }
+    }
 
-        timer.newTimeout(timeout.task(), tick, timeUnit);
+    protected HandleMessageContext getHandleMessageContext() throws Exception {
+        throw new IllegalAccessException("method not supported.");
     }
 
-    @Override
-    public void setExecuteTimeHook(long executeTime) {
-        this.executeTime = executeTime;
+    public abstract void doRun() throws Exception;
+
+    @SneakyThrows
+    protected ProducerManager getProducerManager() {
+        throw new IllegalAccessException("method not supported.");
     }
+
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/consumer/HandleMessageContext.java
similarity index 69%
copy from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
copy to 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/consumer/HandleMessageContext.java
index b004c6aab..6ba20f376 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/consumer/HandleMessageContext.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.retry;
-
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
-
-import java.util.concurrent.TimeUnit;
+package org.apache.eventmesh.runtime.core.protocol.consumer;
 
 /**
- * Retryer interface.
+ * Handle message context
  */
-public interface Retryer {
-
-    void start();
-
-    void shutdown();
-
-    long getPendingTimeouts();
+public interface HandleMessageContext {
 
-    void printState();
+    void finish();
 
-    void newTimeout(TimerTask timerTask, long delay, TimeUnit timeUnit);
+    String getTopic();
 
+    String getConsumerGroup();
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
index c8567a9e6..3a4fde027 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
@@ -48,10 +48,10 @@ import 
org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.Co
 import 
org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType;
 import 
org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.StreamTopicConfig;
 import 
org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.WebhookTopicConfig;
-import 
org.apache.eventmesh.runtime.core.protocol.grpc.producer.EventMeshProducer;
-import 
org.apache.eventmesh.runtime.core.protocol.grpc.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.core.protocol.grpc.push.HandleMsgContext;
 import org.apache.eventmesh.runtime.core.protocol.grpc.push.MessageHandler;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 
 import org.apache.commons.collections4.MapUtils;
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/BatchPublishCloudEventProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/BatchPublishCloudEventProcessor.java
index 80a14e435..d912d82ac 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/BatchPublishCloudEventProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/BatchPublishCloudEventProcessor.java
@@ -30,11 +30,11 @@ import 
org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
 import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
 import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
 import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
-import 
org.apache.eventmesh.runtime.core.protocol.grpc.producer.EventMeshProducer;
-import 
org.apache.eventmesh.runtime.core.protocol.grpc.producer.ProducerManager;
-import 
org.apache.eventmesh.runtime.core.protocol.grpc.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;
 import org.apache.eventmesh.runtime.core.protocol.grpc.service.ServiceUtils;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
 
 import java.util.List;
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/PublishCloudEventsProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/PublishCloudEventsProcessor.java
index eb9189bff..996118f29 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/PublishCloudEventsProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/PublishCloudEventsProcessor.java
@@ -28,11 +28,11 @@ import 
org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
 import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
 import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
 import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
-import 
org.apache.eventmesh.runtime.core.protocol.grpc.producer.EventMeshProducer;
-import 
org.apache.eventmesh.runtime.core.protocol.grpc.producer.ProducerManager;
-import 
org.apache.eventmesh.runtime.core.protocol.grpc.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;
 import org.apache.eventmesh.runtime.core.protocol.grpc.service.ServiceUtils;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 
 import lombok.extern.slf4j.Slf4j;
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/ReplyMessageProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/ReplyMessageProcessor.java
index 73c594b54..e7eecbed3 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/ReplyMessageProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/ReplyMessageProcessor.java
@@ -34,11 +34,11 @@ import 
org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
 import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
-import 
org.apache.eventmesh.runtime.core.protocol.grpc.producer.EventMeshProducer;
-import 
org.apache.eventmesh.runtime.core.protocol.grpc.producer.ProducerManager;
-import 
org.apache.eventmesh.runtime.core.protocol.grpc.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;
 import org.apache.eventmesh.runtime.core.protocol.grpc.service.ServiceUtils;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 
 import java.util.concurrent.TimeUnit;
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/RequestCloudEventProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/RequestCloudEventProcessor.java
index ecb50f499..1e934ff85 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/RequestCloudEventProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/RequestCloudEventProcessor.java
@@ -26,11 +26,11 @@ import 
org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
 import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
 import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
 import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
-import 
org.apache.eventmesh.runtime.core.protocol.grpc.producer.EventMeshProducer;
-import 
org.apache.eventmesh.runtime.core.protocol.grpc.producer.ProducerManager;
-import 
org.apache.eventmesh.runtime.core.protocol.grpc.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;
 import org.apache.eventmesh.runtime.core.protocol.grpc.service.ServiceUtils;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 
 import lombok.extern.slf4j.Slf4j;
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/ProducerManager.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/ProducerManager.java
deleted file mode 100644
index 623c50d88..000000000
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/ProducerManager.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.runtime.core.protocol.grpc.producer;
-
-import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
-import org.apache.eventmesh.runtime.common.ServiceState;
-import org.apache.eventmesh.runtime.core.consumergroup.ProducerGroupConf;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class ProducerManager {
-
-    private EventMeshGrpcServer eventMeshGrpcServer;
-
-    private final ConcurrentHashMap<String, EventMeshProducer> producerTable = 
new ConcurrentHashMap<>();
-
-    public ProducerManager(EventMeshGrpcServer eventMeshGrpcServer) {
-        this.eventMeshGrpcServer = eventMeshGrpcServer;
-    }
-
-    public void init() throws Exception {
-        log.info("Grpc ProducerManager inited......");
-    }
-
-    public void start() throws Exception {
-        log.info("Grpc ProducerManager started......");
-    }
-
-    public EventMeshProducer getEventMeshProducer(String producerGroup) throws 
Exception {
-        EventMeshProducer eventMeshProducer = null;
-        if (!producerTable.containsKey(producerGroup)) {
-            synchronized (producerTable) {
-                if (!producerTable.containsKey(producerGroup)) {
-                    ProducerGroupConf producerGroupConfig = new 
ProducerGroupConf(producerGroup);
-                    eventMeshProducer = 
createEventMeshProducer(producerGroupConfig);
-                    eventMeshProducer.start();
-                }
-            }
-        }
-
-        eventMeshProducer = producerTable.get(producerGroup);
-
-        if (ServiceState.RUNNING != eventMeshProducer.getStatus()) {
-            eventMeshProducer.start();
-        }
-
-        return eventMeshProducer;
-    }
-
-    private synchronized EventMeshProducer createEventMeshProducer(
-        ProducerGroupConf producerGroupConfig) throws Exception {
-        if (producerTable.containsKey(producerGroupConfig.getGroupName())) {
-            return producerTable.get(producerGroupConfig.getGroupName());
-        }
-        EventMeshProducer eventMeshProducer = new EventMeshProducer();
-        
eventMeshProducer.init(eventMeshGrpcServer.getEventMeshGrpcConfiguration(),
-            producerGroupConfig);
-        producerTable.put(producerGroupConfig.getGroupName(), 
eventMeshProducer);
-        return eventMeshProducer;
-    }
-
-    public void shutdown() {
-        for (EventMeshProducer eventMeshProducer : producerTable.values()) {
-            try {
-                eventMeshProducer.shutdown();
-            } catch (Exception ex) {
-                log.error("shutdown eventMeshProducer[{}] err", 
eventMeshProducer, ex);
-            }
-        }
-        log.info("producerManager shutdown......");
-    }
-}
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java
index f9f4a23ee..54056c3a1 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java
@@ -32,7 +32,6 @@ import 
org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.RetryContext;
 import 
org.apache.eventmesh.runtime.core.protocol.grpc.consumer.EventMeshConsumer;
 import org.apache.eventmesh.runtime.core.protocol.grpc.retry.GrpcRetryer;
-import org.apache.eventmesh.runtime.core.timer.Timeout;
 
 import java.util.Collections;
 import java.util.Map;
@@ -158,7 +157,7 @@ public abstract class AbstractPushRequest extends 
RetryContext {
     }
 
     @Override
-    public void run(Timeout timeout) throws Exception {
+    public void doRun() throws Exception {
         tryPushRequest();
     }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
index 6493bfdeb..dccd782c1 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
@@ -17,9 +17,9 @@
 
 package org.apache.eventmesh.runtime.core.protocol.grpc.retry;
 
+import org.apache.eventmesh.retry.api.AbstractRetryer;
 import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
 import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
-import org.apache.eventmesh.runtime.core.protocol.AbstractRetryer;
 
 import lombok.extern.slf4j.Slf4j;
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index 5177e8c58..bccd6e9f5 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -28,6 +28,7 @@ import org.apache.eventmesh.api.EventMeshAction;
 import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
 import org.apache.eventmesh.api.SendCallback;
 import org.apache.eventmesh.api.SendResult;
+import org.apache.eventmesh.api.TopicNameHelper;
 import org.apache.eventmesh.api.exception.OnExceptionContext;
 import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.common.protocol.SubscriptionMode;
@@ -37,18 +38,20 @@ import 
org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
 import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
 import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
-import 
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
-import 
org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.core.protocol.http.push.HTTPMessageHandler;
 import org.apache.eventmesh.runtime.core.protocol.http.push.MessageHandler;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.TraceUtils;
+import org.apache.eventmesh.spi.EventMeshExtensionFactory;
 import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants;
 
 import org.apache.commons.collections4.MapUtils;
 
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -109,6 +112,9 @@ public class EventMeshConsumer {
                 EventMeshUtil.getCloudEventExtensionMap(protocolVersion, 
event),
                 
EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
             try {
+                Optional<TopicNameHelper> topicNameHelper =
+                    
Optional.ofNullable(EventMeshExtensionFactory.getExtension(TopicNameHelper.class,
+                        
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshStoragePluginType()));
                 String topic = event.getSubject();
                 String bizSeqNo = 
Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey())).toString();
                 String uniqueId = 
Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey())).toString();
@@ -124,6 +130,9 @@ public class EventMeshConsumer {
                     
messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", 
topic, bizSeqNo, uniqueId);
                 }
 
+                if (topicNameHelper.isPresent() && 
topicNameHelper.get().isRetryTopic(topic)) {
+                    topic = 
String.valueOf(event.getExtension(ProtocolKey.TOPIC));
+                }
                 ConsumerGroupTopicConf currentTopicConfig = 
MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(),
                     topic, null);
                 EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = 
(EventMeshAsyncConsumeContext) context;
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
index e6af880a9..c4046827a 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
@@ -24,6 +24,7 @@ import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
 import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
+import 
org.apache.eventmesh.runtime.core.protocol.consumer.HandleMessageContext;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.time.DateFormatUtils;
@@ -41,7 +42,7 @@ import io.cloudevents.CloudEvent;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public class HandleMsgContext {
+public class HandleMsgContext implements HandleMessageContext {
 
     public static final Logger MESSAGE_LOGGER = 
LoggerFactory.getLogger(EventMeshConstants.MESSAGE);
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
index 9fc1c0b4a..1606a8c7a 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
@@ -42,8 +42,8 @@ import 
org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
 import 
org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
-import 
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
-import 
org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -246,7 +246,6 @@ public class BatchSendMessageProcessor implements 
HttpRequestProcessor {
                 CloudEvent event = null;
                 // TODO: Detect the maximum length of messages for different 
producers.
                 final SendMessageContext sendMessageContext = new 
SendMessageContext(batchId, event, batchEventMeshProducer, eventMeshHTTPServer);
-                sendMessageContext.setEventList(eventlist);
                 batchEventMeshProducer.send(sendMessageContext, new 
SendCallback() {
 
                     @Override
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
index a390cfccb..41e9ab262 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
@@ -41,8 +41,8 @@ import 
org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
 import 
org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
-import 
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
-import 
org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
index 1c8ac67b3..510bb2054 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
@@ -41,8 +41,8 @@ import 
org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
 import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler;
 import 
org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
-import 
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
-import 
org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
index 22e83c1cf..cb96e5433 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
@@ -38,8 +38,8 @@ import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.common.EventMeshTrace;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
-import 
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
-import 
org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
 import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants;
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
index eb327e7c7..153c607fb 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
@@ -42,8 +42,8 @@ import 
org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
 import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler;
 import 
org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
-import 
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
-import 
org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
 import org.apache.eventmesh.runtime.util.TraceUtils;
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
index 909872038..3f59234bd 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
@@ -37,8 +37,8 @@ import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.common.EventMeshTrace;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
-import 
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
-import 
org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
 import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants;
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index e8e712aef..44dcdff50 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -41,8 +41,8 @@ import 
org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
 import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler;
 import 
org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
-import 
org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
-import 
org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
deleted file mode 100644
index 5e7ab7206..000000000
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.runtime.core.protocol.http.producer;
-
-import org.apache.eventmesh.api.RequestReplyCallback;
-import org.apache.eventmesh.api.SendCallback;
-import org.apache.eventmesh.common.Constants;
-import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
-import org.apache.eventmesh.runtime.core.consumergroup.ProducerGroupConf;
-import org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper;
-import org.apache.eventmesh.runtime.util.EventMeshUtil;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class EventMeshProducer {
-
-    protected AtomicBoolean started = new AtomicBoolean(Boolean.FALSE);
-
-    protected AtomicBoolean inited = new AtomicBoolean(Boolean.FALSE);
-
-    public AtomicBoolean getInited() {
-        return inited;
-    }
-
-    public AtomicBoolean getStarted() {
-        return started;
-    }
-
-    public boolean isStarted() {
-        return started.get();
-    }
-
-    protected ProducerGroupConf producerGroupConfig;
-
-    protected EventMeshHTTPConfiguration eventMeshHttpConfiguration;
-
-    protected MQProducerWrapper mqProducerWrapper;
-
-    public void send(SendMessageContext sendMsgContext, SendCallback 
sendCallback) throws Exception {
-        mqProducerWrapper.send(sendMsgContext.getEvent(), sendCallback);
-    }
-
-    public void request(SendMessageContext sendMsgContext, 
RequestReplyCallback rrCallback, long timeout)
-        throws Exception {
-        mqProducerWrapper.request(sendMsgContext.getEvent(), rrCallback, 
timeout);
-    }
-
-    public boolean reply(final SendMessageContext sendMsgContext, final 
SendCallback sendCallback) throws Exception {
-        mqProducerWrapper.reply(sendMsgContext.getEvent(), sendCallback);
-        return true;
-    }
-
-    public MQProducerWrapper getMqProducerWrapper() {
-        return mqProducerWrapper;
-    }
-
-    public void init(EventMeshHTTPConfiguration eventMeshHttpConfiguration,
-        ProducerGroupConf producerGroupConfig) throws Exception {
-        if (!inited.compareAndSet(false, true)) {
-            return;
-        }
-        this.producerGroupConfig = producerGroupConfig;
-        this.eventMeshHttpConfiguration = eventMeshHttpConfiguration;
-
-        Properties keyValue = new Properties();
-        keyValue.put("producerGroup", producerGroupConfig.getGroupName());
-        keyValue.put("instanceName", 
EventMeshUtil.buildMeshClientID(producerGroupConfig.getGroupName(),
-            eventMeshHttpConfiguration.getEventMeshCluster()));
-        if (StringUtils.isNotBlank(producerGroupConfig.getToken())) {
-            keyValue.put(Constants.PRODUCER_TOKEN, 
producerGroupConfig.getToken());
-        }
-
-        // TODO for defibus
-        keyValue.put("eventMeshIDC", 
eventMeshHttpConfiguration.getEventMeshIDC());
-        mqProducerWrapper = new 
MQProducerWrapper(eventMeshHttpConfiguration.getEventMeshStoragePluginType());
-        mqProducerWrapper.init(keyValue);
-        log.info("EventMeshProducer [{}] inited.............", 
producerGroupConfig.getGroupName());
-    }
-
-    public void start() throws Exception {
-
-        if (!started.compareAndSet(false, true)) {
-            return;
-        }
-        mqProducerWrapper.start();
-        log.info("EventMeshProducer [{}] started.............", 
producerGroupConfig.getGroupName());
-    }
-
-    public void shutdown() throws Exception {
-        if (!inited.compareAndSet(true, false)) {
-            return;
-        }
-        if (!started.compareAndSet(true, false)) {
-            return;
-        }
-        mqProducerWrapper.shutdown();
-        log.info("EventMeshProducer [{}] shutdown.............", 
producerGroupConfig.getGroupName());
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("eventMeshProducer={")
-            .append("inited=").append(inited.get()).append(",")
-            .append("started=").append(started.get()).append(",")
-            
.append("producerGroupConfig=").append(producerGroupConfig).append("}");
-        return sb.toString();
-    }
-}
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/SendMessageContext.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/SendMessageContext.java
deleted file mode 100644
index 0e262aa68..000000000
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/SendMessageContext.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.runtime.core.protocol.http.producer;
-
-import org.apache.eventmesh.api.SendCallback;
-import org.apache.eventmesh.api.SendResult;
-import org.apache.eventmesh.api.exception.OnExceptionContext;
-import org.apache.eventmesh.common.Constants;
-import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
-import org.apache.eventmesh.runtime.core.protocol.RetryContext;
-import org.apache.eventmesh.runtime.core.timer.Timeout;
-
-import org.apache.commons.lang3.time.DateFormatUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import io.cloudevents.CloudEvent;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class SendMessageContext extends RetryContext {
-
-    private CloudEvent event;
-
-    private String bizSeqNo;
-
-    private EventMeshProducer eventMeshProducer;
-
-    private long createTime = System.currentTimeMillis();
-
-    private Map<String, String> props;
-
-    public EventMeshHTTPServer eventMeshHTTPServer;
-
-    private List<CloudEvent> eventList;
-
-    public SendMessageContext(String bizSeqNo, CloudEvent event, 
EventMeshProducer eventMeshProducer, EventMeshHTTPServer eventMeshHTTPServer) {
-        this.bizSeqNo = bizSeqNo;
-        this.event = event;
-        this.eventMeshProducer = eventMeshProducer;
-        this.eventMeshHTTPServer = eventMeshHTTPServer;
-    }
-
-    public void addProp(String key, String val) {
-        if (props == null) {
-            props = new HashMap<>();
-        }
-        props.put(key, val);
-    }
-
-    public String getProp(String key) {
-        return props.get(key);
-    }
-
-    public String getBizSeqNo() {
-        return bizSeqNo;
-    }
-
-    public void setBizSeqNo(String bizSeqNo) {
-        this.bizSeqNo = bizSeqNo;
-    }
-
-    public CloudEvent getEvent() {
-        return event;
-    }
-
-    public void setEvent(CloudEvent event) {
-        this.event = event;
-    }
-
-    public EventMeshProducer getEventMeshProducer() {
-        return eventMeshProducer;
-    }
-
-    public void setEventMeshProducer(EventMeshProducer eventMeshProducer) {
-        this.eventMeshProducer = eventMeshProducer;
-    }
-
-    public long getCreateTime() {
-        return createTime;
-    }
-
-    public void setCreateTime(long createTime) {
-        this.createTime = createTime;
-    }
-
-    public List<CloudEvent> getEventList() {
-        return eventList;
-    }
-
-    public void setEventList(List<CloudEvent> eventList) {
-        this.eventList = eventList;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("sendMessageContext={")
-            .append("bizSeqNo=").append(bizSeqNo)
-            .append(",retryTimes=").append(retryTimes)
-            .append(",producer=").append(eventMeshProducer != null ? 
eventMeshProducer.producerGroupConfig.getGroupName() : null)
-            
.append(",executeTime=").append(DateFormatUtils.format(executeTime, 
Constants.DATE_FORMAT_INCLUDE_MILLISECONDS))
-            .append(",createTime=").append(DateFormatUtils.format(createTime, 
Constants.DATE_FORMAT_INCLUDE_MILLISECONDS)).append("}");
-        return sb.toString();
-    }
-
-    public void retry() throws Exception {
-        if (eventMeshProducer == null) {
-            log.error("Exception happends during retry. EventMeshProduceer is 
null.");
-            return;
-        }
-
-        if (retryTimes > 0) { // retry once
-            log.error("Exception happends during retry. The retryTimes > 0.");
-            return;
-        }
-
-        retryTimes++;
-        eventMeshProducer.send(this, new SendCallback() {
-
-            @Override
-            public void onSuccess(SendResult sendResult) {
-            }
-
-            @Override
-            public void onException(OnExceptionContext context) {
-                log.warn("", context.getException());
-                
eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordSendBatchMsgFailed(1);
-            }
-
-        });
-    }
-
-    @Override
-    public void run(Timeout timeout) throws Exception {
-        retry();
-    }
-}
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
index d3cc71a17..562c6ba01 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
@@ -23,7 +23,7 @@ import 
org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.RetryContext;
 import 
org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext;
 import org.apache.eventmesh.runtime.core.protocol.http.retry.HttpRetryer;
-import org.apache.eventmesh.runtime.core.timer.Timeout;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
@@ -73,6 +73,7 @@ public abstract class AbstractHTTPPushRequest extends 
RetryContext {
         this.retryer = 
handleMsgContext.getEventMeshHTTPServer().getHttpRetryer();
         this.ttl = handleMsgContext.getTtl();
         this.startIdx = ThreadLocalRandom.current().nextInt(0, 
totalUrls.size());
+        super.commonConfiguration = eventMeshHttpConfiguration;
     }
 
     public void tryHTTPRequest() {
@@ -129,10 +130,9 @@ public abstract class AbstractHTTPPushRequest extends 
RetryContext {
         }
     }
 
-    protected abstract void doRetry();
-
     @Override
-    public void run(Timeout timeout) throws Exception {
-        doRetry();
+    protected ProducerManager getProducerManager() {
+        return eventMeshHTTPServer.getProducerManager();
     }
+
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index c60a01f5c..9cf01cd48 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -127,6 +127,7 @@ public class AsyncHTTPPushRequest extends 
AbstractHTTPPushRequest {
             .withExtension(EventMeshConstants.RSP_GROUP, 
handleMsgContext.getConsumerGroup())
             .build();
         handleMsgContext.setEvent(event);
+        super.setEvent(event);
 
         String content = "";
         try {
@@ -316,6 +317,11 @@ public class AsyncHTTPPushRequest extends 
AbstractHTTPPushRequest {
         return false;
     }
 
+    @Override
+    protected HandleMsgContext getHandleMessageContext() {
+        return handleMsgContext;
+    }
+
     ClientRetCode processResponseContent(String content) {
         if (StringUtils.isBlank(content)) {
             return ClientRetCode.FAIL;
@@ -359,7 +365,7 @@ public class AsyncHTTPPushRequest extends 
AbstractHTTPPushRequest {
     }
 
     @Override
-    public void doRetry() {
+    public void doRun() {
         tryHTTPRequest();
     }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
index ce1cb6ca5..f081cc159 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
@@ -17,8 +17,8 @@
 
 package org.apache.eventmesh.runtime.core.protocol.http.retry;
 
+import org.apache.eventmesh.retry.api.AbstractRetryer;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
-import org.apache.eventmesh.runtime.core.protocol.AbstractRetryer;
 
 import lombok.extern.slf4j.Slf4j;
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/EventMeshProducer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/EventMeshProducer.java
similarity index 82%
rename from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/EventMeshProducer.java
rename to 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/EventMeshProducer.java
index ae77312b8..bd27d6933 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/EventMeshProducer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/EventMeshProducer.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.protocol.grpc.producer;
+package org.apache.eventmesh.runtime.core.protocol.producer;
 
 import org.apache.eventmesh.api.RequestReplyCallback;
 import org.apache.eventmesh.api.SendCallback;
+import org.apache.eventmesh.common.config.CommonConfiguration;
 import org.apache.eventmesh.runtime.common.ServiceState;
-import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.consumergroup.ProducerGroupConf;
 import org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper;
@@ -53,19 +53,22 @@ public class EventMeshProducer {
         mqProducerWrapper.reply(sendMessageContext.getEvent(), sendCallback);
     }
 
-    public synchronized void init(EventMeshGrpcConfiguration 
eventMeshGrpcConfiguration,
+    public synchronized void init(CommonConfiguration configuration,
         ProducerGroupConf producerGroupConfig) throws Exception {
+        if (ServiceState.INITED == serviceState) {
+            return;
+        }
         this.producerGroupConfig = producerGroupConfig;
 
         Properties keyValue = new Properties();
         keyValue.put(EventMeshConstants.PRODUCER_GROUP, 
producerGroupConfig.getGroupName());
         keyValue.put(EventMeshConstants.INSTANCE_NAME, 
EventMeshUtil.buildMeshClientID(
-            producerGroupConfig.getGroupName(), 
eventMeshGrpcConfiguration.getEventMeshCluster()));
+            producerGroupConfig.getGroupName(), 
configuration.getEventMeshCluster()));
 
         // TODO for defibus
-        keyValue.put(EventMeshConstants.EVENT_MESH_IDC, 
eventMeshGrpcConfiguration.getEventMeshIDC());
+        keyValue.put(EventMeshConstants.EVENT_MESH_IDC, 
configuration.getEventMeshIDC());
         mqProducerWrapper = new MQProducerWrapper(
-            eventMeshGrpcConfiguration.getEventMeshStoragePluginType());
+            configuration.getEventMeshStoragePluginType());
         mqProducerWrapper.init(keyValue);
         serviceState = ServiceState.INITED;
         log.info("EventMeshProducer [{}] inited...........", 
producerGroupConfig.getGroupName());
@@ -82,7 +85,7 @@ public class EventMeshProducer {
     }
 
     public synchronized void shutdown() throws Exception {
-        if (serviceState == null || ServiceState.INITED == serviceState) {
+        if (serviceState == null || ServiceState.STOPPED == serviceState) {
             return;
         }
 
@@ -102,4 +105,12 @@ public class EventMeshProducer {
             .append(producerGroupConfig).append("}");
         return sb.toString();
     }
+
+    public MQProducerWrapper getMqProducerWrapper() {
+        return mqProducerWrapper;
+    }
+
+    public boolean isStarted() {
+        return serviceState == ServiceState.RUNNING;
+    }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/ProducerManager.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/ProducerManager.java
similarity index 85%
rename from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/ProducerManager.java
rename to 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/ProducerManager.java
index 48b3413f6..251790f89 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/ProducerManager.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/ProducerManager.java
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.protocol.http.producer;
+package org.apache.eventmesh.runtime.core.protocol.producer;
 
-import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
+import org.apache.eventmesh.runtime.boot.AbstractRemotingServer;
 import org.apache.eventmesh.runtime.core.consumergroup.ProducerGroupConf;
 
 import java.util.concurrent.ConcurrentHashMap;
@@ -27,15 +27,15 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class ProducerManager {
 
-    private final EventMeshHTTPServer eventMeshHTTPServer;
+    private final AbstractRemotingServer eventMeshServer;
 
     /**
      * key: group name
      */
-    private final ConcurrentHashMap<String, EventMeshProducer> producerTable = 
new ConcurrentHashMap<String, EventMeshProducer>();
+    private final ConcurrentHashMap<String, EventMeshProducer> producerTable = 
new ConcurrentHashMap<>();
 
-    public ProducerManager(EventMeshHTTPServer eventMeshHTTPServer) {
-        this.eventMeshHTTPServer = eventMeshHTTPServer;
+    public ProducerManager(AbstractRemotingServer eventMeshServer) {
+        this.eventMeshServer = eventMeshServer;
     }
 
     public void init() throws Exception {
@@ -81,7 +81,7 @@ public class ProducerManager {
 
         eventMeshProducer = producerTable.get(producerGroup);
 
-        if (!eventMeshProducer.getStarted().get()) {
+        if (!eventMeshProducer.isStarted()) {
             eventMeshProducer.start();
         }
 
@@ -93,7 +93,7 @@ public class ProducerManager {
             return producerTable.get(producerGroupConfig.getGroupName());
         }
         EventMeshProducer eventMeshProducer = new EventMeshProducer();
-        
eventMeshProducer.init(eventMeshHTTPServer.getEventMeshHttpConfiguration(), 
producerGroupConfig);
+        eventMeshProducer.init(eventMeshServer.getConfiguration(), 
producerGroupConfig);
         producerTable.put(producerGroupConfig.getGroupName(), 
eventMeshProducer);
         return eventMeshProducer;
     }
@@ -109,8 +109,8 @@ public class ProducerManager {
         log.info("producerManager shutdown......");
     }
 
-    public EventMeshHTTPServer getEventMeshHTTPServer() {
-        return eventMeshHTTPServer;
+    public AbstractRemotingServer getEventMeshServer() {
+        return eventMeshServer;
     }
 
     public ConcurrentHashMap<String, EventMeshProducer> getProducerTable() {
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/SendMessageContext.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/SendMessageContext.java
similarity index 90%
rename from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/SendMessageContext.java
rename to 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/SendMessageContext.java
index 92bf4a27a..0de970f70 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/SendMessageContext.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/SendMessageContext.java
@@ -15,15 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.protocol.grpc.producer;
+package org.apache.eventmesh.runtime.core.protocol.producer;
 
 import org.apache.eventmesh.api.SendCallback;
 import org.apache.eventmesh.api.SendResult;
 import org.apache.eventmesh.api.exception.OnExceptionContext;
 import org.apache.eventmesh.common.Constants;
-import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
+import org.apache.eventmesh.runtime.boot.AbstractRemotingServer;
 import org.apache.eventmesh.runtime.core.protocol.RetryContext;
-import org.apache.eventmesh.runtime.core.timer.Timeout;
 
 import org.apache.commons.lang3.time.DateFormatUtils;
 
@@ -44,14 +43,14 @@ public class SendMessageContext extends RetryContext {
 
     private long createTime = System.currentTimeMillis();
 
-    public EventMeshGrpcServer eventMeshGrpcServer;
+    public AbstractRemotingServer eventMeshServer;
 
     public SendMessageContext(String bizSeqNo, CloudEvent event, 
EventMeshProducer eventMeshProducer,
-        EventMeshGrpcServer eventMeshGrpcServer) {
+        AbstractRemotingServer eventMeshServer) {
         this.bizSeqNo = bizSeqNo;
         this.event = event;
         this.eventMeshProducer = eventMeshProducer;
-        this.eventMeshGrpcServer = eventMeshGrpcServer;
+        this.eventMeshServer = eventMeshServer;
     }
 
     public String getBizSeqNo() {
@@ -127,7 +126,7 @@ public class SendMessageContext extends RetryContext {
     }
 
     @Override
-    public void run(Timeout timeout) throws Exception {
+    public void doRun() throws Exception {
         retry();
     }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
index 8b00bf579..1a4d53f3d 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
@@ -24,7 +24,6 @@ import 
org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
 import org.apache.eventmesh.runtime.core.protocol.RetryContext;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
-import org.apache.eventmesh.runtime.core.timer.Timeout;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.ServerGlobal;
 
@@ -195,7 +194,7 @@ public class DownStreamMsgContext extends RetryContext {
     }
 
     @Override
-    public void run(Timeout timeout) throws Exception {
+    public void doRun() {
         retry();
     }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/TcpRetryer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/TcpRetryer.java
index 2150eb562..411299440 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/TcpRetryer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/TcpRetryer.java
@@ -18,11 +18,11 @@
 package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry;
 
 import org.apache.eventmesh.common.protocol.SubscriptionType;
+import org.apache.eventmesh.retry.api.AbstractRetryer;
+import org.apache.eventmesh.retry.api.timer.TimerTask;
 import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
-import org.apache.eventmesh.runtime.core.protocol.AbstractRetryer;
 import org.apache.eventmesh.runtime.core.protocol.RetryContext;
 import 
org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 
 import java.util.concurrent.TimeUnit;
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
index ef716801f..d6730a984 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
@@ -27,7 +27,6 @@ import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.RetryContext;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
-import org.apache.eventmesh.runtime.core.timer.Timeout;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.Utils;
 
@@ -163,7 +162,7 @@ public class UpStreamMsgContext extends RetryContext {
     }
 
     @Override
-    public void run(Timeout timeout) throws Exception {
+    public void doRun() throws Exception {
         retry();
     }
 }
diff --git 
a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionType.java
 
b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionType.java
index c540358e8..f76379f9e 100644
--- 
a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionType.java
+++ 
b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionType.java
@@ -34,6 +34,7 @@ public enum EventMeshExtensionType {
     JDBC_SNAPSHOT_ENGINE("jdbc_snapshot_engine"),
     JDBC_DATABASE_DIALECT("jdbc_database_dialect"),
     OFFSETMGMT("offsetMgmt"),
+    RETRY("retry"),
     ;
 
     private final String extensionTypeName;
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
 
b/eventmesh-storage-plugin/eventmesh-storage-api/src/main/java/org/apache/eventmesh/api/TopicNameHelper.java
similarity index 63%
rename from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
rename to 
eventmesh-storage-plugin/eventmesh-storage-api/src/main/java/org/apache/eventmesh/api/TopicNameHelper.java
index b004c6aab..8a7d34f32 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-api/src/main/java/org/apache/eventmesh/api/TopicNameHelper.java
@@ -15,25 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.retry;
+package org.apache.eventmesh.api;
 
-import org.apache.eventmesh.runtime.core.timer.TimerTask;
+import org.apache.eventmesh.spi.EventMeshExtensionType;
+import org.apache.eventmesh.spi.EventMeshSPI;
 
-import java.util.concurrent.TimeUnit;
+import lombok.SneakyThrows;
 
 /**
- * Retryer interface.
+ * Topic name generator.
  */
-public interface Retryer {
-
-    void start();
-
-    void shutdown();
-
-    long getPendingTimeouts();
-
-    void printState();
-
-    void newTimeout(TimerTask timerTask, long delay, TimeUnit timeUnit);
+@EventMeshSPI(isSingleton = false, eventMeshExtensionType = 
EventMeshExtensionType.STORAGE)
+public interface TopicNameHelper {
 
+    @SneakyThrows
+    default boolean isRetryTopic(String retryTopic) {
+        throw new IllegalAccessException("Method not supported.");
+    }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
 
b/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/java/org/apache/eventmesh/storage/rocketmq/common/TopicNameHelperImpl.java
similarity index 63%
copy from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
copy to 
eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/java/org/apache/eventmesh/storage/rocketmq/common/TopicNameHelperImpl.java
index ce1cb6ca5..e0ee8843e 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/java/org/apache/eventmesh/storage/rocketmq/common/TopicNameHelperImpl.java
@@ -15,19 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.protocol.http.retry;
+package org.apache.eventmesh.storage.rocketmq.common;
 
-import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
-import org.apache.eventmesh.runtime.core.protocol.AbstractRetryer;
+import org.apache.eventmesh.api.TopicNameHelper;
 
-import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.MixAll;
 
-@Slf4j
-public class HttpRetryer extends AbstractRetryer {
+public class TopicNameHelperImpl implements TopicNameHelper {
 
-    private final EventMeshHTTPServer eventMeshHTTPServer;
-
-    public HttpRetryer(EventMeshHTTPServer eventMeshHTTPServer) {
-        this.eventMeshHTTPServer = eventMeshHTTPServer;
+    @Override
+    public boolean isRetryTopic(String retryTopic) {
+        if (StringUtils.isBlank(retryTopic)) {
+            return false;
+        }
+        return retryTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
     }
 }
diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.TopicNameHelper
 
b/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.TopicNameHelper
new file mode 100644
index 000000000..4670fbc91
--- /dev/null
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.TopicNameHelper
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+rocketmq=org.apache.eventmesh.storage.rocketmq.common.TopicNameHelperImpl
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index 676cc6e79..fd91ebd59 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -90,3 +90,7 @@ include 'eventmesh-webhook:eventmesh-webhook-api'
 include 'eventmesh-webhook:eventmesh-webhook-admin'
 include 'eventmesh-webhook:eventmesh-webhook-receive'
 
+include 'eventmesh-retry'
+include 'eventmesh-retry:eventmesh-retry-api'
+include 'eventmesh-retry:eventmesh-retry-rocketmq'
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to