This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new f046a2ef4 [ISSUE #4402] Refactor the retry module with 
HashedWheelTimer. (#4505)
f046a2ef4 is described below

commit f046a2ef4dfa299504218a1385948cad03980a87
Author: yanrongzhen <[email protected]>
AuthorDate: Wed Oct 25 14:51:33 2023 +0800

    [ISSUE #4402] Refactor the retry module with HashedWheelTimer. (#4505)
    
    * Add HashedWheelTimer.
    
    * Integrate HashedWheelTimer to AbstractRetryer.
    
    * Refactor Retryer, RetryContxt.
    
    * Remove DelayQueue dependency.
    
    * Add retry debug log.
    
    * Fix checkstyle.
    
    * Optimize Retry interface.
    
    * Remove init method.
---
 .../org/apache/eventmesh/common/Constants.java     |   5 +
 .../connector/PrometheusSourceConnector.java       |   1 +
 .../metrics/api/model/HttpSummaryMetrics.java      |  11 +-
 .../metrics/api/model/RetrySummaryMetrics.java     |  14 +-
 .../metrics/api/model/TcpSummaryMetrics.java       |   6 +-
 .../runtime/admin/response/GetMetricsResponse.java |   8 +-
 .../runtime/boot/EventMeshGrpcServer.java          |   1 -
 .../runtime/boot/EventMeshHTTPServer.java          |   1 -
 .../eventmesh/runtime/boot/EventMeshTCPServer.java |   9 +-
 .../runtime/core/protocol/AbstractRetryer.java     |  95 +--
 .../runtime/core/protocol/RetryContext.java        |  32 +-
 .../protocol/grpc/producer/SendMessageContext.java |   7 +-
 .../protocol/grpc/push/AbstractPushRequest.java    |  15 +-
 .../core/protocol/grpc/retry/GrpcRetryer.java      |  27 -
 .../http/processor/BatchSendMessageProcessor.java  |   4 +-
 .../processor/BatchSendMessageV2Processor.java     |   4 +-
 .../http/processor/SendAsyncEventProcessor.java    |   4 +-
 .../http/processor/SendAsyncMessageProcessor.java  |   4 +-
 .../processor/SendAsyncRemoteEventProcessor.java   |   4 +-
 .../http/processor/SendSyncMessageProcessor.java   |   4 +-
 .../protocol/http/producer/SendMessageContext.java |   7 +-
 .../http/push/AbstractHTTPPushRequest.java         |  17 +-
 .../protocol/http/push/AsyncHTTPPushRequest.java   |   5 +-
 .../core/protocol/http/retry/HttpRetryer.java      |  34 -
 .../tcp/client/group/ClientGroupWrapper.java       |  14 +-
 .../client/processor/MessageTransferProcessor.java |   5 +-
 .../client/session/push/DownStreamMsgContext.java  |   6 +-
 .../tcp/client/session/push/SessionPusher.java     |   5 +-
 .../{EventMeshTcpRetryer.java => TcpRetryer.java}  |  38 +-
 .../client/session/send/UpStreamMsgContext.java    |  13 +-
 .../DelayRetryable.java => retry/Retryer.java}     |  21 +-
 .../runtime/core/timer/HashedWheelTimer.java       | 802 +++++++++++++++++++++
 .../eventmesh/runtime/core/timer/Timeout.java      |  56 ++
 .../apache/eventmesh/runtime/core/timer/Timer.java |  62 ++
 .../DelayRetryable.java => timer/TimerTask.java}   |  23 +-
 .../runtime/metrics/grpc/EventMeshGrpcMonitor.java |   2 +-
 .../runtime/metrics/http/HTTPMetricsServer.java    |   5 +-
 .../runtime/metrics/tcp/EventMeshTcpMonitor.java   |   4 +-
 38 files changed, 1136 insertions(+), 239 deletions(-)

diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
index 440e5b9a7..eaf595659 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
@@ -199,4 +199,9 @@ public class Constants {
     public static final String TCP = "TCP";
 
     public static final String GRPC = "GRPC";
+
+    public static final String OS_NAME_KEY = "os.name";
+
+    public static final String OS_WIN_PREFIX = "win";
+
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
index b5bd85fc1..0fe5c8757 100644
--- 
a/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
@@ -70,6 +70,7 @@ public class PrometheusSourceConnector implements Source {
             .withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_TIME))
             .withRetryListener(
                 new RetryListener() {
+
                     @Override
                     public <V> void onRetry(Attempt<V> attempt) {
                         long times = attempt.getAttemptNumber();
diff --git 
a/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java
 
b/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java
index cfbfd82f6..e09ff6388 100644
--- 
a/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java
+++ 
b/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java
@@ -19,7 +19,6 @@ package org.apache.eventmesh.metrics.api.model;
 
 import java.util.Collections;
 import java.util.LinkedList;
-import java.util.concurrent.DelayQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
@@ -106,18 +105,18 @@ public class HttpSummaryMetrics implements Metric {
 
     private final ThreadPoolExecutor pushMsgExecutor;
 
-    private final DelayQueue<?> httpFailedQueue;
+    private final RetrySummaryMetrics retrySummaryMetrics;
 
     private Lock lock = new ReentrantLock();
 
     public HttpSummaryMetrics(final ThreadPoolExecutor batchMsgExecutor,
         final ThreadPoolExecutor sendMsgExecutor,
         final ThreadPoolExecutor pushMsgExecutor,
-        final DelayQueue<?> httpFailedQueue) {
+        final RetrySummaryMetrics retrySummaryMetrics) {
         this.batchMsgExecutor = batchMsgExecutor;
         this.sendMsgExecutor = sendMsgExecutor;
         this.pushMsgExecutor = pushMsgExecutor;
-        this.httpFailedQueue = httpFailedQueue;
+        this.retrySummaryMetrics = retrySummaryMetrics;
     }
 
     public float avgHTTPCost() {
@@ -421,8 +420,8 @@ public class HttpSummaryMetrics implements Metric {
         return pushMsgExecutor.getQueue().size();
     }
 
-    public int getHttpRetryQueueSize() {
-        return httpFailedQueue.size();
+    public long getHttpRetryQueueSize() {
+        return retrySummaryMetrics.getPendingRetryTimeouts();
     }
 
     private float avg(LinkedList<Integer> linkedList) {
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/DelayRetryable.java
 
b/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/RetrySummaryMetrics.java
similarity index 80%
copy from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/DelayRetryable.java
copy to 
eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/RetrySummaryMetrics.java
index b60876a64..1962527ca 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/DelayRetryable.java
+++ 
b/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/RetrySummaryMetrics.java
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.protocol;
+package org.apache.eventmesh.metrics.api.model;
 
-import java.util.concurrent.Delayed;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 
-/**
- * Retry
- */
-public interface DelayRetryable extends Delayed {
+@Data
+@AllArgsConstructor
+public class RetrySummaryMetrics {
 
-    void retry() throws Exception;
+    private long pendingRetryTimeouts;
 }
diff --git 
a/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/TcpSummaryMetrics.java
 
b/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/TcpSummaryMetrics.java
index 02d8ec5eb..1e54c8e75 100644
--- 
a/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/TcpSummaryMetrics.java
+++ 
b/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/TcpSummaryMetrics.java
@@ -37,7 +37,7 @@ public class TcpSummaryMetrics implements Metric {
 
     private int allConnections;
 
-    private int retrySize;
+    private long retrySize;
 
     public TcpSummaryMetrics() {
         this.client2eventMeshMsgNum = new AtomicInteger(0);
@@ -130,11 +130,11 @@ public class TcpSummaryMetrics implements Metric {
         this.allConnections = allConnections;
     }
 
-    public void setRetrySize(int retrySize) {
+    public void setRetrySize(long retrySize) {
         this.retrySize = retrySize;
     }
 
-    public int getRetrySize() {
+    public long getRetrySize() {
         return retrySize;
     }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/response/GetMetricsResponse.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/response/GetMetricsResponse.java
index f436b59a6..268aedc0f 100755
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/response/GetMetricsResponse.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/response/GetMetricsResponse.java
@@ -55,13 +55,13 @@ public class GetMetricsResponse {
     private int batchMsgQueueSize;
     private int sendMsgQueueSize;
     private int pushMsgQueueSize;
-    private int retryHTTPQueueSize;
+    private long retryHTTPQueueSize;
     private float avgBatchSendMsgCost;
     private float avgSendMsgCost;
     private float avgReplyMsgCost;
 
     // TCP Metrics
-    private int retryTCPQueueSize;
+    private long retryTCPQueueSize;
     private double client2eventMeshTCPTPS;
     private double eventMesh2mqTCPTPS;
     private double mq2eventMeshTCPTPS;
@@ -102,12 +102,12 @@ public class GetMetricsResponse {
         @JsonProperty("batchMsgQueueSize") int batchMsgQueueSize,
         @JsonProperty("sendMsgQueueSize") int sendMsgQueueSize,
         @JsonProperty("pushMsgQueueSize") int pushMsgQueueSize,
-        @JsonProperty("retryHTTPQueueSize") int retryHTTPQueueSize,
+        @JsonProperty("retryHTTPQueueSize") long retryHTTPQueueSize,
         @JsonProperty("avgBatchSendMsgCost") float avgBatchSendMsgCost,
         @JsonProperty("avgSendMsgCost") float avgSendMsgCost,
         @JsonProperty("avgReplyMsgCost") float avgReplyMsgCost,
         // TCP Metrics
-        @JsonProperty("retryTCPQueueSize") int retryTCPQueueSize,
+        @JsonProperty("retryTCPQueueSize") long retryTCPQueueSize,
         @JsonProperty("client2eventMeshTCPTPS") double client2eventMeshTCPTPS,
         @JsonProperty("eventMesh2mqTCPTPS") double eventMesh2mqTCPTPS,
         @JsonProperty("mq2eventMeshTCPTPS") double mq2eventMeshTCPTPS,
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
index 41260322f..1ee3d69da 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
@@ -119,7 +119,6 @@ public class EventMeshGrpcServer {
         consumerManager.init();
 
         grpcRetryer = new GrpcRetryer(this);
-        grpcRetryer.init();
 
         int serverPort = eventMeshGrpcConfiguration.getGrpcServerPort();
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index 6cd52d6b2..508520eab 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -124,7 +124,6 @@ public class EventMeshHTTPServer extends AbstractHTTPServer 
{
                 pluginType -> 
metricsRegistries.add(MetricsPluginFactory.getMetricsRegistry(pluginType))));
 
         httpRetryer = new HttpRetryer(this);
-        httpRetryer.init();
 
         super.setMetrics(new HTTPMetricsServer(this, metricsRegistries));
         subscriptionManager = new 
SubscriptionManager(eventMeshHttpConfiguration.isEventMeshServerMetaStorageEnable(),
 metaStorage);
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
index 5aaf1c5c7..ae289799f 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
@@ -42,7 +42,7 @@ import 
org.apache.eventmesh.runtime.core.protocol.tcp.client.processor.Subscribe
 import 
org.apache.eventmesh.runtime.core.protocol.tcp.client.processor.UnSubscribeProcessor;
 import 
org.apache.eventmesh.runtime.core.protocol.tcp.client.rebalance.EventMeshRebalanceImpl;
 import 
org.apache.eventmesh.runtime.core.protocol.tcp.client.rebalance.EventMeshRebalanceService;
-import 
org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.EventMeshTcpRetryer;
+import 
org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.TcpRetryer;
 import org.apache.eventmesh.runtime.meta.MetaStorage;
 import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
 import org.apache.eventmesh.webhook.admin.AdminWebHookConfigOperationManager;
@@ -73,7 +73,7 @@ public class EventMeshTCPServer extends AbstractTCPServer {
 
     private ClientSessionGroupMapping clientSessionGroupMapping;
 
-    private EventMeshTcpRetryer tcpRetryer;
+    private TcpRetryer tcpRetryer;
 
     private AdminWebHookConfigOperationManager 
adminWebHookConfigOperationManage;
 
@@ -102,8 +102,7 @@ public class EventMeshTCPServer extends AbstractTCPServer {
             metricsPlugins -> metricsPlugins.forEach(
                 pluginType -> 
metricsRegistries.add(MetricsPluginFactory.getMetricsRegistry(pluginType))));
 
-        tcpRetryer = new EventMeshTcpRetryer(this);
-        tcpRetryer.init();
+        tcpRetryer = new TcpRetryer(this);
 
         clientSessionGroupMapping = new ClientSessionGroupMapping(this);
         clientSessionGroupMapping.init();
@@ -288,7 +287,7 @@ public class EventMeshTCPServer extends AbstractTCPServer {
         this.rateLimiter = rateLimiter;
     }
 
-    public EventMeshTcpRetryer getTcpRetryer() {
+    public TcpRetryer getTcpRetryer() {
         return tcpRetryer;
     }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/AbstractRetryer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/AbstractRetryer.java
index 5b0a64e71..8f1491346 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/AbstractRetryer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/AbstractRetryer.java
@@ -17,71 +17,72 @@
 
 package org.apache.eventmesh.runtime.core.protocol;
 
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
+import org.apache.eventmesh.runtime.core.retry.Retryer;
+import org.apache.eventmesh.runtime.core.timer.HashedWheelTimer;
+import org.apache.eventmesh.runtime.core.timer.Timer;
+import org.apache.eventmesh.runtime.core.timer.TimerTask;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
 
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public abstract class AbstractRetryer {
-
-    protected final DelayQueue<DelayRetryable> retrys = new DelayQueue<>();
-
-    protected ThreadPoolExecutor pool;
-
-    protected Thread dispatcher;
+public abstract class AbstractRetryer implements Retryer {
 
-    protected abstract void pushRetry(DelayRetryable delayRetryable);
+    private volatile Timer timer;
 
-    protected abstract void init();
+    private static final int MAX_PENDING_TIMEOUTS = 10000;
 
-    public int getRetrySize() {
-        return retrys.size();
+    @Override
+    public void newTimeout(TimerTask timerTask, long delay, TimeUnit timeUnit) 
{
+        log.debug("[HASHED-WHEEL-TIMER] executed! taskClass={}, nowTime={}",
+            timeUnit.getClass().getName(), new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss").format(new Date()));
+        timer.newTimeout(timerTask, delay, timeUnit);
     }
 
-    protected void initDispatcher() {
-        dispatcher = new Thread(() -> {
-            try {
-                DelayRetryable retryObj;
-                while (!Thread.currentThread().isInterrupted() && (retryObj = 
retrys.take()) != null) {
-                    final DelayRetryable delayRetryable = retryObj;
-                    pool.execute(() -> {
-                        try {
-                            delayRetryable.retry();
-                            if (log.isDebugEnabled()) {
-                                log.debug("retryObj : {}", delayRetryable);
-                            }
-                        } catch (Exception e) {
-                            log.error("retry-dispatcher error!", e);
-                        }
-                    });
+    @Override
+    public void start() {
+        if (timer == null) {
+            synchronized (this) {
+                if (timer == null) {
+                    timer = new HashedWheelTimer(
+                        new EventMeshThreadFactory("failback-cluster-timer", 
true),
+                        1,
+                        TimeUnit.SECONDS, 512, MAX_PENDING_TIMEOUTS);
                 }
-            } catch (Exception e) {
-                if (e instanceof InterruptedException) {
-                    Thread.currentThread().interrupt();
-                }
-                log.error("retry-dispatcher error!", e);
             }
-        }, "retry-dispatcher");
-        dispatcher.setDaemon(true);
-        log.info("EventMesh retryer inited......");
+        }
+        log.info("EventMesh retryer started......");
     }
 
+    @Override
     public void shutdown() {
-        dispatcher.interrupt();
-        pool.shutdown();
+        timer.stop();
         log.info("EventMesh retryer shutdown......");
     }
 
-    public void start() throws Exception {
-        dispatcher.start();
-        log.info("EventMesh retryer started......");
+    @Override
+    public long getPendingTimeouts() {
+        if (timer == null) {
+            return 0;
+        }
+        return timer.pendingTimeouts();
     }
 
-    /**
-     * Get fail-retry queue, this method is just used for metrics.
-     */
-    public DelayQueue<DelayRetryable> getRetryQueue() {
-        return retrys;
+    @Override
+    public void printState() {
+        if (timer == null) {
+            log.warn("No HashedWheelTimer is provided!");
+            return;
+        }
+        HashedWheelTimer hashedWheelTimer = (HashedWheelTimer) timer;
+
+        log.info("[Retry-HashedWheelTimer] state==================");
+        log.info("Running :{}", !hashedWheelTimer.isStop());
+        log.info("Pending Timeouts: {} | Cancelled Timeouts: {}", 
hashedWheelTimer.pendingTimeouts(), hashedWheelTimer.cancelledTimeouts());
+        log.info("========================================");
     }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java
index 939ef40dd..754ac7cfc 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java
@@ -17,14 +17,15 @@
 
 package org.apache.eventmesh.runtime.core.protocol;
 
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
+import org.apache.eventmesh.runtime.core.timer.Timeout;
+import org.apache.eventmesh.runtime.core.timer.Timer;
+import org.apache.eventmesh.runtime.core.timer.TimerTask;
 
-import javax.annotation.Nonnull;
+import java.util.concurrent.TimeUnit;
 
 import io.cloudevents.CloudEvent;
 
-public abstract class RetryContext implements DelayRetryable {
+public abstract class RetryContext implements TimerTask {
 
     public CloudEvent event;
 
@@ -34,20 +35,25 @@ public abstract class RetryContext implements 
DelayRetryable {
 
     public long executeTime = System.currentTimeMillis();
 
-    public RetryContext delay(long delay) {
-        this.executeTime = System.currentTimeMillis() + (retryTimes + 1) * 
delay;
-        return this;
+    public long getExecuteTime() {
+        return executeTime;
     }
 
-    @Override
-    public int compareTo(@Nonnull Delayed delayed) {
-        RetryContext obj = (RetryContext) delayed;
-        return Long.compare(this.executeTime, obj.executeTime);
+    protected void rePut(Timeout timeout, long tick, TimeUnit timeUnit) {
+        if (timeout == null) {
+            return;
+        }
+
+        Timer timer = timeout.timer();
+        if (timer.isStop() || timeout.isCancelled()) {
+            return;
+        }
 
+        timer.newTimeout(timeout.task(), tick, timeUnit);
     }
 
     @Override
-    public long getDelay(TimeUnit unit) {
-        return unit.convert(this.executeTime - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
+    public void setExecuteTimeHook(long executeTime) {
+        this.executeTime = executeTime;
     }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/SendMessageContext.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/SendMessageContext.java
index b8e97bc8a..92bf4a27a 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/SendMessageContext.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/SendMessageContext.java
@@ -23,6 +23,7 @@ import org.apache.eventmesh.api.exception.OnExceptionContext;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
 import org.apache.eventmesh.runtime.core.protocol.RetryContext;
+import org.apache.eventmesh.runtime.core.timer.Timeout;
 
 import org.apache.commons.lang3.time.DateFormatUtils;
 
@@ -100,7 +101,6 @@ public class SendMessageContext extends RetryContext {
         return sb.toString();
     }
 
-    @Override
     public void retry() throws Exception {
         if (eventMeshProducer == null) {
             logger.error("Exception happends during retry. EventMeshProducer 
is null.");
@@ -125,4 +125,9 @@ public class SendMessageContext extends RetryContext {
             }
         });
     }
+
+    @Override
+    public void run(Timeout timeout) throws Exception {
+        retry();
+    }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java
index ee9e6fbb4..f9f4a23ee 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java
@@ -32,11 +32,13 @@ import 
org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.RetryContext;
 import 
org.apache.eventmesh.runtime.core.protocol.grpc.consumer.EventMeshConsumer;
 import org.apache.eventmesh.runtime.core.protocol.grpc.retry.GrpcRetryer;
+import org.apache.eventmesh.runtime.core.timer.Timeout;
 
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.collect.Sets;
@@ -99,16 +101,10 @@ public abstract class AbstractPushRequest extends 
RetryContext {
         }
     }
 
-    @Override
-    public void retry() {
-        tryPushRequest();
-    }
-
     protected void delayRetry() {
         if (retryTimes < EventMeshConstants.DEFAULT_PUSH_RETRY_TIMES) {
             retryTimes++;
-            delay((long) retryTimes * 
EventMeshConstants.DEFAULT_PUSH_RETRY_TIME_DISTANCE_IN_MILLSECONDS);
-            grpcRetryer.pushRetry(this);
+            grpcRetryer.newTimeout(this, 
EventMeshConstants.DEFAULT_PUSH_RETRY_TIME_DISTANCE_IN_MILLSECONDS, 
TimeUnit.MILLISECONDS);
         } else {
             complete();
         }
@@ -160,4 +156,9 @@ public abstract class AbstractPushRequest extends 
RetryContext {
             
waitingRequests.get(handleMsgContext.getConsumerGroup()).remove(request);
         }
     }
+
+    @Override
+    public void run(Timeout timeout) throws Exception {
+        tryPushRequest();
+    }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
index cb10a9054..6493bfdeb 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
@@ -17,15 +17,9 @@
 
 package org.apache.eventmesh.runtime.core.protocol.grpc.retry;
 
-import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
 import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
 import org.apache.eventmesh.runtime.core.protocol.AbstractRetryer;
-import org.apache.eventmesh.runtime.core.protocol.DelayRetryable;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -38,25 +32,4 @@ public class GrpcRetryer extends AbstractRetryer {
         this.grpcConfiguration = 
eventMeshGrpcServer.getEventMeshGrpcConfiguration();
     }
 
-    @Override
-    public void pushRetry(DelayRetryable delayRetryable) {
-        if (retrys.size() >= 
grpcConfiguration.getEventMeshServerRetryBlockQueueSize()) {
-            log.error("[RETRY-QUEUE] is full!");
-            return;
-        }
-        retrys.offer(delayRetryable);
-    }
-
-    @Override
-    public void init() {
-        pool = new ThreadPoolExecutor(
-            grpcConfiguration.getEventMeshServerRetryThreadNum(),
-            grpcConfiguration.getEventMeshServerRetryThreadNum(), 60000, 
TimeUnit.MILLISECONDS,
-            new 
ArrayBlockingQueue<>(grpcConfiguration.getEventMeshServerRetryBlockQueueSize()),
-            new EventMeshThreadFactory("grpc-retry", true, 
Thread.NORM_PRIORITY),
-            new ThreadPoolExecutor.AbortPolicy());
-
-        initDispatcher();
-    }
-
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
index 8611b9ef8..2f69c99ad 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
@@ -257,7 +257,7 @@ public class BatchSendMessageProcessor implements 
HttpRequestProcessor {
                     @Override
                     public void onException(OnExceptionContext context) {
                         batchMessageLogger.warn("", context.getException());
-                        
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
+                        
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, 
TimeUnit.SECONDS);
                     }
 
                 });
@@ -275,7 +275,7 @@ public class BatchSendMessageProcessor implements 
HttpRequestProcessor {
                     @Override
                     public void onException(OnExceptionContext context) {
                         batchMessageLogger.warn("", context.getException());
-                        
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
+                        
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, 
TimeUnit.SECONDS);
                     }
 
                 });
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
index e035f6c20..a16578bbc 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
@@ -231,7 +231,7 @@ public class BatchSendMessageV2Processor implements 
HttpRequestProcessor {
                 @Override
                 public void onException(OnExceptionContext context) {
                     long batchEndTime = System.currentTimeMillis();
-                    
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
+                    
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, 
TimeUnit.SECONDS);
                     summaryMetrics.recordBatchSendMsgCost(batchEndTime - 
batchStartTime);
                     batchMessageLogger.error(
                         
"batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}",
@@ -246,7 +246,7 @@ public class BatchSendMessageV2Processor implements 
HttpRequestProcessor {
                     EventMeshUtil.stackTrace(e, 2),
                 SendMessageBatchV2ResponseBody.class);
             long batchEndTime = System.currentTimeMillis();
-            
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
+            
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, 
TimeUnit.SECONDS);
             summaryMetrics.recordBatchSendMsgCost(batchEndTime - 
batchStartTime);
             batchMessageLogger.error(
                 
"batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}",
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
index d330de773..69d7192de 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
@@ -274,7 +274,7 @@ public class SendAsyncEventProcessor implements 
AsyncHttpProcessor {
                     responseBodyMap.put(EventMeshConstants.RET_CODE, 
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode());
                     responseBodyMap.put(EventMeshConstants.RET_MSG, 
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
                         + EventMeshUtil.stackTrace(context.getException(), 2));
-                    
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10_000));
+                    
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, 
TimeUnit.SECONDS);
                     
handlerSpecific.getTraceOperation().exceptionLatestTrace(context.getException(),
                         
EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), 
sendMessageContext.getEvent()));
 
@@ -286,7 +286,7 @@ public class SendAsyncEventProcessor implements 
AsyncHttpProcessor {
                 }
             });
         } catch (Exception ex) {
-            
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10_000));
+            
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, 
TimeUnit.SECONDS);
             
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR,
 responseHeaderMap, responseBodyMap, null);
 
             final long endTime = System.currentTimeMillis();
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
index 72ad671f6..6881034ac 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
@@ -280,7 +280,7 @@ public class SendAsyncMessageProcessor implements 
HttpRequestProcessor {
                                     + 
EventMeshUtil.stackTrace(context.getException(), 2)));
                         asyncContext.onComplete(err, handler);
 
-                        
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
+                        
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, 
TimeUnit.SECONDS);
                         long endTime = System.currentTimeMillis();
                         summaryMetrics.recordSendMsgFailed();
                         summaryMetrics.recordSendMsgCost(endTime - startTime);
@@ -301,7 +301,7 @@ public class SendAsyncMessageProcessor implements 
HttpRequestProcessor {
                 EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR, null, 
SendMessageResponseBody.class);
             spanWithException(event, protocolVersion, 
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR);
 
-            
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
+            
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, 
TimeUnit.SECONDS);
             long endTime = System.currentTimeMillis();
             
MESSAGE_LOGGER.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
                 endTime - startTime, topic, bizNo, uniqueId, ex);
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
index 7a86d9ca7..d5046f5e9 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
@@ -305,7 +305,7 @@ public class SendAsyncRemoteEventProcessor implements 
AsyncHttpProcessor {
                     responseBodyMap.put(EventMeshConstants.RET_CODE, 
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode());
                     responseBodyMap.put(EventMeshConstants.RET_MSG, 
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
                         + EventMeshUtil.stackTrace(context.getException(), 2));
-                    
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10_000));
+                    
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, 
TimeUnit.SECONDS);
                     
handlerSpecific.getTraceOperation().exceptionLatestTrace(context.getException(),
                         
EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), 
sendMessageContext.getEvent()));
 
@@ -318,7 +318,7 @@ public class SendAsyncRemoteEventProcessor implements 
AsyncHttpProcessor {
                 }
             });
         } catch (Exception ex) {
-            
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10_000));
+            
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, 
TimeUnit.SECONDS);
             
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR,
 responseHeaderMap,
                 responseBodyMap, null);
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index 0efa5bf62..e6902ebf9 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -279,7 +279,7 @@ public class SendSyncMessageProcessor implements 
HttpRequestProcessor {
                                     + EventMeshUtil.stackTrace(e, 2)));
                     asyncContext.onComplete(err, handler);
 
-                    
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10_000));
+                    
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, 
TimeUnit.SECONDS);
                     if (log.isErrorEnabled()) {
                         log.error(
                             
"message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
@@ -294,7 +294,7 @@ public class SendSyncMessageProcessor implements 
HttpRequestProcessor {
                 EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR,
                 EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg() + 
EventMeshUtil.stackTrace(ex, 2),
                 SendMessageResponseBody.class);
-            
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10_000));
+            
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, 
TimeUnit.SECONDS);
             final long endTime = System.currentTimeMillis();
             summaryMetrics.recordSendMsgFailed();
             summaryMetrics.recordSendMsgCost(endTime - startTime);
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/SendMessageContext.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/SendMessageContext.java
index eb2ecf998..0e262aa68 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/SendMessageContext.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/SendMessageContext.java
@@ -23,6 +23,7 @@ import org.apache.eventmesh.api.exception.OnExceptionContext;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.core.protocol.RetryContext;
+import org.apache.eventmesh.runtime.core.timer.Timeout;
 
 import org.apache.commons.lang3.time.DateFormatUtils;
 
@@ -121,7 +122,6 @@ public class SendMessageContext extends RetryContext {
         return sb.toString();
     }
 
-    @Override
     public void retry() throws Exception {
         if (eventMeshProducer == null) {
             log.error("Exception happends during retry. EventMeshProduceer is 
null.");
@@ -148,4 +148,9 @@ public class SendMessageContext extends RetryContext {
 
         });
     }
+
+    @Override
+    public void run(Timeout timeout) throws Exception {
+        retry();
+    }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
index 885ad2c63..d3cc71a17 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
@@ -23,6 +23,7 @@ import 
org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.RetryContext;
 import 
org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext;
 import org.apache.eventmesh.runtime.core.protocol.http.retry.HttpRetryer;
+import org.apache.eventmesh.runtime.core.timer.Timeout;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
@@ -31,6 +32,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.collect.Lists;
@@ -76,11 +78,10 @@ public abstract class AbstractHTTPPushRequest extends 
RetryContext {
     public void tryHTTPRequest() {
     }
 
-    public void delayRetry(long delayTime) {
+    public void delayRetry(long delayTime, TimeUnit timeUnit) {
         if (retryTimes < EventMeshConstants.DEFAULT_PUSH_RETRY_TIMES && 
delayTime > 0) {
             retryTimes++;
-            delay(delayTime);
-            retryer.pushRetry(this);
+            retryer.newTimeout(this, delayTime, timeUnit);
         } else {
             complete.compareAndSet(Boolean.FALSE, Boolean.TRUE);
         }
@@ -89,8 +90,7 @@ public abstract class AbstractHTTPPushRequest extends 
RetryContext {
     public void delayRetry() {
         if (retryTimes < EventMeshConstants.DEFAULT_PUSH_RETRY_TIMES) {
             retryTimes++;
-            delay((long) retryTimes * 
EventMeshConstants.DEFAULT_PUSH_RETRY_TIME_DISTANCE_IN_MILLSECONDS);
-            retryer.pushRetry(this);
+            retryer.newTimeout(this, 
EventMeshConstants.DEFAULT_PUSH_RETRY_TIME_DISTANCE_IN_MILLSECONDS, 
TimeUnit.MILLISECONDS);
         } else {
             complete.compareAndSet(Boolean.FALSE, Boolean.TRUE);
         }
@@ -128,4 +128,11 @@ public abstract class AbstractHTTPPushRequest extends 
RetryContext {
             delayRetry();
         }
     }
+
+    protected abstract void doRetry();
+
+    @Override
+    public void run(Timeout timeout) throws Exception {
+        doRetry();
+    }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index 45f59779f..213bfeef2 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -58,6 +58,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -305,7 +306,7 @@ public class AsyncHTTPPushRequest extends 
AbstractHTTPPushRequest {
             // retry after the time specified by the header
             Optional<Header> optHeader = 
Arrays.stream(httpResponse.getHeaders("Retry-After")).findAny();
             if (optHeader.isPresent() && 
StringUtils.isNumeric(optHeader.get().getValue())) {
-                delayRetry(Long.parseLong(optHeader.get().getValue()));
+                delayRetry(Long.parseLong(optHeader.get().getValue()), 
TimeUnit.MILLISECONDS);
             }
             return false;
         } else if (httpStatus == HttpStatus.SC_GONE || httpStatus == 
HttpStatus.SC_UNSUPPORTED_MEDIA_TYPE) {
@@ -365,7 +366,7 @@ public class AsyncHTTPPushRequest extends 
AbstractHTTPPushRequest {
     }
 
     @Override
-    public void retry() {
+    public void doRetry() {
         tryHTTPRequest();
     }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
index 7bf2a9cc2..ce1cb6ca5 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
@@ -17,51 +17,17 @@
 
 package org.apache.eventmesh.runtime.core.protocol.http.retry;
 
-import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.core.protocol.AbstractRetryer;
-import org.apache.eventmesh.runtime.core.protocol.DelayRetryable;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class HttpRetryer extends AbstractRetryer {
 
-    private final Logger retryLogger = LoggerFactory.getLogger("retry");
-
     private final EventMeshHTTPServer eventMeshHTTPServer;
 
     public HttpRetryer(EventMeshHTTPServer eventMeshHTTPServer) {
         this.eventMeshHTTPServer = eventMeshHTTPServer;
     }
-
-    @Override
-    public void pushRetry(DelayRetryable delayRetryable) {
-        if (retrys.size() >= 
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryBlockQSize())
 {
-            retryLogger.error("[RETRY-QUEUE] is full!");
-            return;
-        }
-        retrys.offer(delayRetryable);
-    }
-
-    @Override
-    public void init() {
-        pool = new 
ThreadPoolExecutor(eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryThreadNum(),
-            
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryThreadNum(),
-            60000,
-            TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(
-                
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryBlockQSize()),
-            new EventMeshThreadFactory("http-retry", true, 
Thread.NORM_PRIORITY),
-            new ThreadPoolExecutor.AbortPolicy());
-
-        initDispatcher();
-    }
-
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 317e7b91b..61b264806 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -36,7 +36,7 @@ import 
org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper;
 import 
org.apache.eventmesh.runtime.core.protocol.tcp.client.group.dispatch.DownstreamDispatchStrategy;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
 import 
org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
-import 
org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.EventMeshTcpRetryer;
+import 
org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.TcpRetryer;
 import 
org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.UpStreamMsgContext;
 import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
@@ -82,7 +82,7 @@ public class ClientGroupWrapper {
 
     private final EventMeshTCPServer eventMeshTCPServer;
 
-    private EventMeshTcpRetryer eventMeshTcpRetryer;
+    private TcpRetryer tcpRetryer;
 
     private EventMeshTcpMonitor eventMeshTcpMonitor;
 
@@ -129,7 +129,7 @@ public class ClientGroupWrapper {
         this.group = group;
         this.eventMeshTCPServer = eventMeshTCPServer;
         this.eventMeshTCPConfiguration = 
eventMeshTCPServer.getEventMeshTCPConfiguration();
-        this.eventMeshTcpRetryer = eventMeshTCPServer.getTcpRetryer();
+        this.tcpRetryer = eventMeshTCPServer.getTcpRetryer();
         this.eventMeshTcpMonitor =
             
Preconditions.checkNotNull(eventMeshTCPServer.getEventMeshTcpMonitor());
         this.downstreamDispatchStrategy = downstreamDispatchStrategy;
@@ -710,12 +710,12 @@ public class ClientGroupWrapper {
         this.eventMeshTCPConfiguration = eventMeshTCPConfiguration;
     }
 
-    public EventMeshTcpRetryer getEventMeshTcpRetryer() {
-        return eventMeshTcpRetryer;
+    public TcpRetryer getTcpRetryer() {
+        return tcpRetryer;
     }
 
-    public void setEventMeshTcpRetryer(EventMeshTcpRetryer 
eventMeshTcpRetryer) {
-        this.eventMeshTcpRetryer = eventMeshTcpRetryer;
+    public void setTcpRetryer(TcpRetryer tcpRetryer) {
+        this.tcpRetryer = tcpRetryer;
     }
 
     public EventMeshTcpMonitor getEventMeshTcpMonitor() {
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java
index bd997ab49..ccbb98255 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java
@@ -259,10 +259,9 @@ public class MessageTransferProcessor implements 
TcpProcessor {
                 // retry
                 UpStreamMsgContext upStreamMsgContext = new UpStreamMsgContext(
                     session, event, pkg.getHeader(), startTime, 
taskExecuteTime);
-                upStreamMsgContext.delay(10000);
                 Objects.requireNonNull(
-                    
session.getClientGroupWrapper().get()).getEventMeshTcpRetryer()
-                    .pushRetry(upStreamMsgContext);
+                    session.getClientGroupWrapper().get()).getTcpRetryer()
+                    .newTimeout(upStreamMsgContext, 10, TimeUnit.SECONDS);
 
                 session.getSender().getFailMsgCount().incrementAndGet();
                 MESSAGE_LOGGER
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
index 60ecda3d3..8b00bf579 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
@@ -24,6 +24,7 @@ import 
org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
 import org.apache.eventmesh.runtime.core.protocol.RetryContext;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.core.timer.Timeout;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.ServerGlobal;
 
@@ -121,7 +122,6 @@ public class DownStreamMsgContext extends RetryContext {
             + '}';
     }
 
-    @Override
     public void retry() {
         try {
             log.info("retry downStream msg 
start,seq:{},retryTimes:{},bizSeq:{}", this.seq, this.retryTimes,
@@ -194,4 +194,8 @@ public class DownStreamMsgContext extends RetryContext {
         downStreamMsgContext.consumer.updateOffset(msgExts, 
downStreamMsgContext.consumeConcurrentlyContext);
     }
 
+    @Override
+    public void run(Timeout timeout) throws Exception {
+        retry();
+    }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
index cdef6d32f..95e3d4d73 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
@@ -38,6 +38,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import java.util.Arrays;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
@@ -138,8 +139,8 @@ public class SessionPusher {
                             long delayTime = SubscriptionType.SYNC == 
downStreamMsgContext.getSubscriptionItem().getType()
                                 ? 
session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetrySyncDelayInMills()
                                 : 
session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryAsyncDelayInMills();
-                            downStreamMsgContext.delay(delayTime);
-                            
Objects.requireNonNull(session.getClientGroupWrapper().get()).getEventMeshTcpRetryer().pushRetry(downStreamMsgContext);
+                            
Objects.requireNonNull(session.getClientGroupWrapper().get()).getTcpRetryer()
+                                .newTimeout(downStreamMsgContext, delayTime, 
TimeUnit.MILLISECONDS);
                         } else {
                             deliveredMsgsCount.incrementAndGet();
                             log.info("downstreamMsg success,seq:{}, 
retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq,
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/TcpRetryer.java
similarity index 65%
rename from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
rename to 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/TcpRetryer.java
index cc2511c73..2150eb562 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/TcpRetryer.java
@@ -17,35 +17,24 @@
 
 package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry;
 
-import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.common.protocol.SubscriptionType;
 import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
 import org.apache.eventmesh.runtime.core.protocol.AbstractRetryer;
-import org.apache.eventmesh.runtime.core.protocol.DelayRetryable;
 import org.apache.eventmesh.runtime.core.protocol.RetryContext;
 import 
org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
+import org.apache.eventmesh.runtime.core.timer.TimerTask;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
-import org.apache.eventmesh.runtime.util.ThreadPoolHelper;
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public class EventMeshTcpRetryer extends AbstractRetryer {
+public class TcpRetryer extends AbstractRetryer {
 
     private EventMeshTCPServer eventMeshTCPServer;
 
-    private final ThreadPoolExecutor pool = new ThreadPoolExecutor(3,
-        3,
-        60000,
-        TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1000),
-        new EventMeshThreadFactory("eventMesh-tcp-retry", true),
-        new ThreadPoolExecutor.AbortPolicy());
-
-    public EventMeshTcpRetryer(EventMeshTCPServer eventMeshTCPServer) {
+    public TcpRetryer(EventMeshTCPServer eventMeshTCPServer) {
         this.eventMeshTCPServer = eventMeshTCPServer;
     }
 
@@ -58,14 +47,8 @@ public class EventMeshTcpRetryer extends AbstractRetryer {
     }
 
     @Override
-    public void pushRetry(DelayRetryable delayRetryable) {
-        RetryContext retryContext = (RetryContext) delayRetryable;
-        if (retrys.size() >= 
eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryQueueSize())
 {
-            log.error("pushRetry fail, retrys is too much,allow max 
retryQueueSize:{}, retryTimes:{}, seq:{}, bizSeq:{}",
-                
eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryQueueSize(),
 retryContext.retryTimes,
-                retryContext.seq, 
EventMeshUtil.getMessageBizSeq(retryContext.event));
-            return;
-        }
+    public void newTimeout(TimerTask timerTask, long delay, TimeUnit timeUnit) 
{
+        RetryContext retryContext = (RetryContext) timerTask;
 
         int maxRetryTimes = 
eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgAsyncRetryTimes();
         if (retryContext instanceof DownStreamMsgContext) {
@@ -81,17 +64,10 @@ public class EventMeshTcpRetryer extends AbstractRetryer {
             return;
         }
 
-        retrys.offer(retryContext);
+        super.newTimeout(timerTask, delay, timeUnit);
+
         log.info("pushRetry success,seq:{}, retryTimes:{}, bizSeq:{}", 
retryContext.seq, retryContext.retryTimes,
             EventMeshUtil.getMessageBizSeq(retryContext.event));
-    }
-
-    @Override
-    public void init() {
-        initDispatcher();
-    }
 
-    public void printRetryThreadPoolState() {
-        ThreadPoolHelper.printState(pool);
     }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
index 9d77082b8..ef716801f 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
@@ -27,6 +27,7 @@ import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.RetryContext;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.core.timer.Timeout;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.Utils;
 
@@ -34,6 +35,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.time.DateFormatUtils;
 
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 import io.cloudevents.CloudEvent;
 
@@ -83,7 +85,6 @@ public class UpStreamMsgContext extends RetryContext {
             + ",executeTime=" + DateFormatUtils.format(executeTime, 
EventMeshConstants.DATE_FORMAT);
     }
 
-    @Override
     public void retry() {
         log.info("retry upStream msg start,seq:{},retryTimes:{},bizSeq:{}", 
this.seq, this.retryTimes,
             EventMeshUtil.getMessageBizSeq(this.event));
@@ -134,9 +135,8 @@ public class UpStreamMsgContext extends RetryContext {
                 session.getSender().getUpstreamBuff().release();
 
                 // retry
-                // reset delay time
-                retryContext.delay(10000);
-                
Objects.requireNonNull(session.getClientGroupWrapper().get()).getEventMeshTcpRetryer().pushRetry(retryContext);
+                
Objects.requireNonNull(session.getClientGroupWrapper().get()).getTcpRetryer()
+                    .newTimeout(retryContext, 10, TimeUnit.SECONDS);
 
                 session.getSender().getFailMsgCount().incrementAndGet();
                 log.error("upstreamMsg mq message error|user={}|callback 
cost={}, errMsg={}", session.getClient(),
@@ -161,4 +161,9 @@ public class UpStreamMsgContext extends RetryContext {
                 return cmd;
         }
     }
+
+    @Override
+    public void run(Timeout timeout) throws Exception {
+        retry();
+    }
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/DelayRetryable.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
similarity index 69%
copy from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/DelayRetryable.java
copy to 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
index b60876a64..b004c6aab 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/DelayRetryable.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
@@ -15,14 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.protocol;
+package org.apache.eventmesh.runtime.core.retry;
 
-import java.util.concurrent.Delayed;
+import org.apache.eventmesh.runtime.core.timer.TimerTask;
+
+import java.util.concurrent.TimeUnit;
 
 /**
- * Retry
+ * Retryer interface.
  */
-public interface DelayRetryable extends Delayed {
+public interface Retryer {
+
+    void start();
+
+    void shutdown();
+
+    long getPendingTimeouts();
+
+    void printState();
+
+    void newTimeout(TimerTask timerTask, long delay, TimeUnit timeUnit);
 
-    void retry() throws Exception;
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/HashedWheelTimer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/HashedWheelTimer.java
new file mode 100644
index 000000000..443f0a85f
--- /dev/null
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/HashedWheelTimer.java
@@ -0,0 +1,802 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.timer;
+
+import static org.apache.eventmesh.common.Constants.OS_NAME_KEY;
+import static org.apache.eventmesh.common.Constants.OS_WIN_PREFIX;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link Timer} optimized for approximated I/O timeout scheduling.
+ *
+ * <h3>Tick Duration</h3>
+ * <p>
+ * As described with 'approximated', this timer does not execute the scheduled
+ * {@link TimerTask} on time.  {@link HashedWheelTimer}, on every tick, will
+ * check if there are any {@link TimerTask}s behind the schedule and execute
+ * them.
+ * <p>
+ * You can increase or decrease the accuracy of the execution timing by
+ * specifying smaller or larger tick duration in the constructor.  In most
+ * network applications, I/O timeout does not need to be accurate.  Therefore,
+ * the default tick duration is 100 milliseconds, and you will not need to try
+ * different configurations in most cases.
+ *
+ * <h3>Ticks per Wheel (Wheel Size)</h3>
+ * <p>
+ * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
+ * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
+ * function is 'deadline of the task'.  The default number of ticks per wheel
+ * (i.e. the size of the wheel) is 512.  You could specify a larger value
+ * if you are going to schedule a lot of timeouts.
+ *
+ * <h3>Do not create many instances.</h3>
+ * <p>
+ * {@link HashedWheelTimer} creates a new thread whenever it is instantiated 
and
+ * started.  Therefore, you should make sure to create only one instance and
+ * share it across your application.  One of the common mistakes, that makes
+ * your application unresponsive, is to create a new instance for every 
connection.
+ *
+ * <h3>Implementation Details</h3>
+ * <p>
+ * {@link HashedWheelTimer} is based on
+ * <a href="http://cseweb.ucsd.edu/users/varghese/";>George Varghese</a> and
+ * Tony Lauck's paper,
+ * <a href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z";>'Hashed
+ * and Hierarchical Timing Wheels: data structures to efficiently implement a
+ * timer facility'</a>.  More comprehensive slides are located
+ * <a 
href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt";>here</a>.
+ */
+public class HashedWheelTimer implements Timer {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(HashedWheelTimer.class);
+
+    private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
+    private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new 
AtomicBoolean();
+    private static final int INSTANCE_COUNT_LIMIT = 64;
+    private static final AtomicIntegerFieldUpdater<HashedWheelTimer> 
WORKER_STATE_UPDATER =
+        AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, 
"workerState");
+
+    private final Worker worker = new Worker();
+    private final Thread workerThread;
+
+    private static final int WORKER_STATE_INIT = 0;
+    private static final int WORKER_STATE_STARTED = 1;
+    private static final int WORKER_STATE_SHUTDOWN = 2;
+
+    /**
+     * 0 - init, 1 - started, 2 - shut down
+     */
+    @SuppressWarnings({"unused", "FieldMayBeFinal"})
+    private volatile int workerState;
+
+    private final long tickDuration;
+    private final HashedWheelBucket[] wheel;
+    private final int mask;
+    private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
+    private final Queue<HashedWheelTimeout> timeouts = new 
LinkedBlockingQueue<>();
+    private final Queue<HashedWheelTimeout> cancelledTimeouts = new 
LinkedBlockingQueue<>();
+    private final AtomicLong pendingTimeouts = new AtomicLong(0);
+    private final long maxPendingTimeouts;
+
+    private volatile long startTime;
+
+    /**
+     * Creates a new timer with the default thread factory
+     * ({@link Executors#defaultThreadFactory()}), default tick duration, and
+     * default number of ticks per wheel.
+     */
+    public HashedWheelTimer() {
+        this(Executors.defaultThreadFactory());
+    }
+
+    /**
+     * Creates a new timer with the default thread factory
+     * ({@link Executors#defaultThreadFactory()}) and default number of ticks
+     * per wheel.
+     *
+     * @param tickDuration the duration between tick
+     * @param unit         the time unit of the {@code tickDuration}
+     * @throws NullPointerException     if {@code unit} is {@code null}
+     * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
+     */
+    public HashedWheelTimer(long tickDuration, TimeUnit unit) {
+        this(Executors.defaultThreadFactory(), tickDuration, unit);
+    }
+
+    /**
+     * Creates a new timer with the default thread factory
+     * ({@link Executors#defaultThreadFactory()}).
+     *
+     * @param tickDuration  the duration between tick
+     * @param unit          the time unit of the {@code tickDuration}
+     * @param ticksPerWheel the size of the wheel
+     * @throws NullPointerException     if {@code unit} is {@code null}
+     * @throws IllegalArgumentException if either of {@code tickDuration} and 
{@code ticksPerWheel} is &lt;= 0
+     */
+    public HashedWheelTimer(long tickDuration, TimeUnit unit, int 
ticksPerWheel) {
+        this(Executors.defaultThreadFactory(), tickDuration, unit, 
ticksPerWheel);
+    }
+
+    /**
+     * Creates a new timer with the default tick duration and default number of
+     * ticks per wheel.
+     *
+     * @param threadFactory a {@link ThreadFactory} that creates a
+     *                      background {@link Thread} which is dedicated to
+     *                      {@link TimerTask} execution.
+     * @throws NullPointerException if {@code threadFactory} is {@code null}
+     */
+    public HashedWheelTimer(ThreadFactory threadFactory) {
+        this(threadFactory, 100, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Creates a new timer with the default number of ticks per wheel.
+     *
+     * @param threadFactory a {@link ThreadFactory} that creates a
+     *                      background {@link Thread} which is dedicated to
+     *                      {@link TimerTask} execution.
+     * @param tickDuration  the duration between tick
+     * @param unit          the time unit of the {@code tickDuration}
+     * @throws NullPointerException     if either of {@code threadFactory} and 
{@code unit} is {@code null}
+     * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
+     */
+    public HashedWheelTimer(
+        ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
+        this(threadFactory, tickDuration, unit, 512);
+    }
+
+    /**
+     * Creates a new timer.
+     *
+     * @param threadFactory a {@link ThreadFactory} that creates a
+     *                      background {@link Thread} which is dedicated to
+     *                      {@link TimerTask} execution.
+     * @param tickDuration  the duration between tick
+     * @param unit          the time unit of the {@code tickDuration}
+     * @param ticksPerWheel the size of the wheel
+     * @throws NullPointerException     if either of {@code threadFactory} and 
{@code unit} is {@code null}
+     * @throws IllegalArgumentException if either of {@code tickDuration} and 
{@code ticksPerWheel} is &lt;= 0
+     */
+    public HashedWheelTimer(
+        ThreadFactory threadFactory,
+        long tickDuration, TimeUnit unit, int ticksPerWheel) {
+        this(threadFactory, tickDuration, unit, ticksPerWheel, -1);
+    }
+
+    /**
+     * Creates a new timer.
+     *
+     * @param threadFactory      a {@link ThreadFactory} that creates a
+     *                           background {@link Thread} which is dedicated 
to
+     *                           {@link TimerTask} execution.
+     * @param tickDuration       the duration between tick
+     * @param unit               the time unit of the {@code tickDuration}
+     * @param ticksPerWheel      the size of the wheel
+     * @param maxPendingTimeouts The maximum number of pending timeouts after 
which call to
+     *                           {@code newTimeout} will result in
+     *                           {@link RejectedExecutionException}
+     *                           being thrown. No maximum pending timeouts 
limit is assumed if
+     *                           this value is 0 or negative.
+     * @throws NullPointerException     if either of {@code threadFactory} and 
{@code unit} is {@code null}
+     * @throws IllegalArgumentException if either of {@code tickDuration} and 
{@code ticksPerWheel} is &lt;= 0
+     */
+    public HashedWheelTimer(
+        ThreadFactory threadFactory,
+        long tickDuration, TimeUnit unit, int ticksPerWheel,
+        long maxPendingTimeouts) {
+
+        if (threadFactory == null) {
+            throw new NullPointerException("threadFactory");
+        }
+        if (unit == null) {
+            throw new NullPointerException("unit");
+        }
+        if (tickDuration <= 0) {
+            throw new IllegalArgumentException("tickDuration must be greater 
than 0: " + tickDuration);
+        }
+        if (ticksPerWheel <= 0) {
+            throw new IllegalArgumentException("ticksPerWheel must be greater 
than 0: " + ticksPerWheel);
+        }
+
+        // Normalize ticksPerWheel to power of two and initialize the wheel.
+        wheel = createWheel(ticksPerWheel);
+        mask = wheel.length - 1;
+
+        // Convert tickDuration to nanos.
+        this.tickDuration = unit.toNanos(tickDuration);
+
+        // Prevent overflow.
+        if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
+            throw new IllegalArgumentException(String.format(
+                "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
+                tickDuration, Long.MAX_VALUE / wheel.length));
+        }
+        workerThread = threadFactory.newThread(worker);
+
+        this.maxPendingTimeouts = maxPendingTimeouts;
+
+        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && 
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
+            reportTooManyInstances();
+        }
+    }
+
+    private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
+        if (ticksPerWheel <= 0) {
+            throw new IllegalArgumentException(
+                "ticksPerWheel must be greater than 0: " + ticksPerWheel);
+        }
+        if (ticksPerWheel > 1073741824) {
+            throw new IllegalArgumentException(
+                "ticksPerWheel may not be greater than 2^30: " + 
ticksPerWheel);
+        }
+
+        ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
+        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
+        for (int i = 0; i < wheel.length; i++) {
+            wheel[i] = new HashedWheelBucket();
+        }
+        return wheel;
+    }
+
+    private static int normalizeTicksPerWheel(int ticksPerWheel) {
+        int normalizedTicksPerWheel = ticksPerWheel - 1;
+        normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 1;
+        normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 2;
+        normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 4;
+        normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 8;
+        normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 16;
+        return normalizedTicksPerWheel + 1;
+    }
+
+    /**
+     * Starts the background thread explicitly.  The background thread will
+     * start automatically on demand even if you did not call this method.
+     *
+     * @throws IllegalStateException if this timer has been
+     *                               {@linkplain #stop() stopped} already
+     */
+    public void start() {
+        switch (WORKER_STATE_UPDATER.get(this)) {
+            case WORKER_STATE_INIT:
+                if (WORKER_STATE_UPDATER.compareAndSet(this, 
WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
+                    workerThread.start();
+                }
+                break;
+            case WORKER_STATE_STARTED:
+                break;
+            case WORKER_STATE_SHUTDOWN:
+                throw new IllegalStateException("cannot be started once 
stopped");
+            default:
+                throw new Error("Invalid WorkerState");
+        }
+
+        // Wait until the startTime is initialized by the worker.
+        while (startTime == 0) {
+            try {
+                startTimeInitialized.await();
+            } catch (InterruptedException ignore) {
+                // Ignore - it will be ready very soon.
+            }
+        }
+    }
+
+    @Override
+    public Set<Timeout> stop() {
+        if (Thread.currentThread() == workerThread) {
+            throw new IllegalStateException(
+                HashedWheelTimer.class.getSimpleName() + ".stop() cannot be 
called from " + TimerTask.class.getSimpleName());
+        }
+
+        if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, 
WORKER_STATE_SHUTDOWN)) {
+            // workerState can be 0 or 2 at this moment - let it always be 2.
+            if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != 
WORKER_STATE_SHUTDOWN) {
+                INSTANCE_COUNTER.decrementAndGet();
+            }
+
+            return Collections.emptySet();
+        }
+
+        try {
+            boolean interrupted = false;
+            while (workerThread.isAlive()) {
+                workerThread.interrupt();
+                try {
+                    workerThread.join(100);
+                } catch (InterruptedException ignored) {
+                    interrupted = true;
+                }
+            }
+
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        } finally {
+            INSTANCE_COUNTER.decrementAndGet();
+        }
+        return worker.unprocessedTimeouts();
+    }
+
+    @Override
+    public boolean isStop() {
+        return WORKER_STATE_SHUTDOWN == WORKER_STATE_UPDATER.get(this);
+    }
+
+    @Override
+    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
+        if (task == null) {
+            throw new NullPointerException("task");
+        }
+        if (unit == null) {
+            throw new NullPointerException("unit");
+        }
+
+        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
+
+        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > 
maxPendingTimeouts) {
+            pendingTimeouts.decrementAndGet();
+            throw new RejectedExecutionException("Number of pending timeouts ("
+                + pendingTimeoutsCount + ") is greater than or equal to 
maximum allowed pending "
+                + "timeouts (" + maxPendingTimeouts + ")");
+        }
+
+        start();
+
+        // Add the timeout to the timeout queue which will be processed on the 
next tick.
+        // During processing all the queued HashedWheelTimeouts will be added 
to the correct HashedWheelBucket.
+        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
+
+        // Guard against overflow.
+        if (delay > 0 && deadline < 0) {
+            deadline = Long.MAX_VALUE;
+        }
+        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, 
deadline);
+        timeouts.add(timeout);
+        return timeout;
+    }
+
+    /**
+     * Returns the number of pending timeouts of this {@link Timer}.
+     */
+    public long pendingTimeouts() {
+        return pendingTimeouts.get();
+    }
+
+    /**
+     * Returns the number of cancelled timeouts of this {@link Timer}.
+     */
+    public long cancelledTimeouts() {
+        return cancelledTimeouts.size();
+    }
+
+    private static void reportTooManyInstances() {
+        logger.error("You are creating too many HashedWheelTimer instances. is 
a shared resource that must be reused across the JVM, "
+            + "so that only a few instances are created.");
+    }
+
+    private final class Worker implements Runnable {
+
+        private final Set<Timeout> unprocessedTimeouts = new 
HashSet<Timeout>();
+
+        private long tick;
+
+        @Override
+        public void run() {
+            // Initialize the startTime.
+            startTime = System.nanoTime();
+            if (startTime == 0) {
+                // We use 0 as an indicator for the uninitialized value here, 
so make sure it's not 0 when initialized.
+                startTime = 1;
+            }
+
+            // Notify the other threads waiting for the initialization at 
start().
+            startTimeInitialized.countDown();
+
+            do {
+                final long deadline = waitForNextTick();
+                if (deadline > 0) {
+                    int idx = (int) (tick & mask);
+                    processCancelledTasks();
+                    HashedWheelBucket bucket =
+                        wheel[idx];
+                    transferTimeoutsToBuckets();
+                    bucket.expireTimeouts(deadline);
+                    tick++;
+                }
+            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == 
WORKER_STATE_STARTED);
+
+            // Fill the unprocessedTimeouts so we can return them from stop() 
method.
+            for (HashedWheelBucket bucket : wheel) {
+                bucket.clearTimeouts(unprocessedTimeouts);
+            }
+            for (;;) {
+                HashedWheelTimeout timeout = timeouts.poll();
+                if (timeout == null) {
+                    break;
+                }
+                if (!timeout.isCancelled()) {
+                    unprocessedTimeouts.add(timeout);
+                }
+            }
+            processCancelledTasks();
+        }
+
+        private void transferTimeoutsToBuckets() {
+            // transfer only max. 100000 timeouts per tick to prevent a thread 
to stale the workerThread when it just
+            // adds new timeouts in a loop.
+            for (int i = 0; i < 100000; i++) {
+                HashedWheelTimeout timeout = timeouts.poll();
+                if (timeout == null) {
+                    // all processed
+                    break;
+                }
+                if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
+                    // Was cancelled in the meantime.
+                    continue;
+                }
+
+                long calculated = timeout.deadline / tickDuration;
+                timeout.remainingRounds = (calculated - tick) / wheel.length;
+
+                // Ensure we don't schedule for past.
+                final long ticks = Math.max(calculated, tick);
+                int stopIndex = (int) (ticks & mask);
+
+                HashedWheelBucket bucket = wheel[stopIndex];
+                bucket.addTimeout(timeout);
+            }
+        }
+
+        private void processCancelledTasks() {
+            for (;;) {
+                HashedWheelTimeout timeout = cancelledTimeouts.poll();
+                if (timeout == null) {
+                    // all processed
+                    break;
+                }
+                try {
+                    timeout.remove();
+                } catch (Throwable t) {
+                    if (logger.isWarnEnabled()) {
+                        logger.warn("An exception was thrown while process a 
cancellation task", t);
+                    }
+                }
+            }
+        }
+
+        /**
+         * calculate goal nanoTime from startTime and current tick number,
+         * then wait until that goal has been reached.
+         *
+         * @return Long.MIN_VALUE if received a shutdown request,
+         * current time otherwise (with Long.MIN_VALUE changed by +1)
+         */
+        private long waitForNextTick() {
+            long deadline = tickDuration * (tick + 1);
+
+            for (;;) {
+                final long currentTime = System.nanoTime() - startTime;
+                long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
+
+                if (sleepTimeMs <= 0) {
+                    if (currentTime == Long.MIN_VALUE) {
+                        return -Long.MAX_VALUE;
+                    } else {
+                        return currentTime;
+                    }
+                }
+                if (isWindows()) {
+                    sleepTimeMs = sleepTimeMs / 10 * 10;
+                }
+
+                try {
+                    Thread.sleep(sleepTimeMs);
+                } catch (InterruptedException ignored) {
+                    if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == 
WORKER_STATE_SHUTDOWN) {
+                        return Long.MIN_VALUE;
+                    }
+                }
+            }
+        }
+
+        Set<Timeout> unprocessedTimeouts() {
+            return Collections.unmodifiableSet(unprocessedTimeouts);
+        }
+    }
+
+    private static final class HashedWheelTimeout implements Timeout {
+
+        private static final int ST_INIT = 0;
+        private static final int ST_CANCELLED = 1;
+        private static final int ST_EXPIRED = 2;
+        private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> 
STATE_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, 
"state");
+
+        private final HashedWheelTimer timer;
+        private final TimerTask task;
+        private final long deadline;
+
+        private volatile int state = ST_INIT;
+
+        /**
+         * RemainingRounds will be calculated and set by 
Worker.transferTimeoutsToBuckets() before the
+         * HashedWheelTimeout will be added to the correct HashedWheelBucket.
+         */
+        long remainingRounds;
+
+        /**
+         * This will be used to chain timeouts in HashedWheelTimerBucket via a 
double-linked-list.
+         * As only the workerThread will act on it there is no need for 
synchronization / volatile.
+         */
+        HashedWheelTimeout next;
+        HashedWheelTimeout prev;
+
+        /**
+         * The bucket to which the timeout was added
+         */
+        HashedWheelBucket bucket;
+
+        HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long 
deadline) {
+            this.timer = timer;
+            this.task = task;
+            this.deadline = deadline;
+        }
+
+        @Override
+        public Timer timer() {
+            return timer;
+        }
+
+        @Override
+        public TimerTask task() {
+            return task;
+        }
+
+        @Override
+        public boolean cancel() {
+            // only update the state it will be removed from HashedWheelBucket 
on next tick.
+            if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
+                return false;
+            }
+            // If a task should be canceled we put this to another queue which 
will be processed on each tick.
+            // So this means that we will have a GC latency of max. 1 tick 
duration which is good enough. This way we
+            // can make again use of our LinkedBlockingQueue and so minimize 
the locking / overhead as much as possible.
+            timer.cancelledTimeouts.add(this);
+            return true;
+        }
+
+        void remove() {
+            HashedWheelBucket bucket = this.bucket;
+            if (bucket != null) {
+                bucket.remove(this);
+            } else {
+                timer.pendingTimeouts.decrementAndGet();
+            }
+        }
+
+        public boolean compareAndSetState(int expected, int state) {
+            return STATE_UPDATER.compareAndSet(this, expected, state);
+        }
+
+        public int state() {
+            return state;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return state() == ST_CANCELLED;
+        }
+
+        @Override
+        public boolean isExpired() {
+            return state() == ST_EXPIRED;
+        }
+
+        public void expire() {
+            if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
+                return;
+            }
+
+            try {
+                task.run(this);
+                task.setExecuteTimeHook(System.currentTimeMillis());
+            } catch (Throwable t) {
+                if (logger.isWarnEnabled()) {
+                    logger.warn("An exception was thrown by " + 
TimerTask.class.getSimpleName() + '.', t);
+                }
+            }
+        }
+
+        @Override
+        public String toString() {
+            final long currentTime = System.nanoTime();
+            long remaining = deadline - currentTime + timer.startTime;
+            String simpleClassName = getClass().getSimpleName();
+
+            StringBuilder buf = new StringBuilder(192)
+                .append(simpleClassName)
+                .append('(')
+                .append("deadline: ");
+            if (remaining > 0) {
+                buf.append(remaining)
+                    .append(" ns later");
+            } else if (remaining < 0) {
+                buf.append(-remaining)
+                    .append(" ns ago");
+            } else {
+                buf.append("now");
+            }
+
+            if (isCancelled()) {
+                buf.append(", cancelled");
+            }
+
+            return buf.append(", task: ")
+                .append(task())
+                .append(')')
+                .toString();
+        }
+    }
+
+    /**
+     * Bucket that stores HashedWheelTimeouts. These are stored in a 
linked-list like datastructure to allow easy
+     * removal of HashedWheelTimeouts in the middle. Also the 
HashedWheelTimeout act as nodes themself and so no
+     * extra object creation is needed.
+     */
+    private static final class HashedWheelBucket {
+
+        /**
+         * Used for the linked-list datastructure
+         */
+        private HashedWheelTimeout head;
+        private HashedWheelTimeout tail;
+
+        /**
+         * Add {@link HashedWheelTimeout} to this bucket.
+         */
+        void addTimeout(HashedWheelTimeout timeout) {
+            assert timeout.bucket == null;
+            timeout.bucket = this;
+            if (head == null) {
+                head = tail = timeout;
+            } else {
+                tail.next = timeout;
+                timeout.prev = tail;
+                tail = timeout;
+            }
+        }
+
+        /**
+         * Expire all {@link HashedWheelTimeout}s for the given {@code 
deadline}.
+         */
+        void expireTimeouts(long deadline) {
+            HashedWheelTimeout timeout = head;
+
+            // process all timeouts
+            while (timeout != null) {
+                HashedWheelTimeout next = timeout.next;
+                if (timeout.remainingRounds <= 0) {
+                    next = remove(timeout);
+                    if (timeout.deadline <= deadline) {
+                        timeout.expire();
+                    } else {
+                        // The timeout was placed into a wrong slot. This 
should never happen.
+                        throw new IllegalStateException(String.format(
+                            "timeout.deadline (%d) > deadline (%d)", 
timeout.deadline, deadline));
+                    }
+                } else if (timeout.isCancelled()) {
+                    next = remove(timeout);
+                } else {
+                    timeout.remainingRounds--;
+                }
+                timeout = next;
+            }
+        }
+
+        public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
+            HashedWheelTimeout next = timeout.next;
+            // remove timeout that was either processed or cancelled by 
updating the linked-list
+            if (timeout.prev != null) {
+                timeout.prev.next = next;
+            }
+            if (timeout.next != null) {
+                timeout.next.prev = timeout.prev;
+            }
+
+            if (timeout == head) {
+                // if timeout is also the tail we need to adjust the entry too
+                if (timeout == tail) {
+                    tail = null;
+                    head = null;
+                } else {
+                    head = next;
+                }
+            } else if (timeout == tail) {
+                // if the timeout is the tail modify the tail to be the prev 
node.
+                tail = timeout.prev;
+            }
+            // null out prev, next and bucket to allow for GC.
+            timeout.prev = null;
+            timeout.next = null;
+            timeout.bucket = null;
+            timeout.timer.pendingTimeouts.decrementAndGet();
+            return next;
+        }
+
+        /**
+         * Clear this bucket and return all not expired / cancelled {@link 
Timeout}s.
+         */
+        void clearTimeouts(Set<Timeout> set) {
+            for (;;) {
+                HashedWheelTimeout timeout = pollTimeout();
+                if (timeout == null) {
+                    return;
+                }
+                if (timeout.isExpired() || timeout.isCancelled()) {
+                    continue;
+                }
+                set.add(timeout);
+            }
+        }
+
+        private HashedWheelTimeout pollTimeout() {
+            HashedWheelTimeout head = this.head;
+            if (head == null) {
+                return null;
+            }
+            HashedWheelTimeout next = head.next;
+            if (next == null) {
+                tail = this.head = null;
+            } else {
+                this.head = next;
+                next.prev = null;
+            }
+
+            // null out prev and next to allow for GC.
+            head.next = null;
+            head.prev = null;
+            head.bucket = null;
+            return head;
+        }
+    }
+
+    private static final boolean IS_OS_WINDOWS = 
System.getProperty(OS_NAME_KEY, 
"").toLowerCase(Locale.US).contains(OS_WIN_PREFIX);
+
+    private boolean isWindows() {
+        return IS_OS_WINDOWS;
+    }
+}
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timeout.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timeout.java
new file mode 100644
index 000000000..12cea1aea
--- /dev/null
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timeout.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.timer;
+
+/**
+ * A handle associated with a {@link TimerTask} that is returned by a
+ * {@link Timer}.
+ */
+public interface Timeout {
+
+    /**
+     * Returns the {@link Timer} that created this handle.
+     */
+    Timer timer();
+
+    /**
+     * Returns the {@link TimerTask} which is associated with this handle.
+     */
+    TimerTask task();
+
+    /**
+     * Returns {@code true} if and only if the {@link TimerTask} associated
+     * with this handle has been expired.
+     */
+    boolean isExpired();
+
+    /**
+     * Returns {@code true} if and only if the {@link TimerTask} associated
+     * with this handle has been cancelled.
+     */
+    boolean isCancelled();
+
+    /**
+     * Attempts to cancel the {@link TimerTask} associated with this handle.
+     * If the task has been executed or cancelled already, it will return with
+     * no side effect.
+     *
+     * @return True if the cancellation completed successfully, otherwise false
+     */
+    boolean cancel();
+}
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timer.java
new file mode 100644
index 000000000..076f8d04b
--- /dev/null
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.timer;
+
+import java.util.Set;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Schedules {@link TimerTask}s for one-time future execution in a background
+ * thread.
+ */
+public interface Timer {
+
+    /**
+     * Schedules the specified {@link TimerTask} for one-time execution after
+     * the specified delay.
+     *
+     * @return a handle which is associated with the specified task
+     * @throws IllegalStateException      if this timer has been {@linkplain 
#stop() stopped} already
+     * @throws RejectedExecutionException if the pending timeouts are too many 
and creating new timeout
+     *                                    can cause instability in the system.
+     */
+    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
+
+    /**
+     * Releases all resources acquired by this {@link Timer} and cancels all
+     * tasks which were scheduled but not executed yet.
+     *
+     * @return the handles associated with the tasks which were canceled by
+     * this method
+     */
+    Set<Timeout> stop();
+
+    /**
+     * the timer is stop
+     *
+     * @return true for stop
+     */
+    boolean isStop();
+
+    /**
+     * the pending timeouts
+     * @return count of pending timeout
+     */
+    long pendingTimeouts();
+}
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/DelayRetryable.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/TimerTask.java
similarity index 55%
rename from 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/DelayRetryable.java
rename to 
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/TimerTask.java
index b60876a64..ae8447f04 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/DelayRetryable.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/TimerTask.java
@@ -15,14 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.runtime.core.protocol;
+package org.apache.eventmesh.runtime.core.timer;
 
-import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
 
 /**
- * Retry
+ * A task which is executed after the delay specified with
+ * {@link Timer#newTimeout(TimerTask, long, TimeUnit)} (TimerTask, long, 
TimeUnit)}.
  */
-public interface DelayRetryable extends Delayed {
+public interface TimerTask {
 
-    void retry() throws Exception;
+    /**
+     * Executed after the delay specified with
+     * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
+     *
+     * @param timeout a handle which is associated with this task
+     */
+    void run(Timeout timeout) throws Exception;
+
+    /**
+     * Hook method to set the execute time.
+     * @param executeTime execute time
+     */
+    void setExecuteTimeHook(long executeTime);
 }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/grpc/EventMeshGrpcMonitor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/grpc/EventMeshGrpcMonitor.java
index 8b0175fbe..fbd3be616 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/grpc/EventMeshGrpcMonitor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/grpc/EventMeshGrpcMonitor.java
@@ -63,7 +63,7 @@ public class EventMeshGrpcMonitor {
         scheduleTask = scheduler.scheduleAtFixedRate(() -> {
             grpcSummaryMetrics.refreshTpsMetrics(SCHEDULE_PERIOD_MILLS);
             grpcSummaryMetrics.clearAllMessageCounter();
-            
grpcSummaryMetrics.setRetrySize(eventMeshGrpcServer.getGrpcRetryer().getRetrySize());
+            
grpcSummaryMetrics.setRetrySize(eventMeshGrpcServer.getGrpcRetryer().getPendingTimeouts());
             
grpcSummaryMetrics.setSubscribeTopicNum(eventMeshGrpcServer.getConsumerManager().getAllConsumerTopic().size());
         }, DELAY_MILLS, SCHEDULE_PERIOD_MILLS, TimeUnit.MILLISECONDS);
     }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
index aa557c941..8bd740898 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
@@ -20,6 +20,7 @@ package org.apache.eventmesh.runtime.metrics.http;
 import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.metrics.api.MetricsRegistry;
 import org.apache.eventmesh.metrics.api.model.HttpSummaryMetrics;
+import org.apache.eventmesh.metrics.api.model.RetrySummaryMetrics;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 
 import java.util.List;
@@ -50,7 +51,7 @@ public class HTTPMetricsServer {
             eventMeshHTTPServer.getHttpThreadPoolGroup().getBatchMsgExecutor(),
             eventMeshHTTPServer.getHttpThreadPoolGroup().getSendMsgExecutor(),
             eventMeshHTTPServer.getHttpThreadPoolGroup().getPushMsgExecutor(),
-            eventMeshHTTPServer.getHttpRetryer().getRetryQueue());
+            new 
RetrySummaryMetrics(eventMeshHTTPServer.getHttpRetryer().getPendingTimeouts()));
 
         init();
     }
@@ -167,7 +168,7 @@ public class HTTPMetricsServer {
                 
eventMeshHTTPServer.getHttpThreadPoolGroup().getBatchMsgExecutor().getQueue().size(),
                 
eventMeshHTTPServer.getHttpThreadPoolGroup().getSendMsgExecutor().getQueue().size(),
                 
eventMeshHTTPServer.getHttpThreadPoolGroup().getPushMsgExecutor().getQueue().size(),
-                eventMeshHTTPServer.getHttpRetryer().getRetrySize());
+                eventMeshHTTPServer.getHttpRetryer().getPendingTimeouts());
         }
 
         if (log.isInfoEnabled()) {
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
index 97d2962d9..1808a5c3c 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
@@ -128,10 +128,10 @@ public class EventMeshTcpMonitor {
 
         monitorThreadPoolTask = 
eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler().scheduleAtFixedRate(()
 -> {
             
eventMeshTCPServer.getEventMeshRebalanceService().printRebalanceThreadPoolState();
-            eventMeshTCPServer.getTcpRetryer().printRetryThreadPoolState();
+            eventMeshTCPServer.getTcpRetryer().printState();
 
             // monitor retry queue size
-            
tcpSummaryMetrics.setRetrySize(eventMeshTCPServer.getTcpRetryer().getRetrySize());
+            
tcpSummaryMetrics.setRetrySize(eventMeshTCPServer.getTcpRetryer().getPendingTimeouts());
             appLogger.info(
                 MonitorMetricConstants.EVENTMESH_MONITOR_FORMAT_COMMON,
                 EventMeshConstants.PROTOCOL_TCP,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to