This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new ab61183030 [ISSUE #7104] Add ReceiptHandleGroupKey for RenewEvent 
(#7105)
ab61183030 is described below

commit ab61183030f4f230619ea539cbd2cb977234208b
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Tue Aug 1 19:00:15 2023 +0800

    [ISSUE #7104] Add ReceiptHandleGroupKey for RenewEvent (#7105)
---
 .../proxy/common/ReceiptHandleGroupKey.java        | 69 ++++++++++++++++++++++
 .../apache/rocketmq/proxy/common/RenewEvent.java   |  9 ++-
 .../proxy/processor/ReceiptHandleProcessor.java    | 55 ++---------------
 .../receipt/DefaultReceiptHandleManager.java       | 36 +++++------
 .../receipt/DefaultReceiptHandleManagerTest.java   |  4 +-
 5 files changed, 101 insertions(+), 72 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java
new file mode 100644
index 0000000000..bd28393e5e
--- /dev/null
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupKey.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import io.netty.channel.Channel;
+
+public class ReceiptHandleGroupKey {
+    protected final Channel channel;
+    protected final String group;
+
+    public ReceiptHandleGroupKey(Channel channel, String group) {
+        this.channel = channel;
+        this.group = group;
+    }
+
+    protected String getChannelId() {
+        return channel.id().asLongText();
+    }
+
+    public String getGroup() {
+        return group;
+    }
+
+    public Channel getChannel() {
+        return channel;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ReceiptHandleGroupKey key = (ReceiptHandleGroupKey) o;
+        return Objects.equal(getChannelId(), key.getChannelId()) && 
Objects.equal(group, key.group);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(getChannelId(), group);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("channelId", getChannelId())
+            .add("group", group)
+            .toString();
+    }
+}
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
index 0ff65c1ccf..8d591560a7 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.client.consumer.AckResult;
 
 public class RenewEvent {
+    protected ReceiptHandleGroupKey key;
     protected MessageReceiptHandle messageReceiptHandle;
     protected long renewTime;
     protected EventType eventType;
@@ -32,13 +33,19 @@ public class RenewEvent {
         CLEAR_GROUP
     }
 
-    public RenewEvent(MessageReceiptHandle messageReceiptHandle, long 
renewTime, EventType eventType, CompletableFuture<AckResult> future) {
+    public RenewEvent(ReceiptHandleGroupKey key, MessageReceiptHandle 
messageReceiptHandle, long renewTime,
+        EventType eventType, CompletableFuture<AckResult> future) {
+        this.key = key;
         this.messageReceiptHandle = messageReceiptHandle;
         this.renewTime = renewTime;
         this.eventType = eventType;
         this.future = future;
     }
 
+    public ReceiptHandleGroupKey getKey() {
+        return key;
+    }
+
     public MessageReceiptHandle getMessageReceiptHandle() {
         return messageReceiptHandle;
     }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 460842a86e..5e1be93218 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -17,19 +17,17 @@
 
 package org.apache.rocketmq.proxy.processor;
 
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
 import io.netty.channel.Channel;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
 import org.apache.rocketmq.common.state.StateEventListener;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.proxy.common.RenewEvent;
 import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
 import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager;
+import org.apache.rocketmq.proxy.common.RenewEvent;
 import org.apache.rocketmq.proxy.service.ServiceManager;
+import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager;
 
 public class ReceiptHandleProcessor extends AbstractProcessor {
     protected final static Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -38,7 +36,8 @@ public class ReceiptHandleProcessor extends AbstractProcessor 
{
     public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, 
ServiceManager serviceManager) {
         super(messagingProcessor, serviceManager);
         StateEventListener<RenewEvent> eventListener = event -> {
-            ProxyContext context = createContext(event.getEventType().name());
+            ProxyContext context = createContext(event.getEventType().name())
+                .setChannel(event.getKey().getChannel());
             MessageReceiptHandle messageReceiptHandle = 
event.getMessageReceiptHandle();
             ReceiptHandle handle = 
ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
             messagingProcessor.changeInvisibleTime(context, handle, 
messageReceiptHandle.getMessageId(),
@@ -66,50 +65,4 @@ public class ReceiptHandleProcessor extends 
AbstractProcessor {
         return receiptHandleManager.removeReceiptHandle(ctx, channel, group, 
msgID, receiptHandle);
     }
 
-    public static class ReceiptHandleGroupKey {
-        protected final Channel channel;
-        protected final String group;
-
-        public ReceiptHandleGroupKey(Channel channel, String group) {
-            this.channel = channel;
-            this.group = group;
-        }
-
-        protected String getChannelId() {
-            return channel.id().asLongText();
-        }
-
-        public String getGroup() {
-            return group;
-        }
-
-        public Channel getChannel() {
-            return channel;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            ReceiptHandleGroupKey key = (ReceiptHandleGroupKey) o;
-            return Objects.equal(getChannelId(), key.getChannelId()) && 
Objects.equal(group, key.group);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hashCode(getChannelId(), group);
-        }
-
-        @Override
-        public String toString() {
-            return MoreObjects.toStringHelper(this)
-                .add("channelId", getChannelId())
-                .add("group", group)
-                .toString();
-        }
-    }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
index 9f35435f0d..69f44344a0 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -55,7 +55,7 @@ import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
 import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
+import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey;
 import org.apache.rocketmq.proxy.service.metadata.MetadataService;
 import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
@@ -64,7 +64,7 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
     protected final static Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
     protected final MetadataService metadataService;
     protected final ConsumerManager consumerManager;
-    protected final 
ConcurrentMap<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> 
receiptHandleGroupMap;
+    protected final ConcurrentMap<ReceiptHandleGroupKey, ReceiptHandleGroup> 
receiptHandleGroupMap;
     protected final StateEventListener<RenewEvent> eventListener;
     protected final static RetryPolicy RENEW_POLICY = new 
RenewStrategyPolicy();
     protected final ScheduledExecutorService scheduledExecutorService =
@@ -96,7 +96,7 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
                             // if the channel sync from other proxy is 
expired, not to clear data of connect to current proxy
                             return;
                         }
-                        clearGroup(new 
ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), 
group));
+                        clearGroup(new 
ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
                         log.info("clear handle of this client when client 
unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo);
                     }
                 }
@@ -125,19 +125,19 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
     }
 
     public void addReceiptHandle(ProxyContext context, Channel channel, String 
group, String msgID, MessageReceiptHandle messageReceiptHandle) {
-        ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new 
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group),
+        ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new 
ReceiptHandleGroupKey(channel, group),
             k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle);
     }
 
     public MessageReceiptHandle removeReceiptHandle(ProxyContext context, 
Channel channel, String group, String msgID, String receiptHandle) {
-        ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new 
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group));
+        ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new 
ReceiptHandleGroupKey(channel, group));
         if (handleGroup == null) {
             return null;
         }
         return handleGroup.remove(msgID, receiptHandle);
     }
 
