pandaapo commented on code in PR #4544:
URL: https://github.com/apache/eventmesh/pull/4544#discussion_r1386435047


##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java:
##########
@@ -52,8 +72,62 @@ protected void rePut(Timeout timeout, long tick, TimeUnit 
timeUnit) {
         timer.newTimeout(timeout.task(), tick, timeUnit);
     }
 
+    public void setEvent(CloudEvent event) {
+        this.event = event;
+    }
+
     @Override
     public void setExecuteTimeHook(long executeTime) {
         this.executeTime = executeTime;
     }
+
+    @Override
+    public final void run(Timeout timeout) throws Exception {
+        boolean eventMeshServerRetryStorageEnabled =
+            
Optional.ofNullable(commonConfiguration).map(CommonConfiguration::isEventMeshServerRetryStorageEnabled)
+                .orElse(false);
+        if (eventMeshServerRetryStorageEnabled) {
+            if (!STORAGE_RETRY_TIMES_MAP.containsKey(event.getId())) {
+                STORAGE_RETRY_TIMES_MAP.put(event.getId(), 0);
+            }
+            Integer maxTimesPerEvent = 
STORAGE_RETRY_TIMES_MAP.get(event.getId());
+            if (maxTimesPerEvent < 1) {
+                StorageRetryStrategy storageRetryStrategy = 
EventMeshExtensionFactory.getExtension(RetryStrategy.class,
+                    
commonConfiguration.getEventMeshStoragePluginType()).getStorageRetryStrategy();
+                String consumerGroupName = 
getConsumerGroupConf().getConsumerGroup();
+                EventMeshProducer producer = 
getProducerManager().getEventMeshProducer(consumerGroupName);
+                RetryConfiguration retryConfiguration = 
RetryConfiguration.builder()
+                    .event(event)
+                    .retryStorageEnabled(eventMeshServerRetryStorageEnabled)
+                    
.consumerGroupName(getConsumerGroupConf().getConsumerGroup())
+                    
.producer(producer.getMqProducerWrapper().getMeshMQProducer())
+                    .topic(getConsumerGroupConf().getTopic())
+                    .build();
+                storageRetryStrategy.retry(retryConfiguration);
+                STORAGE_RETRY_TIMES_MAP.put(event.getId(), 
STORAGE_RETRY_TIMES_MAP.get(event.getId()) + 1);
+            } else {
+                STORAGE_RETRY_TIMES_MAP.remove(event.getId());
+                getHandleMessageContext().finish();
+            }

Review Comment:
   Based on your code, there is no need to use Map<String, Integer>to increase 
logical complexity. Because the Integer of this Map only stores 0 and 1, using 
Set is sufficient. If not in the Set set, add it and follow the retry logic. If 
there is one in the Set, it is directly processed as successful consumption.
   
   根据你的代码,没必要用Map<String, 
Integer>增加逻辑复杂性。因为该Map的Integer只存0和1,所以用Set就够了。Set集合中没有则添加进去,然后走重试逻辑;集合中有,直接处理为消费成功。



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java:
##########
@@ -52,8 +72,62 @@ protected void rePut(Timeout timeout, long tick, TimeUnit 
timeUnit) {
         timer.newTimeout(timeout.task(), tick, timeUnit);
     }
 
+    public void setEvent(CloudEvent event) {
+        this.event = event;
+    }
+
     @Override
     public void setExecuteTimeHook(long executeTime) {
         this.executeTime = executeTime;
     }
+
+    @Override
+    public final void run(Timeout timeout) throws Exception {
+        boolean eventMeshServerRetryStorageEnabled =
+            
Optional.ofNullable(commonConfiguration).map(CommonConfiguration::isEventMeshServerRetryStorageEnabled)
+                .orElse(false);
+        if (eventMeshServerRetryStorageEnabled) {
+            if (!STORAGE_RETRY_TIMES_MAP.containsKey(event.getId())) {
+                STORAGE_RETRY_TIMES_MAP.put(event.getId(), 0);
+            }
+            Integer maxTimesPerEvent = 
STORAGE_RETRY_TIMES_MAP.get(event.getId());
+            if (maxTimesPerEvent < 1) {
+                StorageRetryStrategy storageRetryStrategy = 
EventMeshExtensionFactory.getExtension(RetryStrategy.class,
+                    
commonConfiguration.getEventMeshStoragePluginType()).getStorageRetryStrategy();
+                String consumerGroupName = 
getConsumerGroupConf().getConsumerGroup();
+                EventMeshProducer producer = 
getProducerManager().getEventMeshProducer(consumerGroupName);
+                RetryConfiguration retryConfiguration = 
RetryConfiguration.builder()
+                    .event(event)
+                    .retryStorageEnabled(eventMeshServerRetryStorageEnabled)
+                    
.consumerGroupName(getConsumerGroupConf().getConsumerGroup())
+                    
.producer(producer.getMqProducerWrapper().getMeshMQProducer())
+                    .topic(getConsumerGroupConf().getTopic())
+                    .build();
+                storageRetryStrategy.retry(retryConfiguration);
+                STORAGE_RETRY_TIMES_MAP.put(event.getId(), 
STORAGE_RETRY_TIMES_MAP.get(event.getId()) + 1);
+            } else {
+                STORAGE_RETRY_TIMES_MAP.remove(event.getId());
+                getHandleMessageContext().finish();
+            }

Review Comment:
   Based on your code, there is no need to use Map<String, Integer>to increase 
logical complexity. Because the Integer of this Map only stores 0 and 1, using 
Set is sufficient. If not in the Set set, add it and follow the retry logic. If 
there is one in the Set, it is directly processed as successful consumption.
   
   根据你的代码,没必要用Map<String, 
Integer>增加逻辑复杂性。因为该Map的Integer只存0和1,所以用Set就够了。Set集合中没有则添加进去,然后走重试逻辑;集合中有,直接处理为消费成功。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to