This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6f115385b [ISSUE #6149] remove handle when exceed renewMaxTimeMillis 
(#6150)
6f115385b is described below

commit 6f115385bbc37a837f11b6999e1b05d1c487ba93
Author: lk <[email protected]>
AuthorDate: Tue Feb 21 19:00:33 2023 +0800

    [ISSUE #6149] remove handle when exceed renewMaxTimeMillis (#6150)
---
 .../proxy/common/MessageReceiptHandle.java         | 28 +++++++---------------
 .../proxy/processor/ReceiptHandleProcessor.java    |  2 +-
 .../processor/ReceiptHandleProcessorTest.java      |  5 ++--
 3 files changed, 13 insertions(+), 22 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
index 379e644f7..0b3c241d1 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
@@ -32,8 +32,7 @@ public class MessageReceiptHandle {
     private final int reconsumeTimes;
 
     private final AtomicInteger renewRetryTimes = new AtomicInteger(0);
-    private volatile long timestamp;
-    private volatile long expectInvisibleTime;
+    private final long consumeTimestamp;
     private volatile String receiptHandleStr;
 
     public MessageReceiptHandle(String group, String topic, int queueId, 
String receiptHandleStr, String messageId,
@@ -47,8 +46,7 @@ public class MessageReceiptHandle {
         this.messageId = messageId;
         this.queueOffset = queueOffset;
         this.reconsumeTimes = reconsumeTimes;
-        this.expectInvisibleTime = receiptHandle.getInvisibleTime();
-        this.timestamp = receiptHandle.getRetrieveTime();
+        this.consumeTimestamp = receiptHandle.getRetrieveTime();
     }
 
     @Override
@@ -60,8 +58,8 @@ public class MessageReceiptHandle {
             return false;
         }
         MessageReceiptHandle handle = (MessageReceiptHandle) o;
-        return queueId == handle.queueId && queueOffset == handle.queueOffset 
&& timestamp == handle.timestamp
-            && reconsumeTimes == handle.reconsumeTimes && expectInvisibleTime 
== handle.expectInvisibleTime
+        return queueId == handle.queueId && queueOffset == handle.queueOffset 
&& consumeTimestamp == handle.consumeTimestamp
+            && reconsumeTimes == handle.reconsumeTimes
             && Objects.equal(group, handle.group) && Objects.equal(topic, 
handle.topic)
             && Objects.equal(messageId, handle.messageId) && 
Objects.equal(originalReceiptHandleStr, handle.originalReceiptHandleStr)
             && Objects.equal(receiptHandleStr, handle.receiptHandleStr);
@@ -69,8 +67,8 @@ public class MessageReceiptHandle {
 
     @Override
     public int hashCode() {
-        return Objects.hashCode(group, topic, queueId, messageId, queueOffset, 
originalReceiptHandleStr, timestamp,
-            reconsumeTimes, expectInvisibleTime, receiptHandleStr);
+        return Objects.hashCode(group, topic, queueId, messageId, queueOffset, 
originalReceiptHandleStr, consumeTimestamp,
+            reconsumeTimes, receiptHandleStr);
     }
 
     @Override
@@ -84,8 +82,7 @@ public class MessageReceiptHandle {
             .add("originalReceiptHandleStr", originalReceiptHandleStr)
             .add("reconsumeTimes", reconsumeTimes)
             .add("renewRetryTimes", renewRetryTimes)
-            .add("timestamp", timestamp)
-            .add("expectInvisibleTime", expectInvisibleTime)
+            .add("firstConsumeTimestamp", consumeTimestamp)
             .add("receiptHandleStr", receiptHandleStr)
             .toString();
     }
@@ -122,19 +119,12 @@ public class MessageReceiptHandle {
         return reconsumeTimes;
     }
 
-    public long getTimestamp() {
-        return timestamp;
-    }
-
-    public long getExpectInvisibleTime() {
-        return expectInvisibleTime;
+    public long getConsumeTimestamp() {
+        return consumeTimestamp;
     }
 
     public void updateReceiptHandle(String receiptHandleStr) {
-        ReceiptHandle receiptHandle = ReceiptHandle.decode(receiptHandleStr);
         this.receiptHandleStr = receiptHandleStr;
-        this.expectInvisibleTime = receiptHandle.getInvisibleTime();
-        this.timestamp = receiptHandle.getRetrieveTime();
     }
 
     public int incrementAndGetRenewRetryTimes() {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 5e096bc6b..bbd507070 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -174,7 +174,7 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
                 log.warn("handle has exceed max renewRetryTimes. handle:{}", 
messageReceiptHandle);
                 return CompletableFuture.completedFuture(null);
             }
-            if (current - messageReceiptHandle.getTimestamp() < 
messageReceiptHandle.getExpectInvisibleTime()) {
+            if (current - messageReceiptHandle.getConsumeTimestamp() < 
proxyConfig.getRenewMaxTimeMillis()) {
                 CompletableFuture<AckResult> future =
                     messagingProcessor.changeInvisibleTime(context, handle, 
messageReceiptHandle.getMessageId(),
                         messageReceiptHandle.getGroup(), 
messageReceiptHandle.getTopic(), proxyConfig.getRenewSliceTimeMillis());
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index 99c662830..33057da6e 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -261,10 +261,11 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
 
     @Test
     public void testRenewReceiptHandleWhenTimeout() {
-        long newInvisibleTime = 0L;
+        long newInvisibleTime = 200L;
+        long maxRenewMs = 
ConfigurationManager.getProxyConfig().getRenewMaxTimeMillis();
         String newReceiptHandle = ReceiptHandle.builder()
             .startOffset(0L)
-            .retrieveTime(0)
+            .retrieveTime(System.currentTimeMillis() - maxRenewMs)
             .invisibleTime(newInvisibleTime)
             .reviveQueueId(1)
             .topicType(ReceiptHandle.NORMAL_TOPIC)

Reply via email to