This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8e6b5e62bd [ISSUE #6999] Add interface ReceiptHandleManager (#7000)
8e6b5e62bd is described below

commit 8e6b5e62bd4da78c0a7d265891c52685fcffd08a
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Mon Jul 10 20:14:17 2023 +0800

    [ISSUE #6999] Add interface ReceiptHandleManager (#7000)
    
    * Add interface ReceiptHandleManager
    
    * fix unit test
    
    * fix
---
 .../proxy/processor/ReceiptHandleProcessor.java    |  10 +-
 ...nager.java => DefaultReceiptHandleManager.java} |  12 +-
 .../service/receipt/ReceiptHandleManager.java      | 260 +--------------------
 ...t.java => DefaultReceiptHandleManagerTest.java} |  34 +--
 4 files changed, 31 insertions(+), 285 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 9c7e8dea9d..fc49e76229 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -28,12 +28,12 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.common.RenewEvent;
 import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
 import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.service.receipt.ReceiptHandleManager;
+import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager;
 import org.apache.rocketmq.proxy.service.ServiceManager;
 
 public class ReceiptHandleProcessor extends AbstractProcessor {
     protected final static Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
-    protected ReceiptHandleManager receiptHandleManager;
+    protected DefaultReceiptHandleManager receiptHandleManager;
 
     public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, 
ServiceManager serviceManager) {
         super(messagingProcessor, serviceManager);
@@ -51,7 +51,7 @@ public class ReceiptHandleProcessor extends AbstractProcessor 
{
                     event.getFuture().complete(v);
                 });
         };
