This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d1bcda57b3 [ISSUE #6974] Feature/refector receipt processor (#6975)
d1bcda57b3 is described below

commit d1bcda57b32f7ee033a3cb0067aef781dc12b7f1
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Mon Jul 3 14:09:21 2023 +0800

    [ISSUE #6974] Feature/refector receipt processor (#6975)
    
    * Refector ReceiptHandleProcessor
---
 .../rocketmq/common/state/StateEventListener.java  |  22 ++
 .../apache/rocketmq/proxy/common/RenewEvent.java   |  45 +++
 .../proxy/grpc/v2/DefaultGrpcMessingActivity.java  |  12 +-
 .../proxy/grpc/v2/consumer/AckMessageActivity.java |   8 +-
 .../consumer/ChangeInvisibleDurationActivity.java  |   6 +-
 .../grpc/v2/consumer/ReceiveMessageActivity.java   |   7 +-
 .../v2/producer/ForwardMessageToDLQActivity.java   |   7 +-
 .../proxy/processor/DefaultMessagingProcessor.java |  16 +-
 .../proxy/processor/MessagingProcessor.java        |   5 +
 .../proxy/processor/ReceiptHandleProcessor.java    | 292 ++--------------
 .../receipt/ReceiptHandleManager.java}             | 229 +++++-------
 .../grpc/v2/consumer/AckMessageActivityTest.java   |   2 +-
 .../ChangeInvisibleDurationActivityTest.java       |   4 +-
 .../v2/consumer/ReceiveMessageActivityTest.java    |   2 +-
 .../producer/ForwardMessageToDLQActivityTest.java  |   4 +-
 .../proxy/processor/ConsumerProcessorTest.java     |   1 -
 .../receipt/ReceiptHandleManagerTest.java}         | 389 +++++----------------
 17 files changed, 295 insertions(+), 756 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/state/StateEventListener.java 
b/common/src/main/java/org/apache/rocketmq/common/state/StateEventListener.java
new file mode 100644
index 0000000000..aed04dc31d
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/state/StateEventListener.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.state;
+
+public interface StateEventListener<T> {
+    void fireEvent(T event);
+}
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
new file mode 100644
index 0000000000..fdf9833ccd
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.client.consumer.AckResult;
+
+public class RenewEvent {
+    protected MessageReceiptHandle messageReceiptHandle;
+    protected long renewTime;
+    protected CompletableFuture<AckResult> future;
+
+    public RenewEvent(MessageReceiptHandle messageReceiptHandle, long 
renewTime, CompletableFuture<AckResult> future) {
+        this.messageReceiptHandle = messageReceiptHandle;
+        this.renewTime = renewTime;
+        this.future = future;
+    }
+
+    public MessageReceiptHandle getMessageReceiptHandle() {
+        return messageReceiptHandle;
+    }
+
+    public long getRenewTime() {
+        return renewTime;
+    }
+
+    public CompletableFuture<AckResult> getFuture() {
+        return future;
+    }
+}
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
index 73b764bc4f..091e9086ec 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
@@ -55,14 +55,12 @@ import 
org.apache.rocketmq.proxy.grpc.v2.producer.SendMessageActivity;
 import org.apache.rocketmq.proxy.grpc.v2.route.RouteActivity;
 import org.apache.rocketmq.proxy.grpc.v2.transaction.EndTransactionActivity;
 import org.apache.rocketmq.proxy.processor.MessagingProcessor;
-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
 
 public class DefaultGrpcMessingActivity extends AbstractStartAndShutdown 
