pandaapo commented on code in PR #4544:
URL: https://github.com/apache/eventmesh/pull/4544#discussion_r1386308831


##########
eventmesh-runtime/build.gradle:
##########
@@ -72,6 +72,9 @@ dependencies {
     implementation project(":eventmesh-webhook:eventmesh-webhook-api")
     implementation project(":eventmesh-webhook:eventmesh-webhook-receive")
 
+    implementation project(":eventmesh-retry:eventmesh-retry-api")
+    implementation project(":eventmesh-retry:eventmesh-retry-rocketmq")

Review Comment:
   This dependency is redundant. If this dependency is needed, decoupling 
cannot be achieved.
   
   这个依赖是多余的。如果需要该依赖,就做不到解耦了。



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java:
##########
@@ -52,8 +72,62 @@ protected void rePut(Timeout timeout, long tick, TimeUnit 
timeUnit) {
         timer.newTimeout(timeout.task(), tick, timeUnit);
     }
 
+    public void setEvent(CloudEvent event) {
+        this.event = event;
+    }
+
     @Override
     public void setExecuteTimeHook(long executeTime) {
         this.executeTime = executeTime;
     }
+
+    @Override
+    public final void run(Timeout timeout) throws Exception {
+        boolean eventMeshServerRetryStorageEnabled =
+            
Optional.ofNullable(commonConfiguration).map(CommonConfiguration::isEventMeshServerRetryStorageEnabled)
+                .orElse(false);
+        if (eventMeshServerRetryStorageEnabled) {
+            if (!STORAGE_RETRY_TIMES_MAP.containsKey(event.getId())) {
+                STORAGE_RETRY_TIMES_MAP.put(event.getId(), 0);
+            }
+            Integer maxTimesPerEvent = 
STORAGE_RETRY_TIMES_MAP.get(event.getId());
+            if (maxTimesPerEvent < 1) {
+                StorageRetryStrategy storageRetryStrategy = 
EventMeshExtensionFactory.getExtension(RetryStrategy.class,
+                    
commonConfiguration.getEventMeshStoragePluginType()).getStorageRetryStrategy();
+                String consumerGroupName = 
getConsumerGroupConf().getConsumerGroup();
+                EventMeshProducer producer = 
getProducerManager().getEventMeshProducer(consumerGroupName);
+                RetryConfiguration retryConfiguration = 
RetryConfiguration.builder()
+                    .event(event)
+                    .retryStorageEnabled(eventMeshServerRetryStorageEnabled)
+                    
.consumerGroupName(getConsumerGroupConf().getConsumerGroup())
+                    
.producer(producer.getMqProducerWrapper().getMeshMQProducer())
+                    .topic(getConsumerGroupConf().getTopic())
+                    .build();
+                storageRetryStrategy.retry(retryConfiguration);
+                STORAGE_RETRY_TIMES_MAP.put(event.getId(), 
STORAGE_RETRY_TIMES_MAP.get(event.getId()) + 1);
+            } else {
+                STORAGE_RETRY_TIMES_MAP.remove(event.getId());
+                getHandleMessageContext().finish();
+            }
+        } else {
+            doRun(timeout);

Review Comment:
   Suggest modifying the method name, as the logic of `doRun()` and the 
remaining logic of the `run()` method are juxtaposed. Just one is based on the 
retry function provided by external storage, and the other is the in memory 
retry provided by EventMesh.
   
   
建议修改下该方法名,因为doRun()的逻辑和该run()方法剩下的逻辑是并列的。只不过一个是基于外部存储提供的重试功能,一个是EventMesh提供的内存中重试。



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java:
##########
@@ -52,8 +76,87 @@ protected void rePut(Timeout timeout, long tick, TimeUnit 
timeUnit) {
         timer.newTimeout(timeout.task(), tick, timeUnit);
     }
 
+    public void setEvent(CloudEvent event) {
+        this.event = event;
+    }
+
     @Override
     public void setExecuteTimeHook(long executeTime) {
         this.executeTime = executeTime;
     }
+
+    @Override
+    public final void run(Timeout timeout) throws Exception {
+        boolean eventMeshServerRetryStorageEnabled = 
commonConfiguration.isEventMeshServerRetryStorageEnabled();
+        if (eventMeshServerRetryStorageEnabled) {
+            if (!STORAGE_RETRY_TIMES_MAP.containsKey(event.getId())) {
+                STORAGE_RETRY_TIMES_MAP.put(event.getId(), 0);
+            }
+            Integer maxTimesPerEvent = 
STORAGE_RETRY_TIMES_MAP.get(event.getId());
+            if (maxTimesPerEvent < 1) {
+                sendMessageBack(event);
+                STORAGE_RETRY_TIMES_MAP.put(event.getId(), 
STORAGE_RETRY_TIMES_MAP.get(event.getId()) + 1);
+            } else {
+                getHandleMessageContext().finish();

Review Comment:
   Based on your code, there is no need to use Map<String, Integer>to increase 
logical complexity. Because the Integer of this Map only stores 0 and 1, using 
Set is sufficient. If not in the Set set, add it and follow the retry logic. If 
there is one in the Set, it is directly processed as successful consumption.
   
   根据你的代码,没必要用Map<String, 
Integer>增加逻辑复杂性。因为该Map的Integer只存0和1,所以用Set就够了。Set集合中没有则添加进去,然后走重试逻辑;集合中有,直接处理为消费成功。



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupConf.java:
##########
@@ -27,17 +27,24 @@ public class ConsumerGroupConf implements Serializable {
     // eg . 5013-1A0
     private String consumerGroup;
 
+    private String topic;
+
     private final ConcurrentHashMap<String/* topic */, ConsumerGroupTopicConf> 
consumerGroupTopicConf =

Review Comment:
   The original `ConsumerGroupConf` can store multiple topics. It is also 
common for a consumption group to correspond to multiple topics. Would it be 
appropriate if you changed it to only store one topic?
   
   原来的`ConsumerGroupConf`,可以存储多个topic。一个消费组对应多个topic也很常见。你将其改成了只能存储一个topic,是否合适?



##########
eventmesh-storage-plugin/eventmesh-storage-api/src/main/java/org/apache/eventmesh/api/TopicNameHelper.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.api;
+
+import org.apache.eventmesh.spi.EventMeshExtensionType;
+import org.apache.eventmesh.spi.EventMeshSPI;
+
+import lombok.SneakyThrows;
+
+/**
+ * Topic name generator.
+ */
+@EventMeshSPI(isSingleton = false, eventMeshExtensionType = 
EventMeshExtensionType.STORAGE)
+public interface TopicNameHelper {
+
+    @SneakyThrows
+    default String generateRetryTopicName(String topicName) {
+        throw new IllegalAccessException("Method not supported.");
+    }

Review Comment:
   This method is no longer used in your new submission.



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/SendMessageContext.java:
##########
@@ -44,14 +46,16 @@ public class SendMessageContext extends RetryContext {
 
     private long createTime = System.currentTimeMillis();
 
-    public EventMeshGrpcServer eventMeshGrpcServer;
+    public AbstractRemotingServer eventMeshServer;
+
+    private List<CloudEvent> eventList;

Review Comment:
   Redundant attribute and corresponding setter getter.



##########
eventmesh-runtime/build.gradle:
##########
@@ -72,6 +72,9 @@ dependencies {
     implementation project(":eventmesh-webhook:eventmesh-webhook-api")
     implementation project(":eventmesh-webhook:eventmesh-webhook-receive")
 
+    implementation project(":eventmesh-retry:eventmesh-retry-api")
+    implementation project(":eventmesh-retry:eventmesh-retry-rocketmq")

Review Comment:
   This dependency is redundant. If this dependency is needed, decoupling 
cannot be achieved.
   
   这个依赖是多余的。如果需要该依赖,就做不到解耦了。



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupConf.java:
##########
@@ -27,17 +27,24 @@ public class ConsumerGroupConf implements Serializable {
     // eg . 5013-1A0
     private String consumerGroup;
 
+    private String topic;
+
     private final ConcurrentHashMap<String/* topic */, ConsumerGroupTopicConf> 
consumerGroupTopicConf =

Review Comment:
   The original `ConsumerGroupConf` can store multiple topics. It is also 
common for a consumption group to correspond to multiple topics. Would it be 
appropriate if you changed it to only store one topic?
   
   原来的`ConsumerGroupConf`,可以存储多个topic。一个消费组对应多个topic也很常见。你将其改成了只能存储一个topic,是否合适?



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/EventMeshProducer.java:
##########
@@ -102,4 +105,16 @@ public String toString() {
             .append(producerGroupConfig).append("}");
         return sb.toString();
     }
+
+    public MQProducerWrapper getMqProducerWrapper() {
+        return mqProducerWrapper;
+    }
+
+    public boolean isInited() {

Review Comment:
   Redundant method.



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java:
##########
@@ -52,8 +72,62 @@ protected void rePut(Timeout timeout, long tick, TimeUnit 
timeUnit) {
         timer.newTimeout(timeout.task(), tick, timeUnit);
     }
 
+    public void setEvent(CloudEvent event) {
+        this.event = event;
+    }
+
     @Override
     public void setExecuteTimeHook(long executeTime) {
         this.executeTime = executeTime;
     }
+
+    @Override
+    public final void run(Timeout timeout) throws Exception {
+        boolean eventMeshServerRetryStorageEnabled =
+            
Optional.ofNullable(commonConfiguration).map(CommonConfiguration::isEventMeshServerRetryStorageEnabled)
+                .orElse(false);
+        if (eventMeshServerRetryStorageEnabled) {
+            if (!STORAGE_RETRY_TIMES_MAP.containsKey(event.getId())) {
+                STORAGE_RETRY_TIMES_MAP.put(event.getId(), 0);
+            }
+            Integer maxTimesPerEvent = 
STORAGE_RETRY_TIMES_MAP.get(event.getId());
+            if (maxTimesPerEvent < 1) {
+                StorageRetryStrategy storageRetryStrategy = 
EventMeshExtensionFactory.getExtension(RetryStrategy.class,
+                    
commonConfiguration.getEventMeshStoragePluginType()).getStorageRetryStrategy();
+                String consumerGroupName = 
getConsumerGroupConf().getConsumerGroup();
+                EventMeshProducer producer = 
getProducerManager().getEventMeshProducer(consumerGroupName);
+                RetryConfiguration retryConfiguration = 
RetryConfiguration.builder()
+                    .event(event)
+                    .retryStorageEnabled(eventMeshServerRetryStorageEnabled)
+                    
.consumerGroupName(getConsumerGroupConf().getConsumerGroup())
+                    
.producer(producer.getMqProducerWrapper().getMeshMQProducer())
+                    .topic(getConsumerGroupConf().getTopic())
+                    .build();
+                storageRetryStrategy.retry(retryConfiguration);
+                STORAGE_RETRY_TIMES_MAP.put(event.getId(), 
STORAGE_RETRY_TIMES_MAP.get(event.getId()) + 1);
+            } else {
+                STORAGE_RETRY_TIMES_MAP.remove(event.getId());
+                getHandleMessageContext().finish();
+            }
+        } else {
+            doRun(timeout);

Review Comment:
   Suggest modifying the method name, as the logic of `doRun()` and the 
remaining logic of the `run()` method are juxtaposed. Just one is based on the 
retry function provided by external storage, and the other is the in memory 
retry provided by EventMesh.
   
   
建议修改下该方法名,因为doRun()的逻辑和该run()方法剩下的逻辑是并列的。只不过一个是基于外部存储提供的重试功能,一个是EventMesh提供的内存中重试。



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/RetryContext.java:
##########
@@ -52,8 +76,87 @@ protected void rePut(Timeout timeout, long tick, TimeUnit 
timeUnit) {
         timer.newTimeout(timeout.task(), tick, timeUnit);
     }
 
+    public void setEvent(CloudEvent event) {
+        this.event = event;
+    }
+
     @Override
     public void setExecuteTimeHook(long executeTime) {
         this.executeTime = executeTime;
     }
+
+    @Override
+    public final void run(Timeout timeout) throws Exception {
+        boolean eventMeshServerRetryStorageEnabled = 
commonConfiguration.isEventMeshServerRetryStorageEnabled();
+        if (eventMeshServerRetryStorageEnabled) {
+            if (!STORAGE_RETRY_TIMES_MAP.containsKey(event.getId())) {
+                STORAGE_RETRY_TIMES_MAP.put(event.getId(), 0);
+            }
+            Integer maxTimesPerEvent = 
STORAGE_RETRY_TIMES_MAP.get(event.getId());
+            if (maxTimesPerEvent < 1) {
+                sendMessageBack(event);
+                STORAGE_RETRY_TIMES_MAP.put(event.getId(), 
STORAGE_RETRY_TIMES_MAP.get(event.getId()) + 1);
+            } else {
+                getHandleMessageContext().finish();

Review Comment:
   Based on your code, there is no need to use Map<String, Integer>to increase 
logical complexity. Because the Integer of this Map only stores 0 and 1, using 
Set is sufficient. If not in the Set set, add it and follow the retry logic. If 
there is one in the Set, it is directly processed as successful consumption.
   
   根据你的代码,没必要用Map<String, 
Integer>增加逻辑复杂性。因为该Map的Integer只存0和1,所以用Set就够了。Set集合中没有则添加进去,然后走重试逻辑;集合中有,直接处理为消费成功。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to