This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 667ebbcc142a0584c2f30dc0ae3c135017127529 Author: loboxu <[email protected]> AuthorDate: Tue Mar 7 22:14:27 2023 +0800 ReceiptHandleProcessor message renewal strategy optimization #6232 --- .../proxy/common/MessageReceiptHandle.java | 7 +++ .../rocketmq/proxy/common/RenewStrategyPolicy.java | 70 ++++++++++++++++++++++ .../apache/rocketmq/proxy/config/ProxyConfig.java | 23 ++++--- .../grpc/v2/consumer/ReceiveMessageActivity.java | 2 +- .../proxy/processor/ReceiptHandleProcessor.java | 3 +- .../proxy/common/RenewStrategyPolicyTest.java | 66 ++++++++++++++++++++ .../processor/ReceiptHandleProcessorTest.java | 16 ++--- 7 files changed, 168 insertions(+), 19 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java index 0b3c241d1..263d6157d 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java @@ -21,6 +21,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.common.consumer.ReceiptHandle; +import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; public class MessageReceiptHandle { private final String group; @@ -34,6 +35,7 @@ public class MessageReceiptHandle { private final AtomicInteger renewRetryTimes = new AtomicInteger(0); private final long consumeTimestamp; private volatile String receiptHandleStr; + private final RetryPolicy renewStrategyPolicy; public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId, long queueOffset, int reconsumeTimes) { @@ -47,6 +49,7 @@ public class MessageReceiptHandle { this.queueOffset = queueOffset; this.reconsumeTimes = reconsumeTimes; this.consumeTimestamp = receiptHandle.getRetrieveTime(); + this.renewStrategyPolicy = new RenewStrategyPolicy(); } @Override @@ -138,4 +141,8 @@ public class MessageReceiptHandle { public int getRenewRetryTimes() { return this.renewRetryTimes.get(); } + + public RetryPolicy getRenewStrategyPolicy(){ + return this.renewStrategyPolicy; + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicy.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicy.java new file mode 100644 index 000000000..ce33619b4 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicy.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.common; + +import com.google.common.base.MoreObjects; +import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; + +import java.util.concurrent.TimeUnit; + + +public class RenewStrategyPolicy implements RetryPolicy { + // 1m 3m 5m 6m 10m 30m 1h + private long[] next = new long[]{ + TimeUnit.MINUTES.toMillis(1), + TimeUnit.MINUTES.toMillis(3), + TimeUnit.MINUTES.toMillis(5), + TimeUnit.MINUTES.toMillis(10), + TimeUnit.MINUTES.toMillis(30), + TimeUnit.HOURS.toMillis(1) + }; + + public RenewStrategyPolicy() { + } + + public RenewStrategyPolicy(long[] next) { + this.next = next; + } + + public long[] getNext() { + return next; + } + + public void setNext(long[] next) { + this.next = next; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("next", next) + .toString(); + } + + @Override + public long nextDelayDuration(int renewTimes) { + if (renewTimes < 0) { + renewTimes = 0; + } + int index = renewTimes; + if (index >= next.length) { + index = next.length - 1; + } + return next[index]; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index c65877a41..dcbf1af0e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -107,10 +107,16 @@ public class ProxyConfig implements ConfigFile { */ private int maxUserPropertySize = 16 * 1024; private int userPropertyMaxNum = 128; + /** * max message group size, 0 or negative number means no limit for proxy */ private int maxMessageGroupSize = 64; + + /** + * When a message pops, the message is invisible by default + */ + private long defaultInvisibleTimeMills = Duration.ofSeconds(60).toMillis(); private long minInvisibleTimeMillsForRecv = Duration.ofSeconds(10).toMillis(); private long maxInvisibleTimeMills = Duration.ofHours(12).toMillis(); private long maxDelayTimeMills = Duration.ofDays(1).toMillis(); @@ -180,7 +186,6 @@ public class ProxyConfig implements ConfigFile { private int renewThreadPoolQueueCapacity = 300; private long lockTimeoutMsInHandleGroup = TimeUnit.SECONDS.toMillis(3); private long renewAheadTimeMillis = TimeUnit.SECONDS.toMillis(10); - private long renewSliceTimeMillis = TimeUnit.SECONDS.toMillis(60); private long renewMaxTimeMillis = TimeUnit.HOURS.toMillis(3); private long renewSchedulePeriodMillis = TimeUnit.SECONDS.toMillis(5); @@ -555,6 +560,14 @@ public class ProxyConfig implements ConfigFile { this.minInvisibleTimeMillsForRecv = minInvisibleTimeMillsForRecv; } + public long getDefaultInvisibleTimeMills() { + return defaultInvisibleTimeMills; + } + + public void setDefaultInvisibleTimeMills(long defaultInvisibleTimeMills) { + this.defaultInvisibleTimeMills = defaultInvisibleTimeMills; + } + public long getMaxInvisibleTimeMills() { return maxInvisibleTimeMills; } @@ -1019,14 +1032,6 @@ public class ProxyConfig implements ConfigFile { this.renewAheadTimeMillis = renewAheadTimeMillis; } - public long getRenewSliceTimeMillis() { - return renewSliceTimeMillis; - } - - public void setRenewSliceTimeMillis(long renewSliceTimeMillis) { - this.renewSliceTimeMillis = renewSliceTimeMillis; - } - public long getRenewMaxTimeMillis() { return renewMaxTimeMillis; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java index ddbe07083..9df4101f7 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java @@ -103,7 +103,7 @@ public class ReceiveMessageActivity extends AbstractMessingActivity { long actualInvisibleTime = Durations.toMillis(request.getInvisibleDuration()); ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) { - actualInvisibleTime = proxyConfig.getRenewSliceTimeMillis(); + actualInvisibleTime = proxyConfig.getDefaultInvisibleTimeMills(); } else { validateInvisibleTime(actualInvisibleTime, ConfigurationManager.getProxyConfig().getMinInvisibleTimeMillsForRecv()); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java index bbd507070..b0a4e8414 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java @@ -175,9 +175,10 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown { return CompletableFuture.completedFuture(null); } if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) { + RetryPolicy renewPolicy = messageReceiptHandle.getRenewStrategyPolicy(); CompletableFuture<AckResult> future = messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(), - messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), proxyConfig.getRenewSliceTimeMillis()); + messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), renewPolicy.nextDelayDuration(messageReceiptHandle.getRenewRetryTimes())); future.whenComplete((ackResult, throwable) -> { if (throwable != null) { log.error("error when renew. handle:{}", messageReceiptHandle, throwable); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicyTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicyTest.java new file mode 100644 index 000000000..54e627274 --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicyTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.common; + +import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +public class RenewStrategyPolicyTest { + + private RetryPolicy retryPolicy; + private final AtomicInteger times = new AtomicInteger(0); + + @Before + public void before() throws Throwable { + this.retryPolicy = new RenewStrategyPolicy(); + } + + @Test + public void testNextDelayDuration() { + long value = this.retryPolicy.nextDelayDuration(times.getAndIncrement()); + assertEquals(value, TimeUnit.MINUTES.toMillis(1)); + + value = this.retryPolicy.nextDelayDuration(times.getAndIncrement()); + assertEquals(value, TimeUnit.MINUTES.toMillis(3)); + + value = this.retryPolicy.nextDelayDuration(times.getAndIncrement()); + assertEquals(value, TimeUnit.MINUTES.toMillis(5)); + + value = this.retryPolicy.nextDelayDuration(times.getAndIncrement()); + assertEquals(value, TimeUnit.MINUTES.toMillis(10)); + + value = this.retryPolicy.nextDelayDuration(times.getAndIncrement()); + assertEquals(value, TimeUnit.MINUTES.toMillis(30)); + + value = this.retryPolicy.nextDelayDuration(times.getAndIncrement()); + assertEquals(value, TimeUnit.HOURS.toMillis(1)); + } + + + @After + public void after() { + } + +} diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java index 33057da6e..355596ba1 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java @@ -111,7 +111,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { receiptHandleProcessor.scheduleRenewTask(); Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis())); + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())); } @Test @@ -139,16 +139,16 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { ackResult.setStatus(AckStatus.OK); ackResult.setExtraInfo(newReceiptHandle); Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()))) + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()))) .thenReturn(CompletableFuture.completedFuture(ackResult)); receiptHandleProcessor.scheduleRenewTask(); Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == INVISIBLE_TIME), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis())); + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())); receiptHandleProcessor.scheduleRenewTask(); Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == newInvisibleTime), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis())); + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())); } @Test @@ -161,7 +161,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>(); ackResultFuture.completeExceptionally(new MQClientException(0, "error")); Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()))) + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()))) .thenReturn(ackResultFuture); await().atMost(Duration.ofSeconds(1)).until(() -> { @@ -175,7 +175,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { }); Mockito.verify(messagingProcessor, Mockito.times(config.getMaxRenewRetryTimes())) .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis())); + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())); } @Test @@ -187,7 +187,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>(); ackResultFuture.completeExceptionally(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error")); Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()))) + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()))) .thenReturn(ackResultFuture); await().atMost(Duration.ofSeconds(1)).until(() -> { @@ -246,7 +246,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { Mockito.doAnswer((Answer<CompletableFuture<AckResult>>) mock -> { return futureList.get(count.getAndIncrement()); }).when(messagingProcessor).changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis())); + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())); await().pollDelay(Duration.ZERO).pollInterval(Duration.ofMillis(10)).atMost(Duration.ofSeconds(10)).until(() -> { receiptHandleProcessor.scheduleRenewTask(); try {