implements GrpcMessingActivity {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
 
     protected GrpcClientSettingsManager grpcClientSettingsManager;
     protected GrpcChannelManager grpcChannelManager;
-    protected ReceiptHandleProcessor receiptHandleProcessor;
     protected ReceiveMessageActivity receiveMessageActivity;
     protected AckMessageActivity ackMessageActivity;
     protected ChangeInvisibleDurationActivity changeInvisibleDurationActivity;
@@ -79,18 +77,16 @@ public class DefaultGrpcMessingActivity extends 
AbstractStartAndShutdown impleme
     protected void init(MessagingProcessor messagingProcessor) {
         this.grpcClientSettingsManager = new 
GrpcClientSettingsManager(messagingProcessor);
         this.grpcChannelManager = new 
GrpcChannelManager(messagingProcessor.getProxyRelayService(), 
this.grpcClientSettingsManager);
-        this.receiptHandleProcessor = new 
ReceiptHandleProcessor(messagingProcessor);
 
-        this.receiveMessageActivity = new 
ReceiveMessageActivity(messagingProcessor, receiptHandleProcessor, 
grpcClientSettingsManager, grpcChannelManager);
-        this.ackMessageActivity = new AckMessageActivity(messagingProcessor, 
receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager);
-        this.changeInvisibleDurationActivity = new 
ChangeInvisibleDurationActivity(messagingProcessor, receiptHandleProcessor, 
grpcClientSettingsManager, grpcChannelManager);
+        this.receiveMessageActivity = new 
ReceiveMessageActivity(messagingProcessor, grpcClientSettingsManager, 
grpcChannelManager);
+        this.ackMessageActivity = new AckMessageActivity(messagingProcessor, 
grpcClientSettingsManager, grpcChannelManager);
+        this.changeInvisibleDurationActivity = new 
ChangeInvisibleDurationActivity(messagingProcessor, grpcClientSettingsManager, 
grpcChannelManager);
         this.sendMessageActivity = new SendMessageActivity(messagingProcessor, 
grpcClientSettingsManager, grpcChannelManager);
-        this.forwardMessageToDLQActivity = new 
ForwardMessageToDLQActivity(messagingProcessor, receiptHandleProcessor, 
grpcClientSettingsManager, grpcChannelManager);
+        this.forwardMessageToDLQActivity = new 
ForwardMessageToDLQActivity(messagingProcessor, grpcClientSettingsManager, 
grpcChannelManager);
         this.endTransactionActivity = new 
EndTransactionActivity(messagingProcessor, grpcClientSettingsManager, 
grpcChannelManager);
         this.routeActivity = new RouteActivity(messagingProcessor, 
grpcClientSettingsManager, grpcChannelManager);
         this.clientActivity = new ClientActivity(messagingProcessor, 
grpcClientSettingsManager, grpcChannelManager);
 
-        this.appendStartAndShutdown(this.receiptHandleProcessor);
         this.appendStartAndShutdown(this.grpcClientSettingsManager);
     }
 
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
index 993f069b94..9a3a772017 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
@@ -37,16 +37,12 @@ import 
org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
 import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter;
 import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
 import org.apache.rocketmq.proxy.processor.MessagingProcessor;
-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
 
 public class AckMessageActivity extends AbstractMessingActivity {
-    protected ReceiptHandleProcessor receiptHandleProcessor;
 
-    public AckMessageActivity(MessagingProcessor messagingProcessor, 
ReceiptHandleProcessor receiptHandleProcessor,
-        GrpcClientSettingsManager grpcClientSettingsManager,
+    public AckMessageActivity(MessagingProcessor messagingProcessor, 
GrpcClientSettingsManager grpcClientSettingsManager,
         GrpcChannelManager grpcChannelManager) {
         super(messagingProcessor, grpcClientSettingsManager, 
grpcChannelManager);
-        this.receiptHandleProcessor = receiptHandleProcessor;
     }
 
     public CompletableFuture<AckMessageResponse> ackMessage(ProxyContext ctx, 
AckMessageRequest request) {
@@ -98,7 +94,7 @@ public class AckMessageActivity extends 
AbstractMessingActivity {
             String handleString = ackMessageEntry.getReceiptHandle();
 
             String group = 
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
-            MessageReceiptHandle messageReceiptHandle = 
receiptHandleProcessor.removeReceiptHandle(ctx, 
grpcChannelManager.getChannel(ctx.getClientID()), group, 
ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
+            MessageReceiptHandle messageReceiptHandle = 
messagingProcessor.removeReceiptHandle(ctx, 
grpcChannelManager.getChannel(ctx.getClientID()), group, 
ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
             if (messageReceiptHandle != null) {
                 handleString = messageReceiptHandle.getReceiptHandleStr();
             }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
index 9b7e947e0b..02356c4977 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
@@ -32,16 +32,12 @@ import 
org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
 import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter;
 import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
 import org.apache.rocketmq.proxy.processor.MessagingProcessor;
-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
 
 public class ChangeInvisibleDurationActivity extends AbstractMessingActivity {
-    protected ReceiptHandleProcessor receiptHandleProcessor;
 
     public ChangeInvisibleDurationActivity(MessagingProcessor 
messagingProcessor,
-        ReceiptHandleProcessor receiptHandleProcessor,
         GrpcClientSettingsManager grpcClientSettingsManager, 
GrpcChannelManager grpcChannelManager) {
         super(messagingProcessor, grpcClientSettingsManager, 
grpcChannelManager);
-        this.receiptHandleProcessor = receiptHandleProcessor;
     }
 
     public CompletableFuture<ChangeInvisibleDurationResponse> 
changeInvisibleDuration(ProxyContext ctx,
@@ -55,7 +51,7 @@ public class ChangeInvisibleDurationActivity extends 
AbstractMessingActivity {
             ReceiptHandle receiptHandle = 
ReceiptHandle.decode(request.getReceiptHandle());
             String group = 
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
 
-            MessageReceiptHandle messageReceiptHandle = 
receiptHandleProcessor.removeReceiptHandle(ctx, 
grpcChannelManager.getChannel(ctx.getClientID()), group, 
request.getMessageId(), receiptHandle.getReceiptHandle());
+            MessageReceiptHandle messageReceiptHandle = 
messagingProcessor.removeReceiptHandle(ctx, 
grpcChannelManager.getChannel(ctx.getClientID()), group, 
request.getMessageId(), receiptHandle.getReceiptHandle());
             if (messageReceiptHandle != null) {
                 receiptHandle = 
ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
             }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index 9830e7dacd..a504179a9e 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -40,7 +40,6 @@ import 
org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
 import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter;
 import org.apache.rocketmq.proxy.processor.MessagingProcessor;
 import org.apache.rocketmq.proxy.processor.QueueSelector;
-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
 import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
 import org.apache.rocketmq.proxy.service.route.MessageQueueSelector;
 import org.apache.rocketmq.proxy.service.route.MessageQueueView;
@@ -48,13 +47,11 @@ import 
org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 
 public class ReceiveMessageActivity extends AbstractMessingActivity {
-    protected ReceiptHandleProcessor receiptHandleProcessor;
     private static final String ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION 
= "5.0.3";
 
-    public ReceiveMessageActivity(MessagingProcessor messagingProcessor, 
ReceiptHandleProcessor receiptHandleProcessor,
+    public ReceiveMessageActivity(MessagingProcessor messagingProcessor,
         GrpcClientSettingsManager grpcClientSettingsManager, 
GrpcChannelManager grpcChannelManager) {
         super(messagingProcessor, grpcClientSettingsManager, 
grpcChannelManager);
-        this.receiptHandleProcessor = receiptHandleProcessor;
     }
 
     public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request,
@@ -145,7 +142,7 @@ public class ReceiveMessageActivity extends 
AbstractMessingActivity {
                                     MessageReceiptHandle messageReceiptHandle =
                                         new MessageReceiptHandle(group, topic, 
messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
                                             messageExt.getQueueOffset(), 
messageExt.getReconsumeTimes());
-                                    
receiptHandleProcessor.addReceiptHandle(ctx, 
grpcChannelManager.getChannel(ctx.getClientID()), group, messageExt.getMsgId(), 
messageReceiptHandle);
+                                    messagingProcessor.addReceiptHandle(ctx, 
grpcChannelManager.getChannel(ctx.getClientID()), group, messageExt.getMsgId(), 
messageReceiptHandle);
                                 }
                             }
                         }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
index 6b5c5c7e07..f1fc5a143a 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
@@ -28,16 +28,13 @@ import 
org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
 import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter;
 import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
 import org.apache.rocketmq.proxy.processor.MessagingProcessor;
-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class ForwardMessageToDLQActivity extends AbstractMessingActivity {
-    protected ReceiptHandleProcessor receiptHandleProcessor;
 
-    public ForwardMessageToDLQActivity(MessagingProcessor messagingProcessor, 
ReceiptHandleProcessor receiptHandleProcessor,
+    public ForwardMessageToDLQActivity(MessagingProcessor messagingProcessor,
         GrpcClientSettingsManager grpcClientSettingsManager, 
GrpcChannelManager grpcChannelManager) {
         super(messagingProcessor, grpcClientSettingsManager, 
grpcChannelManager);
-        this.receiptHandleProcessor = receiptHandleProcessor;
     }
 
     public CompletableFuture<ForwardMessageToDeadLetterQueueResponse> 
forwardMessageToDeadLetterQueue(ProxyContext ctx,
@@ -48,7 +45,7 @@ public class ForwardMessageToDLQActivity extends 
AbstractMessingActivity {
 
             String group = 
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
             String handleString = request.getReceiptHandle();
-            MessageReceiptHandle messageReceiptHandle = 
receiptHandleProcessor.removeReceiptHandle(ctx, 
grpcChannelManager.getChannel(ctx.getClientID()), group, 
request.getMessageId(), request.getReceiptHandle());
+            MessageReceiptHandle messageReceiptHandle = 
messagingProcessor.removeReceiptHandle(ctx, 
grpcChannelManager.getChannel(ctx.getClientID()), group, 
request.getMessageId(), request.getReceiptHandle());
             if (messageReceiptHandle != null) {
                 handleString = messageReceiptHandle.getReceiptHandleStr();
             }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index e663ae1ba2..1b3f0af4ea 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -22,7 +22,6 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.rocketmq.acl.common.AclUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
@@ -41,6 +40,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
 import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
 import org.apache.rocketmq.proxy.common.Address;
+import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
@@ -64,6 +64,7 @@ public class DefaultMessagingProcessor extends 
AbstractStartAndShutdown implemen
     protected TransactionProcessor transactionProcessor;
     protected ClientProcessor clientProcessor;
     protected RequestBrokerProcessor requestBrokerProcessor;
+    protected ReceiptHandleProcessor receiptHandleProcessor;
 
     protected ThreadPoolExecutor producerProcessorExecutor;
     protected ThreadPoolExecutor consumerProcessorExecutor;
@@ -95,6 +96,7 @@ public class DefaultMessagingProcessor extends 
AbstractStartAndShutdown implemen
         this.transactionProcessor = new TransactionProcessor(this, 
serviceManager);
         this.clientProcessor = new ClientProcessor(this, serviceManager);
         this.requestBrokerProcessor = new RequestBrokerProcessor(this, 
serviceManager);
+        this.receiptHandleProcessor = new ReceiptHandleProcessor(this, 
serviceManager);
 
         this.init();
     }
@@ -308,4 +310,16 @@ public class DefaultMessagingProcessor extends 
AbstractStartAndShutdown implemen
     public MetadataService getMetadataService() {
         return this.serviceManager.getMetadataService();
     }
+
+    @Override
+    public void addReceiptHandle(ProxyContext ctx, Channel channel, String 
group, String msgID,
+        MessageReceiptHandle messageReceiptHandle) {
+        receiptHandleProcessor.addReceiptHandle(ctx, channel, group, msgID, 
messageReceiptHandle);
+    }
+
+    @Override
+    public MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel 
channel, String group, String msgID,
+        String receiptHandle) {
+        return receiptHandleProcessor.removeReceiptHandle(ctx, channel, group, 
msgID, receiptHandle);
+    }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
index 263068965a..d86be0bd88 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.common.consumer.ReceiptHandle;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.proxy.common.Address;
+import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.common.utils.StartAndShutdown;
 import org.apache.rocketmq.proxy.service.metadata.MetadataService;
@@ -299,4 +300,8 @@ public interface MessagingProcessor extends 
StartAndShutdown {
     ProxyRelayService getProxyRelayService();
 
     MetadataService getMetadataService();
+
+    void addReceiptHandle(ProxyContext ctx, Channel channel, String group, 
String msgID, MessageReceiptHandle messageReceiptHandle);
+
+    MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel 
channel, String group, String msgID, String receiptHandle);
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 88c597e994..9c7e8dea9d 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -19,291 +19,51 @@ package org.apache.rocketmq.proxy.processor;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
-import com.google.common.base.Stopwatch;
 import io.netty.channel.Channel;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.broker.client.ClientChannelInfo;
-import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
-import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
-import org.apache.rocketmq.client.consumer.AckResult;
-import org.apache.rocketmq.client.consumer.AckStatus;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
-import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
-import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
-import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
-import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
-import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.common.ProxyException;
-import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
-import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
-import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
-import org.apache.rocketmq.common.utils.StartAndShutdown;
-import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
-import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
-import org.apache.rocketmq.proxy.config.ConfigurationManager;
-import org.apache.rocketmq.proxy.config.ProxyConfig;
-import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
-import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.state.StateEventListener;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.proxy.common.RenewEvent;
+import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.service.receipt.ReceiptHandleManager;
+import org.apache.rocketmq.proxy.service.ServiceManager;
 
-public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
+public class ReceiptHandleProcessor extends AbstractProcessor {
     protected final static Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
-    protected final ConcurrentMap<ReceiptHandleGroupKey, ReceiptHandleGroup> 
receiptHandleGroupMap;
-    protected final ScheduledExecutorService scheduledExecutorService =
-        Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("RenewalScheduledThread_"));
-    protected ThreadPoolExecutor renewalWorkerService;
-    protected final MessagingProcessor messagingProcessor;
-    protected final static RetryPolicy RENEW_POLICY = new 
RenewStrategyPolicy();
-
-    public ReceiptHandleProcessor(MessagingProcessor messagingProcessor) {
-        this.messagingProcessor = messagingProcessor;
-        this.receiptHandleGroupMap = new ConcurrentHashMap<>();
-        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
-        this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor(
-            proxyConfig.getRenewThreadPoolNums(),
-            proxyConfig.getRenewMaxThreadPoolNums(),
-            1, TimeUnit.MINUTES,
-            "RenewalWorkerThread",
-            proxyConfig.getRenewThreadPoolQueueCapacity()
-        );
-        this.init();
-    }
-
-    protected void init() {
-        this.registerConsumerListener();
-        this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> 
log.warn("add renew task failed. queueSize:{}", executor.getQueue().size()));
-        this.appendStartAndShutdown(new StartAndShutdown() {
-            @Override
-            public void start() throws Exception {
-                scheduledExecutorService.scheduleWithFixedDelay(() -> 
scheduleRenewTask(), 0,
-                    
ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), 
TimeUnit.MILLISECONDS);
-            }
-
-            @Override
-            public void shutdown() throws Exception {
-                scheduledExecutorService.shutdown();
-                clearAllHandle();
-            }
-        });
-    }
-
-    protected void registerConsumerListener() {
-        this.messagingProcessor.registerConsumerListener(new 
ConsumerIdsChangeListener() {
-            @Override
-            public void handle(ConsumerGroupEvent event, String group, 
Object... args) {
-                if (ConsumerGroupEvent.CLIENT_UNREGISTER.equals(event)) {
-                    if (args == null || args.length < 1) {
+    protected ReceiptHandleManager receiptHandleManager;
+
+    public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, 
ServiceManager serviceManager) {
+        super(messagingProcessor, serviceManager);
+        StateEventListener<RenewEvent> eventListener = event -> {
+            ProxyContext context = createContext("RenewMessage");
+            MessageReceiptHandle messageReceiptHandle = 
event.getMessageReceiptHandle();
+            ReceiptHandle handle = 
ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
+            messagingProcessor.changeInvisibleTime(context, handle, 
messageReceiptHandle.getMessageId(),
+                    messageReceiptHandle.getGroup(), 
messageReceiptHandle.getTopic(), event.getRenewTime())
+                .whenComplete((v, t) -> {
+                    if (t != null) {
+                        event.getFuture().completeExceptionally(t);
                         return;
                     }
-                    if (args[0] instanceof ClientChannelInfo) {
-                        ClientChannelInfo clientChannelInfo = 
(ClientChannelInfo) args[0];
-                        if 
(ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
-                            // if the channel sync from other proxy is 
expired, not to clear data of connect to current proxy
-                            return;
-                        }
-                        clearGroup(new 
ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
-                        log.info("clear handle of this client when client 
unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo);
-                    }
-                }
-            }
-
-            @Override
-            public void shutdown() {
-
-            }
-        });
+                    event.getFuture().complete(v);
+                });
+        };
+        this.receiptHandleManager = new 
ReceiptHandleManager(serviceManager.getMetadataService(), 
serviceManager.getConsumerManager(), eventListener);
     }
 
     protected ProxyContext createContext(String actionName) {
         return ProxyContext.createForInner(this.getClass().getSimpleName() + 
actionName);
     }
 
-    protected void scheduleRenewTask() {
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        try {
-            ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
-            for (Map.Entry<ReceiptHandleGroupKey, ReceiptHandleGroup> entry : 
receiptHandleGroupMap.entrySet()) {
-                ReceiptHandleGroupKey key = entry.getKey();
-                if (clientIsOffline(key)) {
-                    clearGroup(key);
-                    continue;
-                }
-
-                ReceiptHandleGroup group = entry.getValue();
-                group.scan((msgID, handleStr, v) -> {
-                    long current = System.currentTimeMillis();
-                    ReceiptHandle handle = 
ReceiptHandle.decode(v.getReceiptHandleStr());
-                    if (handle.getNextVisibleTime() - current > 
proxyConfig.getRenewAheadTimeMillis()) {
-                        return;
-                    }
-                    renewalWorkerService.submit(() -> renewMessage(group, 
msgID, handleStr));
-                });
-            }
-        } catch (Exception e) {
-            log.error("unexpect error when schedule renew task", e);
-        }
-
-        log.debug("scan for renewal done. cost:{}ms", 
stopwatch.elapsed().toMillis());
-    }
-
-    protected void renewMessage(ReceiptHandleGroup group, String msgID, String 
handleStr) {
-        try {
-            group.computeIfPresent(msgID, handleStr, this::startRenewMessage);
-        } catch (Exception e) {
-            log.error("error when renew message. msgID:{}, handleStr:{}", 
msgID, handleStr, e);
-        }
-    }
-
-    protected CompletableFuture<MessageReceiptHandle> 
startRenewMessage(MessageReceiptHandle messageReceiptHandle) {
-        CompletableFuture<MessageReceiptHandle> resFuture = new 
CompletableFuture<>();
-        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
-        ProxyContext context = createContext("RenewMessage");
-        ReceiptHandle handle = 
ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
-        long current = System.currentTimeMillis();
-        try {
-            if (messageReceiptHandle.getRenewRetryTimes() >= 
proxyConfig.getMaxRenewRetryTimes()) {
-                log.warn("handle has exceed max renewRetryTimes. handle:{}", 
messageReceiptHandle);
-                return CompletableFuture.completedFuture(null);
-            }
-            if (current - messageReceiptHandle.getConsumeTimestamp() < 
proxyConfig.getRenewMaxTimeMillis()) {
-                CompletableFuture<AckResult> future =
-                    messagingProcessor.changeInvisibleTime(context, handle, 
messageReceiptHandle.getMessageId(),
-                        messageReceiptHandle.getGroup(), 
messageReceiptHandle.getTopic(), 
RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()));
-                future.whenComplete((ackResult, throwable) -> {
-                    if (throwable != null) {
-                        log.error("error when renew. handle:{}", 
messageReceiptHandle, throwable);
-                        if (renewExceptionNeedRetry(throwable)) {
-                            
messageReceiptHandle.incrementAndGetRenewRetryTimes();
-                            resFuture.complete(messageReceiptHandle);
-                        } else {
-                            resFuture.complete(null);
-                        }
-                    } else if (AckStatus.OK.equals(ackResult.getStatus())) {
-                        
messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo());
-                        messageReceiptHandle.resetRenewRetryTimes();
-                        messageReceiptHandle.incrementRenewTimes();
-                        resFuture.complete(messageReceiptHandle);
-                    } else {
-                        log.error("renew response is not ok. result:{}, 
handle:{}", ackResult, messageReceiptHandle);
-                        resFuture.complete(null);
-                    }
-                });
-            } else {
-                SubscriptionGroupConfig subscriptionGroupConfig =
-                    
messagingProcessor.getMetadataService().getSubscriptionGroupConfig(context, 
messageReceiptHandle.getGroup());
-                if (subscriptionGroupConfig == null) {
-                    log.error("group's subscriptionGroupConfig is null when 
renew. handle: {}", messageReceiptHandle);
-                    return CompletableFuture.completedFuture(null);
-                }
-                RetryPolicy retryPolicy = 
subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
-                CompletableFuture<AckResult> future = 
messagingProcessor.changeInvisibleTime(context,
-                    handle, messageReceiptHandle.getMessageId(), 
messageReceiptHandle.getGroup(),
-                    messageReceiptHandle.getTopic(), 
retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()));
-                future.whenComplete((ackResult, throwable) -> {
-                    if (throwable != null) {
-                        log.error("error when nack in renew. handle:{}", 
messageReceiptHandle, throwable);
-                    }
-                    resFuture.complete(null);
-                });
-            }
-        } catch (Throwable t) {
-            log.error("unexpect error when renew message, stop to renew it. 
handle:{}", messageReceiptHandle, t);
-            resFuture.complete(null);
-        }
-        return resFuture;
-    }
-
-    protected boolean renewExceptionNeedRetry(Throwable t) {
-        t = ExceptionUtils.getRealException(t);
-        if (t instanceof ProxyException) {
-            ProxyException proxyException = (ProxyException) t;
-            if 
(ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) ||
-                
ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    protected boolean clientIsOffline(ReceiptHandleGroupKey groupKey) {
-        return 
this.messagingProcessor.findConsumerChannel(createContext("JudgeClientOnline"), 
groupKey.group, groupKey.channel) == null;
-    }
-
     public void addReceiptHandle(ProxyContext ctx, Channel channel, String 
group, String msgID, MessageReceiptHandle messageReceiptHandle) {
-        this.addReceiptHandle(ctx, new ReceiptHandleGroupKey(channel, group), 
msgID, messageReceiptHandle);
-    }
-
-    protected void addReceiptHandle(ProxyContext ctx, ReceiptHandleGroupKey 
key, String msgID, MessageReceiptHandle messageReceiptHandle) {
-        if (key == null) {
-            return;
-        }
-        ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, key,
-            k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(channel, group, msgID, 
messageReceiptHandle);
     }
 
     public MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel 
channel, String group, String msgID, String receiptHandle) {
-        return this.removeReceiptHandle(ctx, new 
ReceiptHandleGroupKey(channel, group), msgID, receiptHandle);
-    }
-
-    protected MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, 
ReceiptHandleGroupKey key, String msgID, String receiptHandle) {
-        if (key == null) {
-            return null;
-        }
-        ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(key);
-        if (handleGroup == null) {
-            return null;
-        }
-        return handleGroup.remove(msgID, receiptHandle);
-    }
-
-    protected void clearGroup(ReceiptHandleGroupKey key) {
-        if (key == null) {
-            return;
-        }
-        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
-        ProxyContext context = createContext("ClearGroup");
-        ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key);
-        if (handleGroup == null) {
-            return;
-        }
-        handleGroup.scan((msgID, handle, v) -> {
-            try {
-                handleGroup.computeIfPresent(msgID, handle, 
messageReceiptHandle -> {
-                    ReceiptHandle receiptHandle = 
ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
-                    messagingProcessor.changeInvisibleTime(
-                        context,
-                        receiptHandle,
-                        messageReceiptHandle.getMessageId(),
-                        messageReceiptHandle.getGroup(),
-                        messageReceiptHandle.getTopic(),
-                        proxyConfig.getInvisibleTimeMillisWhenClear()
-                    );
-                    return CompletableFuture.completedFuture(null);
-                });
-            } catch (Exception e) {
-                log.error("error when clear handle for group. key:{}", key, e);
-            }
-        });
-    }
-
-    protected void clearAllHandle() {
-        log.info("start clear all handle in receiptHandleProcessor");
-        Set<ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet();
-        for (ReceiptHandleGroupKey key : keySet) {
-            clearGroup(key);
-        }
-        log.info("clear all handle in receiptHandleProcessor done");
+        return receiptHandleManager.removeReceiptHandle(channel, group, msgID, 
receiptHandle);
     }
 
     public static class ReceiptHandleGroupKey {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
similarity index 68%
copy from 
proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
copy to 
proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
index 88c597e994..f3b8056247 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
@@ -15,10 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.proxy.processor;
+package org.apache.rocketmq.proxy.service.receipt;
 
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
 import com.google.common.base.Stopwatch;
 import io.netty.channel.Channel;
 import java.util.Map;
@@ -33,42 +31,50 @@ import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
 import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
+import org.apache.rocketmq.broker.client.ConsumerManager;
 import org.apache.rocketmq.client.consumer.AckResult;
 import org.apache.rocketmq.client.consumer.AckStatus;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
+import org.apache.rocketmq.common.state.StateEventListener;
 import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
-import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
 import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
+import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.proxy.common.RenewEvent;
 import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.common.ProxyException;
 import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
 import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
 import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
-import org.apache.rocketmq.common.utils.StartAndShutdown;
 import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
 import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
+import org.apache.rocketmq.proxy.service.metadata.MetadataService;
 import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 
-public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
+public class ReceiptHandleManager extends AbstractStartAndShutdown {
     protected final static Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
-    protected final ConcurrentMap<ReceiptHandleGroupKey, ReceiptHandleGroup> 
receiptHandleGroupMap;
+    protected final MetadataService metadataService;
+    protected final ConsumerManager consumerManager;
+    protected final 
ConcurrentMap<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> 
receiptHandleGroupMap;
+    protected final StateEventListener<RenewEvent> eventListener;
+    protected final static RetryPolicy RENEW_POLICY = new 
RenewStrategyPolicy();
     protected final ScheduledExecutorService scheduledExecutorService =
         Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("RenewalScheduledThread_"));
-    protected ThreadPoolExecutor renewalWorkerService;
-    protected final MessagingProcessor messagingProcessor;
-    protected final static RetryPolicy RENEW_POLICY = new 
RenewStrategyPolicy();
+    protected final ThreadPoolExecutor renewalWorkerService;
 
-    public ReceiptHandleProcessor(MessagingProcessor messagingProcessor) {
-        this.messagingProcessor = messagingProcessor;
-        this.receiptHandleGroupMap = new ConcurrentHashMap<>();
+    public ReceiptHandleManager(MetadataService metadataService, 
ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
+        this.metadataService = metadataService;
+        this.consumerManager = consumerManager;
+        this.eventListener = eventListener;
         ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
         this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor(
             proxyConfig.getRenewThreadPoolNums(),
@@ -77,29 +83,7 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
             "RenewalWorkerThread",
             proxyConfig.getRenewThreadPoolQueueCapacity()
         );
-        this.init();
-    }
-
-    protected void init() {
-        this.registerConsumerListener();
-        this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> 
log.warn("add renew task failed. queueSize:{}", executor.getQueue().size()));
-        this.appendStartAndShutdown(new StartAndShutdown() {
-            @Override
-            public void start() throws Exception {
-                scheduledExecutorService.scheduleWithFixedDelay(() -> 
scheduleRenewTask(), 0,
-                    
ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), 
TimeUnit.MILLISECONDS);
-            }
-
-            @Override
-            public void shutdown() throws Exception {
-                scheduledExecutorService.shutdown();
-                clearAllHandle();
-            }
-        });
-    }
-
-    protected void registerConsumerListener() {
-        this.messagingProcessor.registerConsumerListener(new 
ConsumerIdsChangeListener() {
+        consumerManager.appendConsumerIdsChangeListener(new 
ConsumerIdsChangeListener() {
             @Override
             public void handle(ConsumerGroupEvent event, String group, 
Object... args) {
                 if (ConsumerGroupEvent.CLIENT_UNREGISTER.equals(event)) {
@@ -112,7 +96,7 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
                             // if the channel sync from other proxy is 
expired, not to clear data of connect to current proxy
                             return;
                         }
-                        clearGroup(new 
ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
+                        clearGroup(new 
ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), 
group));
                         log.info("clear handle of this client when client 
unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo);
                     }
                 }
@@ -123,18 +107,46 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
 
             }
         });
+        this.receiptHandleGroupMap = new ConcurrentHashMap<>();
+        this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> 
log.warn("add renew task failed. queueSize:{}", executor.getQueue().size()));
+        this.appendStartAndShutdown(new StartAndShutdown() {
+            @Override
+            public void start() throws Exception {
+                scheduledExecutorService.scheduleWithFixedDelay(() -> 
scheduleRenewTask(), 0,
+                    
ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), 
TimeUnit.MILLISECONDS);
+            }
+
+            @Override
+            public void shutdown() throws Exception {
+                scheduledExecutorService.shutdown();
+                clearAllHandle();
+            }
+        });
     }
 
-    protected ProxyContext createContext(String actionName) {
-        return ProxyContext.createForInner(this.getClass().getSimpleName() + 
actionName);
+    public void addReceiptHandle(Channel channel, String group, String msgID, 
MessageReceiptHandle messageReceiptHandle) {
+        ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new 
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group),
+            k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle);
+    }
+
+    public MessageReceiptHandle removeReceiptHandle(Channel channel, String 
group, String msgID, String receiptHandle) {
+        ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new 
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group));
+        if (handleGroup == null) {
+            return null;
+        }
+        return handleGroup.remove(msgID, receiptHandle);
     }
 
-    protected void scheduleRenewTask() {
+    protected boolean 
clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) {
+        return this.consumerManager.findChannel(groupKey.getGroup(), 
groupKey.getChannel()) == null;
+    }
+
+    public void scheduleRenewTask() {
         Stopwatch stopwatch = Stopwatch.createStarted();
         try {
             ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
-            for (Map.Entry<ReceiptHandleGroupKey, ReceiptHandleGroup> entry : 
receiptHandleGroupMap.entrySet()) {
-                ReceiptHandleGroupKey key = entry.getKey();
+            for (Map.Entry<ReceiptHandleProcessor.ReceiptHandleGroupKey, 
ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) {
+                ReceiptHandleProcessor.ReceiptHandleGroupKey key = 
entry.getKey();
                 if (clientIsOffline(key)) {
                     clearGroup(key);
                     continue;
@@ -168,8 +180,6 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
     protected CompletableFuture<MessageReceiptHandle> 
startRenewMessage(MessageReceiptHandle messageReceiptHandle) {
         CompletableFuture<MessageReceiptHandle> resFuture = new 
CompletableFuture<>();
         ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
-        ProxyContext context = createContext("RenewMessage");
-        ReceiptHandle handle = 
ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
         long current = System.currentTimeMillis();
         try {
             if (messageReceiptHandle.getRenewRetryTimes() >= 
proxyConfig.getMaxRenewRetryTimes()) {
@@ -177,9 +187,8 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
                 return CompletableFuture.completedFuture(null);
             }
             if (current - messageReceiptHandle.getConsumeTimestamp() < 
proxyConfig.getRenewMaxTimeMillis()) {
-                CompletableFuture<AckResult> future =
-                    messagingProcessor.changeInvisibleTime(context, handle, 
messageReceiptHandle.getMessageId(),
-                        messageReceiptHandle.getGroup(), 
messageReceiptHandle.getTopic(), 
RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()));
+                CompletableFuture<AckResult> future = new 
CompletableFuture<>();
+                eventListener.fireEvent(new RenewEvent(messageReceiptHandle, 
RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future));
                 future.whenComplete((ackResult, throwable) -> {
                     if (throwable != null) {
                         log.error("error when renew. handle:{}", 
messageReceiptHandle, throwable);
@@ -200,16 +209,16 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
                     }
                 });
             } else {
+                ProxyContext context = createContext("RenewMessage");
                 SubscriptionGroupConfig subscriptionGroupConfig =
-                    
messagingProcessor.getMetadataService().getSubscriptionGroupConfig(context, 
messageReceiptHandle.getGroup());
+                    metadataService.getSubscriptionGroupConfig(context, 
messageReceiptHandle.getGroup());
                 if (subscriptionGroupConfig == null) {
                     log.error("group's subscriptionGroupConfig is null when 
renew. handle: {}", messageReceiptHandle);
                     return CompletableFuture.completedFuture(null);
                 }
                 RetryPolicy retryPolicy = 
subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
-                CompletableFuture<AckResult> future = 
messagingProcessor.changeInvisibleTime(context,
-                    handle, messageReceiptHandle.getMessageId(), 
messageReceiptHandle.getGroup(),
-                    messageReceiptHandle.getTopic(), 
retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()));
+                CompletableFuture<AckResult> future = new 
CompletableFuture<>();
+                eventListener.fireEvent(new RenewEvent(messageReceiptHandle, 
retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), 
future));
                 future.whenComplete((ackResult, throwable) -> {
                     if (throwable != null) {
                         log.error("error when nack in renew. handle:{}", 
messageReceiptHandle, throwable);
@@ -224,55 +233,11 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
         return resFuture;
     }
 
-    protected boolean renewExceptionNeedRetry(Throwable t) {
-        t = ExceptionUtils.getRealException(t);
-        if (t instanceof ProxyException) {
-            ProxyException proxyException = (ProxyException) t;
-            if 
(ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) ||
-                
ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    protected boolean clientIsOffline(ReceiptHandleGroupKey groupKey) {
-        return 
this.messagingProcessor.findConsumerChannel(createContext("JudgeClientOnline"), 
groupKey.group, groupKey.channel) == null;
-    }
-
-    public void addReceiptHandle(ProxyContext ctx, Channel channel, String 
group, String msgID, MessageReceiptHandle messageReceiptHandle) {
-        this.addReceiptHandle(ctx, new ReceiptHandleGroupKey(channel, group), 
msgID, messageReceiptHandle);
-    }
-
-    protected void addReceiptHandle(ProxyContext ctx, ReceiptHandleGroupKey 
key, String msgID, MessageReceiptHandle messageReceiptHandle) {
-        if (key == null) {
-            return;
-        }
-        ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, key,
-            k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle);
-    }
-
-    public MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel 
channel, String group, String msgID, String receiptHandle) {
-        return this.removeReceiptHandle(ctx, new 
ReceiptHandleGroupKey(channel, group), msgID, receiptHandle);
-    }
-
-    protected MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, 
ReceiptHandleGroupKey key, String msgID, String receiptHandle) {
-        if (key == null) {
-            return null;
-        }
-        ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(key);
-        if (handleGroup == null) {
-            return null;
-        }
-        return handleGroup.remove(msgID, receiptHandle);
-    }
-
-    protected void clearGroup(ReceiptHandleGroupKey key) {
+    protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey 
key) {
         if (key == null) {
             return;
         }
         ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
-        ProxyContext context = createContext("ClearGroup");
         ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key);
         if (handleGroup == null) {
             return;
@@ -280,15 +245,8 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
         handleGroup.scan((msgID, handle, v) -> {
             try {
                 handleGroup.computeIfPresent(msgID, handle, 
messageReceiptHandle -> {
-                    ReceiptHandle receiptHandle = 
ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
-                    messagingProcessor.changeInvisibleTime(
-                        context,
-                        receiptHandle,
-                        messageReceiptHandle.getMessageId(),
-                        messageReceiptHandle.getGroup(),
-                        messageReceiptHandle.getTopic(),
-                        proxyConfig.getInvisibleTimeMillisWhenClear()
-                    );
+                    CompletableFuture<AckResult> future = new 
CompletableFuture<>();
+                    eventListener.fireEvent(new 
RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), 
future));
                     return CompletableFuture.completedFuture(null);
                 });
             } catch (Exception e) {
@@ -297,59 +255,28 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
         });
     }
 
-    protected void clearAllHandle() {
+    public void clearAllHandle() {
         log.info("start clear all handle in receiptHandleProcessor");
-        Set<ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet();
-        for (ReceiptHandleGroupKey key : keySet) {
+        Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet = 
receiptHandleGroupMap.keySet();
+        for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) {
             clearGroup(key);
         }
         log.info("clear all handle in receiptHandleProcessor done");
     }
 
-    public static class ReceiptHandleGroupKey {
-        protected final Channel channel;
-        protected final String group;
-
-        public ReceiptHandleGroupKey(Channel channel, String group) {
-            this.channel = channel;
-            this.group = group;
-        }
-
-        protected String getChannelId() {
-            return channel.id().asLongText();
-        }
-
-        public String getGroup() {
-            return group;
-        }
-
-        public Channel getChannel() {
-            return channel;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
+    protected boolean renewExceptionNeedRetry(Throwable t) {
+        t = ExceptionUtils.getRealException(t);
+        if (t instanceof ProxyException) {
+            ProxyException proxyException = (ProxyException) t;
+            if 
(ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) ||
+                
ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) {
                 return false;
             }
-            ReceiptHandleGroupKey key = (ReceiptHandleGroupKey) o;
-            return Objects.equal(getChannelId(), key.getChannelId()) && 
Objects.equal(group, key.group);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hashCode(getChannelId(), group);
         }
+        return true;
+    }
 
-        @Override
-        public String toString() {
-            return MoreObjects.toStringHelper(this)
-                .add("channelId", getChannelId())
-                .add("group", group)
-                .toString();
-        }
+    protected ProxyContext createContext(String actionName) {
+        return ProxyContext.createForInner(this.getClass().getSimpleName() + 
actionName);
     }
 }
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java
index 4df834bb65..49fdfc6a8b 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java
@@ -47,7 +47,7 @@ public class AckMessageActivityTest extends BaseActivityTest {
     @Before
     public void before() throws Throwable {
         super.before();
-        this.ackMessageActivity = new AckMessageActivity(messagingProcessor, 
receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager);
+        this.ackMessageActivity = new AckMessageActivity(messagingProcessor, 
grpcClientSettingsManager, grpcChannelManager);
     }
 
     @Test
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
index fdd052da76..2de9a066be 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
@@ -49,7 +49,7 @@ public class ChangeInvisibleDurationActivityTest extends 
BaseActivityTest {
     @Before
     public void before() throws Throwable {
         super.before();
-        this.changeInvisibleDurationActivity = new 
ChangeInvisibleDurationActivity(messagingProcessor, receiptHandleProcessor,
+        this.changeInvisibleDurationActivity = new 
ChangeInvisibleDurationActivity(messagingProcessor,
             grpcClientSettingsManager, grpcChannelManager);
     }
 
@@ -92,7 +92,7 @@ public class ChangeInvisibleDurationActivityTest extends 
BaseActivityTest {
         when(this.messagingProcessor.changeInvisibleTime(
             any(), receiptHandleCaptor.capture(), anyString(), anyString(), 
anyString(), invisibleTimeArgumentCaptor.capture()
         )).thenReturn(CompletableFuture.completedFuture(ackResult));
-        when(receiptHandleProcessor.removeReceiptHandle(any(), any(), 
anyString(), anyString(), anyString()))
+        when(messagingProcessor.removeReceiptHandle(any(), any(), anyString(), 
anyString(), anyString()))
             .thenReturn(new MessageReceiptHandle("group", "topic", 0, 
savedHandleStr, "msgId", 0, 0));
 
         ChangeInvisibleDurationResponse response = 
this.changeInvisibleDurationActivity.changeInvisibleDuration(
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
index 535af838c9..2e562504a4 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
@@ -74,7 +74,7 @@ public class ReceiveMessageActivityTest extends 
BaseActivityTest {
     public void before() throws Throwable {
         super.before();
         
ConfigurationManager.getProxyConfig().setGrpcClientConsumerMinLongPollingTimeoutMillis(0);
-        this.receiveMessageActivity = new 
ReceiveMessageActivity(messagingProcessor, receiptHandleProcessor,
+        this.receiveMessageActivity = new 
ReceiveMessageActivity(messagingProcessor,
             grpcClientSettingsManager, grpcChannelManager);
     }
 
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
index ec620340c5..87824e5b4b 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
@@ -44,7 +44,7 @@ public class ForwardMessageToDLQActivityTest extends 
BaseActivityTest {
     @Before
     public void before() throws Throwable {
         super.before();
-        this.forwardMessageToDLQActivity = new 
ForwardMessageToDLQActivity(messagingProcessor,receiptHandleProcessor, 
grpcClientSettingsManager, grpcChannelManager);
+        this.forwardMessageToDLQActivity = new 
ForwardMessageToDLQActivity(messagingProcessor, grpcClientSettingsManager, 
grpcChannelManager);
     }
 
     @Test
@@ -75,7 +75,7 @@ public class ForwardMessageToDLQActivityTest extends 
BaseActivityTest {
             
.thenReturn(CompletableFuture.completedFuture(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS,
 "")));
 
         String savedHandleStr = buildReceiptHandle("topic", 
System.currentTimeMillis(),3000);
-        when(receiptHandleProcessor.removeReceiptHandle(any(), any(), 
anyString(), anyString(), anyString()))
+        when(messagingProcessor.removeReceiptHandle(any(), any(), anyString(), 
anyString(), anyString()))
             .thenReturn(new MessageReceiptHandle("group", "topic", 0, 
savedHandleStr, "msgId", 0, 0));
 
         ForwardMessageToDeadLetterQueueResponse response = 
this.forwardMessageToDLQActivity.forwardMessageToDeadLetterQueue(
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
index bfa2cc3e64..717e86fc05 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
@@ -73,7 +73,6 @@ public class ConsumerProcessorTest extends BaseProcessorTest {
     @Before
     public void before() throws Throwable {
         super.before();
-        ReceiptHandleProcessor receiptHandleProcessor = new 
ReceiptHandleProcessor(messagingProcessor);
         this.consumerProcessor = new ConsumerProcessor(messagingProcessor, 
serviceManager, Executors.newCachedThreadPool());
     }
 
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java
similarity index 63%
rename from 
proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
rename to 
proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java
index c76f40f920..877c9fd6f4 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java
@@ -15,21 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.proxy.processor;
+package org.apache.rocketmq.proxy.service.receipt;
 
-import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelConfig;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelId;
-import io.netty.channel.ChannelMetadata;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelProgressivePromise;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.EventLoop;
-import io.netty.util.Attribute;
-import io.netty.util.AttributeKey;
-import java.net.SocketAddress;
+import io.netty.channel.local.LocalChannel;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
@@ -38,26 +27,34 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
 import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
+import org.apache.rocketmq.broker.client.ConsumerManager;
 import org.apache.rocketmq.client.consumer.AckResult;
 import org.apache.rocketmq.client.consumer.AckStatus;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.state.StateEventListener;
+import org.apache.rocketmq.proxy.common.RenewEvent;
 import org.apache.rocketmq.proxy.common.ContextVariable;
 import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.common.ProxyException;
-import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
 import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
 import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
+import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
+import org.apache.rocketmq.proxy.service.BaseServiceTest;
+import org.apache.rocketmq.proxy.service.metadata.MetadataService;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
 
@@ -65,8 +62,14 @@ import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-public class ReceiptHandleProcessorTest extends BaseProcessorTest {
-    private ReceiptHandleProcessor receiptHandleProcessor;
+public class ReceiptHandleManagerTest extends BaseServiceTest {
+    private ReceiptHandleManager receiptHandleManager;
+    @Mock
+    protected MessagingProcessor messagingProcessor;
+    @Mock
+    protected MetadataService metadataService;
+    @Mock
+    protected ConsumerManager consumerManager;
 
     private static final ProxyContext PROXY_CONTEXT = ProxyContext.create();
     private static final String GROUP = "group";
@@ -84,6 +87,22 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
 
     @Before
     public void setup() {
+        receiptHandleManager = new ReceiptHandleManager(metadataService, 
consumerManager, new StateEventListener<RenewEvent>() {
+            @Override
+            public void fireEvent(RenewEvent event) {
+                MessageReceiptHandle messageReceiptHandle = 
event.getMessageReceiptHandle();
+                ReceiptHandle handle = 
ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
+                messagingProcessor.changeInvisibleTime(PROXY_CONTEXT, handle, 
messageReceiptHandle.getMessageId(),
+                        messageReceiptHandle.getGroup(), 
messageReceiptHandle.getTopic(), event.getRenewTime())
+                    .whenComplete((v, t) -> {
+                        if (t != null) {
+                            event.getFuture().completeExceptionally(t);
+                            return;
+                        }
+                        event.getFuture().complete(v);
+                    });
+            }
+        });
         ProxyConfig config = ConfigurationManager.getProxyConfig();
         receiptHandle = ReceiptHandle.builder()
             .startOffset(0L)
@@ -97,20 +116,19 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
             .commitLogOffset(0L)
             .build().encode();
         PROXY_CONTEXT.withVal(ContextVariable.CLIENT_ID, "channel-id");
-        PROXY_CONTEXT.withVal(ContextVariable.CHANNEL, new MockChannel());
-        receiptHandleProcessor = new 
ReceiptHandleProcessor(messagingProcessor);
-        
Mockito.doNothing().when(messagingProcessor).registerConsumerListener(Mockito.any(ConsumerIdsChangeListener.class));
+        PROXY_CONTEXT.withVal(ContextVariable.CHANNEL, new LocalChannel());
+        
Mockito.doNothing().when(consumerManager).appendConsumerIdsChangeListener(Mockito.any(ConsumerIdsChangeListener.class));
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, 
QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET,
             RECONSUME_TIMES);
     }
 
     @Test
     public void testAddReceiptHandle() {
-        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        Channel channel = new LocalChannel();
+        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
-        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        receiptHandleProcessor.scheduleRenewTask();
+        Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        receiptHandleManager.scheduleRenewTask();
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), 
Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
                 Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
@@ -134,12 +152,12 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
                 .build().encode();
             MessageReceiptHandle messageReceiptHandle = new 
MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET,
                 RECONSUME_TIMES);
-            receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, 
GROUP, MSG_ID, messageReceiptHandle);
+            receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
         }
-        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
-        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        receiptHandleProcessor.scheduleRenewTask();
+        Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        receiptHandleManager.scheduleRenewTask();
         ArgumentCaptor<ReceiptHandle> handleArgumentCaptor = 
ArgumentCaptor.forClass(ReceiptHandle.class);
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), 
handleArgumentCaptor.capture(), Mockito.eq(MESSAGE_ID),
@@ -152,10 +170,10 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
     public void testRenewReceiptHandle() {
         ProxyConfig config = ConfigurationManager.getProxyConfig();
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
-        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         long newInvisibleTime = 18000L;
 
         ReceiptHandle newReceiptHandleClass = ReceiptHandle.builder()
@@ -179,27 +197,26 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
         ackResult.setExtraInfo(newReceiptHandle);
 
         
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class),
 Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(retryPolicy.nextDelayDuration(times.get()))))
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(retryPolicy.nextDelayDuration(times.get()))))
             .thenReturn(CompletableFuture.completedFuture(ackResult));
-        receiptHandleProcessor.scheduleRenewTask();
+        receiptHandleManager.scheduleRenewTask();
 
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), 
Mockito.argThat(r -> r.getInvisibleTime() == INVISIBLE_TIME), 
Mockito.eq(MESSAGE_ID),
                 Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(retryPolicy.nextDelayDuration(times.get())));
-        receiptHandleProcessor.scheduleRenewTask();
+        receiptHandleManager.scheduleRenewTask();
 
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), 
Mockito.argThat(r -> r.getInvisibleTime() == newInvisibleTime), 
Mockito.eq(MESSAGE_ID),
                 Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(retryPolicy.nextDelayDuration(times.incrementAndGet())));
-        receiptHandleProcessor.scheduleRenewTask();
+        receiptHandleManager.scheduleRenewTask();
     }
 
     @Test
     public void testRenewExceedMaxRenewTimes() {
-        ProxyConfig config = ConfigurationManager.getProxyConfig();
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
 
         CompletableFuture<AckResult> ackResultFuture = new 
CompletableFuture<>();
         ackResultFuture.completeExceptionally(new MQClientException(0, 
"error"));
@@ -207,13 +224,13 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
         RetryPolicy retryPolicy = new RenewStrategyPolicy();
 
         
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class),
 Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(retryPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes()))))
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(retryPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes()))))
             .thenReturn(ackResultFuture);
 
         await().atMost(Duration.ofSeconds(1)).until(() -> {
-            receiptHandleProcessor.scheduleRenewTask();
+            receiptHandleManager.scheduleRenewTask();
             try {
-                ReceiptHandleGroup receiptHandleGroup = 
receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get();
+                ReceiptHandleGroup receiptHandleGroup = 
receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get();
                 return receiptHandleGroup.isEmpty();
             } catch (Exception e) {
                 return false;
@@ -228,19 +245,19 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
     @Test
     public void testRenewWithInvalidHandle() {
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
 
         CompletableFuture<AckResult> ackResultFuture = new 
CompletableFuture<>();
         ackResultFuture.completeExceptionally(new 
ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error"));
         
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class),
 Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())))
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())))
             .thenReturn(ackResultFuture);
 
         await().atMost(Duration.ofSeconds(1)).until(() -> {
-            receiptHandleProcessor.scheduleRenewTask();
+            receiptHandleManager.scheduleRenewTask();
             try {
-                ReceiptHandleGroup receiptHandleGroup = 
receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get();
+                ReceiptHandleGroup receiptHandleGroup = 
receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get();
                 return receiptHandleGroup.isEmpty();
             } catch (Exception e) {
                 return false;
@@ -252,8 +269,8 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
     public void testRenewWithErrorThenOK() {
         ProxyConfig config = ConfigurationManager.getProxyConfig();
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
 
         AtomicInteger count = new AtomicInteger(0);
         List<CompletableFuture<AckResult>> futureList = new ArrayList<>();
@@ -297,13 +314,13 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
             Mockito.doAnswer((Answer<CompletableFuture<AckResult>>) mock -> {
                 return futureList.get(count.getAndIncrement());
             
}).when(messagingProcessor).changeInvisibleTime(Mockito.any(ProxyContext.class),
 Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-                    Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement())));
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement())));
         }
 
         
