yanrongzhen commented on code in PR #4544:
URL: https://github.com/apache/eventmesh/pull/4544#discussion_r1387426482
##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java:
##########
@@ -52,8 +72,62 @@ protected void rePut(Timeout timeout, long tick, TimeUnit
timeUnit) {
timer.newTimeout(timeout.task(), tick, timeUnit);
}
+ public void setEvent(CloudEvent event) {
+ this.event = event;
+ }
+
@Override
public void setExecuteTimeHook(long executeTime) {
this.executeTime = executeTime;
}
+
+ @Override
+ public final void run(Timeout timeout) throws Exception {
+ boolean eventMeshServerRetryStorageEnabled =
+
Optional.ofNullable(commonConfiguration).map(CommonConfiguration::isEventMeshServerRetryStorageEnabled)
+ .orElse(false);
+ if (eventMeshServerRetryStorageEnabled) {
+ if (!STORAGE_RETRY_TIMES_MAP.containsKey(event.getId())) {
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(), 0);
+ }
+ Integer maxTimesPerEvent =
STORAGE_RETRY_TIMES_MAP.get(event.getId());
+ if (maxTimesPerEvent < 1) {
+ StorageRetryStrategy storageRetryStrategy =
EventMeshExtensionFactory.getExtension(RetryStrategy.class,
+
commonConfiguration.getEventMeshStoragePluginType()).getStorageRetryStrategy();
+ String consumerGroupName =
getConsumerGroupConf().getConsumerGroup();
+ EventMeshProducer producer =
getProducerManager().getEventMeshProducer(consumerGroupName);
+ RetryConfiguration retryConfiguration =
RetryConfiguration.builder()
+ .event(event)
+ .retryStorageEnabled(eventMeshServerRetryStorageEnabled)
+
.consumerGroupName(getConsumerGroupConf().getConsumerGroup())
+
.producer(producer.getMqProducerWrapper().getMeshMQProducer())
+ .topic(getConsumerGroupConf().getTopic())
+ .build();
+ storageRetryStrategy.retry(retryConfiguration);
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(),
STORAGE_RETRY_TIMES_MAP.get(event.getId()) + 1);
+ } else {
+ STORAGE_RETRY_TIMES_MAP.remove(event.getId());
+ getHandleMessageContext().finish();
+ }
+ } else {
+ doRun(timeout);
Review Comment:
In addition, I set the run method to final, which is invisible to its
subclasses.
---
另外, run方法我设置为了final, 这样对于其子类是不可见的.
##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java:
##########
@@ -52,8 +72,62 @@ protected void rePut(Timeout timeout, long tick, TimeUnit
timeUnit) {
timer.newTimeout(timeout.task(), tick, timeUnit);
}
+ public void setEvent(CloudEvent event) {
+ this.event = event;
+ }
+
@Override
public void setExecuteTimeHook(long executeTime) {
this.executeTime = executeTime;
}
+
+ @Override
+ public final void run(Timeout timeout) throws Exception {
+ boolean eventMeshServerRetryStorageEnabled =
+
Optional.ofNullable(commonConfiguration).map(CommonConfiguration::isEventMeshServerRetryStorageEnabled)
+ .orElse(false);
+ if (eventMeshServerRetryStorageEnabled) {
+ if (!STORAGE_RETRY_TIMES_MAP.containsKey(event.getId())) {
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(), 0);
+ }
+ Integer maxTimesPerEvent =
STORAGE_RETRY_TIMES_MAP.get(event.getId());
+ if (maxTimesPerEvent < 1) {
+ StorageRetryStrategy storageRetryStrategy =
EventMeshExtensionFactory.getExtension(RetryStrategy.class,
+
commonConfiguration.getEventMeshStoragePluginType()).getStorageRetryStrategy();
+ String consumerGroupName =
getConsumerGroupConf().getConsumerGroup();
+ EventMeshProducer producer =
getProducerManager().getEventMeshProducer(consumerGroupName);
+ RetryConfiguration retryConfiguration =
RetryConfiguration.builder()
+ .event(event)
+ .retryStorageEnabled(eventMeshServerRetryStorageEnabled)
+
.consumerGroupName(getConsumerGroupConf().getConsumerGroup())
+
.producer(producer.getMqProducerWrapper().getMeshMQProducer())
+ .topic(getConsumerGroupConf().getTopic())
+ .build();
+ storageRetryStrategy.retry(retryConfiguration);
+ STORAGE_RETRY_TIMES_MAP.put(event.getId(),
STORAGE_RETRY_TIMES_MAP.get(event.getId()) + 1);
+ } else {
+ STORAGE_RETRY_TIMES_MAP.remove(event.getId());
+ getHandleMessageContext().finish();
+ }
+ } else {
+ doRun(timeout);
Review Comment:
In addition, I set the run method to final, which is invisible to its
subclasses.
---
另外, run方法我设置为了final, 这样对于其子类是不可见的.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]