-    protected boolean 
clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) {
+    protected boolean clientIsOffline(ReceiptHandleGroupKey groupKey) {
         return this.consumerManager.findChannel(groupKey.getGroup(), 
groupKey.getChannel()) == null;
     }
 
@@ -145,8 +145,8 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
         Stopwatch stopwatch = Stopwatch.createStarted();
         try {
             ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
-            for (Map.Entry<ReceiptHandleProcessor.ReceiptHandleGroupKey, 
ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) {
-                ReceiptHandleProcessor.ReceiptHandleGroupKey key = 
entry.getKey();
+            for (Map.Entry<ReceiptHandleGroupKey, ReceiptHandleGroup> entry : 
receiptHandleGroupMap.entrySet()) {
+                ReceiptHandleGroupKey key = entry.getKey();
                 if (clientIsOffline(key)) {
                     clearGroup(key);
                     continue;
@@ -159,7 +159,7 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
                     if (handle.getNextVisibleTime() - current > 
proxyConfig.getRenewAheadTimeMillis()) {
                         return;
                     }
-                    renewalWorkerService.submit(() -> renewMessage(group, 
msgID, handleStr));
+                    renewalWorkerService.submit(() -> renewMessage(key, group, 
msgID, handleStr));
                 });
             }
         } catch (Exception e) {
@@ -169,15 +169,15 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
         log.debug("scan for renewal done. cost:{}ms", 
stopwatch.elapsed().toMillis());
     }
 
-    protected void renewMessage(ReceiptHandleGroup group, String msgID, String 
handleStr) {
+    protected void renewMessage(ReceiptHandleGroupKey key, ReceiptHandleGroup 
group, String msgID, String handleStr) {
         try {
-            group.computeIfPresent(msgID, handleStr, this::startRenewMessage);
+            group.computeIfPresent(msgID, handleStr, messageReceiptHandle -> 
startRenewMessage(key, messageReceiptHandle));
         } catch (Exception e) {
             log.error("error when renew message. msgID:{}, handleStr:{}", 
msgID, handleStr, e);
         }
     }
 
-    protected CompletableFuture<MessageReceiptHandle> 
startRenewMessage(MessageReceiptHandle messageReceiptHandle) {
+    protected CompletableFuture<MessageReceiptHandle> 
startRenewMessage(ReceiptHandleGroupKey key, MessageReceiptHandle 
messageReceiptHandle) {
         CompletableFuture<MessageReceiptHandle> resFuture = new 
CompletableFuture<>();
         ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
         long current = System.currentTimeMillis();
@@ -188,7 +188,7 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
             }
             if (current - messageReceiptHandle.getConsumeTimestamp() < 
proxyConfig.getRenewMaxTimeMillis()) {
                 CompletableFuture<AckResult> future = new 
CompletableFuture<>();
-                eventListener.fireEvent(new RenewEvent(messageReceiptHandle, 
RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), 
RenewEvent.EventType.RENEW, future));
+                eventListener.fireEvent(new RenewEvent(key, 
messageReceiptHandle, 
RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), 
RenewEvent.EventType.RENEW, future));
                 future.whenComplete((ackResult, throwable) -> {
                     if (throwable != null) {
                         log.error("error when renew. handle:{}", 
messageReceiptHandle, throwable);
@@ -218,7 +218,7 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
                 }
                 RetryPolicy retryPolicy = 
subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
                 CompletableFuture<AckResult> future = new 
CompletableFuture<>();
-                eventListener.fireEvent(new RenewEvent(messageReceiptHandle, 
retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), 
RenewEvent.EventType.STOP_RENEW, future));
+                eventListener.fireEvent(new RenewEvent(key, 
messageReceiptHandle, 
retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), 
RenewEvent.EventType.STOP_RENEW, future));
                 future.whenComplete((ackResult, throwable) -> {
                     if (throwable != null) {
                         log.error("error when nack in renew. handle:{}", 
messageReceiptHandle, throwable);
@@ -233,7 +233,7 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
         return resFuture;
     }
 
-    protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey 
key) {
+    protected void clearGroup(ReceiptHandleGroupKey key) {
         if (key == null) {
             return;
         }
@@ -246,7 +246,7 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
             try {
                 handleGroup.computeIfPresent(msgID, handle, 
messageReceiptHandle -> {
                     CompletableFuture<AckResult> future = new 
CompletableFuture<>();
-                    eventListener.fireEvent(new 
RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), 
RenewEvent.EventType.CLEAR_GROUP, future));
+                    eventListener.fireEvent(new RenewEvent(key, 
messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), 
RenewEvent.EventType.CLEAR_GROUP, future));
                     return CompletableFuture.completedFuture(null);
                 });
             } catch (Exception e) {
@@ -257,8 +257,8 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
 
     protected void clearAllHandle() {
         log.info("start clear all handle in receiptHandleProcessor");
-        Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet = 
receiptHandleGroupMap.keySet();
-        for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) {
+        Set<ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet();
+        for (ReceiptHandleGroupKey key : keySet) {
             clearGroup(key);
         }
         log.info("clear all handle in receiptHandleProcessor done");
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
index 7c6943e44a..25ae1509a9 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
@@ -45,7 +45,7 @@ import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.proxy.processor.MessagingProcessor;
-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
+import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey;
 import org.apache.rocketmq.proxy.service.BaseServiceTest;
 import org.apache.rocketmq.proxy.service.metadata.MetadataService;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
@@ -445,7 +445,7 @@ public class DefaultReceiptHandleManagerTest extends 
BaseServiceTest {
     public void testClearGroup() {
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
         receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
-        receiptHandleManager.clearGroup(new 
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP));
+        receiptHandleManager.clearGroup(new ReceiptHandleGroupKey(channel, 
GROUP));
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
         receiptHandleManager.scheduleRenewTask();

Reply via email to