await().pollDelay(Duration.ZERO).pollInterval(Duration.ofMillis(10)).atMost(Duration.ofSeconds(10)).until(()
 -> {
-            receiptHandleProcessor.scheduleRenewTask();
+            receiptHandleManager.scheduleRenewTask();
             try {
-                ReceiptHandleGroup receiptHandleGroup = 
receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get();
+                ReceiptHandleGroup receiptHandleGroup = 
receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get();
                 return receiptHandleGroup.isEmpty();
             } catch (Exception e) {
                 return false;
@@ -331,19 +348,19 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, 
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
             RECONSUME_TIMES);
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
-        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
+        Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), 
Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), 
Mockito.anyLong()))
             .thenReturn(CompletableFuture.completedFuture(new AckResult()));
-        receiptHandleProcessor.scheduleRenewTask();
+        receiptHandleManager.scheduleRenewTask();
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), 
Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
                 Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(groupConfig.getGroupRetryPolicy().getRetryPolicy().nextDelayDuration(RECONSUME_TIMES)));
 
         await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> {
-            ReceiptHandleGroup receiptHandleGroup = 
receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get();
+            ReceiptHandleGroup receiptHandleGroup = 
receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get();
             assertTrue(receiptHandleGroup.isEmpty());
         });
     }
@@ -365,15 +382,15 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, 
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
             RECONSUME_TIMES);
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
-        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
+        Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(null);
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), 
Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), 
Mockito.anyLong()))
             .thenReturn(CompletableFuture.completedFuture(new AckResult()));
