yanrongzhen commented on code in PR #4544:
URL: https://github.com/apache/eventmesh/pull/4544#discussion_r1386138028


##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java:
##########
@@ -52,8 +76,87 @@ protected void rePut(Timeout timeout, long tick, TimeUnit 
timeUnit) {
         timer.newTimeout(timeout.task(), tick, timeUnit);
     }
 
+    public void setEvent(CloudEvent event) {
+        this.event = event;
+    }
+
     @Override
     public void setExecuteTimeHook(long executeTime) {
         this.executeTime = executeTime;
     }
+
+    @Override
+    public final void run(Timeout timeout) throws Exception {
+        boolean eventMeshServerRetryStorageEnabled = 
commonConfiguration.isEventMeshServerRetryStorageEnabled();
+        if (eventMeshServerRetryStorageEnabled) {
+            if (!STORAGE_RETRY_TIMES_MAP.containsKey(event.getId())) {
+                STORAGE_RETRY_TIMES_MAP.put(event.getId(), 0);
+            }
+            Integer maxTimesPerEvent = 
STORAGE_RETRY_TIMES_MAP.get(event.getId());
+            if (maxTimesPerEvent < 1) {
+                sendMessageBack(event);
+                STORAGE_RETRY_TIMES_MAP.put(event.getId(), 
STORAGE_RETRY_TIMES_MAP.get(event.getId()) + 1);
+            } else {
+                getHandleMessageContext().finish();
+            }
+        } else {
+            doRun(timeout);
+        }
+    }
+
+    protected HandleMessageContext getHandleMessageContext() throws Exception {
+        throw new IllegalAccessException("method not supported.");
+    }
+
+    public abstract void doRun(Timeout timeout) throws Exception;
+
+    @SneakyThrows
+    protected ProducerManager getProducerManager() {
+        throw new IllegalAccessException("method not supported.");
+    }
+
+    @SneakyThrows
+    protected ConsumerGroupConf getConsumerGroupConf() {
+        throw new IllegalAccessException("method not supported.");
+    }
+
+    private void sendMessageBack(final CloudEvent event) throws Exception {
+        String topic = event.getSubject();
+        String bizSeqNo = 
Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey())).toString();
+        String uniqueId = 
Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey())).toString();
+
+        ConsumerGroupConf consumerGroupConf = getConsumerGroupConf();
+        String consumerGroupName = consumerGroupConf.getConsumerGroup();
+        EventMeshProducer sendMessageBack = 
getProducerManager().getEventMeshProducer(consumerGroupName);
+
+        if (sendMessageBack == null) {
+            log.warn("consumer:{} consume fail, sendMessageBack, topic:{}, 
bizSeqNo:{}, uniqueId:{}",
+                consumerGroupName, topic, bizSeqNo, uniqueId);
+            return;
+        }
+        TopicNameHelper topicNameGenerator = 
EventMeshExtensionFactory.getExtension(TopicNameHelper.class,
+            commonConfiguration.getEventMeshStoragePluginType());
+        String retryTopicName = 
topicNameGenerator.generateRetryTopicName(consumerGroupName);

Review Comment:
   I optimized this in a new round of commits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to