This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 8e6b5e62bd [ISSUE #6999] Add interface ReceiptHandleManager (#7000)
8e6b5e62bd is described below
commit 8e6b5e62bd4da78c0a7d265891c52685fcffd08a
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Mon Jul 10 20:14:17 2023 +0800
[ISSUE #6999] Add interface ReceiptHandleManager (#7000)
* Add interface ReceiptHandleManager
* fix unit test
* fix
---
.../proxy/processor/ReceiptHandleProcessor.java | 10 +-
...nager.java => DefaultReceiptHandleManager.java} | 12 +-
.../service/receipt/ReceiptHandleManager.java | 260 +--------------------
...t.java => DefaultReceiptHandleManagerTest.java} | 34 +--
4 files changed, 31 insertions(+), 285 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 9c7e8dea9d..fc49e76229 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -28,12 +28,12 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.RenewEvent;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.service.receipt.ReceiptHandleManager;
+import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager;
import org.apache.rocketmq.proxy.service.ServiceManager;
public class ReceiptHandleProcessor extends AbstractProcessor {
protected final static Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
- protected ReceiptHandleManager receiptHandleManager;
+ protected DefaultReceiptHandleManager receiptHandleManager;
public ReceiptHandleProcessor(MessagingProcessor messagingProcessor,
ServiceManager serviceManager) {
super(messagingProcessor, serviceManager);
@@ -51,7 +51,7 @@ public class ReceiptHandleProcessor extends AbstractProcessor
{
event.getFuture().complete(v);
});
};
- this.receiptHandleManager = new
ReceiptHandleManager(serviceManager.getMetadataService(),
serviceManager.getConsumerManager(), eventListener);
+ this.receiptHandleManager = new
DefaultReceiptHandleManager(serviceManager.getMetadataService(),
serviceManager.getConsumerManager(), eventListener);
}
protected ProxyContext createContext(String actionName) {
@@ -59,11 +59,11 @@ public class ReceiptHandleProcessor extends
AbstractProcessor {
}
public void addReceiptHandle(ProxyContext ctx, Channel channel, String
group, String msgID, MessageReceiptHandle messageReceiptHandle) {
- receiptHandleManager.addReceiptHandle(channel, group, msgID,
messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(ctx, channel, group, msgID,
messageReceiptHandle);
}
public MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel
channel, String group, String msgID, String receiptHandle) {
- return receiptHandleManager.removeReceiptHandle(channel, group, msgID,
receiptHandle);
+ return receiptHandleManager.removeReceiptHandle(ctx, channel, group,
msgID, receiptHandle);
}
public static class ReceiptHandleGroupKey {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
similarity index 95%
copy from
proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
copy to
proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
index f3b8056247..c7633d658a 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -60,7 +60,7 @@ import
org.apache.rocketmq.proxy.service.metadata.MetadataService;
import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
-public class ReceiptHandleManager extends AbstractStartAndShutdown {
+public class DefaultReceiptHandleManager extends AbstractStartAndShutdown
implements ReceiptHandleManager {
protected final static Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected final MetadataService metadataService;
protected final ConsumerManager consumerManager;
@@ -71,7 +71,7 @@ public class ReceiptHandleManager extends
AbstractStartAndShutdown {
Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("RenewalScheduledThread_"));
protected final ThreadPoolExecutor renewalWorkerService;
- public ReceiptHandleManager(MetadataService metadataService,
ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
+ public DefaultReceiptHandleManager(MetadataService metadataService,
ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
this.metadataService = metadataService;
this.consumerManager = consumerManager;
this.eventListener = eventListener;
@@ -124,12 +124,12 @@ public class ReceiptHandleManager extends
AbstractStartAndShutdown {
});
}
- public void addReceiptHandle(Channel channel, String group, String msgID,
MessageReceiptHandle messageReceiptHandle) {
+ public void addReceiptHandle(ProxyContext context, Channel channel, String
group, String msgID, MessageReceiptHandle messageReceiptHandle) {
ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group),
k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle);
}
- public MessageReceiptHandle removeReceiptHandle(Channel channel, String
group, String msgID, String receiptHandle) {
+ public MessageReceiptHandle removeReceiptHandle(ProxyContext context,
Channel channel, String group, String msgID, String receiptHandle) {
ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group));
if (handleGroup == null) {
return null;
@@ -141,7 +141,7 @@ public class ReceiptHandleManager extends
AbstractStartAndShutdown {
return this.consumerManager.findChannel(groupKey.getGroup(),
groupKey.getChannel()) == null;
}
- public void scheduleRenewTask() {
+ protected void scheduleRenewTask() {
Stopwatch stopwatch = Stopwatch.createStarted();
try {
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
@@ -255,7 +255,7 @@ public class ReceiptHandleManager extends
AbstractStartAndShutdown {
});
}
- public void clearAllHandle() {
+ protected void clearAllHandle() {
log.info("start clear all handle in receiptHandleProcessor");
Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet =
receiptHandleGroupMap.keySet();
for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
index f3b8056247..6a8888e97e 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
@@ -17,266 +17,12 @@
package org.apache.rocketmq.proxy.service.receipt;
-import com.google.common.base.Stopwatch;
import io.netty.channel.Channel;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.broker.client.ClientChannelInfo;
-import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
-import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
-import org.apache.rocketmq.broker.client.ConsumerManager;
-import org.apache.rocketmq.client.consumer.AckResult;
-import org.apache.rocketmq.client.consumer.AckStatus;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.consumer.ReceiptHandle;
-import org.apache.rocketmq.common.state.StateEventListener;
-import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
-import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
-import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
-import org.apache.rocketmq.common.utils.StartAndShutdown;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.proxy.common.RenewEvent;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.common.ProxyException;
-import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
-import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
-import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
-import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
-import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
-import org.apache.rocketmq.proxy.config.ConfigurationManager;
-import org.apache.rocketmq.proxy.config.ProxyConfig;
-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
-import org.apache.rocketmq.proxy.service.metadata.MetadataService;
-import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
-import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
-public class ReceiptHandleManager extends AbstractStartAndShutdown {
- protected final static Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
- protected final MetadataService metadataService;
- protected final ConsumerManager consumerManager;
- protected final
ConcurrentMap<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup>
receiptHandleGroupMap;
- protected final StateEventListener<RenewEvent> eventListener;
- protected final static RetryPolicy RENEW_POLICY = new
RenewStrategyPolicy();
- protected final ScheduledExecutorService scheduledExecutorService =
- Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("RenewalScheduledThread_"));
- protected final ThreadPoolExecutor renewalWorkerService;
+public interface ReceiptHandleManager {
+ void addReceiptHandle(ProxyContext context, Channel channel, String group,
String msgID, MessageReceiptHandle messageReceiptHandle);
- public ReceiptHandleManager(MetadataService metadataService,
ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
- this.metadataService = metadataService;
- this.consumerManager = consumerManager;
- this.eventListener = eventListener;
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
- this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor(
- proxyConfig.getRenewThreadPoolNums(),
- proxyConfig.getRenewMaxThreadPoolNums(),
- 1, TimeUnit.MINUTES,
- "RenewalWorkerThread",
- proxyConfig.getRenewThreadPoolQueueCapacity()
- );
- consumerManager.appendConsumerIdsChangeListener(new
ConsumerIdsChangeListener() {
- @Override
- public void handle(ConsumerGroupEvent event, String group,
Object... args) {
- if (ConsumerGroupEvent.CLIENT_UNREGISTER.equals(event)) {
- if (args == null || args.length < 1) {
- return;
- }
- if (args[0] instanceof ClientChannelInfo) {
- ClientChannelInfo clientChannelInfo =
(ClientChannelInfo) args[0];
- if
(ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
- // if the channel sync from other proxy is
expired, not to clear data of connect to current proxy
- return;
- }
- clearGroup(new
ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(),
group));
- log.info("clear handle of this client when client
unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo);
- }
- }
- }
-
- @Override
- public void shutdown() {
-
- }
- });
- this.receiptHandleGroupMap = new ConcurrentHashMap<>();
- this.renewalWorkerService.setRejectedExecutionHandler((r, executor) ->
log.warn("add renew task failed. queueSize:{}", executor.getQueue().size()));
- this.appendStartAndShutdown(new StartAndShutdown() {
- @Override
- public void start() throws Exception {
- scheduledExecutorService.scheduleWithFixedDelay(() ->
scheduleRenewTask(), 0,
-
ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(),
TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void shutdown() throws Exception {
- scheduledExecutorService.shutdown();
- clearAllHandle();
- }
- });
- }
-
- public void addReceiptHandle(Channel channel, String group, String msgID,
MessageReceiptHandle messageReceiptHandle) {
- ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group),
- k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle);
- }
-
- public MessageReceiptHandle removeReceiptHandle(Channel channel, String
group, String msgID, String receiptHandle) {
- ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group));
- if (handleGroup == null) {
- return null;
- }
- return handleGroup.remove(msgID, receiptHandle);
- }
-
- protected boolean
clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) {
- return this.consumerManager.findChannel(groupKey.getGroup(),
groupKey.getChannel()) == null;
- }
-
- public void scheduleRenewTask() {
- Stopwatch stopwatch = Stopwatch.createStarted();
- try {
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
- for (Map.Entry<ReceiptHandleProcessor.ReceiptHandleGroupKey,
ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) {
- ReceiptHandleProcessor.ReceiptHandleGroupKey key =
entry.getKey();
- if (clientIsOffline(key)) {
- clearGroup(key);
- continue;
- }
-
- ReceiptHandleGroup group = entry.getValue();
- group.scan((msgID, handleStr, v) -> {
- long current = System.currentTimeMillis();
- ReceiptHandle handle =
ReceiptHandle.decode(v.getReceiptHandleStr());
- if (handle.getNextVisibleTime() - current >
proxyConfig.getRenewAheadTimeMillis()) {
- return;
- }
- renewalWorkerService.submit(() -> renewMessage(group,
msgID, handleStr));
- });
- }
- } catch (Exception e) {
- log.error("unexpect error when schedule renew task", e);
- }
-
- log.debug("scan for renewal done. cost:{}ms",
stopwatch.elapsed().toMillis());
- }
-
- protected void renewMessage(ReceiptHandleGroup group, String msgID, String
handleStr) {
- try {
- group.computeIfPresent(msgID, handleStr, this::startRenewMessage);
- } catch (Exception e) {
- log.error("error when renew message. msgID:{}, handleStr:{}",
msgID, handleStr, e);
- }
- }
-
- protected CompletableFuture<MessageReceiptHandle>
startRenewMessage(MessageReceiptHandle messageReceiptHandle) {
- CompletableFuture<MessageReceiptHandle> resFuture = new
CompletableFuture<>();
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
- long current = System.currentTimeMillis();
- try {
- if (messageReceiptHandle.getRenewRetryTimes() >=
proxyConfig.getMaxRenewRetryTimes()) {
- log.warn("handle has exceed max renewRetryTimes. handle:{}",
messageReceiptHandle);
- return CompletableFuture.completedFuture(null);
- }
- if (current - messageReceiptHandle.getConsumeTimestamp() <
proxyConfig.getRenewMaxTimeMillis()) {
- CompletableFuture<AckResult> future = new
CompletableFuture<>();
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle,
RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future));
- future.whenComplete((ackResult, throwable) -> {
- if (throwable != null) {
- log.error("error when renew. handle:{}",
messageReceiptHandle, throwable);
- if (renewExceptionNeedRetry(throwable)) {
-
messageReceiptHandle.incrementAndGetRenewRetryTimes();
- resFuture.complete(messageReceiptHandle);
- } else {
- resFuture.complete(null);
- }
- } else if (AckStatus.OK.equals(ackResult.getStatus())) {
-
messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo());
- messageReceiptHandle.resetRenewRetryTimes();
- messageReceiptHandle.incrementRenewTimes();
- resFuture.complete(messageReceiptHandle);
- } else {
- log.error("renew response is not ok. result:{},
handle:{}", ackResult, messageReceiptHandle);
- resFuture.complete(null);
- }
- });
- } else {
- ProxyContext context = createContext("RenewMessage");
- SubscriptionGroupConfig subscriptionGroupConfig =
- metadataService.getSubscriptionGroupConfig(context,
messageReceiptHandle.getGroup());
- if (subscriptionGroupConfig == null) {
- log.error("group's subscriptionGroupConfig is null when
renew. handle: {}", messageReceiptHandle);
- return CompletableFuture.completedFuture(null);
- }
- RetryPolicy retryPolicy =
subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
- CompletableFuture<AckResult> future = new
CompletableFuture<>();
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle,
retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()),
future));
- future.whenComplete((ackResult, throwable) -> {
- if (throwable != null) {
- log.error("error when nack in renew. handle:{}",
messageReceiptHandle, throwable);
- }
- resFuture.complete(null);
- });
- }
- } catch (Throwable t) {
- log.error("unexpect error when renew message, stop to renew it.
handle:{}", messageReceiptHandle, t);
- resFuture.complete(null);
- }
- return resFuture;
- }
-
- protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey
key) {
- if (key == null) {
- return;
- }
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
- ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key);
- if (handleGroup == null) {
- return;
- }
- handleGroup.scan((msgID, handle, v) -> {
- try {
- handleGroup.computeIfPresent(msgID, handle,
messageReceiptHandle -> {
- CompletableFuture<AckResult> future = new
CompletableFuture<>();
- eventListener.fireEvent(new
RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(),
future));
- return CompletableFuture.completedFuture(null);
- });
- } catch (Exception e) {
- log.error("error when clear handle for group. key:{}", key, e);
- }
- });
- }
-
- public void clearAllHandle() {
- log.info("start clear all handle in receiptHandleProcessor");
- Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet =
receiptHandleGroupMap.keySet();
- for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) {
- clearGroup(key);
- }
- log.info("clear all handle in receiptHandleProcessor done");
- }
-
- protected boolean renewExceptionNeedRetry(Throwable t) {
- t = ExceptionUtils.getRealException(t);
- if (t instanceof ProxyException) {
- ProxyException proxyException = (ProxyException) t;
- if
(ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) ||
-
ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) {
- return false;
- }
- }
- return true;
- }
-
- protected ProxyContext createContext(String actionName) {
- return ProxyContext.createForInner(this.getClass().getSimpleName() +
actionName);
- }
+ MessageReceiptHandle removeReceiptHandle(ProxyContext context, Channel
channel, String group, String msgID, String receiptHandle);
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
similarity index 93%
rename from
proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java
rename to
proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
index 877c9fd6f4..7c6943e44a 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
@@ -62,8 +62,8 @@ import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-public class ReceiptHandleManagerTest extends BaseServiceTest {
- private ReceiptHandleManager receiptHandleManager;
+public class DefaultReceiptHandleManagerTest extends BaseServiceTest {
+ private DefaultReceiptHandleManager receiptHandleManager;
@Mock
protected MessagingProcessor messagingProcessor;
@Mock
@@ -87,7 +87,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest
{
@Before
public void setup() {
- receiptHandleManager = new ReceiptHandleManager(metadataService,
consumerManager, new StateEventListener<RenewEvent>() {
+ receiptHandleManager = new
DefaultReceiptHandleManager(metadataService, consumerManager, new
StateEventListener<RenewEvent>() {
@Override
public void fireEvent(RenewEvent event) {
MessageReceiptHandle messageReceiptHandle =
event.getMessageReceiptHandle();
@@ -125,7 +125,7 @@ public class ReceiptHandleManagerTest extends
BaseServiceTest {
@Test
public void testAddReceiptHandle() {
Channel channel = new LocalChannel();
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID,
messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
receiptHandleManager.scheduleRenewTask();
@@ -152,9 +152,9 @@ public class ReceiptHandleManagerTest extends
BaseServiceTest {
.build().encode();
MessageReceiptHandle messageReceiptHandle = new
MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID,
messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel,
GROUP, MSG_ID, messageReceiptHandle);
}
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID,
messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
receiptHandleManager.scheduleRenewTask();
@@ -170,7 +170,7 @@ public class ReceiptHandleManagerTest extends
BaseServiceTest {
public void testRenewReceiptHandle() {
ProxyConfig config = ConfigurationManager.getProxyConfig();
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID,
messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(groupConfig);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
@@ -216,7 +216,7 @@ public class ReceiptHandleManagerTest extends
BaseServiceTest {
public void testRenewExceedMaxRenewTimes() {
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID,
messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
CompletableFuture<AckResult> ackResultFuture = new
CompletableFuture<>();
ackResultFuture.completeExceptionally(new MQClientException(0,
"error"));
@@ -246,7 +246,7 @@ public class ReceiptHandleManagerTest extends
BaseServiceTest {
public void testRenewWithInvalidHandle() {
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID,
messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
CompletableFuture<AckResult> ackResultFuture = new
CompletableFuture<>();
ackResultFuture.completeExceptionally(new
ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error"));
@@ -270,7 +270,7 @@ public class ReceiptHandleManagerTest extends
BaseServiceTest {
ProxyConfig config = ConfigurationManager.getProxyConfig();
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID,
messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
AtomicInteger count = new AtomicInteger(0);
List<CompletableFuture<AckResult>> futureList = new ArrayList<>();
@@ -348,7 +348,7 @@ public class ReceiptHandleManagerTest extends
BaseServiceTest {
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC,
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID,
messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(groupConfig);
@@ -382,7 +382,7 @@ public class ReceiptHandleManagerTest extends
BaseServiceTest {
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC,
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID,
messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(null);
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(),
Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
Mockito.anyLong()))
@@ -418,7 +418,7 @@ public class ReceiptHandleManagerTest extends
BaseServiceTest {
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC,
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID,
messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(groupConfig);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
@@ -431,8 +431,8 @@ public class ReceiptHandleManagerTest extends
BaseServiceTest {
@Test
public void testRemoveReceiptHandle() {
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID,
messageReceiptHandle);
- receiptHandleManager.removeReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
+ receiptHandleManager.removeReceiptHandle(PROXY_CONTEXT, channel,
GROUP, MSG_ID, receiptHandle);
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(groupConfig);
receiptHandleManager.scheduleRenewTask();
@@ -444,7 +444,7 @@ public class ReceiptHandleManagerTest extends
BaseServiceTest {
@Test
public void testClearGroup() {
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID,
messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
receiptHandleManager.clearGroup(new
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP));
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(groupConfig);
@@ -459,7 +459,7 @@ public class ReceiptHandleManagerTest extends
BaseServiceTest {
ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor =
ArgumentCaptor.forClass(ConsumerIdsChangeListener.class);
Mockito.verify(consumerManager,
Mockito.times(1)).appendConsumerIdsChangeListener(listenerArgumentCaptor.capture());
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID,
messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER,
GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0));
assertTrue(receiptHandleManager.receiptHandleGroupMap.isEmpty());
}