-        receiptHandleProcessor.scheduleRenewTask();
+        receiptHandleManager.scheduleRenewTask();
         await().atMost(Duration.ofSeconds(1)).until(() -> {
             try {
-                ReceiptHandleGroup receiptHandleGroup = 
receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get();
+                ReceiptHandleGroup receiptHandleGroup = 
receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get();
                 return receiptHandleGroup.isEmpty();
             } catch (Exception e) {
                 return false;
@@ -401,11 +418,11 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, 
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
             RECONSUME_TIMES);
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
-        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        receiptHandleProcessor.scheduleRenewTask();
+        Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        receiptHandleManager.scheduleRenewTask();
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), 
Mockito.any(ReceiptHandle.class), Mockito.anyString(),
                 Mockito.anyString(), Mockito.anyString(), Mockito.anyLong());
@@ -414,11 +431,11 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
     @Test
     public void testRemoveReceiptHandle() {
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
-        receiptHandleProcessor.removeReceiptHandle(PROXY_CONTEXT, channel, 
GROUP, MSG_ID, receiptHandle);
+        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
+        receiptHandleManager.removeReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle);
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
-        receiptHandleProcessor.scheduleRenewTask();
+        receiptHandleManager.scheduleRenewTask();
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), 
Mockito.any(ReceiptHandle.class), Mockito.anyString(),
                 Mockito.anyString(), Mockito.anyString(), Mockito.anyLong());
