yanrongzhen commented on code in PR #4544:
URL: https://github.com/apache/eventmesh/pull/4544#discussion_r1387425591
##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java:
##########
@@ -52,8 +72,62 @@ protected void rePut(Timeout timeout, long tick, TimeUnit
timeUnit) {
timer.newTimeout(timeout.task(), tick, timeUnit);
}
+ public void setEvent(CloudEvent event) {
+ this.event = event;
+ }
+
@Override
public void setExecuteTimeHook(long executeTime) {
this.executeTime = executeTime;
}
+
+ @Override
+ public final void run(Timeout timeout) throws Exception {
+ boolean eventMeshServerRetryStorageEnabled =
+
Optional.ofNullable(commonConfiguration).map(CommonConfiguration::isEventMeshServerRetryStorageEnabled)
+ .orElse(false);
+ if (eventMeshServerRetryStorageEnabled) {
+ if (!STORAGE_RETRY_TIMES_MAP.containsKey(event.getId())) {
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(), 0);
+ }
+ Integer maxTimesPerEvent =
STORAGE_RETRY_TIMES_MAP.get(event.getId());
+ if (maxTimesPerEvent < 1) {
+ StorageRetryStrategy storageRetryStrategy =
EventMeshExtensionFactory.getExtension(RetryStrategy.class,
+
commonConfiguration.getEventMeshStoragePluginType()).getStorageRetryStrategy();
+ String consumerGroupName =
getConsumerGroupConf().getConsumerGroup();
+ EventMeshProducer producer =
getProducerManager().getEventMeshProducer(consumerGroupName);
+ RetryConfiguration retryConfiguration =
RetryConfiguration.builder()
+ .event(event)
+ .retryStorageEnabled(eventMeshServerRetryStorageEnabled)
+
.consumerGroupName(getConsumerGroupConf().getConsumerGroup())
+
.producer(producer.getMqProducerWrapper().getMeshMQProducer())
+ .topic(getConsumerGroupConf().getTopic())
+ .build();
+ storageRetryStrategy.retry(retryConfiguration);
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(),
STORAGE_RETRY_TIMES_MAP.get(event.getId()) + 1);
+ } else {
+ STORAGE_RETRY_TIMES_MAP.remove(event.getId());
+ getHandleMessageContext().finish();
+ }
+ } else {
+ doRun(timeout);
Review Comment:
The semantics of this API is to specify the retry behavior of the Retryable
class. I think for the retry class, there is no need to perceive whether the
upper layer is retrying in memory or based on storage.
---
这个api的语义是指定Retryable类的重试行为, 我认为对于重试类而言, 不需要感知上层到底是通过内存重试还是基于存储的重试.
##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java:
##########
@@ -52,8 +72,62 @@ protected void rePut(Timeout timeout, long tick, TimeUnit
timeUnit) {
timer.newTimeout(timeout.task(), tick, timeUnit);
}
+ public void setEvent(CloudEvent event) {
+ this.event = event;
+ }
+
@Override
public void setExecuteTimeHook(long executeTime) {
this.executeTime = executeTime;
}
+
+ @Override
+ public final void run(Timeout timeout) throws Exception {
+ boolean eventMeshServerRetryStorageEnabled =
+
Optional.ofNullable(commonConfiguration).map(CommonConfiguration::isEventMeshServerRetryStorageEnabled)
+ .orElse(false);
+ if (eventMeshServerRetryStorageEnabled) {
+ if (!STORAGE_RETRY_TIMES_MAP.containsKey(event.getId())) {
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(), 0);
+ }
+ Integer maxTimesPerEvent =
STORAGE_RETRY_TIMES_MAP.get(event.getId());
+ if (maxTimesPerEvent < 1) {
+ StorageRetryStrategy storageRetryStrategy =
EventMeshExtensionFactory.getExtension(RetryStrategy.class,
+
commonConfiguration.getEventMeshStoragePluginType()).getStorageRetryStrategy();
+ String consumerGroupName =
getConsumerGroupConf().getConsumerGroup();
+ EventMeshProducer producer =
getProducerManager().getEventMeshProducer(consumerGroupName);
+ RetryConfiguration retryConfiguration =
RetryConfiguration.builder()
+ .event(event)
+ .retryStorageEnabled(eventMeshServerRetryStorageEnabled)
+
.consumerGroupName(getConsumerGroupConf().getConsumerGroup())
+
.producer(producer.getMqProducerWrapper().getMeshMQProducer())
+ .topic(getConsumerGroupConf().getTopic())
+ .build();
+ storageRetryStrategy.retry(retryConfiguration);
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(),
STORAGE_RETRY_TIMES_MAP.get(event.getId()) + 1);
+ } else {
+ STORAGE_RETRY_TIMES_MAP.remove(event.getId());
+ getHandleMessageContext().finish();
+ }
+ } else {
+ doRun(timeout);
Review Comment:
The semantics of this API is to specify the retry behavior of the Retryable
class. I think for the retry class, there is no need to perceive whether the
upper layer is retrying in memory or based on storage.
---
这个api的语义是指定Retryable类的重试行为, 我认为对于重试类而言, 不需要感知上层到底是通过内存重试还是基于存储的重试.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]