xwm1992 commented on code in PR #4544:
URL: https://github.com/apache/eventmesh/pull/4544#discussion_r1385860254


##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java:
##########
@@ -52,8 +76,87 @@ protected void rePut(Timeout timeout, long tick, TimeUnit 
timeUnit) {
         timer.newTimeout(timeout.task(), tick, timeUnit);
     }
 
+    public void setEvent(CloudEvent event) {
+        this.event = event;
+    }
+
     @Override
     public void setExecuteTimeHook(long executeTime) {
         this.executeTime = executeTime;
     }
+
+    @Override
+    public final void run(Timeout timeout) throws Exception {
+        boolean eventMeshServerRetryStorageEnabled = 
commonConfiguration.isEventMeshServerRetryStorageEnabled();
+        if (eventMeshServerRetryStorageEnabled) {
+            if (!STORAGE_RETRY_TIMES_MAP.containsKey(event.getId())) {
+                STORAGE_RETRY_TIMES_MAP.put(event.getId(), 0);
+            }
+            Integer maxTimesPerEvent = 
STORAGE_RETRY_TIMES_MAP.get(event.getId());
+            if (maxTimesPerEvent < 1) {
+                sendMessageBack(event);
+                STORAGE_RETRY_TIMES_MAP.put(event.getId(), 
STORAGE_RETRY_TIMES_MAP.get(event.getId()) + 1);
+            } else {
+                getHandleMessageContext().finish();
+            }
+        } else {
+            doRun(timeout);
+        }
+    }
+
+    protected HandleMessageContext getHandleMessageContext() throws Exception {
+        throw new IllegalAccessException("method not supported.");
+    }
+
+    public abstract void doRun(Timeout timeout) throws Exception;
+
+    @SneakyThrows
+    protected ProducerManager getProducerManager() {
+        throw new IllegalAccessException("method not supported.");
+    }
+
+    @SneakyThrows
+    protected ConsumerGroupConf getConsumerGroupConf() {
+        throw new IllegalAccessException("method not supported.");
+    }
+
+    private void sendMessageBack(final CloudEvent event) throws Exception {
+        String topic = event.getSubject();
+        String bizSeqNo = 
Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey())).toString();
+        String uniqueId = 
Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey())).toString();
+
+        ConsumerGroupConf consumerGroupConf = getConsumerGroupConf();
+        String consumerGroupName = consumerGroupConf.getConsumerGroup();
+        EventMeshProducer sendMessageBack = 
getProducerManager().getEventMeshProducer(consumerGroupName);
+
+        if (sendMessageBack == null) {
+            log.warn("consumer:{} consume fail, sendMessageBack, topic:{}, 
bizSeqNo:{}, uniqueId:{}",
+                consumerGroupName, topic, bizSeqNo, uniqueId);
+            return;
+        }
+        TopicNameHelper topicNameGenerator = 
EventMeshExtensionFactory.getExtension(TopicNameHelper.class,
+            commonConfiguration.getEventMeshStoragePluginType());
+        String retryTopicName = 
topicNameGenerator.generateRetryTopicName(consumerGroupName);

Review Comment:
   > Send the message to the MQ retry queue for retry, more accurately 
RocketMQ. Because other mainstream MQ does not have a retry queue like 
RocketMQ, for example, RabbitMQ's retry queue is a regular queue that is only 
controlled through the `x-max-retries` attribute in the message header, and 
Kafka does not have a retry queue.
   > 
   > This also vaguely reminds me of the reason why PR #4179 was not merged, as 
if considering that this method can only be implemented for RocketMQ and is 
difficult to extend to other MQ. Is that the reason? @xwm1992
   > 
   > 
将消息发给MQ的重试队列进行重试,更准确的说是RocketMQ。因为其他主流的MQ并没有像RocketMQ一样的重试队列,比如RabbitMQ的重试队列是普通队列,只是通过消息头的`x-max-retries`属性来控制、Kafka没有重试队列。
 这也让我模糊地想起了没有合并PR #4179的原因,好像是考虑这种方式只能针对RocketMQ来实现,难以拓展到其他MQ。是这个原因吗?@xwm1992
   
   Yes, the main point of this pr is to support the expansion of different 
retry strategies and decouple the storage-dependent retry strategy from 
eventmesh
   
   ---
   是的,这个pr的主要修改点在于支持扩展不同的重试策略,将依赖存储的重试策略与eventmesh解耦



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to