@@ -427,11 +444,11 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
     @Test
     public void testClearGroup() {
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
-        receiptHandleProcessor.clearGroup(new 
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP));
+        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
+        receiptHandleManager.clearGroup(new 
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP));
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
-        receiptHandleProcessor.scheduleRenewTask();
+        receiptHandleManager.scheduleRenewTask();
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), 
Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
                 Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(ConfigurationManager.getProxyConfig().getInvisibleTimeMillisWhenClear()));
@@ -440,242 +457,10 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
     @Test
     public void testClientOffline() {
         ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor = 
ArgumentCaptor.forClass(ConsumerIdsChangeListener.class);
-        Mockito.verify(messagingProcessor, 
Mockito.times(1)).registerConsumerListener(listenerArgumentCaptor.capture());
+        Mockito.verify(consumerManager, 
Mockito.times(1)).appendConsumerIdsChangeListener(listenerArgumentCaptor.capture());
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, 
messageReceiptHandle);
         
listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, 
GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0));
-        assertTrue(receiptHandleProcessor.receiptHandleGroupMap.isEmpty());
-    }
-
-    class MockChannel implements Channel {
-        @Override
-        public ChannelId id() {
-            return new ChannelId() {
-                @Override
-                public String asShortText() {
-                    return "short";
-                }
-
-                @Override
-                public String asLongText() {
-                    return "long";
-                }
-
-                @Override
-                public int compareTo(ChannelId o) {
-                    return 1;
-                }
-            };
-        }
-
-        @Override
-        public EventLoop eventLoop() {
-            return null;
-        }
-
-        @Override
-        public Channel parent() {
-            return null;
-        }
-
-        @Override
-        public ChannelConfig config() {
-            return null;
-        }
-
-        @Override
-        public boolean isOpen() {
-            return false;
-        }
-
-        @Override
-        public boolean isRegistered() {
-            return false;
-        }
-
-        @Override
-        public boolean isActive() {
-            return false;
-        }
-
-        @Override
-        public ChannelMetadata metadata() {
-            return null;
-        }
-
-        @Override
-        public SocketAddress localAddress() {
-            return null;
-        }
-
-        @Override
-        public SocketAddress remoteAddress() {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture closeFuture() {
-            return null;
-        }
-
-        @Override
-        public boolean isWritable() {
-            return false;
-        }
-
-        @Override
-        public long bytesBeforeUnwritable() {
-            return 0;
-        }
-
-        @Override
-        public long bytesBeforeWritable() {
-            return 0;
-        }
-
-        @Override
-        public Unsafe unsafe() {
-            return null;
-        }
-
-        @Override
-        public ChannelPipeline pipeline() {
-            return null;
-        }
-
-        @Override
-        public ByteBufAllocator alloc() {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture bind(SocketAddress localAddress) {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture connect(SocketAddress remoteAddress) {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture connect(SocketAddress remoteAddress, 
SocketAddress localAddress) {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture disconnect() {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture close() {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture deregister() {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture bind(SocketAddress localAddress, ChannelPromise 
promise) {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture connect(SocketAddress remoteAddress, 
ChannelPromise promise) {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture connect(SocketAddress remoteAddress, 
SocketAddress localAddress, ChannelPromise promise) {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture disconnect(ChannelPromise promise) {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture close(ChannelPromise promise) {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture deregister(ChannelPromise promise) {
-            return null;
-        }
-
-        @Override
-        public Channel read() {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture write(Object msg) {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture write(Object msg, ChannelPromise promise) {
-            return null;
-        }
-
-        @Override
-        public Channel flush() {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) 
{
-            return null;
-        }
-
-        @Override
-        public ChannelFuture writeAndFlush(Object msg) {
-            return null;
-        }
-
-        @Override
-        public ChannelPromise newPromise() {
-            return null;
-        }
-
-        @Override
-        public ChannelProgressivePromise newProgressivePromise() {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture newSucceededFuture() {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture newFailedFuture(Throwable cause) {
-            return null;
-        }
-
-        @Override
-        public ChannelPromise voidPromise() {
-            return null;
-        }
-
-        @Override
-        public <T> Attribute<T> attr(AttributeKey<T> key) {
-            return null;
-        }
-
-        @Override
-        public <T> boolean hasAttr(AttributeKey<T> key) {
-            return false;
-        }
-
-        @Override
-        public int compareTo(Channel o) {
-            return 1;
-        }
+        assertTrue(receiptHandleManager.receiptHandleGroupMap.isEmpty());
     }
-}
+}
\ No newline at end of file

Reply via email to