-        this.receiptHandleManager = new 
ReceiptHandleManager(serviceManager.getMetadataService(), 
serviceManager.getConsumerManager(), eventListener);
+        this.receiptHandleManager = new 
DefaultReceiptHandleManager(serviceManager.getMetadataService(), 
serviceManager.getConsumerManager(), eventListener);
     }
 
     protected ProxyContext createContext(String actionName) {
@@ -59,11 +59,11 @@ public class ReceiptHandleProcessor extends 
AbstractProcessor {
     }
 
     public void addReceiptHandle(ProxyContext ctx, Channel channel, String 
group, String msgID, MessageReceiptHandle messageReceiptHandle) {
-        receiptHandleManager.addReceiptHandle(channel, group, msgID, 
messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(ctx, channel, group, msgID, 
messageReceiptHandle);
     }
 
     public MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel 
channel, String group, String msgID, String receiptHandle) {
-        return receiptHandleManager.removeReceiptHandle(channel, group, msgID, 
receiptHandle);
+        return receiptHandleManager.removeReceiptHandle(ctx, channel, group, 
msgID, receiptHandle);
     }
 
     public static class ReceiptHandleGroupKey {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
similarity index 95%
copy from 
proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
copy to 
proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
index f3b8056247..c7633d658a 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -60,7 +60,7 @@ import 
org.apache.rocketmq.proxy.service.metadata.MetadataService;
 import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 
-public class ReceiptHandleManager extends AbstractStartAndShutdown {
+public class DefaultReceiptHandleManager extends AbstractStartAndShutdown 
implements ReceiptHandleManager {
     protected final static Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
     protected final MetadataService metadataService;
     protected final ConsumerManager consumerManager;
@@ -71,7 +71,7 @@ public class ReceiptHandleManager extends 
AbstractStartAndShutdown {
         Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("RenewalScheduledThread_"));
     protected final ThreadPoolExecutor renewalWorkerService;
 
-    public ReceiptHandleManager(MetadataService metadataService, 
ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
+    public DefaultReceiptHandleManager(MetadataService metadataService, 
ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
         this.metadataService = metadataService;
         this.consumerManager = consumerManager;
         this.eventListener = eventListener;
@@ -124,12 +124,12 @@ public class ReceiptHandleManager extends 
AbstractStartAndShutdown {
         });
     }
 
-    public void addReceiptHandle(Channel channel, String group, String msgID, 
MessageReceiptHandle messageReceiptHandle) {
+    public void addReceiptHandle(ProxyContext context, Channel channel, String 
group, String msgID, MessageReceiptHandle messageReceiptHandle) {
         ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new 
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group),
             k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle);
     }
 
-    public MessageReceiptHandle removeReceiptHandle(Channel channel, String 
group, String msgID, String receiptHandle) {
+    public MessageReceiptHandle removeReceiptHandle(ProxyContext context, 
Channel channel, String group, String msgID, String receiptHandle) {
         ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new 
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group));
         if (handleGroup == null) {
             return null;
@@ -141,7 +141,7 @@ public class ReceiptHandleManager extends 
AbstractStartAndShutdown {
         return this.consumerManager.findChannel(groupKey.getGroup(), 
groupKey.getChannel()) == null;
     }
 
-    public void scheduleRenewTask() {
+    protected void scheduleRenewTask() {
         Stopwatch stopwatch = Stopwatch.createStarted();
         try {
             ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
@@ -255,7 +255,7 @@ public class ReceiptHandleManager extends 
AbstractStartAndShutdown {
         });
     }
 
-    public void clearAllHandle() {
+    protected void clearAllHandle() {
         log.info("start clear all handle in receiptHandleProcessor");
         Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet = 
receiptHandleGroupMap.keySet();
         for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
index f3b8056247..6a8888e97e 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
@@ -17,266 +17,12 @@
 
 package org.apache.rocketmq.proxy.service.receipt;
 
-import com.google.common.base.Stopwatch;
 import io.netty.channel.Channel;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.broker.client.ClientChannelInfo;
-import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
-import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
-import org.apache.rocketmq.broker.client.ConsumerManager;
-import org.apache.rocketmq.client.consumer.AckResult;
-import org.apache.rocketmq.client.consumer.AckStatus;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.consumer.ReceiptHandle;
-import org.apache.rocketmq.common.state.StateEventListener;
-import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
-import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
-import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
-import org.apache.rocketmq.common.utils.StartAndShutdown;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.proxy.common.RenewEvent;
 import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
 import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.common.ProxyException;
-import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
-import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
-import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
-import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
-import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
-import org.apache.rocketmq.proxy.config.ConfigurationManager;
-import org.apache.rocketmq.proxy.config.ProxyConfig;
-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
-import org.apache.rocketmq.proxy.service.metadata.MetadataService;
-import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
-import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 
-public class ReceiptHandleManager extends AbstractStartAndShutdown {
-    protected final static Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
-    protected final MetadataService metadataService;
-    protected final ConsumerManager consumerManager;
-    protected final 
ConcurrentMap<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> 
receiptHandleGroupMap;
-    protected final StateEventListener<RenewEvent> eventListener;
-    protected final static RetryPolicy RENEW_POLICY = new 
RenewStrategyPolicy();
-    protected final ScheduledExecutorService scheduledExecutorService =
-        Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("RenewalScheduledThread_"));
-    protected final ThreadPoolExecutor renewalWorkerService;
+public interface ReceiptHandleManager {
+    void addReceiptHandle(ProxyContext context, Channel channel, String group, 
String msgID, MessageReceiptHandle messageReceiptHandle);
 
-    public ReceiptHandleManager(MetadataService metadataService, 
ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
-        this.metadataService = metadataService;
-        this.consumerManager = consumerManager;
-        this.eventListener = eventListener;
-        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
-        this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor(
-            proxyConfig.getRenewThreadPoolNums(),
-            proxyConfig.getRenewMaxThreadPoolNums(),
-            1, TimeUnit.MINUTES,
-            "RenewalWorkerThread",
-            proxyConfig.getRenewThreadPoolQueueCapacity()
-        );
-        consumerManager.appendConsumerIdsChangeListener(new 
ConsumerIdsChangeListener() {
-            @Override
-            public void handle(ConsumerGroupEvent event, String group, 
Object... args) {
-                if (ConsumerGroupEvent.CLIENT_UNREGISTER.equals(event)) {
-                    if (args == null || args.length < 1) {
-                        return;
-                    }
-                    if (args[0] instanceof ClientChannelInfo) {
-                        ClientChannelInfo clientChannelInfo = 
(ClientChannelInfo) args[0];
-                        if 
(ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
-                            // if the channel sync from other proxy is 
expired, not to clear data of connect to current proxy
-                            return;
-                        }
-                        clearGroup(new 
ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), 
group));
-                        log.info("clear handle of this client when client 
unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo);
-                    }
-                }
-            }
-
-            @Override
-            public void shutdown() {
-
-            }
-        });
-        this.receiptHandleGroupMap = new ConcurrentHashMap<>();
-        this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> 
log.warn("add renew task failed. queueSize:{}", executor.getQueue().size()));
-        this.appendStartAndShutdown(new StartAndShutdown() {
-            @Override
-            public void start() throws Exception {
-                scheduledExecutorService.scheduleWithFixedDelay(() -> 
scheduleRenewTask(), 0,
-                    
ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), 
TimeUnit.MILLISECONDS);
-            }
-
-            @Override
-            public void shutdown() throws Exception {
-                scheduledExecutorService.shutdown();
-                clearAllHandle();
-            }
-        });
-    }
-
-    public void addReceiptHandle(Channel channel, String group, String msgID, 
MessageReceiptHandle messageReceiptHandle) {
-        ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new 
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group),
-            k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle);
-    }
-
-    public MessageReceiptHandle removeReceiptHandle(Channel channel, String 
group, String msgID, String receiptHandle) {
-        ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new 
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group));
-        if (handleGroup == null) {
-            return null;
-        }
-        return handleGroup.remove(msgID, receiptHandle);
-    }
-
-    protected boolean 
clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) {
-        return this.consumerManager.findChannel(groupKey.getGroup(), 
groupKey.getChannel()) == null;
-    }
-
-    public void scheduleRenewTask() {
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        try {
-            ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
-            for (Map.Entry<ReceiptHandleProcessor.ReceiptHandleGroupKey, 
ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) {
-                ReceiptHandleProcessor.ReceiptHandleGroupKey key = 
entry.getKey();
-                if (clientIsOffline(key)) {
-                    clearGroup(key);
-                    continue;
-                }
-
-                ReceiptHandleGroup group = entry.getValue();
-                group.scan((msgID, handleStr, v) -> {
-                    long current = System.currentTimeMillis();
-                    ReceiptHandle handle = 
ReceiptHandle.decode(v.getReceiptHandleStr());
-                    if (handle.getNextVisibleTime() - current > 
proxyConfig.getRenewAheadTimeMillis()) {
-                        return;
-                    }
-                    renewalWorkerService.submit(() -> renewMessage(group, 
msgID, handleStr));
-                });
-            }
-        } catch (Exception e) {
-            log.error("unexpect error when schedule renew task", e);
-        }
-
-        log.debug("scan for renewal done. cost:{}ms", 
stopwatch.elapsed().toMillis());
-    }
-
-    protected void renewMessage(ReceiptHandleGroup group, String msgID, String 
handleStr) {
-        try {
-            group.computeIfPresent(msgID, handleStr, this::startRenewMessage);
-        } catch (Exception e) {
-            log.error("error when renew message. msgID:{}, handleStr:{}", 
msgID, handleStr, e);
-        }
-    }
-
-    protected CompletableFuture<MessageReceiptHandle> 
startRenewMessage(MessageReceiptHandle messageReceiptHandle) {
-        CompletableFuture<MessageReceiptHandle> resFuture = new 
CompletableFuture<>();
-        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
-        long current = System.currentTimeMillis();
-        try {
-            if (messageReceiptHandle.getRenewRetryTimes() >= 
proxyConfig.getMaxRenewRetryTimes()) {
-                log.warn("handle has exceed max renewRetryTimes. handle:{}", 
messageReceiptHandle);
-                return CompletableFuture.completedFuture(null);
-            }
-            if (current - messageReceiptHandle.getConsumeTimestamp() < 
proxyConfig.getRenewMaxTimeMillis()) {
-                CompletableFuture<AckResult> future = new 
CompletableFuture<>();
-                eventListener.fireEvent(new RenewEvent(messageReceiptHandle, 
RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future));
-                future.whenComplete((ackResult, throwable) -> {
-                    if (throwable != null) {
-                        log.error("error when renew. handle:{}", 
messageReceiptHandle, throwable);
-                        if (renewExceptionNeedRetry(throwable)) {
-                            
messageReceiptHandle.incrementAndGetRenewRetryTimes();
-                            resFuture.complete(messageReceiptHandle);
-                        } else {
-                            resFuture.complete(null);
-                        }
-                    } else if (AckStatus.OK.equals(ackResult.getStatus())) {
-                        
messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo());
-                        messageReceiptHandle.resetRenewRetryTimes();
-                        messageReceiptHandle.incrementRenewTimes();
-                        resFuture.complete(messageReceiptHandle);
-                    } else {
-                        log.error("renew response is not ok. result:{}, 
handle:{}", ackResult, messageReceiptHandle);
-                        resFuture.complete(null);
-                    }
-                });
-            } else {
-                ProxyContext context = createContext("RenewMessage");
-                SubscriptionGroupConfig subscriptionGroupConfig =
-                    metadataService.getSubscriptionGroupConfig(context, 
messageReceiptHandle.getGroup());
-                if (subscriptionGroupConfig == null) {
-                    log.error("group's subscriptionGroupConfig is null when 
renew. handle: {}", messageReceiptHandle);
-                    return CompletableFuture.completedFuture(null);
-                }
-                RetryPolicy retryPolicy = 
subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
-                CompletableFuture<AckResult> future = new 
CompletableFuture<>();
-                eventListener.fireEvent(new RenewEvent(messageReceiptHandle, 
retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), 
future));
-                future.whenComplete((ackResult, throwable) -> {
-                    if (throwable != null) {
-                        log.error("error when nack in renew. handle:{}", 
messageReceiptHandle, throwable);
-                    }
-                    resFuture.complete(null);
-                });
-            }
-        } catch (Throwable t) {
-            log.error("unexpect error when renew message, stop to renew it. 
handle:{}", messageReceiptHandle, t);
-            resFuture.complete(null);
-        }
-        return resFuture;
-    }
-
-    protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey 
key) {
-        if (key == null) {
-            return;
-        }
-        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
-        ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key);
-        if (handleGroup == null) {
-            return;
-        }
-        handleGroup.scan((msgID, handle, v) -> {
-            try {
-                handleGroup.computeIfPresent(msgID, handle, 
messageReceiptHandle -> {
-                    CompletableFuture<AckResult> future = new 
CompletableFuture<>();
-                    eventListener.fireEvent(new 
RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), 
future));
-                    return CompletableFuture.completedFuture(null);
-                });
-            } catch (Exception e) {
-                log.error("error when clear handle for group. key:{}", key, e);
-            }
-        });
-    }
-
-    public void clearAllHandle() {
-        log.info("start clear all handle in receiptHandleProcessor");
-        Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet = 
receiptHandleGroupMap.keySet();
-        for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) {
-            clearGroup(key);
-        }
-        log.info("clear all handle in receiptHandleProcessor done");
-    }
-
-    protected boolean renewExceptionNeedRetry(Throwable t) {
-        t = ExceptionUtils.getRealException(t);
-        if (t instanceof ProxyException) {
-            ProxyException proxyException = (ProxyException) t;
-            if 
(ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) ||
-                
ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    protected ProxyContext createContext(String actionName) {
-        return ProxyContext.createForInner(this.getClass().getSimpleName() + 
actionName);
-    }
+    MessageReceiptHandle removeReceiptHandle(ProxyContext context, Channel 
channel, String group, String msgID, String receiptHandle);
 }
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
similarity index 93%
rename from 
proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java
rename to 
proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
index 877c9fd6f4..7c6943e44a 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
@@ -62,8 +62,8 @@ import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-public class ReceiptHandleManagerTest extends BaseServiceTest {
-    private ReceiptHandleManager receiptHandleManager;
+public class DefaultReceiptHandleManagerTest extends BaseServiceTest {
+    private DefaultReceiptHandleManager receiptHandleManager;
     @Mock
     protected MessagingProcessor messagingProcessor;
     @Mock
@@ -87,7 +87,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest 
{
 
     @Before
     public void setup() {
-        receiptHandleManager = new ReceiptHandleManager(metadataService, 
consumerManager, new StateEventListener<RenewEvent>() {
+        receiptHandleManager = new 
DefaultReceiptHandleManager(metadataService, consumerManager, new 
StateEventListener<RenewEvent>() {
             @Override
             public void fireEvent(RenewEvent event) {
                 MessageReceiptHandle messageReceiptHandle = 
event.getMessageReceiptHandle();
@@ -125,7 +125,7 @@ public class ReceiptHandleManagerTest extends 
BaseServiceTest {
     @Test
     public void testAddReceiptHandle() {
         Channel channel = new LocalChannel();
-        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
         Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         receiptHandleManager.scheduleRenewTask();
@@ -152,9 +152,9 @@ public class ReceiptHandleManagerTest extends 
BaseServiceTest {
                 .build().encode();
             MessageReceiptHandle messageReceiptHandle = new 
MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET,
                 RECONSUME_TIMES);
-            receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
+            receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, 
GROUP, MSG_ID, messageReceiptHandle);
         }
-        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
         Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         receiptHandleManager.scheduleRenewTask();
@@ -170,7 +170,7 @@ public class ReceiptHandleManagerTest extends 
BaseServiceTest {
     public void testRenewReceiptHandle() {
         ProxyConfig config = ConfigurationManager.getProxyConfig();
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
         Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
@@ -216,7 +216,7 @@ public class ReceiptHandleManagerTest extends 
BaseServiceTest {
     public void testRenewExceedMaxRenewTimes() {
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
         Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
 
         CompletableFuture<AckResult> ackResultFuture = new 
CompletableFuture<>();
         ackResultFuture.completeExceptionally(new MQClientException(0, 
"error"));
@@ -246,7 +246,7 @@ public class ReceiptHandleManagerTest extends 
BaseServiceTest {
     public void testRenewWithInvalidHandle() {
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
         Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
 
         CompletableFuture<AckResult> ackResultFuture = new 
CompletableFuture<>();
         ackResultFuture.completeExceptionally(new 
ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error"));
@@ -270,7 +270,7 @@ public class ReceiptHandleManagerTest extends 
BaseServiceTest {
         ProxyConfig config = ConfigurationManager.getProxyConfig();
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
         Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
 
         AtomicInteger count = new AtomicInteger(0);
         List<CompletableFuture<AckResult>> futureList = new ArrayList<>();
@@ -348,7 +348,7 @@ public class ReceiptHandleManagerTest extends 
BaseServiceTest {
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, 
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
             RECONSUME_TIMES);
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
         Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
@@ -382,7 +382,7 @@ public class ReceiptHandleManagerTest extends 
BaseServiceTest {
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, 
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
             RECONSUME_TIMES);
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
         Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(null);
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), 
Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), 
Mockito.anyLong()))
@@ -418,7 +418,7 @@ public class ReceiptHandleManagerTest extends 
BaseServiceTest {
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, 
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
             RECONSUME_TIMES);
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
         Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
@@ -431,8 +431,8 @@ public class ReceiptHandleManagerTest extends 
BaseServiceTest {
     @Test
     public void testRemoveReceiptHandle() {
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
-        receiptHandleManager.removeReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle);
+        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        receiptHandleManager.removeReceiptHandle(PROXY_CONTEXT, channel, 
GROUP, MSG_ID, receiptHandle);
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
         receiptHandleManager.scheduleRenewTask();
@@ -444,7 +444,7 @@ public class ReceiptHandleManagerTest extends 
BaseServiceTest {
     @Test
     public void testClearGroup() {
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
         receiptHandleManager.clearGroup(new 
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP));
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
@@ -459,7 +459,7 @@ public class ReceiptHandleManagerTest extends 
BaseServiceTest {
         ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor = 
ArgumentCaptor.forClass(ConsumerIdsChangeListener.class);
         Mockito.verify(consumerManager, 
Mockito.times(1)).appendConsumerIdsChangeListener(listenerArgumentCaptor.capture());
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
         
listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, 
GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0));
         assertTrue(receiptHandleManager.receiptHandleGroupMap.isEmpty());
     }

Reply via email to