This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new f046a2ef4 [ISSUE #4402] Refactor the retry module with
HashedWheelTimer. (#4505)
f046a2ef4 is described below
commit f046a2ef4dfa299504218a1385948cad03980a87
Author: yanrongzhen <[email protected]>
AuthorDate: Wed Oct 25 14:51:33 2023 +0800
[ISSUE #4402] Refactor the retry module with HashedWheelTimer. (#4505)
* Add HashedWheelTimer.
* Integrate HashedWheelTimer to AbstractRetryer.
* Refactor Retryer, RetryContxt.
* Remove DelayQueue dependency.
* Add retry debug log.
* Fix checkstyle.
* Optimize Retry interface.
* Remove init method.
---
.../org/apache/eventmesh/common/Constants.java | 5 +
.../connector/PrometheusSourceConnector.java | 1 +
.../metrics/api/model/HttpSummaryMetrics.java | 11 +-
.../metrics/api/model/RetrySummaryMetrics.java | 14 +-
.../metrics/api/model/TcpSummaryMetrics.java | 6 +-
.../runtime/admin/response/GetMetricsResponse.java | 8 +-
.../runtime/boot/EventMeshGrpcServer.java | 1 -
.../runtime/boot/EventMeshHTTPServer.java | 1 -
.../eventmesh/runtime/boot/EventMeshTCPServer.java | 9 +-
.../runtime/core/protocol/AbstractRetryer.java | 95 +--
.../runtime/core/protocol/RetryContext.java | 32 +-
.../protocol/grpc/producer/SendMessageContext.java | 7 +-
.../protocol/grpc/push/AbstractPushRequest.java | 15 +-
.../core/protocol/grpc/retry/GrpcRetryer.java | 27 -
.../http/processor/BatchSendMessageProcessor.java | 4 +-
.../processor/BatchSendMessageV2Processor.java | 4 +-
.../http/processor/SendAsyncEventProcessor.java | 4 +-
.../http/processor/SendAsyncMessageProcessor.java | 4 +-
.../processor/SendAsyncRemoteEventProcessor.java | 4 +-
.../http/processor/SendSyncMessageProcessor.java | 4 +-
.../protocol/http/producer/SendMessageContext.java | 7 +-
.../http/push/AbstractHTTPPushRequest.java | 17 +-
.../protocol/http/push/AsyncHTTPPushRequest.java | 5 +-
.../core/protocol/http/retry/HttpRetryer.java | 34 -
.../tcp/client/group/ClientGroupWrapper.java | 14 +-
.../client/processor/MessageTransferProcessor.java | 5 +-
.../client/session/push/DownStreamMsgContext.java | 6 +-
.../tcp/client/session/push/SessionPusher.java | 5 +-
.../{EventMeshTcpRetryer.java => TcpRetryer.java} | 38 +-
.../client/session/send/UpStreamMsgContext.java | 13 +-
.../DelayRetryable.java => retry/Retryer.java} | 21 +-
.../runtime/core/timer/HashedWheelTimer.java | 802 +++++++++++++++++++++
.../eventmesh/runtime/core/timer/Timeout.java | 56 ++
.../apache/eventmesh/runtime/core/timer/Timer.java | 62 ++
.../DelayRetryable.java => timer/TimerTask.java} | 23 +-
.../runtime/metrics/grpc/EventMeshGrpcMonitor.java | 2 +-
.../runtime/metrics/http/HTTPMetricsServer.java | 5 +-
.../runtime/metrics/tcp/EventMeshTcpMonitor.java | 4 +-
38 files changed, 1136 insertions(+), 239 deletions(-)
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
index 440e5b9a7..eaf595659 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
@@ -199,4 +199,9 @@ public class Constants {
public static final String TCP = "TCP";
public static final String GRPC = "GRPC";
+
+ public static final String OS_NAME_KEY = "os.name";
+
+ public static final String OS_WIN_PREFIX = "win";
+
}
diff --git
a/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
index b5bd85fc1..0fe5c8757 100644
---
a/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-prometheus/src/main/java/org/apache/eventmesh/connector/prometheus/source/connector/PrometheusSourceConnector.java
@@ -70,6 +70,7 @@ public class PrometheusSourceConnector implements Source {
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_TIME))
.withRetryListener(
new RetryListener() {
+
@Override
public <V> void onRetry(Attempt<V> attempt) {
long times = attempt.getAttemptNumber();
diff --git
a/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java
b/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java
index cfbfd82f6..e09ff6388 100644
---
a/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java
+++
b/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java
@@ -19,7 +19,6 @@ package org.apache.eventmesh.metrics.api.model;
import java.util.Collections;
import java.util.LinkedList;
-import java.util.concurrent.DelayQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@@ -106,18 +105,18 @@ public class HttpSummaryMetrics implements Metric {
private final ThreadPoolExecutor pushMsgExecutor;
- private final DelayQueue<?> httpFailedQueue;
+ private final RetrySummaryMetrics retrySummaryMetrics;
private Lock lock = new ReentrantLock();
public HttpSummaryMetrics(final ThreadPoolExecutor batchMsgExecutor,
final ThreadPoolExecutor sendMsgExecutor,
final ThreadPoolExecutor pushMsgExecutor,
- final DelayQueue<?> httpFailedQueue) {
+ final RetrySummaryMetrics retrySummaryMetrics) {
this.batchMsgExecutor = batchMsgExecutor;
this.sendMsgExecutor = sendMsgExecutor;
this.pushMsgExecutor = pushMsgExecutor;
- this.httpFailedQueue = httpFailedQueue;
+ this.retrySummaryMetrics = retrySummaryMetrics;
}
public float avgHTTPCost() {
@@ -421,8 +420,8 @@ public class HttpSummaryMetrics implements Metric {
return pushMsgExecutor.getQueue().size();
}
- public int getHttpRetryQueueSize() {
- return httpFailedQueue.size();
+ public long getHttpRetryQueueSize() {
+ return retrySummaryMetrics.getPendingRetryTimeouts();
}
private float avg(LinkedList<Integer> linkedList) {
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/DelayRetryable.java
b/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/RetrySummaryMetrics.java
similarity index 80%
copy from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/DelayRetryable.java
copy to
eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/RetrySummaryMetrics.java
index b60876a64..1962527ca 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/DelayRetryable.java
+++
b/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/RetrySummaryMetrics.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.protocol;
+package org.apache.eventmesh.metrics.api.model;
-import java.util.concurrent.Delayed;
+import lombok.AllArgsConstructor;
+import lombok.Data;
-/**
- * Retry
- */
-public interface DelayRetryable extends Delayed {
+@Data
+@AllArgsConstructor
+public class RetrySummaryMetrics {
- void retry() throws Exception;
+ private long pendingRetryTimeouts;
}
diff --git
a/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/TcpSummaryMetrics.java
b/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/TcpSummaryMetrics.java
index 02d8ec5eb..1e54c8e75 100644
---
a/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/TcpSummaryMetrics.java
+++
b/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/TcpSummaryMetrics.java
@@ -37,7 +37,7 @@ public class TcpSummaryMetrics implements Metric {
private int allConnections;
- private int retrySize;
+ private long retrySize;
public TcpSummaryMetrics() {
this.client2eventMeshMsgNum = new AtomicInteger(0);
@@ -130,11 +130,11 @@ public class TcpSummaryMetrics implements Metric {
this.allConnections = allConnections;
}
- public void setRetrySize(int retrySize) {
+ public void setRetrySize(long retrySize) {
this.retrySize = retrySize;
}
- public int getRetrySize() {
+ public long getRetrySize() {
return retrySize;
}
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/response/GetMetricsResponse.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/response/GetMetricsResponse.java
index f436b59a6..268aedc0f 100755
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/response/GetMetricsResponse.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/response/GetMetricsResponse.java
@@ -55,13 +55,13 @@ public class GetMetricsResponse {
private int batchMsgQueueSize;
private int sendMsgQueueSize;
private int pushMsgQueueSize;
- private int retryHTTPQueueSize;
+ private long retryHTTPQueueSize;
private float avgBatchSendMsgCost;
private float avgSendMsgCost;
private float avgReplyMsgCost;
// TCP Metrics
- private int retryTCPQueueSize;
+ private long retryTCPQueueSize;
private double client2eventMeshTCPTPS;
private double eventMesh2mqTCPTPS;
private double mq2eventMeshTCPTPS;
@@ -102,12 +102,12 @@ public class GetMetricsResponse {
@JsonProperty("batchMsgQueueSize") int batchMsgQueueSize,
@JsonProperty("sendMsgQueueSize") int sendMsgQueueSize,
@JsonProperty("pushMsgQueueSize") int pushMsgQueueSize,
- @JsonProperty("retryHTTPQueueSize") int retryHTTPQueueSize,
+ @JsonProperty("retryHTTPQueueSize") long retryHTTPQueueSize,
@JsonProperty("avgBatchSendMsgCost") float avgBatchSendMsgCost,
@JsonProperty("avgSendMsgCost") float avgSendMsgCost,
@JsonProperty("avgReplyMsgCost") float avgReplyMsgCost,
// TCP Metrics
- @JsonProperty("retryTCPQueueSize") int retryTCPQueueSize,
+ @JsonProperty("retryTCPQueueSize") long retryTCPQueueSize,
@JsonProperty("client2eventMeshTCPTPS") double client2eventMeshTCPTPS,
@JsonProperty("eventMesh2mqTCPTPS") double eventMesh2mqTCPTPS,
@JsonProperty("mq2eventMeshTCPTPS") double mq2eventMeshTCPTPS,
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
index 41260322f..1ee3d69da 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java
@@ -119,7 +119,6 @@ public class EventMeshGrpcServer {
consumerManager.init();
grpcRetryer = new GrpcRetryer(this);
- grpcRetryer.init();
int serverPort = eventMeshGrpcConfiguration.getGrpcServerPort();
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index 6cd52d6b2..508520eab 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -124,7 +124,6 @@ public class EventMeshHTTPServer extends AbstractHTTPServer
{
pluginType ->
metricsRegistries.add(MetricsPluginFactory.getMetricsRegistry(pluginType))));
httpRetryer = new HttpRetryer(this);
- httpRetryer.init();
super.setMetrics(new HTTPMetricsServer(this, metricsRegistries));
subscriptionManager = new
SubscriptionManager(eventMeshHttpConfiguration.isEventMeshServerMetaStorageEnable(),
metaStorage);
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
index 5aaf1c5c7..ae289799f 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
@@ -42,7 +42,7 @@ import
org.apache.eventmesh.runtime.core.protocol.tcp.client.processor.Subscribe
import
org.apache.eventmesh.runtime.core.protocol.tcp.client.processor.UnSubscribeProcessor;
import
org.apache.eventmesh.runtime.core.protocol.tcp.client.rebalance.EventMeshRebalanceImpl;
import
org.apache.eventmesh.runtime.core.protocol.tcp.client.rebalance.EventMeshRebalanceService;
-import
org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.EventMeshTcpRetryer;
+import
org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.TcpRetryer;
import org.apache.eventmesh.runtime.meta.MetaStorage;
import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
import org.apache.eventmesh.webhook.admin.AdminWebHookConfigOperationManager;
@@ -73,7 +73,7 @@ public class EventMeshTCPServer extends AbstractTCPServer {
private ClientSessionGroupMapping clientSessionGroupMapping;
- private EventMeshTcpRetryer tcpRetryer;
+ private TcpRetryer tcpRetryer;
private AdminWebHookConfigOperationManager
adminWebHookConfigOperationManage;
@@ -102,8 +102,7 @@ public class EventMeshTCPServer extends AbstractTCPServer {
metricsPlugins -> metricsPlugins.forEach(
pluginType ->
metricsRegistries.add(MetricsPluginFactory.getMetricsRegistry(pluginType))));
- tcpRetryer = new EventMeshTcpRetryer(this);
- tcpRetryer.init();
+ tcpRetryer = new TcpRetryer(this);
clientSessionGroupMapping = new ClientSessionGroupMapping(this);
clientSessionGroupMapping.init();
@@ -288,7 +287,7 @@ public class EventMeshTCPServer extends AbstractTCPServer {
this.rateLimiter = rateLimiter;
}
- public EventMeshTcpRetryer getTcpRetryer() {
+ public TcpRetryer getTcpRetryer() {
return tcpRetryer;
}
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/AbstractRetryer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/AbstractRetryer.java
index 5b0a64e71..8f1491346 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/AbstractRetryer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/AbstractRetryer.java
@@ -17,71 +17,72 @@
package org.apache.eventmesh.runtime.core.protocol;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
+import org.apache.eventmesh.runtime.core.retry.Retryer;
+import org.apache.eventmesh.runtime.core.timer.HashedWheelTimer;
+import org.apache.eventmesh.runtime.core.timer.Timer;
+import org.apache.eventmesh.runtime.core.timer.TimerTask;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public abstract class AbstractRetryer {
-
- protected final DelayQueue<DelayRetryable> retrys = new DelayQueue<>();
-
- protected ThreadPoolExecutor pool;
-
- protected Thread dispatcher;
+public abstract class AbstractRetryer implements Retryer {
- protected abstract void pushRetry(DelayRetryable delayRetryable);
+ private volatile Timer timer;
- protected abstract void init();
+ private static final int MAX_PENDING_TIMEOUTS = 10000;
- public int getRetrySize() {
- return retrys.size();
+ @Override
+ public void newTimeout(TimerTask timerTask, long delay, TimeUnit timeUnit)
{
+ log.debug("[HASHED-WHEEL-TIMER] executed! taskClass={}, nowTime={}",
+ timeUnit.getClass().getName(), new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss").format(new Date()));
+ timer.newTimeout(timerTask, delay, timeUnit);
}
- protected void initDispatcher() {
- dispatcher = new Thread(() -> {
- try {
- DelayRetryable retryObj;
- while (!Thread.currentThread().isInterrupted() && (retryObj =
retrys.take()) != null) {
- final DelayRetryable delayRetryable = retryObj;
- pool.execute(() -> {
- try {
- delayRetryable.retry();
- if (log.isDebugEnabled()) {
- log.debug("retryObj : {}", delayRetryable);
- }
- } catch (Exception e) {
- log.error("retry-dispatcher error!", e);
- }
- });
+ @Override
+ public void start() {
+ if (timer == null) {
+ synchronized (this) {
+ if (timer == null) {
+ timer = new HashedWheelTimer(
+ new EventMeshThreadFactory("failback-cluster-timer",
true),
+ 1,
+ TimeUnit.SECONDS, 512, MAX_PENDING_TIMEOUTS);
}
- } catch (Exception e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- log.error("retry-dispatcher error!", e);
}
- }, "retry-dispatcher");
- dispatcher.setDaemon(true);
- log.info("EventMesh retryer inited......");
+ }
+ log.info("EventMesh retryer started......");
}
+ @Override
public void shutdown() {
- dispatcher.interrupt();
- pool.shutdown();
+ timer.stop();
log.info("EventMesh retryer shutdown......");
}
- public void start() throws Exception {
- dispatcher.start();
- log.info("EventMesh retryer started......");
+ @Override
+ public long getPendingTimeouts() {
+ if (timer == null) {
+ return 0;
+ }
+ return timer.pendingTimeouts();
}
- /**
- * Get fail-retry queue, this method is just used for metrics.
- */
- public DelayQueue<DelayRetryable> getRetryQueue() {
- return retrys;
+ @Override
+ public void printState() {
+ if (timer == null) {
+ log.warn("No HashedWheelTimer is provided!");
+ return;
+ }
+ HashedWheelTimer hashedWheelTimer = (HashedWheelTimer) timer;
+
+ log.info("[Retry-HashedWheelTimer] state==================");
+ log.info("Running :{}", !hashedWheelTimer.isStop());
+ log.info("Pending Timeouts: {} | Cancelled Timeouts: {}",
hashedWheelTimer.pendingTimeouts(), hashedWheelTimer.cancelledTimeouts());
+ log.info("========================================");
}
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java
index 939ef40dd..754ac7cfc 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java
@@ -17,14 +17,15 @@
package org.apache.eventmesh.runtime.core.protocol;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
+import org.apache.eventmesh.runtime.core.timer.Timeout;
+import org.apache.eventmesh.runtime.core.timer.Timer;
+import org.apache.eventmesh.runtime.core.timer.TimerTask;
-import javax.annotation.Nonnull;
+import java.util.concurrent.TimeUnit;
import io.cloudevents.CloudEvent;
-public abstract class RetryContext implements DelayRetryable {
+public abstract class RetryContext implements TimerTask {
public CloudEvent event;
@@ -34,20 +35,25 @@ public abstract class RetryContext implements
DelayRetryable {
public long executeTime = System.currentTimeMillis();
- public RetryContext delay(long delay) {
- this.executeTime = System.currentTimeMillis() + (retryTimes + 1) *
delay;
- return this;
+ public long getExecuteTime() {
+ return executeTime;
}
- @Override
- public int compareTo(@Nonnull Delayed delayed) {
- RetryContext obj = (RetryContext) delayed;
- return Long.compare(this.executeTime, obj.executeTime);
+ protected void rePut(Timeout timeout, long tick, TimeUnit timeUnit) {
+ if (timeout == null) {
+ return;
+ }
+
+ Timer timer = timeout.timer();
+ if (timer.isStop() || timeout.isCancelled()) {
+ return;
+ }
+ timer.newTimeout(timeout.task(), tick, timeUnit);
}
@Override
- public long getDelay(TimeUnit unit) {
- return unit.convert(this.executeTime - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
+ public void setExecuteTimeHook(long executeTime) {
+ this.executeTime = executeTime;
}
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/SendMessageContext.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/SendMessageContext.java
index b8e97bc8a..92bf4a27a 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/SendMessageContext.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/SendMessageContext.java
@@ -23,6 +23,7 @@ import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.core.protocol.RetryContext;
+import org.apache.eventmesh.runtime.core.timer.Timeout;
import org.apache.commons.lang3.time.DateFormatUtils;
@@ -100,7 +101,6 @@ public class SendMessageContext extends RetryContext {
return sb.toString();
}
- @Override
public void retry() throws Exception {
if (eventMeshProducer == null) {
logger.error("Exception happends during retry. EventMeshProducer
is null.");
@@ -125,4 +125,9 @@ public class SendMessageContext extends RetryContext {
}
});
}
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ retry();
+ }
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java
index ee9e6fbb4..f9f4a23ee 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/AbstractPushRequest.java
@@ -32,11 +32,13 @@ import
org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.RetryContext;
import
org.apache.eventmesh.runtime.core.protocol.grpc.consumer.EventMeshConsumer;
import org.apache.eventmesh.runtime.core.protocol.grpc.retry.GrpcRetryer;
+import org.apache.eventmesh.runtime.core.timer.Timeout;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.collect.Sets;
@@ -99,16 +101,10 @@ public abstract class AbstractPushRequest extends
RetryContext {
}
}
- @Override
- public void retry() {
- tryPushRequest();
- }
-
protected void delayRetry() {
if (retryTimes < EventMeshConstants.DEFAULT_PUSH_RETRY_TIMES) {
retryTimes++;
- delay((long) retryTimes *
EventMeshConstants.DEFAULT_PUSH_RETRY_TIME_DISTANCE_IN_MILLSECONDS);
- grpcRetryer.pushRetry(this);
+ grpcRetryer.newTimeout(this,
EventMeshConstants.DEFAULT_PUSH_RETRY_TIME_DISTANCE_IN_MILLSECONDS,
TimeUnit.MILLISECONDS);
} else {
complete();
}
@@ -160,4 +156,9 @@ public abstract class AbstractPushRequest extends
RetryContext {
waitingRequests.get(handleMsgContext.getConsumerGroup()).remove(request);
}
}
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ tryPushRequest();
+ }
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
index cb10a9054..6493bfdeb 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java
@@ -17,15 +17,9 @@
package org.apache.eventmesh.runtime.core.protocol.grpc.retry;
-import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
import org.apache.eventmesh.runtime.core.protocol.AbstractRetryer;
-import org.apache.eventmesh.runtime.core.protocol.DelayRetryable;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@@ -38,25 +32,4 @@ public class GrpcRetryer extends AbstractRetryer {
this.grpcConfiguration =
eventMeshGrpcServer.getEventMeshGrpcConfiguration();
}
- @Override
- public void pushRetry(DelayRetryable delayRetryable) {
- if (retrys.size() >=
grpcConfiguration.getEventMeshServerRetryBlockQueueSize()) {
- log.error("[RETRY-QUEUE] is full!");
- return;
- }
- retrys.offer(delayRetryable);
- }
-
- @Override
- public void init() {
- pool = new ThreadPoolExecutor(
- grpcConfiguration.getEventMeshServerRetryThreadNum(),
- grpcConfiguration.getEventMeshServerRetryThreadNum(), 60000,
TimeUnit.MILLISECONDS,
- new
ArrayBlockingQueue<>(grpcConfiguration.getEventMeshServerRetryBlockQueueSize()),
- new EventMeshThreadFactory("grpc-retry", true,
Thread.NORM_PRIORITY),
- new ThreadPoolExecutor.AbortPolicy());
-
- initDispatcher();
- }
-
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
index 8611b9ef8..2f69c99ad 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
@@ -257,7 +257,7 @@ public class BatchSendMessageProcessor implements
HttpRequestProcessor {
@Override
public void onException(OnExceptionContext context) {
batchMessageLogger.warn("", context.getException());
-
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
+
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10,
TimeUnit.SECONDS);
}
});
@@ -275,7 +275,7 @@ public class BatchSendMessageProcessor implements
HttpRequestProcessor {
@Override
public void onException(OnExceptionContext context) {
batchMessageLogger.warn("", context.getException());
-
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
+
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10,
TimeUnit.SECONDS);
}
});
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
index e035f6c20..a16578bbc 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
@@ -231,7 +231,7 @@ public class BatchSendMessageV2Processor implements
HttpRequestProcessor {
@Override
public void onException(OnExceptionContext context) {
long batchEndTime = System.currentTimeMillis();
-
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
+
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10,
TimeUnit.SECONDS);
summaryMetrics.recordBatchSendMsgCost(batchEndTime -
batchStartTime);
batchMessageLogger.error(
"batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}",
@@ -246,7 +246,7 @@ public class BatchSendMessageV2Processor implements
HttpRequestProcessor {
EventMeshUtil.stackTrace(e, 2),
SendMessageBatchV2ResponseBody.class);
long batchEndTime = System.currentTimeMillis();
-
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
+
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10,
TimeUnit.SECONDS);
summaryMetrics.recordBatchSendMsgCost(batchEndTime -
batchStartTime);
batchMessageLogger.error(
"batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}",
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
index d330de773..69d7192de 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
@@ -274,7 +274,7 @@ public class SendAsyncEventProcessor implements
AsyncHttpProcessor {
responseBodyMap.put(EventMeshConstants.RET_CODE,
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode());
responseBodyMap.put(EventMeshConstants.RET_MSG,
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
+ EventMeshUtil.stackTrace(context.getException(), 2));
-
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10_000));
+
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10,
TimeUnit.SECONDS);
handlerSpecific.getTraceOperation().exceptionLatestTrace(context.getException(),
EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(),
sendMessageContext.getEvent()));
@@ -286,7 +286,7 @@ public class SendAsyncEventProcessor implements
AsyncHttpProcessor {
}
});
} catch (Exception ex) {
-
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10_000));
+
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10,
TimeUnit.SECONDS);
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR,
responseHeaderMap, responseBodyMap, null);
final long endTime = System.currentTimeMillis();
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
index 72ad671f6..6881034ac 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
@@ -280,7 +280,7 @@ public class SendAsyncMessageProcessor implements
HttpRequestProcessor {
+
EventMeshUtil.stackTrace(context.getException(), 2)));
asyncContext.onComplete(err, handler);
-
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
+
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10,
TimeUnit.SECONDS);
long endTime = System.currentTimeMillis();
summaryMetrics.recordSendMsgFailed();
summaryMetrics.recordSendMsgCost(endTime - startTime);
@@ -301,7 +301,7 @@ public class SendAsyncMessageProcessor implements
HttpRequestProcessor {
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR, null,
SendMessageResponseBody.class);
spanWithException(event, protocolVersion,
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR);
-
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
+
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10,
TimeUnit.SECONDS);
long endTime = System.currentTimeMillis();
MESSAGE_LOGGER.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
endTime - startTime, topic, bizNo, uniqueId, ex);
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
index 7a86d9ca7..d5046f5e9 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
@@ -305,7 +305,7 @@ public class SendAsyncRemoteEventProcessor implements
AsyncHttpProcessor {
responseBodyMap.put(EventMeshConstants.RET_CODE,
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode());
responseBodyMap.put(EventMeshConstants.RET_MSG,
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
+ EventMeshUtil.stackTrace(context.getException(), 2));
-
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10_000));
+
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10,
TimeUnit.SECONDS);
handlerSpecific.getTraceOperation().exceptionLatestTrace(context.getException(),
EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(),
sendMessageContext.getEvent()));
@@ -318,7 +318,7 @@ public class SendAsyncRemoteEventProcessor implements
AsyncHttpProcessor {
}
});
} catch (Exception ex) {
-
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10_000));
+
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10,
TimeUnit.SECONDS);
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR,
responseHeaderMap,
responseBodyMap, null);
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index 0efa5bf62..e6902ebf9 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -279,7 +279,7 @@ public class SendSyncMessageProcessor implements
HttpRequestProcessor {
+ EventMeshUtil.stackTrace(e, 2)));
asyncContext.onComplete(err, handler);
-
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10_000));
+
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10,
TimeUnit.SECONDS);
if (log.isErrorEnabled()) {
log.error(
"message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
@@ -294,7 +294,7 @@ public class SendSyncMessageProcessor implements
HttpRequestProcessor {
EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR,
EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg() +
EventMeshUtil.stackTrace(ex, 2),
SendMessageResponseBody.class);
-
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10_000));
+
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10,
TimeUnit.SECONDS);
final long endTime = System.currentTimeMillis();
summaryMetrics.recordSendMsgFailed();
summaryMetrics.recordSendMsgCost(endTime - startTime);
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/SendMessageContext.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/SendMessageContext.java
index eb2ecf998..0e262aa68 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/SendMessageContext.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/SendMessageContext.java
@@ -23,6 +23,7 @@ import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.core.protocol.RetryContext;
+import org.apache.eventmesh.runtime.core.timer.Timeout;
import org.apache.commons.lang3.time.DateFormatUtils;
@@ -121,7 +122,6 @@ public class SendMessageContext extends RetryContext {
return sb.toString();
}
- @Override
public void retry() throws Exception {
if (eventMeshProducer == null) {
log.error("Exception happends during retry. EventMeshProduceer is
null.");
@@ -148,4 +148,9 @@ public class SendMessageContext extends RetryContext {
});
}
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ retry();
+ }
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
index 885ad2c63..d3cc71a17 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
@@ -23,6 +23,7 @@ import
org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.RetryContext;
import
org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext;
import org.apache.eventmesh.runtime.core.protocol.http.retry.HttpRetryer;
+import org.apache.eventmesh.runtime.core.timer.Timeout;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
@@ -31,6 +32,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.collect.Lists;
@@ -76,11 +78,10 @@ public abstract class AbstractHTTPPushRequest extends
RetryContext {
public void tryHTTPRequest() {
}
- public void delayRetry(long delayTime) {
+ public void delayRetry(long delayTime, TimeUnit timeUnit) {
if (retryTimes < EventMeshConstants.DEFAULT_PUSH_RETRY_TIMES &&
delayTime > 0) {
retryTimes++;
- delay(delayTime);
- retryer.pushRetry(this);
+ retryer.newTimeout(this, delayTime, timeUnit);
} else {
complete.compareAndSet(Boolean.FALSE, Boolean.TRUE);
}
@@ -89,8 +90,7 @@ public abstract class AbstractHTTPPushRequest extends
RetryContext {
public void delayRetry() {
if (retryTimes < EventMeshConstants.DEFAULT_PUSH_RETRY_TIMES) {
retryTimes++;
- delay((long) retryTimes *
EventMeshConstants.DEFAULT_PUSH_RETRY_TIME_DISTANCE_IN_MILLSECONDS);
- retryer.pushRetry(this);
+ retryer.newTimeout(this,
EventMeshConstants.DEFAULT_PUSH_RETRY_TIME_DISTANCE_IN_MILLSECONDS,
TimeUnit.MILLISECONDS);
} else {
complete.compareAndSet(Boolean.FALSE, Boolean.TRUE);
}
@@ -128,4 +128,11 @@ public abstract class AbstractHTTPPushRequest extends
RetryContext {
delayRetry();
}
}
+
+ protected abstract void doRetry();
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ doRetry();
+ }
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index 45f59779f..213bfeef2 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -58,6 +58,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -305,7 +306,7 @@ public class AsyncHTTPPushRequest extends
AbstractHTTPPushRequest {
// retry after the time specified by the header
Optional<Header> optHeader =
Arrays.stream(httpResponse.getHeaders("Retry-After")).findAny();
if (optHeader.isPresent() &&
StringUtils.isNumeric(optHeader.get().getValue())) {
- delayRetry(Long.parseLong(optHeader.get().getValue()));
+ delayRetry(Long.parseLong(optHeader.get().getValue()),
TimeUnit.MILLISECONDS);
}
return false;
} else if (httpStatus == HttpStatus.SC_GONE || httpStatus ==
HttpStatus.SC_UNSUPPORTED_MEDIA_TYPE) {
@@ -365,7 +366,7 @@ public class AsyncHTTPPushRequest extends
AbstractHTTPPushRequest {
}
@Override
- public void retry() {
+ public void doRetry() {
tryHTTPRequest();
}
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
index 7bf2a9cc2..ce1cb6ca5 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java
@@ -17,51 +17,17 @@
package org.apache.eventmesh.runtime.core.protocol.http.retry;
-import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.core.protocol.AbstractRetryer;
-import org.apache.eventmesh.runtime.core.protocol.DelayRetryable;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HttpRetryer extends AbstractRetryer {
- private final Logger retryLogger = LoggerFactory.getLogger("retry");
-
private final EventMeshHTTPServer eventMeshHTTPServer;
public HttpRetryer(EventMeshHTTPServer eventMeshHTTPServer) {
this.eventMeshHTTPServer = eventMeshHTTPServer;
}
-
- @Override
- public void pushRetry(DelayRetryable delayRetryable) {
- if (retrys.size() >=
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryBlockQSize())
{
- retryLogger.error("[RETRY-QUEUE] is full!");
- return;
- }
- retrys.offer(delayRetryable);
- }
-
- @Override
- public void init() {
- pool = new
ThreadPoolExecutor(eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryThreadNum(),
-
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryThreadNum(),
- 60000,
- TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(
-
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryBlockQSize()),
- new EventMeshThreadFactory("http-retry", true,
Thread.NORM_PRIORITY),
- new ThreadPoolExecutor.AbortPolicy());
-
- initDispatcher();
- }
-
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 317e7b91b..61b264806 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -36,7 +36,7 @@ import
org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper;
import
org.apache.eventmesh.runtime.core.protocol.tcp.client.group.dispatch.DownstreamDispatchStrategy;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import
org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
-import
org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.EventMeshTcpRetryer;
+import
org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.TcpRetryer;
import
org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.UpStreamMsgContext;
import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
@@ -82,7 +82,7 @@ public class ClientGroupWrapper {
private final EventMeshTCPServer eventMeshTCPServer;
- private EventMeshTcpRetryer eventMeshTcpRetryer;
+ private TcpRetryer tcpRetryer;
private EventMeshTcpMonitor eventMeshTcpMonitor;
@@ -129,7 +129,7 @@ public class ClientGroupWrapper {
this.group = group;
this.eventMeshTCPServer = eventMeshTCPServer;
this.eventMeshTCPConfiguration =
eventMeshTCPServer.getEventMeshTCPConfiguration();
- this.eventMeshTcpRetryer = eventMeshTCPServer.getTcpRetryer();
+ this.tcpRetryer = eventMeshTCPServer.getTcpRetryer();
this.eventMeshTcpMonitor =
Preconditions.checkNotNull(eventMeshTCPServer.getEventMeshTcpMonitor());
this.downstreamDispatchStrategy = downstreamDispatchStrategy;
@@ -710,12 +710,12 @@ public class ClientGroupWrapper {
this.eventMeshTCPConfiguration = eventMeshTCPConfiguration;
}
- public EventMeshTcpRetryer getEventMeshTcpRetryer() {
- return eventMeshTcpRetryer;
+ public TcpRetryer getTcpRetryer() {
+ return tcpRetryer;
}
- public void setEventMeshTcpRetryer(EventMeshTcpRetryer
eventMeshTcpRetryer) {
- this.eventMeshTcpRetryer = eventMeshTcpRetryer;
+ public void setTcpRetryer(TcpRetryer tcpRetryer) {
+ this.tcpRetryer = tcpRetryer;
}
public EventMeshTcpMonitor getEventMeshTcpMonitor() {
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java
index bd997ab49..ccbb98255 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/processor/MessageTransferProcessor.java
@@ -259,10 +259,9 @@ public class MessageTransferProcessor implements
TcpProcessor {
// retry
UpStreamMsgContext upStreamMsgContext = new UpStreamMsgContext(
session, event, pkg.getHeader(), startTime,
taskExecuteTime);
- upStreamMsgContext.delay(10000);
Objects.requireNonNull(
-
session.getClientGroupWrapper().get()).getEventMeshTcpRetryer()
- .pushRetry(upStreamMsgContext);
+ session.getClientGroupWrapper().get()).getTcpRetryer()
+ .newTimeout(upStreamMsgContext, 10, TimeUnit.SECONDS);
session.getSender().getFailMsgCount().incrementAndGet();
MESSAGE_LOGGER
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
index 60ecda3d3..8b00bf579 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
@@ -24,6 +24,7 @@ import
org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.protocol.RetryContext;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.core.timer.Timeout;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.ServerGlobal;
@@ -121,7 +122,6 @@ public class DownStreamMsgContext extends RetryContext {
+ '}';
}
- @Override
public void retry() {
try {
log.info("retry downStream msg
start,seq:{},retryTimes:{},bizSeq:{}", this.seq, this.retryTimes,
@@ -194,4 +194,8 @@ public class DownStreamMsgContext extends RetryContext {
downStreamMsgContext.consumer.updateOffset(msgExts,
downStreamMsgContext.consumeConcurrentlyContext);
}
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ retry();
+ }
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
index cdef6d32f..95e3d4d73 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
@@ -38,6 +38,7 @@ import org.apache.commons.collections4.CollectionUtils;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
@@ -138,8 +139,8 @@ public class SessionPusher {
long delayTime = SubscriptionType.SYNC ==
downStreamMsgContext.getSubscriptionItem().getType()
?
session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetrySyncDelayInMills()
:
session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryAsyncDelayInMills();
- downStreamMsgContext.delay(delayTime);
-
Objects.requireNonNull(session.getClientGroupWrapper().get()).getEventMeshTcpRetryer().pushRetry(downStreamMsgContext);
+
Objects.requireNonNull(session.getClientGroupWrapper().get()).getTcpRetryer()
+ .newTimeout(downStreamMsgContext, delayTime,
TimeUnit.MILLISECONDS);
} else {
deliveredMsgsCount.incrementAndGet();
log.info("downstreamMsg success,seq:{},
retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq,
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/TcpRetryer.java
similarity index 65%
rename from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
rename to
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/TcpRetryer.java
index cc2511c73..2150eb562 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/TcpRetryer.java
@@ -17,35 +17,24 @@
package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry;
-import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.core.protocol.AbstractRetryer;
-import org.apache.eventmesh.runtime.core.protocol.DelayRetryable;
import org.apache.eventmesh.runtime.core.protocol.RetryContext;
import
org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
+import org.apache.eventmesh.runtime.core.timer.TimerTask;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
-import org.apache.eventmesh.runtime.util.ThreadPoolHelper;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public class EventMeshTcpRetryer extends AbstractRetryer {
+public class TcpRetryer extends AbstractRetryer {
private EventMeshTCPServer eventMeshTCPServer;
- private final ThreadPoolExecutor pool = new ThreadPoolExecutor(3,
- 3,
- 60000,
- TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1000),
- new EventMeshThreadFactory("eventMesh-tcp-retry", true),
- new ThreadPoolExecutor.AbortPolicy());
-
- public EventMeshTcpRetryer(EventMeshTCPServer eventMeshTCPServer) {
+ public TcpRetryer(EventMeshTCPServer eventMeshTCPServer) {
this.eventMeshTCPServer = eventMeshTCPServer;
}
@@ -58,14 +47,8 @@ public class EventMeshTcpRetryer extends AbstractRetryer {
}
@Override
- public void pushRetry(DelayRetryable delayRetryable) {
- RetryContext retryContext = (RetryContext) delayRetryable;
- if (retrys.size() >=
eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryQueueSize())
{
- log.error("pushRetry fail, retrys is too much,allow max
retryQueueSize:{}, retryTimes:{}, seq:{}, bizSeq:{}",
-
eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryQueueSize(),
retryContext.retryTimes,
- retryContext.seq,
EventMeshUtil.getMessageBizSeq(retryContext.event));
- return;
- }
+ public void newTimeout(TimerTask timerTask, long delay, TimeUnit timeUnit)
{
+ RetryContext retryContext = (RetryContext) timerTask;
int maxRetryTimes =
eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgAsyncRetryTimes();
if (retryContext instanceof DownStreamMsgContext) {
@@ -81,17 +64,10 @@ public class EventMeshTcpRetryer extends AbstractRetryer {
return;
}
- retrys.offer(retryContext);
+ super.newTimeout(timerTask, delay, timeUnit);
+
log.info("pushRetry success,seq:{}, retryTimes:{}, bizSeq:{}",
retryContext.seq, retryContext.retryTimes,
EventMeshUtil.getMessageBizSeq(retryContext.event));
- }
-
- @Override
- public void init() {
- initDispatcher();
- }
- public void printRetryThreadPoolState() {
- ThreadPoolHelper.printState(pool);
}
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
index 9d77082b8..ef716801f 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java
@@ -27,6 +27,7 @@ import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.RetryContext;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.core.timer.Timeout;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.Utils;
@@ -34,6 +35,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import io.cloudevents.CloudEvent;
@@ -83,7 +85,6 @@ public class UpStreamMsgContext extends RetryContext {
+ ",executeTime=" + DateFormatUtils.format(executeTime,
EventMeshConstants.DATE_FORMAT);
}
- @Override
public void retry() {
log.info("retry upStream msg start,seq:{},retryTimes:{},bizSeq:{}",
this.seq, this.retryTimes,
EventMeshUtil.getMessageBizSeq(this.event));
@@ -134,9 +135,8 @@ public class UpStreamMsgContext extends RetryContext {
session.getSender().getUpstreamBuff().release();
// retry
- // reset delay time
- retryContext.delay(10000);
-
Objects.requireNonNull(session.getClientGroupWrapper().get()).getEventMeshTcpRetryer().pushRetry(retryContext);
+
Objects.requireNonNull(session.getClientGroupWrapper().get()).getTcpRetryer()
+ .newTimeout(retryContext, 10, TimeUnit.SECONDS);
session.getSender().getFailMsgCount().incrementAndGet();
log.error("upstreamMsg mq message error|user={}|callback
cost={}, errMsg={}", session.getClient(),
@@ -161,4 +161,9 @@ public class UpStreamMsgContext extends RetryContext {
return cmd;
}
}
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ retry();
+ }
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/DelayRetryable.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
similarity index 69%
copy from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/DelayRetryable.java
copy to
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
index b60876a64..b004c6aab 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/DelayRetryable.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/retry/Retryer.java
@@ -15,14 +15,25 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.protocol;
+package org.apache.eventmesh.runtime.core.retry;
-import java.util.concurrent.Delayed;
+import org.apache.eventmesh.runtime.core.timer.TimerTask;
+
+import java.util.concurrent.TimeUnit;
/**
- * Retry
+ * Retryer interface.
*/
-public interface DelayRetryable extends Delayed {
+public interface Retryer {
+
+ void start();
+
+ void shutdown();
+
+ long getPendingTimeouts();
+
+ void printState();
+
+ void newTimeout(TimerTask timerTask, long delay, TimeUnit timeUnit);
- void retry() throws Exception;
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/HashedWheelTimer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/HashedWheelTimer.java
new file mode 100644
index 000000000..443f0a85f
--- /dev/null
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/HashedWheelTimer.java
@@ -0,0 +1,802 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.timer;
+
+import static org.apache.eventmesh.common.Constants.OS_NAME_KEY;
+import static org.apache.eventmesh.common.Constants.OS_WIN_PREFIX;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link Timer} optimized for approximated I/O timeout scheduling.
+ *
+ * <h3>Tick Duration</h3>
+ * <p>
+ * As described with 'approximated', this timer does not execute the scheduled
+ * {@link TimerTask} on time. {@link HashedWheelTimer}, on every tick, will
+ * check if there are any {@link TimerTask}s behind the schedule and execute
+ * them.
+ * <p>
+ * You can increase or decrease the accuracy of the execution timing by
+ * specifying smaller or larger tick duration in the constructor. In most
+ * network applications, I/O timeout does not need to be accurate. Therefore,
+ * the default tick duration is 100 milliseconds, and you will not need to try
+ * different configurations in most cases.
+ *
+ * <h3>Ticks per Wheel (Wheel Size)</h3>
+ * <p>
+ * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
+ * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
+ * function is 'deadline of the task'. The default number of ticks per wheel
+ * (i.e. the size of the wheel) is 512. You could specify a larger value
+ * if you are going to schedule a lot of timeouts.
+ *
+ * <h3>Do not create many instances.</h3>
+ * <p>
+ * {@link HashedWheelTimer} creates a new thread whenever it is instantiated
and
+ * started. Therefore, you should make sure to create only one instance and
+ * share it across your application. One of the common mistakes, that makes
+ * your application unresponsive, is to create a new instance for every
connection.
+ *
+ * <h3>Implementation Details</h3>
+ * <p>
+ * {@link HashedWheelTimer} is based on
+ * <a href="http://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
+ * Tony Lauck's paper,
+ * <a href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
+ * and Hierarchical Timing Wheels: data structures to efficiently implement a
+ * timer facility'</a>. More comprehensive slides are located
+ * <a
href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
+ */
+public class HashedWheelTimer implements Timer {
+
+ private static final Logger logger =
LoggerFactory.getLogger(HashedWheelTimer.class);
+
+ private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
+ private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new
AtomicBoolean();
+ private static final int INSTANCE_COUNT_LIMIT = 64;
+ private static final AtomicIntegerFieldUpdater<HashedWheelTimer>
WORKER_STATE_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class,
"workerState");
+
+ private final Worker worker = new Worker();
+ private final Thread workerThread;
+
+ private static final int WORKER_STATE_INIT = 0;
+ private static final int WORKER_STATE_STARTED = 1;
+ private static final int WORKER_STATE_SHUTDOWN = 2;
+
+ /**
+ * 0 - init, 1 - started, 2 - shut down
+ */
+ @SuppressWarnings({"unused", "FieldMayBeFinal"})
+ private volatile int workerState;
+
+ private final long tickDuration;
+ private final HashedWheelBucket[] wheel;
+ private final int mask;
+ private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
+ private final Queue<HashedWheelTimeout> timeouts = new
LinkedBlockingQueue<>();
+ private final Queue<HashedWheelTimeout> cancelledTimeouts = new
LinkedBlockingQueue<>();
+ private final AtomicLong pendingTimeouts = new AtomicLong(0);
+ private final long maxPendingTimeouts;
+
+ private volatile long startTime;
+
+ /**
+ * Creates a new timer with the default thread factory
+ * ({@link Executors#defaultThreadFactory()}), default tick duration, and
+ * default number of ticks per wheel.
+ */
+ public HashedWheelTimer() {
+ this(Executors.defaultThreadFactory());
+ }
+
+ /**
+ * Creates a new timer with the default thread factory
+ * ({@link Executors#defaultThreadFactory()}) and default number of ticks
+ * per wheel.
+ *
+ * @param tickDuration the duration between tick
+ * @param unit the time unit of the {@code tickDuration}
+ * @throws NullPointerException if {@code unit} is {@code null}
+ * @throws IllegalArgumentException if {@code tickDuration} is <= 0
+ */
+ public HashedWheelTimer(long tickDuration, TimeUnit unit) {
+ this(Executors.defaultThreadFactory(), tickDuration, unit);
+ }
+
+ /**
+ * Creates a new timer with the default thread factory
+ * ({@link Executors#defaultThreadFactory()}).
+ *
+ * @param tickDuration the duration between tick
+ * @param unit the time unit of the {@code tickDuration}
+ * @param ticksPerWheel the size of the wheel
+ * @throws NullPointerException if {@code unit} is {@code null}
+ * @throws IllegalArgumentException if either of {@code tickDuration} and
{@code ticksPerWheel} is <= 0
+ */
+ public HashedWheelTimer(long tickDuration, TimeUnit unit, int
ticksPerWheel) {
+ this(Executors.defaultThreadFactory(), tickDuration, unit,
ticksPerWheel);
+ }
+
+ /**
+ * Creates a new timer with the default tick duration and default number of
+ * ticks per wheel.
+ *
+ * @param threadFactory a {@link ThreadFactory} that creates a
+ * background {@link Thread} which is dedicated to
+ * {@link TimerTask} execution.
+ * @throws NullPointerException if {@code threadFactory} is {@code null}
+ */
+ public HashedWheelTimer(ThreadFactory threadFactory) {
+ this(threadFactory, 100, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Creates a new timer with the default number of ticks per wheel.
+ *
+ * @param threadFactory a {@link ThreadFactory} that creates a
+ * background {@link Thread} which is dedicated to
+ * {@link TimerTask} execution.
+ * @param tickDuration the duration between tick
+ * @param unit the time unit of the {@code tickDuration}
+ * @throws NullPointerException if either of {@code threadFactory} and
{@code unit} is {@code null}
+ * @throws IllegalArgumentException if {@code tickDuration} is <= 0
+ */
+ public HashedWheelTimer(
+ ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
+ this(threadFactory, tickDuration, unit, 512);
+ }
+
+ /**
+ * Creates a new timer.
+ *
+ * @param threadFactory a {@link ThreadFactory} that creates a
+ * background {@link Thread} which is dedicated to
+ * {@link TimerTask} execution.
+ * @param tickDuration the duration between tick
+ * @param unit the time unit of the {@code tickDuration}
+ * @param ticksPerWheel the size of the wheel
+ * @throws NullPointerException if either of {@code threadFactory} and
{@code unit} is {@code null}
+ * @throws IllegalArgumentException if either of {@code tickDuration} and
{@code ticksPerWheel} is <= 0
+ */
+ public HashedWheelTimer(
+ ThreadFactory threadFactory,
+ long tickDuration, TimeUnit unit, int ticksPerWheel) {
+ this(threadFactory, tickDuration, unit, ticksPerWheel, -1);
+ }
+
+ /**
+ * Creates a new timer.
+ *
+ * @param threadFactory a {@link ThreadFactory} that creates a
+ * background {@link Thread} which is dedicated
to
+ * {@link TimerTask} execution.
+ * @param tickDuration the duration between tick
+ * @param unit the time unit of the {@code tickDuration}
+ * @param ticksPerWheel the size of the wheel
+ * @param maxPendingTimeouts The maximum number of pending timeouts after
which call to
+ * {@code newTimeout} will result in
+ * {@link RejectedExecutionException}
+ * being thrown. No maximum pending timeouts
limit is assumed if
+ * this value is 0 or negative.
+ * @throws NullPointerException if either of {@code threadFactory} and
{@code unit} is {@code null}
+ * @throws IllegalArgumentException if either of {@code tickDuration} and
{@code ticksPerWheel} is <= 0
+ */
+ public HashedWheelTimer(
+ ThreadFactory threadFactory,
+ long tickDuration, TimeUnit unit, int ticksPerWheel,
+ long maxPendingTimeouts) {
+
+ if (threadFactory == null) {
+ throw new NullPointerException("threadFactory");
+ }
+ if (unit == null) {
+ throw new NullPointerException("unit");
+ }
+ if (tickDuration <= 0) {
+ throw new IllegalArgumentException("tickDuration must be greater
than 0: " + tickDuration);
+ }
+ if (ticksPerWheel <= 0) {
+ throw new IllegalArgumentException("ticksPerWheel must be greater
than 0: " + ticksPerWheel);
+ }
+
+ // Normalize ticksPerWheel to power of two and initialize the wheel.
+ wheel = createWheel(ticksPerWheel);
+ mask = wheel.length - 1;
+
+ // Convert tickDuration to nanos.
+ this.tickDuration = unit.toNanos(tickDuration);
+
+ // Prevent overflow.
+ if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
+ throw new IllegalArgumentException(String.format(
+ "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
+ tickDuration, Long.MAX_VALUE / wheel.length));
+ }
+ workerThread = threadFactory.newThread(worker);
+
+ this.maxPendingTimeouts = maxPendingTimeouts;
+
+ if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
+ reportTooManyInstances();
+ }
+ }
+
+ private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
+ if (ticksPerWheel <= 0) {
+ throw new IllegalArgumentException(
+ "ticksPerWheel must be greater than 0: " + ticksPerWheel);
+ }
+ if (ticksPerWheel > 1073741824) {
+ throw new IllegalArgumentException(
+ "ticksPerWheel may not be greater than 2^30: " +
ticksPerWheel);
+ }
+
+ ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
+ HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
+ for (int i = 0; i < wheel.length; i++) {
+ wheel[i] = new HashedWheelBucket();
+ }
+ return wheel;
+ }
+
+ private static int normalizeTicksPerWheel(int ticksPerWheel) {
+ int normalizedTicksPerWheel = ticksPerWheel - 1;
+ normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 1;
+ normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 2;
+ normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 4;
+ normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 8;
+ normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 16;
+ return normalizedTicksPerWheel + 1;
+ }
+
+ /**
+ * Starts the background thread explicitly. The background thread will
+ * start automatically on demand even if you did not call this method.
+ *
+ * @throws IllegalStateException if this timer has been
+ * {@linkplain #stop() stopped} already
+ */
+ public void start() {
+ switch (WORKER_STATE_UPDATER.get(this)) {
+ case WORKER_STATE_INIT:
+ if (WORKER_STATE_UPDATER.compareAndSet(this,
WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
+ workerThread.start();
+ }
+ break;
+ case WORKER_STATE_STARTED:
+ break;
+ case WORKER_STATE_SHUTDOWN:
+ throw new IllegalStateException("cannot be started once
stopped");
+ default:
+ throw new Error("Invalid WorkerState");
+ }
+
+ // Wait until the startTime is initialized by the worker.
+ while (startTime == 0) {
+ try {
+ startTimeInitialized.await();
+ } catch (InterruptedException ignore) {
+ // Ignore - it will be ready very soon.
+ }
+ }
+ }
+
+ @Override
+ public Set<Timeout> stop() {
+ if (Thread.currentThread() == workerThread) {
+ throw new IllegalStateException(
+ HashedWheelTimer.class.getSimpleName() + ".stop() cannot be
called from " + TimerTask.class.getSimpleName());
+ }
+
+ if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED,
WORKER_STATE_SHUTDOWN)) {
+ // workerState can be 0 or 2 at this moment - let it always be 2.
+ if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) !=
WORKER_STATE_SHUTDOWN) {
+ INSTANCE_COUNTER.decrementAndGet();
+ }
+
+ return Collections.emptySet();
+ }
+
+ try {
+ boolean interrupted = false;
+ while (workerThread.isAlive()) {
+ workerThread.interrupt();
+ try {
+ workerThread.join(100);
+ } catch (InterruptedException ignored) {
+ interrupted = true;
+ }
+ }
+
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ } finally {
+ INSTANCE_COUNTER.decrementAndGet();
+ }
+ return worker.unprocessedTimeouts();
+ }
+
+ @Override
+ public boolean isStop() {
+ return WORKER_STATE_SHUTDOWN == WORKER_STATE_UPDATER.get(this);
+ }
+
+ @Override
+ public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
+ if (task == null) {
+ throw new NullPointerException("task");
+ }
+ if (unit == null) {
+ throw new NullPointerException("unit");
+ }
+
+ long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
+
+ if (maxPendingTimeouts > 0 && pendingTimeoutsCount >
maxPendingTimeouts) {
+ pendingTimeouts.decrementAndGet();
+ throw new RejectedExecutionException("Number of pending timeouts ("
+ + pendingTimeoutsCount + ") is greater than or equal to
maximum allowed pending "
+ + "timeouts (" + maxPendingTimeouts + ")");
+ }
+
+ start();
+
+ // Add the timeout to the timeout queue which will be processed on the
next tick.
+ // During processing all the queued HashedWheelTimeouts will be added
to the correct HashedWheelBucket.
+ long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
+
+ // Guard against overflow.
+ if (delay > 0 && deadline < 0) {
+ deadline = Long.MAX_VALUE;
+ }
+ HashedWheelTimeout timeout = new HashedWheelTimeout(this, task,
deadline);
+ timeouts.add(timeout);
+ return timeout;
+ }
+
+ /**
+ * Returns the number of pending timeouts of this {@link Timer}.
+ */
+ public long pendingTimeouts() {
+ return pendingTimeouts.get();
+ }
+
+ /**
+ * Returns the number of cancelled timeouts of this {@link Timer}.
+ */
+ public long cancelledTimeouts() {
+ return cancelledTimeouts.size();
+ }
+
+ private static void reportTooManyInstances() {
+ logger.error("You are creating too many HashedWheelTimer instances. is
a shared resource that must be reused across the JVM, "
+ + "so that only a few instances are created.");
+ }
+
+ private final class Worker implements Runnable {
+
+ private final Set<Timeout> unprocessedTimeouts = new
HashSet<Timeout>();
+
+ private long tick;
+
+ @Override
+ public void run() {
+ // Initialize the startTime.
+ startTime = System.nanoTime();
+ if (startTime == 0) {
+ // We use 0 as an indicator for the uninitialized value here,
so make sure it's not 0 when initialized.
+ startTime = 1;
+ }
+
+ // Notify the other threads waiting for the initialization at
start().
+ startTimeInitialized.countDown();
+
+ do {
+ final long deadline = waitForNextTick();
+ if (deadline > 0) {
+ int idx = (int) (tick & mask);
+ processCancelledTasks();
+ HashedWheelBucket bucket =
+ wheel[idx];
+ transferTimeoutsToBuckets();
+ bucket.expireTimeouts(deadline);
+ tick++;
+ }
+ } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) ==
WORKER_STATE_STARTED);
+
+ // Fill the unprocessedTimeouts so we can return them from stop()
method.
+ for (HashedWheelBucket bucket : wheel) {
+ bucket.clearTimeouts(unprocessedTimeouts);
+ }
+ for (;;) {
+ HashedWheelTimeout timeout = timeouts.poll();
+ if (timeout == null) {
+ break;
+ }
+ if (!timeout.isCancelled()) {
+ unprocessedTimeouts.add(timeout);
+ }
+ }
+ processCancelledTasks();
+ }
+
+ private void transferTimeoutsToBuckets() {
+ // transfer only max. 100000 timeouts per tick to prevent a thread
to stale the workerThread when it just
+ // adds new timeouts in a loop.
+ for (int i = 0; i < 100000; i++) {
+ HashedWheelTimeout timeout = timeouts.poll();
+ if (timeout == null) {
+ // all processed
+ break;
+ }
+ if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
+ // Was cancelled in the meantime.
+ continue;
+ }
+
+ long calculated = timeout.deadline / tickDuration;
+ timeout.remainingRounds = (calculated - tick) / wheel.length;
+
+ // Ensure we don't schedule for past.
+ final long ticks = Math.max(calculated, tick);
+ int stopIndex = (int) (ticks & mask);
+
+ HashedWheelBucket bucket = wheel[stopIndex];
+ bucket.addTimeout(timeout);
+ }
+ }
+
+ private void processCancelledTasks() {
+ for (;;) {
+ HashedWheelTimeout timeout = cancelledTimeouts.poll();
+ if (timeout == null) {
+ // all processed
+ break;
+ }
+ try {
+ timeout.remove();
+ } catch (Throwable t) {
+ if (logger.isWarnEnabled()) {
+ logger.warn("An exception was thrown while process a
cancellation task", t);
+ }
+ }
+ }
+ }
+
+ /**
+ * calculate goal nanoTime from startTime and current tick number,
+ * then wait until that goal has been reached.
+ *
+ * @return Long.MIN_VALUE if received a shutdown request,
+ * current time otherwise (with Long.MIN_VALUE changed by +1)
+ */
+ private long waitForNextTick() {
+ long deadline = tickDuration * (tick + 1);
+
+ for (;;) {
+ final long currentTime = System.nanoTime() - startTime;
+ long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
+
+ if (sleepTimeMs <= 0) {
+ if (currentTime == Long.MIN_VALUE) {
+ return -Long.MAX_VALUE;
+ } else {
+ return currentTime;
+ }
+ }
+ if (isWindows()) {
+ sleepTimeMs = sleepTimeMs / 10 * 10;
+ }
+
+ try {
+ Thread.sleep(sleepTimeMs);
+ } catch (InterruptedException ignored) {
+ if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) ==
WORKER_STATE_SHUTDOWN) {
+ return Long.MIN_VALUE;
+ }
+ }
+ }
+ }
+
+ Set<Timeout> unprocessedTimeouts() {
+ return Collections.unmodifiableSet(unprocessedTimeouts);
+ }
+ }
+
+ private static final class HashedWheelTimeout implements Timeout {
+
+ private static final int ST_INIT = 0;
+ private static final int ST_CANCELLED = 1;
+ private static final int ST_EXPIRED = 2;
+ private static final AtomicIntegerFieldUpdater<HashedWheelTimeout>
STATE_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class,
"state");
+
+ private final HashedWheelTimer timer;
+ private final TimerTask task;
+ private final long deadline;
+
+ private volatile int state = ST_INIT;
+
+ /**
+ * RemainingRounds will be calculated and set by
Worker.transferTimeoutsToBuckets() before the
+ * HashedWheelTimeout will be added to the correct HashedWheelBucket.
+ */
+ long remainingRounds;
+
+ /**
+ * This will be used to chain timeouts in HashedWheelTimerBucket via a
double-linked-list.
+ * As only the workerThread will act on it there is no need for
synchronization / volatile.
+ */
+ HashedWheelTimeout next;
+ HashedWheelTimeout prev;
+
+ /**
+ * The bucket to which the timeout was added
+ */
+ HashedWheelBucket bucket;
+
+ HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long
deadline) {
+ this.timer = timer;
+ this.task = task;
+ this.deadline = deadline;
+ }
+
+ @Override
+ public Timer timer() {
+ return timer;
+ }
+
+ @Override
+ public TimerTask task() {
+ return task;
+ }
+
+ @Override
+ public boolean cancel() {
+ // only update the state it will be removed from HashedWheelBucket
on next tick.
+ if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
+ return false;
+ }
+ // If a task should be canceled we put this to another queue which
will be processed on each tick.
+ // So this means that we will have a GC latency of max. 1 tick
duration which is good enough. This way we
+ // can make again use of our LinkedBlockingQueue and so minimize
the locking / overhead as much as possible.
+ timer.cancelledTimeouts.add(this);
+ return true;
+ }
+
+ void remove() {
+ HashedWheelBucket bucket = this.bucket;
+ if (bucket != null) {
+ bucket.remove(this);
+ } else {
+ timer.pendingTimeouts.decrementAndGet();
+ }
+ }
+
+ public boolean compareAndSetState(int expected, int state) {
+ return STATE_UPDATER.compareAndSet(this, expected, state);
+ }
+
+ public int state() {
+ return state;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return state() == ST_CANCELLED;
+ }
+
+ @Override
+ public boolean isExpired() {
+ return state() == ST_EXPIRED;
+ }
+
+ public void expire() {
+ if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
+ return;
+ }
+
+ try {
+ task.run(this);
+ task.setExecuteTimeHook(System.currentTimeMillis());
+ } catch (Throwable t) {
+ if (logger.isWarnEnabled()) {
+ logger.warn("An exception was thrown by " +
TimerTask.class.getSimpleName() + '.', t);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ final long currentTime = System.nanoTime();
+ long remaining = deadline - currentTime + timer.startTime;
+ String simpleClassName = getClass().getSimpleName();
+
+ StringBuilder buf = new StringBuilder(192)
+ .append(simpleClassName)
+ .append('(')
+ .append("deadline: ");
+ if (remaining > 0) {
+ buf.append(remaining)
+ .append(" ns later");
+ } else if (remaining < 0) {
+ buf.append(-remaining)
+ .append(" ns ago");
+ } else {
+ buf.append("now");
+ }
+
+ if (isCancelled()) {
+ buf.append(", cancelled");
+ }
+
+ return buf.append(", task: ")
+ .append(task())
+ .append(')')
+ .toString();
+ }
+ }
+
+ /**
+ * Bucket that stores HashedWheelTimeouts. These are stored in a
linked-list like datastructure to allow easy
+ * removal of HashedWheelTimeouts in the middle. Also the
HashedWheelTimeout act as nodes themself and so no
+ * extra object creation is needed.
+ */
+ private static final class HashedWheelBucket {
+
+ /**
+ * Used for the linked-list datastructure
+ */
+ private HashedWheelTimeout head;
+ private HashedWheelTimeout tail;
+
+ /**
+ * Add {@link HashedWheelTimeout} to this bucket.
+ */
+ void addTimeout(HashedWheelTimeout timeout) {
+ assert timeout.bucket == null;
+ timeout.bucket = this;
+ if (head == null) {
+ head = tail = timeout;
+ } else {
+ tail.next = timeout;
+ timeout.prev = tail;
+ tail = timeout;
+ }
+ }
+
+ /**
+ * Expire all {@link HashedWheelTimeout}s for the given {@code
deadline}.
+ */
+ void expireTimeouts(long deadline) {
+ HashedWheelTimeout timeout = head;
+
+ // process all timeouts
+ while (timeout != null) {
+ HashedWheelTimeout next = timeout.next;
+ if (timeout.remainingRounds <= 0) {
+ next = remove(timeout);
+ if (timeout.deadline <= deadline) {
+ timeout.expire();
+ } else {
+ // The timeout was placed into a wrong slot. This
should never happen.
+ throw new IllegalStateException(String.format(
+ "timeout.deadline (%d) > deadline (%d)",
timeout.deadline, deadline));
+ }
+ } else if (timeout.isCancelled()) {
+ next = remove(timeout);
+ } else {
+ timeout.remainingRounds--;
+ }
+ timeout = next;
+ }
+ }
+
+ public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
+ HashedWheelTimeout next = timeout.next;
+ // remove timeout that was either processed or cancelled by
updating the linked-list
+ if (timeout.prev != null) {
+ timeout.prev.next = next;
+ }
+ if (timeout.next != null) {
+ timeout.next.prev = timeout.prev;
+ }
+
+ if (timeout == head) {
+ // if timeout is also the tail we need to adjust the entry too
+ if (timeout == tail) {
+ tail = null;
+ head = null;
+ } else {
+ head = next;
+ }
+ } else if (timeout == tail) {
+ // if the timeout is the tail modify the tail to be the prev
node.
+ tail = timeout.prev;
+ }
+ // null out prev, next and bucket to allow for GC.
+ timeout.prev = null;
+ timeout.next = null;
+ timeout.bucket = null;
+ timeout.timer.pendingTimeouts.decrementAndGet();
+ return next;
+ }
+
+ /**
+ * Clear this bucket and return all not expired / cancelled {@link
Timeout}s.
+ */
+ void clearTimeouts(Set<Timeout> set) {
+ for (;;) {
+ HashedWheelTimeout timeout = pollTimeout();
+ if (timeout == null) {
+ return;
+ }
+ if (timeout.isExpired() || timeout.isCancelled()) {
+ continue;
+ }
+ set.add(timeout);
+ }
+ }
+
+ private HashedWheelTimeout pollTimeout() {
+ HashedWheelTimeout head = this.head;
+ if (head == null) {
+ return null;
+ }
+ HashedWheelTimeout next = head.next;
+ if (next == null) {
+ tail = this.head = null;
+ } else {
+ this.head = next;
+ next.prev = null;
+ }
+
+ // null out prev and next to allow for GC.
+ head.next = null;
+ head.prev = null;
+ head.bucket = null;
+ return head;
+ }
+ }
+
+ private static final boolean IS_OS_WINDOWS =
System.getProperty(OS_NAME_KEY,
"").toLowerCase(Locale.US).contains(OS_WIN_PREFIX);
+
+ private boolean isWindows() {
+ return IS_OS_WINDOWS;
+ }
+}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timeout.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timeout.java
new file mode 100644
index 000000000..12cea1aea
--- /dev/null
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timeout.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.timer;
+
+/**
+ * A handle associated with a {@link TimerTask} that is returned by a
+ * {@link Timer}.
+ */
+public interface Timeout {
+
+ /**
+ * Returns the {@link Timer} that created this handle.
+ */
+ Timer timer();
+
+ /**
+ * Returns the {@link TimerTask} which is associated with this handle.
+ */
+ TimerTask task();
+
+ /**
+ * Returns {@code true} if and only if the {@link TimerTask} associated
+ * with this handle has been expired.
+ */
+ boolean isExpired();
+
+ /**
+ * Returns {@code true} if and only if the {@link TimerTask} associated
+ * with this handle has been cancelled.
+ */
+ boolean isCancelled();
+
+ /**
+ * Attempts to cancel the {@link TimerTask} associated with this handle.
+ * If the task has been executed or cancelled already, it will return with
+ * no side effect.
+ *
+ * @return True if the cancellation completed successfully, otherwise false
+ */
+ boolean cancel();
+}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timer.java
new file mode 100644
index 000000000..076f8d04b
--- /dev/null
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/Timer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.timer;
+
+import java.util.Set;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Schedules {@link TimerTask}s for one-time future execution in a background
+ * thread.
+ */
+public interface Timer {
+
+ /**
+ * Schedules the specified {@link TimerTask} for one-time execution after
+ * the specified delay.
+ *
+ * @return a handle which is associated with the specified task
+ * @throws IllegalStateException if this timer has been {@linkplain
#stop() stopped} already
+ * @throws RejectedExecutionException if the pending timeouts are too many
and creating new timeout
+ * can cause instability in the system.
+ */
+ Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
+
+ /**
+ * Releases all resources acquired by this {@link Timer} and cancels all
+ * tasks which were scheduled but not executed yet.
+ *
+ * @return the handles associated with the tasks which were canceled by
+ * this method
+ */
+ Set<Timeout> stop();
+
+ /**
+ * the timer is stop
+ *
+ * @return true for stop
+ */
+ boolean isStop();
+
+ /**
+ * the pending timeouts
+ * @return count of pending timeout
+ */
+ long pendingTimeouts();
+}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/DelayRetryable.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/TimerTask.java
similarity index 55%
rename from
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/DelayRetryable.java
rename to
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/TimerTask.java
index b60876a64..ae8447f04 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/DelayRetryable.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/timer/TimerTask.java
@@ -15,14 +15,27 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.protocol;
+package org.apache.eventmesh.runtime.core.timer;
-import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
/**
- * Retry
+ * A task which is executed after the delay specified with
+ * {@link Timer#newTimeout(TimerTask, long, TimeUnit)} (TimerTask, long,
TimeUnit)}.
*/
-public interface DelayRetryable extends Delayed {
+public interface TimerTask {
- void retry() throws Exception;
+ /**
+ * Executed after the delay specified with
+ * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
+ *
+ * @param timeout a handle which is associated with this task
+ */
+ void run(Timeout timeout) throws Exception;
+
+ /**
+ * Hook method to set the execute time.
+ * @param executeTime execute time
+ */
+ void setExecuteTimeHook(long executeTime);
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/grpc/EventMeshGrpcMonitor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/grpc/EventMeshGrpcMonitor.java
index 8b0175fbe..fbd3be616 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/grpc/EventMeshGrpcMonitor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/grpc/EventMeshGrpcMonitor.java
@@ -63,7 +63,7 @@ public class EventMeshGrpcMonitor {
scheduleTask = scheduler.scheduleAtFixedRate(() -> {
grpcSummaryMetrics.refreshTpsMetrics(SCHEDULE_PERIOD_MILLS);
grpcSummaryMetrics.clearAllMessageCounter();
-
grpcSummaryMetrics.setRetrySize(eventMeshGrpcServer.getGrpcRetryer().getRetrySize());
+
grpcSummaryMetrics.setRetrySize(eventMeshGrpcServer.getGrpcRetryer().getPendingTimeouts());
grpcSummaryMetrics.setSubscribeTopicNum(eventMeshGrpcServer.getConsumerManager().getAllConsumerTopic().size());
}, DELAY_MILLS, SCHEDULE_PERIOD_MILLS, TimeUnit.MILLISECONDS);
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
index aa557c941..8bd740898 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/http/HTTPMetricsServer.java
@@ -20,6 +20,7 @@ package org.apache.eventmesh.runtime.metrics.http;
import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.metrics.api.MetricsRegistry;
import org.apache.eventmesh.metrics.api.model.HttpSummaryMetrics;
+import org.apache.eventmesh.metrics.api.model.RetrySummaryMetrics;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import java.util.List;
@@ -50,7 +51,7 @@ public class HTTPMetricsServer {
eventMeshHTTPServer.getHttpThreadPoolGroup().getBatchMsgExecutor(),
eventMeshHTTPServer.getHttpThreadPoolGroup().getSendMsgExecutor(),
eventMeshHTTPServer.getHttpThreadPoolGroup().getPushMsgExecutor(),
- eventMeshHTTPServer.getHttpRetryer().getRetryQueue());
+ new
RetrySummaryMetrics(eventMeshHTTPServer.getHttpRetryer().getPendingTimeouts()));
init();
}
@@ -167,7 +168,7 @@ public class HTTPMetricsServer {
eventMeshHTTPServer.getHttpThreadPoolGroup().getBatchMsgExecutor().getQueue().size(),
eventMeshHTTPServer.getHttpThreadPoolGroup().getSendMsgExecutor().getQueue().size(),
eventMeshHTTPServer.getHttpThreadPoolGroup().getPushMsgExecutor().getQueue().size(),
- eventMeshHTTPServer.getHttpRetryer().getRetrySize());
+ eventMeshHTTPServer.getHttpRetryer().getPendingTimeouts());
}
if (log.isInfoEnabled()) {
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
index 97d2962d9..1808a5c3c 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
@@ -128,10 +128,10 @@ public class EventMeshTcpMonitor {
monitorThreadPoolTask =
eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler().scheduleAtFixedRate(()
-> {
eventMeshTCPServer.getEventMeshRebalanceService().printRebalanceThreadPoolState();
- eventMeshTCPServer.getTcpRetryer().printRetryThreadPoolState();
+ eventMeshTCPServer.getTcpRetryer().printState();
// monitor retry queue size
-
tcpSummaryMetrics.setRetrySize(eventMeshTCPServer.getTcpRetryer().getRetrySize());
+
tcpSummaryMetrics.setRetrySize(eventMeshTCPServer.getTcpRetryer().getPendingTimeouts());
appLogger.info(
MonitorMetricConstants.EVENTMESH_MONITOR_FORMAT_COMMON,
EventMeshConstants.PROTOCOL_TCP,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]