This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit feb1f3077aef151f2eee9d34980ca47eebf2fc90 Author: loboxu <[email protected]> AuthorDate: Thu Mar 9 23:05:22 2023 +0800 ReceiptHandleProcessor message renewal strategy optimization --- .../proxy/common/MessageReceiptHandle.java | 15 +++++---- .../proxy/processor/ReceiptHandleProcessor.java | 8 +++-- .../processor/ReceiptHandleProcessorTest.java | 37 ++++++++++------------ 3 files changed, 31 insertions(+), 29 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java index 8fa6583b2..e885cf4c2 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java @@ -21,7 +21,6 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.common.consumer.ReceiptHandle; -import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; public class MessageReceiptHandle { private final String group; @@ -33,9 +32,9 @@ public class MessageReceiptHandle { private final int reconsumeTimes; private final AtomicInteger renewRetryTimes = new AtomicInteger(0); + private final AtomicInteger renewTimes = new AtomicInteger(0); private final long consumeTimestamp; private volatile String receiptHandleStr; - private final RetryPolicy renewStrategyPolicy; public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId, long queueOffset, int reconsumeTimes) { @@ -49,7 +48,6 @@ public class MessageReceiptHandle { this.queueOffset = queueOffset; this.reconsumeTimes = reconsumeTimes; this.consumeTimestamp = receiptHandle.getRetrieveTime(); - this.renewStrategyPolicy = new RenewStrategyPolicy(); } @Override @@ -134,6 +132,14 @@ public class MessageReceiptHandle { return this.renewRetryTimes.incrementAndGet(); } + public int incrementRenewTimes() { + return this.renewTimes.incrementAndGet(); + } + + public int getRenewTimes() { + return this.renewTimes.get(); + } + public void resetRenewRetryTimes() { this.renewRetryTimes.set(0); } @@ -142,7 +148,4 @@ public class MessageReceiptHandle { return this.renewRetryTimes.get(); } - public RetryPolicy getRenewStrategyPolicy() { - return this.renewStrategyPolicy; - } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java index 7b7982bab..357e94249 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java @@ -46,6 +46,7 @@ import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; import org.apache.rocketmq.proxy.common.ReceiptHandleGroup; +import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; import org.apache.rocketmq.proxy.common.StartAndShutdown; import org.apache.rocketmq.proxy.common.channel.ChannelHelper; import org.apache.rocketmq.proxy.common.utils.ExceptionUtils; @@ -175,10 +176,10 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown { return CompletableFuture.completedFuture(null); } if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) { - RetryPolicy renewPolicy = messageReceiptHandle.getRenewStrategyPolicy(); + RetryPolicy renewPolicy = new RenewStrategyPolicy(); CompletableFuture<AckResult> future = messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(), - messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), renewPolicy.nextDelayDuration(messageReceiptHandle.getRenewRetryTimes())); + messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), renewPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes())); future.whenComplete((ackResult, throwable) -> { if (throwable != null) { log.error("error when renew. handle:{}", messageReceiptHandle, throwable); @@ -191,9 +192,10 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown { } else if (AckStatus.OK.equals(ackResult.getStatus())) { messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo()); messageReceiptHandle.resetRenewRetryTimes(); + messageReceiptHandle.incrementRenewTimes(); resFuture.complete(messageReceiptHandle); } else { - log.error("renew response is not ok. result:{}", ackResult, messageReceiptHandle); + log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle); resFuture.complete(null); } }); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java index 6d77c1471..c0bff981f 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java @@ -124,7 +124,8 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig); Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - long newInvisibleTime = 2000L; + long newInvisibleTime = 18000L; + ReceiptHandle newReceiptHandleClass = ReceiptHandle.builder() .startOffset(0L) .retrieveTime(System.currentTimeMillis() - newInvisibleTime + config.getRenewAheadTimeMillis() - 5) @@ -137,20 +138,28 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { .commitLogOffset(0L) .build(); String newReceiptHandle = newReceiptHandleClass.encode(); + + RetryPolicy retryPolicy = new RenewStrategyPolicy(); + AtomicInteger times = new AtomicInteger(0); + AckResult ackResult = new AckResult(); ackResult.setStatus(AckStatus.OK); ackResult.setExtraInfo(newReceiptHandle); + Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()))) + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.get())))) .thenReturn(CompletableFuture.completedFuture(ackResult)); receiptHandleProcessor.scheduleRenewTask(); + Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == INVISIBLE_TIME), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())); + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.get()))); receiptHandleProcessor.scheduleRenewTask(); + Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == newInvisibleTime), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())); + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.incrementAndGet()))); + receiptHandleProcessor.scheduleRenewTask(); } @Test @@ -164,20 +173,11 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { ackResultFuture.completeExceptionally(new MQClientException(0, "error")); RetryPolicy retryPolicy = new RenewStrategyPolicy(); - AtomicInteger times = new AtomicInteger(0); Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement())))) + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes())))) .thenReturn(ackResultFuture); - Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement())))) - .thenReturn(ackResultFuture); - - Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement())))) - .thenReturn(ackResultFuture); - await().atMost(Duration.ofSeconds(1)).until(() -> { receiptHandleProcessor.scheduleRenewTask(); try { @@ -188,12 +188,9 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { } }); - times = new AtomicInteger(0); - for (int i = 0; i < config.getMaxRenewRetryTimes(); i++) { - Mockito.verify(messagingProcessor, Mockito.times(1)) - .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))); - } + Mockito.verify(messagingProcessor, Mockito.times(3)) + .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes()))); } @Test
