This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 6f115385b [ISSUE #6149] remove handle when exceed renewMaxTimeMillis
(#6150)
6f115385b is described below
commit 6f115385bbc37a837f11b6999e1b05d1c487ba93
Author: lk <[email protected]>
AuthorDate: Tue Feb 21 19:00:33 2023 +0800
[ISSUE #6149] remove handle when exceed renewMaxTimeMillis (#6150)
---
.../proxy/common/MessageReceiptHandle.java | 28 +++++++---------------
.../proxy/processor/ReceiptHandleProcessor.java | 2 +-
.../processor/ReceiptHandleProcessorTest.java | 5 ++--
3 files changed, 13 insertions(+), 22 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
index 379e644f7..0b3c241d1 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
@@ -32,8 +32,7 @@ public class MessageReceiptHandle {
private final int reconsumeTimes;
private final AtomicInteger renewRetryTimes = new AtomicInteger(0);
- private volatile long timestamp;
- private volatile long expectInvisibleTime;
+ private final long consumeTimestamp;
private volatile String receiptHandleStr;
public MessageReceiptHandle(String group, String topic, int queueId,
String receiptHandleStr, String messageId,
@@ -47,8 +46,7 @@ public class MessageReceiptHandle {
this.messageId = messageId;
this.queueOffset = queueOffset;
this.reconsumeTimes = reconsumeTimes;
- this.expectInvisibleTime = receiptHandle.getInvisibleTime();
- this.timestamp = receiptHandle.getRetrieveTime();
+ this.consumeTimestamp = receiptHandle.getRetrieveTime();
}
@Override
@@ -60,8 +58,8 @@ public class MessageReceiptHandle {
return false;
}
MessageReceiptHandle handle = (MessageReceiptHandle) o;
- return queueId == handle.queueId && queueOffset == handle.queueOffset
&& timestamp == handle.timestamp
- && reconsumeTimes == handle.reconsumeTimes && expectInvisibleTime
== handle.expectInvisibleTime
+ return queueId == handle.queueId && queueOffset == handle.queueOffset
&& consumeTimestamp == handle.consumeTimestamp
+ && reconsumeTimes == handle.reconsumeTimes
&& Objects.equal(group, handle.group) && Objects.equal(topic,
handle.topic)
&& Objects.equal(messageId, handle.messageId) &&
Objects.equal(originalReceiptHandleStr, handle.originalReceiptHandleStr)
&& Objects.equal(receiptHandleStr, handle.receiptHandleStr);
@@ -69,8 +67,8 @@ public class MessageReceiptHandle {
@Override
public int hashCode() {
- return Objects.hashCode(group, topic, queueId, messageId, queueOffset,
originalReceiptHandleStr, timestamp,
- reconsumeTimes, expectInvisibleTime, receiptHandleStr);
+ return Objects.hashCode(group, topic, queueId, messageId, queueOffset,
originalReceiptHandleStr, consumeTimestamp,
+ reconsumeTimes, receiptHandleStr);
}
@Override
@@ -84,8 +82,7 @@ public class MessageReceiptHandle {
.add("originalReceiptHandleStr", originalReceiptHandleStr)
.add("reconsumeTimes", reconsumeTimes)
.add("renewRetryTimes", renewRetryTimes)
- .add("timestamp", timestamp)
- .add("expectInvisibleTime", expectInvisibleTime)
+ .add("firstConsumeTimestamp", consumeTimestamp)
.add("receiptHandleStr", receiptHandleStr)
.toString();
}
@@ -122,19 +119,12 @@ public class MessageReceiptHandle {
return reconsumeTimes;
}
- public long getTimestamp() {
- return timestamp;
- }
-
- public long getExpectInvisibleTime() {
- return expectInvisibleTime;
+ public long getConsumeTimestamp() {
+ return consumeTimestamp;
}
public void updateReceiptHandle(String receiptHandleStr) {
- ReceiptHandle receiptHandle = ReceiptHandle.decode(receiptHandleStr);
this.receiptHandleStr = receiptHandleStr;
- this.expectInvisibleTime = receiptHandle.getInvisibleTime();
- this.timestamp = receiptHandle.getRetrieveTime();
}
public int incrementAndGetRenewRetryTimes() {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 5e096bc6b..bbd507070 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -174,7 +174,7 @@ public class ReceiptHandleProcessor extends
AbstractStartAndShutdown {
log.warn("handle has exceed max renewRetryTimes. handle:{}",
messageReceiptHandle);
return CompletableFuture.completedFuture(null);
}
- if (current - messageReceiptHandle.getTimestamp() <
messageReceiptHandle.getExpectInvisibleTime()) {
+ if (current - messageReceiptHandle.getConsumeTimestamp() <
proxyConfig.getRenewMaxTimeMillis()) {
CompletableFuture<AckResult> future =
messagingProcessor.changeInvisibleTime(context, handle,
messageReceiptHandle.getMessageId(),
messageReceiptHandle.getGroup(),
messageReceiptHandle.getTopic(), proxyConfig.getRenewSliceTimeMillis());
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index 99c662830..33057da6e 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -261,10 +261,11 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
@Test
public void testRenewReceiptHandleWhenTimeout() {
- long newInvisibleTime = 0L;
+ long newInvisibleTime = 200L;
+ long maxRenewMs =
ConfigurationManager.getProxyConfig().getRenewMaxTimeMillis();
String newReceiptHandle = ReceiptHandle.builder()
.startOffset(0L)
- .retrieveTime(0)
+ .retrieveTime(System.currentTimeMillis() - maxRenewMs)
.invisibleTime(newInvisibleTime)
.reviveQueueId(1)
.topicType(ReceiptHandle.NORMAL_TOPIC)