This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e0f5295fed passing the renew event type to create the correct context 
(#7045)
e0f5295fed is described below

commit e0f5295fed8791d93bfa5b8420074c00b651ddfe
Author: lk <[email protected]>
AuthorDate: Wed Jul 19 15:55:11 2023 +0800

    passing the renew event type to create the correct context (#7045)
---
 .../java/org/apache/rocketmq/proxy/common/RenewEvent.java  | 14 +++++++++++++-
 .../rocketmq/proxy/processor/ReceiptHandleProcessor.java   |  2 +-
 .../proxy/service/receipt/DefaultReceiptHandleManager.java |  6 +++---
 3 files changed, 17 insertions(+), 5 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
index fdf9833ccd..0ff65c1ccf 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
@@ -23,11 +23,19 @@ import org.apache.rocketmq.client.consumer.AckResult;
 public class RenewEvent {
     protected MessageReceiptHandle messageReceiptHandle;
     protected long renewTime;
+    protected EventType eventType;
     protected CompletableFuture<AckResult> future;
 
-    public RenewEvent(MessageReceiptHandle messageReceiptHandle, long 
renewTime, CompletableFuture<AckResult> future) {
+    public enum EventType {
+        RENEW,
+        STOP_RENEW,
+        CLEAR_GROUP
+    }
+
+    public RenewEvent(MessageReceiptHandle messageReceiptHandle, long 
renewTime, EventType eventType, CompletableFuture<AckResult> future) {
         this.messageReceiptHandle = messageReceiptHandle;
         this.renewTime = renewTime;
+        this.eventType = eventType;
         this.future = future;
     }
 
@@ -39,6 +47,10 @@ public class RenewEvent {
         return renewTime;
     }
 
+    public EventType getEventType() {
+        return eventType;
+    }
+
     public CompletableFuture<AckResult> getFuture() {
         return future;
     }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index fc49e76229..460842a86e 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -38,7 +38,7 @@ public class ReceiptHandleProcessor extends AbstractProcessor 
{
     public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, 
ServiceManager serviceManager) {
         super(messagingProcessor, serviceManager);
         StateEventListener<RenewEvent> eventListener = event -> {
-            ProxyContext context = createContext("RenewMessage");
+            ProxyContext context = createContext(event.getEventType().name());
             MessageReceiptHandle messageReceiptHandle = 
event.getMessageReceiptHandle();
             ReceiptHandle handle = 
ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
             messagingProcessor.changeInvisibleTime(context, handle, 
messageReceiptHandle.getMessageId(),
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
index c7633d658a..9f35435f0d 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -188,7 +188,7 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
             }
             if (current - messageReceiptHandle.getConsumeTimestamp() < 
proxyConfig.getRenewMaxTimeMillis()) {
                 CompletableFuture<AckResult> future = new 
CompletableFuture<>();
-                eventListener.fireEvent(new RenewEvent(messageReceiptHandle, 
RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future));
+                eventListener.fireEvent(new RenewEvent(messageReceiptHandle, 
RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), 
RenewEvent.EventType.RENEW, future));
                 future.whenComplete((ackResult, throwable) -> {
                     if (throwable != null) {
                         log.error("error when renew. handle:{}", 
messageReceiptHandle, throwable);
@@ -218,7 +218,7 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
                 }
                 RetryPolicy retryPolicy = 
subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
                 CompletableFuture<AckResult> future = new 
CompletableFuture<>();
-                eventListener.fireEvent(new RenewEvent(messageReceiptHandle, 
retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), 
future));
+                eventListener.fireEvent(new RenewEvent(messageReceiptHandle, 
retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), 
RenewEvent.EventType.STOP_RENEW, future));
                 future.whenComplete((ackResult, throwable) -> {
                     if (throwable != null) {
                         log.error("error when nack in renew. handle:{}", 
messageReceiptHandle, throwable);
@@ -246,7 +246,7 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
             try {
                 handleGroup.computeIfPresent(msgID, handle, 
messageReceiptHandle -> {
                     CompletableFuture<AckResult> future = new 
CompletableFuture<>();
-                    eventListener.fireEvent(new 
RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), 
future));
+                    eventListener.fireEvent(new 
RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), 
RenewEvent.EventType.CLEAR_GROUP, future));
                     return CompletableFuture.completedFuture(null);
                 });
             } catch (Exception e) {

Reply via email to