yanrongzhen commented on code in PR #4544:
URL: https://github.com/apache/eventmesh/pull/4544#discussion_r1386139816
##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java:
##########
@@ -52,8 +76,87 @@ protected void rePut(Timeout timeout, long tick, TimeUnit
timeUnit) {
timer.newTimeout(timeout.task(), tick, timeUnit);
}
+ public void setEvent(CloudEvent event) {
+ this.event = event;
+ }
+
@Override
public void setExecuteTimeHook(long executeTime) {
this.executeTime = executeTime;
}
+
+ @Override
+ public final void run(Timeout timeout) throws Exception {
+ boolean eventMeshServerRetryStorageEnabled =
commonConfiguration.isEventMeshServerRetryStorageEnabled();
+ if (eventMeshServerRetryStorageEnabled) {
+ if (!STORAGE_RETRY_TIMES_MAP.containsKey(event.getId())) {
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(), 0);
+ }
+ Integer maxTimesPerEvent =
STORAGE_RETRY_TIMES_MAP.get(event.getId());
+ if (maxTimesPerEvent < 1) {
+ sendMessageBack(event);
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(),
STORAGE_RETRY_TIMES_MAP.get(event.getId()) + 1);
+ } else {
+ getHandleMessageContext().finish();
Review Comment:
Used to control the number of consumption, since the retry-topic method is
used for retrying, failure to control may result in unlimited repeated
consumption.
---
用于控制消费次数, 由于使用了retry-topic的方式进行重试, 不进行控制可能会导致无限制重复消费.
##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java:
##########
@@ -52,8 +76,87 @@ protected void rePut(Timeout timeout, long tick, TimeUnit
timeUnit) {
timer.newTimeout(timeout.task(), tick, timeUnit);
}
+ public void setEvent(CloudEvent event) {
+ this.event = event;
+ }
+
@Override
public void setExecuteTimeHook(long executeTime) {
this.executeTime = executeTime;
}
+
+ @Override
+ public final void run(Timeout timeout) throws Exception {
+ boolean eventMeshServerRetryStorageEnabled =
commonConfiguration.isEventMeshServerRetryStorageEnabled();
+ if (eventMeshServerRetryStorageEnabled) {
+ if (!STORAGE_RETRY_TIMES_MAP.containsKey(event.getId())) {
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(), 0);
+ }
+ Integer maxTimesPerEvent =
STORAGE_RETRY_TIMES_MAP.get(event.getId());
+ if (maxTimesPerEvent < 1) {
+ sendMessageBack(event);
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(),
STORAGE_RETRY_TIMES_MAP.get(event.getId()) + 1);
+ } else {
+ getHandleMessageContext().finish();
Review Comment:
Used to control the number of consumption, since the retry-topic method is
used for retrying, failure to control may result in unlimited repeated
consumption.
---
用于控制消费次数, 由于使用了retry-topic的方式进行重试, 不进行控制可能会导致无限制重复消费.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]