This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 290a4da  Polish code (#194)
290a4da is described below

commit 290a4da4aa609e7778089a3c09aa3cde58def496
Author: Aaron Ai <[email protected]>
AuthorDate: Sun Aug 28 17:18:36 2022 +0800

    Polish code (#194)
---
 .../java/example/ProducerFifoMessageExample.java   |  2 +-
 ...dler.java => CompositedMessageInterceptor.java} | 38 +++++++++++-----------
 ...MessageHandler.java => MessageInterceptor.java} |  8 ++---
 ...Context.java => MessageInterceptorContext.java} |  2 +-
 ...mpl.java => MessageInterceptorContextImpl.java} | 10 +++---
 .../rocketmq/client/java/impl/ClientImpl.java      | 29 +++++++++--------
 .../rocketmq/client/java/impl/ClientManager.java   |  6 ++--
 .../client/java/impl/consumer/ConsumeService.java  | 10 +++---
 .../client/java/impl/consumer/ConsumeTask.java     | 18 +++++-----
 .../client/java/impl/consumer/ConsumerImpl.java    | 23 ++++++-------
 .../java/impl/consumer/FifoConsumeService.java     |  7 ++--
 .../client/java/impl/consumer/ProcessQueue.java    | 16 ++++++---
 .../java/impl/consumer/ProcessQueueImpl.java       | 32 +++++++++---------
 .../java/impl/consumer/PushConsumerImpl.java       | 12 ++++---
 .../java/impl/consumer/StandardConsumeService.java |  7 ++--
 .../client/java/impl/producer/ProducerImpl.java    | 20 ++++++------
 ...erHandler.java => MessageMeterInterceptor.java} | 22 ++++++-------
 .../java/impl/consumer/ConsumeServiceTest.java     |  4 +--
 .../client/java/impl/consumer/ConsumeTaskTest.java | 10 +++---
 .../java/metrics/MessageMeterHandlerTest.java      | 34 +++++++++----------
 20 files changed, 160 insertions(+), 150 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
index 823aab4..6af757f 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
@@ -69,7 +69,7 @@ public class ProducerFifoMessageExample {
             // Key(s) of the message, another way to mark message besides 
message id.
             .setKeys("yourMessageKey-1ff69ada8e0e")
             // Message group decides the message delivery order.
-            .setMessageGroup("youMessageGroup0")
+            .setMessageGroup("yourMessageGroup0")
             .setBody(body)
             .build();
         try {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageHandler.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java
similarity index 62%
rename from 
java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageHandler.java
rename to 
java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java
index 7a021ab..c0ddaef 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageHandler.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java
@@ -25,48 +25,48 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("rawtypes")
-public class CompositedMessageHandler implements MessageHandler {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(MessageHandler.class);
-    private static final AttributeKey<Map<Integer, Map<AttributeKey, 
Attribute>>> HANDLER_ATTRIBUTES_KEY =
-        AttributeKey.create("composited_handler_attributes");
-    private final List<MessageHandler> handlers;
+public class CompositedMessageInterceptor implements MessageInterceptor {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MessageInterceptor.class);
+    private static final AttributeKey<Map<Integer, Map<AttributeKey, 
Attribute>>> INTERCEPTOR_ATTRIBUTES_KEY =
+        AttributeKey.create("composited_interceptor_attributes");
+    private final List<MessageInterceptor> interceptors;
 
-    public CompositedMessageHandler(List<MessageHandler> handlers) {
-        this.handlers = handlers;
+    public CompositedMessageInterceptor(List<MessageInterceptor> interceptors) 
{
+        this.interceptors = interceptors;
     }
 
     @Override
-    public void doBefore(MessageHandlerContext context0, List<GeneralMessage> 
messages) {
+    public void doBefore(MessageInterceptorContext context0, 
List<GeneralMessage> messages) {
         final HashMap<Integer, Map<AttributeKey, Attribute>> attributeMap = 
new HashMap<>();
-        for (int index = 0; index < handlers.size(); index++) {
-            MessageHandler handler = handlers.get(index);
+        for (int index = 0; index < interceptors.size(); index++) {
+            MessageInterceptor interceptor = interceptors.get(index);
             final MessageHookPoints messageHookPoints = 
context0.getMessageHookPoints();
             final MessageHookPointsStatus status = context0.getStatus();
-            final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(messageHookPoints, status);
+            final MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(messageHookPoints, status);
             try {
-                handler.doBefore(context, messages);
+                interceptor.doBefore(context, messages);
             } catch (Throwable t) {
                 LOGGER.error("Exception raised while handing messages", t);
             }
             final Map<AttributeKey, Attribute> attributes = 
context.getAttributes();
             attributeMap.put(index, attributes);
         }
-        context0.putAttribute(HANDLER_ATTRIBUTES_KEY, 
Attribute.create(attributeMap));
+        context0.putAttribute(INTERCEPTOR_ATTRIBUTES_KEY, 
Attribute.create(attributeMap));
     }
 
     @Override
-    public void doAfter(MessageHandlerContext context0, List<GeneralMessage> 
messages) {
-        for (int index = handlers.size() - 1; index >= 0; index--) {
+    public void doAfter(MessageInterceptorContext context0, 
List<GeneralMessage> messages) {
+        for (int index = interceptors.size() - 1; index >= 0; index--) {
             final Map<Integer, Map<AttributeKey, Attribute>> attributeMap =
-                context0.getAttribute(HANDLER_ATTRIBUTES_KEY).get();
+                context0.getAttribute(INTERCEPTOR_ATTRIBUTES_KEY).get();
             final Map<AttributeKey, Attribute> attributes = 
attributeMap.get(index);
             final MessageHookPoints messageHookPoints = 
context0.getMessageHookPoints();
             final MessageHookPointsStatus status = context0.getStatus();
-            final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(messageHookPoints, status,
+            final MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(messageHookPoints, status,
                 attributes);
-            MessageHandler handler = handlers.get(index);
+            MessageInterceptor interceptor = interceptors.get(index);
             try {
-                handler.doAfter(context, messages);
+                interceptor.doAfter(context, messages);
             } catch (Throwable t) {
                 LOGGER.error("Exception raised while handing messages", t);
             }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHandler.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptor.java
similarity index 76%
rename from 
java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHandler.java
rename to 
java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptor.java
index b2c5087..593c408 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHandler.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptor.java
@@ -22,11 +22,11 @@ import java.util.List;
 import org.apache.rocketmq.client.java.message.GeneralMessage;
 
 /**
- * Message interceptor based on {@link MessageHookPoints}.
+ * Interface for intercepting ingoing/outgoing message before/after they are 
dispatched by client.
  */
 @Beta
-public interface MessageHandler {
-    void doBefore(MessageHandlerContext context, List<GeneralMessage> 
messages);
+public interface MessageInterceptor {
+    void doBefore(MessageInterceptorContext context, List<GeneralMessage> 
messages);
 
-    void doAfter(MessageHandlerContext context, List<GeneralMessage> messages);
+    void doAfter(MessageInterceptorContext context, List<GeneralMessage> 
messages);
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHandlerContext.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptorContext.java
similarity index 95%
rename from 
java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHandlerContext.java
rename to 
java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptorContext.java
index e38aaec..6769ae9 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHandlerContext.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptorContext.java
@@ -17,7 +17,7 @@
 
 package org.apache.rocketmq.client.java.hook;
 
-public interface MessageHandlerContext {
+public interface MessageInterceptorContext {
     MessageHookPoints getMessageHookPoints();
 
     MessageHookPointsStatus getStatus();
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHandlerContextImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptorContextImpl.java
similarity index 82%
rename from 
java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHandlerContextImpl.java
rename to 
java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptorContextImpl.java
index 486cd30..2565eb7 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHandlerContextImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptorContextImpl.java
@@ -21,31 +21,31 @@ import java.util.HashMap;
 import java.util.Map;
 
 @SuppressWarnings("rawtypes")
-public class MessageHandlerContextImpl implements MessageHandlerContext {
+public class MessageInterceptorContextImpl implements 
MessageInterceptorContext {
     private final MessageHookPoints messageHookPoints;
     private MessageHookPointsStatus status;
     private final Map<AttributeKey, Attribute> attributes;
 
-    public MessageHandlerContextImpl(MessageHookPoints messageHookPoints) {
+    public MessageInterceptorContextImpl(MessageHookPoints messageHookPoints) {
         this.messageHookPoints = messageHookPoints;
         this.status = MessageHookPointsStatus.UNSET;
         this.attributes = new HashMap<>();
     }
 
-    public MessageHandlerContextImpl(MessageHookPoints messageHookPoints, 
MessageHookPointsStatus status) {
+    public MessageInterceptorContextImpl(MessageHookPoints messageHookPoints, 
MessageHookPointsStatus status) {
         this.messageHookPoints = messageHookPoints;
         this.status = status;
         this.attributes = new HashMap<>();
     }
 
-    public MessageHandlerContextImpl(MessageHookPoints messageHookPoints, 
MessageHookPointsStatus status,
+    public MessageInterceptorContextImpl(MessageHookPoints messageHookPoints, 
MessageHookPointsStatus status,
         Map<AttributeKey, Attribute> attributes) {
         this.messageHookPoints = messageHookPoints;
         this.status = status;
         this.attributes = attributes;
     }
 
-    public MessageHandlerContextImpl(MessageHandlerContextImpl context, 
MessageHookPointsStatus status) {
+    public MessageInterceptorContextImpl(MessageInterceptorContextImpl 
context, MessageHookPointsStatus status) {
         this.messageHookPoints = context.messageHookPoints;
         this.status = status;
         this.attributes = context.attributes;
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index d48ee74..4745ec5 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -72,13 +72,13 @@ import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.exception.InternalErrorException;
 import org.apache.rocketmq.client.java.exception.StatusChecker;
-import org.apache.rocketmq.client.java.hook.CompositedMessageHandler;
-import org.apache.rocketmq.client.java.hook.MessageHandler;
-import org.apache.rocketmq.client.java.hook.MessageHandlerContext;
+import org.apache.rocketmq.client.java.hook.CompositedMessageInterceptor;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
+import org.apache.rocketmq.client.java.hook.MessageInterceptorContext;
 import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
 import org.apache.rocketmq.client.java.message.GeneralMessage;
 import org.apache.rocketmq.client.java.metrics.ClientMeterManager;
-import org.apache.rocketmq.client.java.metrics.MessageMeterHandler;
+import org.apache.rocketmq.client.java.metrics.MessageMeterInterceptor;
 import org.apache.rocketmq.client.java.metrics.Metric;
 import org.apache.rocketmq.client.java.misc.ClientId;
 import org.apache.rocketmq.client.java.misc.ExecutorServices;
@@ -93,7 +93,7 @@ import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
 public abstract class ClientImpl extends AbstractIdleService implements 
Client, ClientSessionHandler,
-    MessageHandler {
+    MessageInterceptor {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ClientImpl.class);
     /**
      * The telemetry timeout should not be too long, otherwise
@@ -126,7 +126,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     private final Map<Endpoints, ClientSessionImpl> sessionsTable;
     private final ReadWriteLock sessionsLock;
 
-    private final CompositedMessageHandler compositedMessageHandler;
+    private final CompositedMessageInterceptor compositedMessageInterceptor;
 
     public ClientImpl(ClientConfiguration clientConfiguration, Set<String> 
topics) {
         this.clientConfiguration = checkNotNull(clientConfiguration, 
"clientConfiguration should not be null");
@@ -158,8 +158,9 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
 
         this.clientMeterManager = new ClientMeterManager(clientId, 
clientConfiguration);
 
-        this.compositedMessageHandler =
-            new CompositedMessageHandler(Collections.singletonList(new 
MessageMeterHandler(this, clientMeterManager)));
+        this.compositedMessageInterceptor =
+            new CompositedMessageInterceptor(Collections.singletonList(new 
MessageMeterInterceptor(this,
+                clientMeterManager)));
 
         this.telemetryCommandExecutor = new ThreadPoolExecutor(
             1,
@@ -232,22 +233,22 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     }
 
     @Override
-    public void doBefore(MessageHandlerContext context, List<GeneralMessage> 
generalMessages) {
+    public void doBefore(MessageInterceptorContext context, 
List<GeneralMessage> generalMessages) {
         try {
-            compositedMessageHandler.doBefore(context, generalMessages);
+            compositedMessageInterceptor.doBefore(context, generalMessages);
         } catch (Throwable t) {
             // Should never reach here.
-            LOGGER.error("[BUG] Exception raised while handling messages, 
clientId={}", clientId, t);
+            LOGGER.error("[Bug] Exception raised while handling messages, 
clientId={}", clientId, t);
         }
     }
 
     @Override
-    public void doAfter(MessageHandlerContext context, List<GeneralMessage> 
generalMessages) {
+    public void doAfter(MessageInterceptorContext context, 
List<GeneralMessage> generalMessages) {
         try {
-            compositedMessageHandler.doAfter(context, generalMessages);
+            compositedMessageInterceptor.doAfter(context, generalMessages);
         } catch (Throwable t) {
             // Should never reach here.
-            LOGGER.error("[BUG] Exception raised while handling messages, 
clientId={}", clientId, t);
+            LOGGER.error("[Bug] Exception raised while handling messages, 
clientId={}", clientId, t);
         }
     }
 
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
index 468712b..ebc6f78 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
@@ -137,8 +137,7 @@ public abstract class ClientManager extends 
AbstractIdleService {
      * @return invocation of response future.
      */
     public abstract RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse>
-    changeInvisibleDuration(Endpoints endpoints, 
ChangeInvisibleDurationRequest request,
-        Duration duration);
+    changeInvisibleDuration(Endpoints endpoints, 
ChangeInvisibleDurationRequest request, Duration duration);
 
     /**
      * Send a message to the dead letter queue asynchronously, the method 
ensures no throwable.
@@ -173,8 +172,7 @@ public abstract class ClientManager extends 
AbstractIdleService {
      */
     @SuppressWarnings("UnusedReturnValue")
     public abstract RpcFuture<NotifyClientTerminationRequest, 
NotifyClientTerminationResponse>
-    notifyClientTermination(Endpoints endpoints, 
NotifyClientTerminationRequest request,
-        Duration duration);
+    notifyClientTermination(Endpoints endpoints, 
NotifyClientTerminationRequest request, Duration duration);
 
     /**
      * Establish telemetry session stream to server.
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
index ac653fc..1e9f585 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
@@ -30,7 +30,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.java.hook.MessageHandler;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.misc.ClientId;
 import org.slf4j.Logger;
@@ -43,15 +43,15 @@ public abstract class ConsumeService {
     protected final ClientId clientId;
     private final MessageListener messageListener;
     private final ThreadPoolExecutor consumptionExecutor;
-    private final MessageHandler messageHandler;
+    private final MessageInterceptor messageInterceptor;
     private final ScheduledExecutorService scheduler;
 
     public ConsumeService(ClientId clientId, MessageListener messageListener, 
ThreadPoolExecutor consumptionExecutor,
-        MessageHandler messageHandler, ScheduledExecutorService scheduler) {
+        MessageInterceptor messageInterceptor, ScheduledExecutorService 
scheduler) {
         this.clientId = clientId;
         this.messageListener = messageListener;
         this.consumptionExecutor = consumptionExecutor;
-        this.messageHandler = messageHandler;
+        this.messageInterceptor = messageInterceptor;
         this.scheduler = scheduler;
     }
 
@@ -63,7 +63,7 @@ public abstract class ConsumeService {
 
     public ListenableFuture<ConsumeResult> consume(MessageViewImpl 
messageView, Duration delay) {
         final ListeningExecutorService executorService = 
MoreExecutors.listeningDecorator(consumptionExecutor);
-        final ConsumeTask task = new ConsumeTask(clientId, messageListener, 
messageView, messageHandler);
+        final ConsumeTask task = new ConsumeTask(clientId, messageListener, 
messageView, messageInterceptor);
         // Consume message with no delay.
         if (Duration.ZERO.compareTo(delay) >= 0) {
             return executorService.submit(task);
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
index 001ad5a..45fd9c9 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
@@ -22,10 +22,10 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.java.hook.MessageHandler;
-import org.apache.rocketmq.client.java.hook.MessageHandlerContextImpl;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
+import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
 import org.apache.rocketmq.client.java.message.GeneralMessage;
 import org.apache.rocketmq.client.java.message.GeneralMessageImpl;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
@@ -39,14 +39,14 @@ public class ConsumeTask implements Callable<ConsumeResult> 
{
     private final ClientId clientId;
     private final MessageListener messageListener;
     private final MessageViewImpl messageView;
-    private final MessageHandler messageHandler;
+    private final MessageInterceptor messageInterceptor;
 
     public ConsumeTask(ClientId clientId, MessageListener messageListener, 
MessageViewImpl messageView,
-        MessageHandler messageHandler) {
+        MessageInterceptor messageInterceptor) {
         this.clientId = clientId;
         this.messageListener = messageListener;
         this.messageView = messageView;
-        this.messageHandler = messageHandler;
+        this.messageInterceptor = messageInterceptor;
     }
 
     /**
@@ -58,8 +58,8 @@ public class ConsumeTask implements Callable<ConsumeResult> {
     public ConsumeResult call() {
         ConsumeResult consumeResult;
         final List<GeneralMessage> generalMessages = 
Collections.singletonList(new GeneralMessageImpl(messageView));
-        MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.CONSUME);
-        messageHandler.doBefore(context, generalMessages);
+        MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(MessageHookPoints.CONSUME);
+        messageInterceptor.doBefore(context, generalMessages);
         try {
             consumeResult = messageListener.consume(messageView);
         } catch (Throwable t) {
@@ -69,8 +69,8 @@ public class ConsumeTask implements Callable<ConsumeResult> {
         }
         MessageHookPointsStatus status = 
ConsumeResult.SUCCESS.equals(consumeResult) ? MessageHookPointsStatus.OK :
             MessageHookPointsStatus.ERROR;
-        context = new MessageHandlerContextImpl(context, status);
-        messageHandler.doAfter(context, generalMessages);
+        context = new MessageInterceptorContextImpl(context, status);
+        messageInterceptor.doAfter(context, generalMessages);
         // Make sure that the return value is the subset of messageViews.
         return consumeResult;
     }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index bb06b97..4add788 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -48,9 +48,9 @@ import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.java.exception.StatusChecker;
-import org.apache.rocketmq.client.java.hook.MessageHandlerContextImpl;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
+import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
 import org.apache.rocketmq.client.java.impl.ClientImpl;
 import org.apache.rocketmq.client.java.impl.ClientManager;
 import org.apache.rocketmq.client.java.message.GeneralMessage;
@@ -80,7 +80,7 @@ abstract class ConsumerImpl extends ClientImpl {
         try {
             final Endpoints endpoints = mq.getBroker().getEndpoints();
             final Duration tolerance = clientConfiguration.getRequestTimeout();
-            final Duration timeout = Duration.ofNanos(awaitDuration.toNanos() 
+ tolerance.toNanos());
+            final Duration timeout = awaitDuration.plus(tolerance);
             final ClientManager clientManager = this.getClientManager();
             final RpcFuture<ReceiveMessageRequest, 
List<ReceiveMessageResponse>> future =
                 clientManager.receiveMessage(endpoints, request, timeout);
@@ -146,7 +146,7 @@ abstract class ConsumerImpl extends ClientImpl {
         final Endpoints endpoints = messageView.getEndpoints();
         RpcFuture<AckMessageRequest, AckMessageResponse> future;
         final List<GeneralMessage> generalMessages = 
Collections.singletonList(new GeneralMessageImpl(messageView));
-        final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.ACK);
+        final MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(MessageHookPoints.ACK);
         doBefore(context, generalMessages);
         try {
             final AckMessageRequest request = 
wrapAckMessageRequest(messageView);
@@ -160,15 +160,15 @@ abstract class ConsumerImpl extends ClientImpl {
             public void onSuccess(AckMessageResponse response) {
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
-                MessageHookPointsStatus messageHookPointsStatus = 
Code.OK.equals(code) ?
+                MessageHookPointsStatus hookPointsStatus = 
Code.OK.equals(code) ?
                     MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
-                MessageHandlerContextImpl context0 = new 
MessageHandlerContextImpl(context, messageHookPointsStatus);
+                MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context, hookPointsStatus);
                 doAfter(context0, generalMessages);
             }
 
             @Override
             public void onFailure(Throwable t) {
-                MessageHandlerContextImpl context0 = new 
MessageHandlerContextImpl(context,
+                MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
                     MessageHookPointsStatus.ERROR);
                 doAfter(context0, generalMessages);
             }
@@ -181,8 +181,8 @@ abstract class ConsumerImpl extends ClientImpl {
         final Endpoints endpoints = messageView.getEndpoints();
         RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> future;
         final List<GeneralMessage> generalMessages = 
Collections.singletonList(new GeneralMessageImpl(messageView));
-        final MessageHandlerContextImpl context =
-            new 
MessageHandlerContextImpl(MessageHookPoints.CHANGE_INVISIBLE_DURATION);
+        final MessageInterceptorContextImpl context =
+            new 
MessageInterceptorContextImpl(MessageHookPoints.CHANGE_INVISIBLE_DURATION);
         doBefore(context, generalMessages);
         final ChangeInvisibleDurationRequest request = 
wrapChangeInvisibleDuration(messageView, invisibleDuration);
         final Duration requestTimeout = 
clientConfiguration.getRequestTimeout();
@@ -193,19 +193,20 @@ abstract class ConsumerImpl extends ClientImpl {
             public void onSuccess(ChangeInvisibleDurationResponse response) {
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
-                MessageHookPointsStatus messageHookPointsStatus = 
Code.OK.equals(code) ?
+                MessageHookPointsStatus hookPointsStatus = 
Code.OK.equals(code) ?
                     MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
                 if (!Code.OK.equals(code)) {
                     LOGGER.error("Failed to change message invisible duration, 
messageId={}, endpoints={}, code={}, " +
                         "status message=[{}], clientId={}", messageId, 
endpoints, code, status.getMessage(), clientId);
                 }
-                MessageHandlerContextImpl context0 = new 
MessageHandlerContextImpl(context, messageHookPointsStatus);
+                MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
+                    hookPointsStatus);
                 doAfter(context0, generalMessages);
             }
 
             @Override
             public void onFailure(Throwable t) {
-                MessageHandlerContextImpl context0 = new 
MessageHandlerContextImpl(context,
+                MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
                     MessageHookPointsStatus.ERROR);
                 doAfter(context0, generalMessages);
                 LOGGER.error("Exception raised while changing message 
invisible duration, messageId={}, endpoints={}, "
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
index a8fa8d0..8a6c7ca 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.java.hook.MessageHandler;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.misc.ClientId;
 import org.slf4j.Logger;
@@ -37,8 +37,9 @@ class FifoConsumeService extends ConsumeService {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(FifoConsumeService.class);
 
     public FifoConsumeService(ClientId clientId, MessageListener 
messageListener,
-        ThreadPoolExecutor consumptionExecutor, MessageHandler messageHandler, 
ScheduledExecutorService scheduler) {
-        super(clientId, messageListener, consumptionExecutor, messageHandler, 
scheduler);
+        ThreadPoolExecutor consumptionExecutor, MessageInterceptor 
messageInterceptor,
+        ScheduledExecutorService scheduler) {
+        super(clientId, messageListener, consumptionExecutor, 
messageInterceptor, scheduler);
     }
 
     @Override
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
index 0376d66..48b5612 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
@@ -36,9 +36,9 @@ import org.apache.rocketmq.client.java.route.MessageQueueImpl;
  * <p>
  * phase 1: Fetch 32 messages successfully from remote.
  * <pre>
- * 32 in   ┌─────────────────────────┐
- * ────────►           32            │
- *         └─────────────────────────┘
+ *  32 in ┌─────────────────────────┐
+ * ───────►           32            │
+ *        └─────────────────────────┘
  *             cached messages = 32
  * </pre>
  * phase 2: consuming 1 message.
@@ -56,7 +56,15 @@ import 
org.apache.rocketmq.client.java.route.MessageQueueImpl;
  *            cached messages = 31
  * </pre>
  *
- * <p>Especially, there are some different processing procedures for FIFO 
consumption. // TODO
+ * <p>Especially, there are some different processing procedures for FIFO 
consumption. The server ensures that the
+ * next batch of messages will not be obtained by the client until the 
previous batch of messages is confirmed to be
+ * consumed successfully or not. In detail, the server confirms the success of 
consumption by message being
+ * successfully acknowledged, and confirms the consumption failure by being 
successfully forwarding to the dead
+ * letter queue, thus the client should try to ensure it succeeded in 
acknowledgement or forwarding to the dead
+ * letter queue as possible.
+ *
+ * <p>Considering the different workflow of FIFO consumption, {@link 
#eraseFifoMessage(MessageViewImpl, ConsumeResult)}
+ * and {@link #discardFifoMessage(MessageViewImpl)} is provided.
  */
 public interface ProcessQueue {
     /**
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index c817bf3..4e31ff9 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -49,9 +49,9 @@ import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.apis.message.MessageView;
 import org.apache.rocketmq.client.java.exception.BadRequestException;
 import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
-import org.apache.rocketmq.client.java.hook.MessageHandlerContextImpl;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
+import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
 import org.apache.rocketmq.client.java.message.GeneralMessage;
 import org.apache.rocketmq.client.java.message.GeneralMessageImpl;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
@@ -197,14 +197,13 @@ class ProcessQueueImpl implements ProcessQueue {
     }
 
     public void receiveMessage() {
+        final ClientId clientId = consumer.getClientId();
         if (dropped) {
-            LOGGER.info("Process queue has been dropped, no longer receive 
message, mq={}, clientId={}", mq,
-                consumer.getClientId());
+            LOGGER.info("Process queue has been dropped, no longer receive 
message, mq={}, clientId={}", mq, clientId);
             return;
         }
         if (this.isCacheFull()) {
-            LOGGER.warn("Process queue cache is full, would receive message 
later, mq={}, clientId={}", mq,
-                consumer.getClientId());
+            LOGGER.warn("Process queue cache is full, would receive message 
later, mq={}, clientId={}", mq, clientId);
             receiveMessageLater(RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL);
             return;
         }
@@ -214,8 +213,7 @@ class ProcessQueueImpl implements ProcessQueue {
     private void receiveMessageImmediately() {
         final ClientId clientId = consumer.getClientId();
         if (!consumer.isRunning()) {
-            LOGGER.info("Stop to receive message because consumer is not 
running, mq={}, clientId={}", mq,
-                clientId);
+            LOGGER.info("Stop to receive message because consumer is not 
running, mq={}, clientId={}", mq, clientId);
             return;
         }
         try {
@@ -225,7 +223,7 @@ class ProcessQueueImpl implements ProcessQueue {
             activityNanoTime = System.nanoTime();
 
             // Intercept before message reception.
-            final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.RECEIVE);
+            final MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);
             consumer.doBefore(context, Collections.emptyList());
 
             final ListenableFuture<ReceiveMessageResult> future = 
consumer.receiveMessage(request, mq,
@@ -237,8 +235,8 @@ class ProcessQueueImpl implements ProcessQueue {
                     final List<GeneralMessage> generalMessages = 
result.getMessageViewImpls().stream()
                         .map((Function<MessageView, GeneralMessage>) 
GeneralMessageImpl::new)
                         .collect(Collectors.toList());
-                    final MessageHandlerContextImpl context0 =
-                        new MessageHandlerContextImpl(context, 
MessageHookPointsStatus.OK);
+                    final MessageInterceptorContextImpl context0 =
+                        new MessageInterceptorContextImpl(context, 
MessageHookPointsStatus.OK);
                     consumer.doAfter(context0, generalMessages);
 
                     try {
@@ -254,8 +252,8 @@ class ProcessQueueImpl implements ProcessQueue {
                 @Override
                 public void onFailure(Throwable t) {
                     // Intercept after message reception.
-                    final MessageHandlerContextImpl context0 =
-                        new MessageHandlerContextImpl(context, 
MessageHookPointsStatus.ERROR);
+                    final MessageInterceptorContextImpl context0 =
+                        new MessageInterceptorContextImpl(context, 
MessageHookPointsStatus.ERROR);
                     consumer.doAfter(context0, Collections.emptyList());
 
                     LOGGER.error("Exception raised during message reception, 
mq={}, endpoints={}, clientId={}", mq,
@@ -274,10 +272,10 @@ class ProcessQueueImpl implements ProcessQueue {
     public boolean isCacheFull() {
         final int cacheMessageCountThresholdPerQueue = 
consumer.cacheMessageCountThresholdPerQueue();
         final long actualMessagesQuantity = this.cachedMessagesCount();
+        final ClientId clientId = consumer.getClientId();
         if (cacheMessageCountThresholdPerQueue <= actualMessagesQuantity) {
             LOGGER.warn("Process queue total cached messages quantity exceeds 
the threshold, threshold={}, actual={}," +
-                    " mq={}, clientId={}", cacheMessageCountThresholdPerQueue, 
actualMessagesQuantity, mq,
-                consumer.getClientId());
+                " mq={}, clientId={}", cacheMessageCountThresholdPerQueue, 
actualMessagesQuantity, mq, clientId);
             cacheFullNanoTime = System.nanoTime();
             return true;
         }
@@ -286,7 +284,7 @@ class ProcessQueueImpl implements ProcessQueue {
         if (cacheMessageBytesThresholdPerQueue <= actualCachedMessagesBytes) {
             LOGGER.warn("Process queue total cached messages memory exceeds 
the threshold, threshold={} bytes," +
                     " actual={} bytes, mq={}, clientId={}", 
cacheMessageBytesThresholdPerQueue,
-                actualCachedMessagesBytes, mq, consumer.getClientId());
+                actualCachedMessagesBytes, mq, clientId);
             cacheFullNanoTime = System.nanoTime();
             return true;
         }
@@ -641,7 +639,7 @@ class ProcessQueueImpl implements ProcessQueue {
         final long receptionTimes = this.receptionTimes.getAndSet(0);
         final long receivedMessagesQuantity = 
this.receivedMessagesQuantity.getAndSet(0);
         LOGGER.info("Process queue stats: clientId={}, mq={}, 
receptionTimes={}, receivedMessageQuantity={}, "
-                + "cachedMessageCount={}, cachedMessageBytes={}", 
consumer.getClientId(), mq,
-            receptionTimes, receivedMessagesQuantity, 
this.getCachedMessageCount(), this.getCachedMessageBytes());
+            + "cachedMessageCount={}, cachedMessageBytes={}", 
consumer.getClientId(), mq, receptionTimes,
+            receivedMessagesQuantity, this.getCachedMessageCount(), 
this.getCachedMessageBytes());
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 47119d0..8194227 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -57,10 +57,10 @@ import 
org.apache.rocketmq.client.apis.consumer.MessageListener;
 import org.apache.rocketmq.client.apis.consumer.PushConsumer;
 import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.java.exception.StatusChecker;
-import org.apache.rocketmq.client.java.hook.MessageHandlerContext;
-import org.apache.rocketmq.client.java.hook.MessageHandlerContextImpl;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
+import org.apache.rocketmq.client.java.hook.MessageInterceptorContext;
+import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
 import org.apache.rocketmq.client.java.impl.Settings;
 import org.apache.rocketmq.client.java.message.GeneralMessage;
 import org.apache.rocketmq.client.java.message.GeneralMessageImpl;
@@ -191,8 +191,10 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
     private ConsumeService createConsumeService() {
         final ScheduledExecutorService scheduler = 
this.getClientManager().getScheduler();
         if (pushSubscriptionSettings.isFifo()) {
+            LOGGER.info("Create FIFO consume service, consumerGroup={}, 
clientId={}", consumerGroup, clientId);
             return new FifoConsumeService(clientId, messageListener, 
consumptionExecutor, this, scheduler);
         }
+        LOGGER.info("Create standard consume service, consumerGroup={}, 
clientId={}", consumerGroup, clientId);
         return new StandardConsumeService(clientId, messageListener, 
consumptionExecutor, this, scheduler);
     }
 
@@ -507,7 +509,7 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
     forwardMessageToDeadLetterQueue(final MessageViewImpl messageView) {
         // Intercept before forwarding message to DLQ.
         final List<GeneralMessage> generalMessages = 
Collections.singletonList(new GeneralMessageImpl(messageView));
-        final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.FORWARD_TO_DLQ);
+        MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(MessageHookPoints.FORWARD_TO_DLQ);
         doBefore(context, generalMessages);
 
         final Endpoints endpoints = messageView.getEndpoints();
@@ -522,14 +524,14 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
                 // Intercept after forwarding message to DLQ.
                 MessageHookPointsStatus status = 
Code.OK.equals(response.getStatus().getCode()) ?
                     MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
-                final MessageHandlerContext context0 = new 
MessageHandlerContextImpl(context, status);
+                final MessageInterceptorContext context0 = new 
MessageInterceptorContextImpl(context, status);
                 doAfter(context0, generalMessages);
             }
 
             @Override
             public void onFailure(Throwable t) {
                 // Intercept after forwarding message to DLQ.
-                final MessageHandlerContext context0 = new 
MessageHandlerContextImpl(context,
+                final MessageInterceptorContext context0 = new 
MessageInterceptorContextImpl(context,
                     MessageHookPointsStatus.ERROR);
                 doAfter(context0, generalMessages);
             }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
index ae80cbe..eef6c7b 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.java.hook.MessageHandler;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.misc.ClientId;
 import org.slf4j.Logger;
@@ -37,8 +37,9 @@ public class StandardConsumeService extends ConsumeService {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(StandardConsumeService.class);
 
     public StandardConsumeService(ClientId clientId, MessageListener 
messageListener,
-        ThreadPoolExecutor consumptionExecutor, MessageHandler messageHandler, 
ScheduledExecutorService scheduler) {
-        super(clientId, messageListener, consumptionExecutor, messageHandler, 
scheduler);
+        ThreadPoolExecutor consumptionExecutor, MessageInterceptor 
messageInterceptor,
+        ScheduledExecutorService scheduler) {
+        super(clientId, messageListener, consumptionExecutor, 
messageInterceptor, scheduler);
     }
 
     @Override
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index fd87fef..40a0e13 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -60,9 +60,9 @@ import 
org.apache.rocketmq.client.apis.producer.TransactionChecker;
 import org.apache.rocketmq.client.apis.producer.TransactionResolution;
 import org.apache.rocketmq.client.java.exception.InternalErrorException;
 import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
-import org.apache.rocketmq.client.java.hook.MessageHandlerContextImpl;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
+import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
 import org.apache.rocketmq.client.java.impl.ClientImpl;
 import org.apache.rocketmq.client.java.impl.Settings;
 import org.apache.rocketmq.client.java.message.GeneralMessage;
@@ -272,7 +272,7 @@ class ProducerImpl extends ClientImpl implements Producer {
         final List<GeneralMessage> generalMessages = 
Collections.singletonList(generalMessage);
         MessageHookPoints messageHookPoints = 
TransactionResolution.COMMIT.equals(resolution) ?
             MessageHookPoints.COMMIT_TRANSACTION : 
MessageHookPoints.ROLLBACK_TRANSACTION;
-        final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(messageHookPoints);
+        final MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(messageHookPoints);
         doBefore(context, generalMessages);
 
         final RpcFuture<EndTransactionRequest, EndTransactionResponse> future =
@@ -282,16 +282,16 @@ class ProducerImpl extends ClientImpl implements Producer 
{
             public void onSuccess(EndTransactionResponse response) {
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
-                MessageHookPointsStatus messageHookPointsStatus = 
Code.OK.equals(code) ? MessageHookPointsStatus.OK :
+                MessageHookPointsStatus hookPointsStatus = 
Code.OK.equals(code) ? MessageHookPointsStatus.OK :
                     MessageHookPointsStatus.ERROR;
-                final MessageHandlerContextImpl context0 = new 
MessageHandlerContextImpl(context,
-                    messageHookPointsStatus);
+                final MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
+                    hookPointsStatus);
                 doAfter(context0, generalMessages);
             }
 
             @Override
             public void onFailure(Throwable t) {
-                final MessageHandlerContextImpl context0 = new 
MessageHandlerContextImpl(context,
+                final MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
                     MessageHookPointsStatus.ERROR);
                 doAfter(context0, generalMessages);
             }
@@ -447,7 +447,7 @@ class ProducerImpl extends ClientImpl implements Producer {
         // Intercept before message publishing.
         final List<GeneralMessage> generalMessages = 
messages.stream().map((Function<PublishingMessageImpl,
             GeneralMessage>) 
GeneralMessageImpl::new).collect(Collectors.toList());
-        final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.SEND);
+        final MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(MessageHookPoints.SEND);
         doBefore(context, generalMessages);
 
         Futures.addCallback(future, new 
FutureCallback<List<SendReceiptImpl>>() {
@@ -461,14 +461,14 @@ class ProducerImpl extends ClientImpl implements Producer 
{
                     future0.setException(e);
 
                     // Intercept after message publishing.
-                    final MessageHandlerContextImpl context0 = new 
MessageHandlerContextImpl(context,
+                    final MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
                         MessageHookPointsStatus.ERROR);
                     doAfter(context0, generalMessages);
 
                     return;
                 }
                 // Intercept after message publishing.
-                final MessageHandlerContextImpl context0 = new 
MessageHandlerContextImpl(context,
+                final MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
                     MessageHookPointsStatus.OK);
                 doAfter(context0, generalMessages);
 
@@ -491,7 +491,7 @@ class ProducerImpl extends ClientImpl implements Producer {
             @Override
             public void onFailure(Throwable t) {
                 // Intercept after message publishing.
-                final MessageHandlerContextImpl context0 = new 
MessageHandlerContextImpl(context,
+                final MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
                     MessageHookPointsStatus.ERROR);
                 doAfter(context0, generalMessages);
 
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandler.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
similarity index 90%
rename from 
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandler.java
rename to 
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
index 0c70214..b68dd55 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandler.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
@@ -26,35 +26,35 @@ import 
org.apache.rocketmq.client.apis.consumer.PushConsumer;
 import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
 import org.apache.rocketmq.client.java.hook.Attribute;
 import org.apache.rocketmq.client.java.hook.AttributeKey;
-import org.apache.rocketmq.client.java.hook.MessageHandler;
-import org.apache.rocketmq.client.java.hook.MessageHandlerContext;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
+import org.apache.rocketmq.client.java.hook.MessageInterceptorContext;
 import org.apache.rocketmq.client.java.impl.Client;
 import org.apache.rocketmq.client.java.message.GeneralMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class MessageMeterHandler implements MessageHandler {
+public class MessageMeterInterceptor implements MessageInterceptor {
     static final AttributeKey<Stopwatch> SEND_STOPWATCH_KEY = 
AttributeKey.create("send_stopwatch");
     static final AttributeKey<Stopwatch> CONSUME_STOPWATCH_KEY = 
AttributeKey.create("consume_stopwatch");
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(MessageMeterHandler.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MessageMeterInterceptor.class);
 
     private final Client client;
     private final ClientMeterManager meterManager;
 
-    public MessageMeterHandler(Client client, ClientMeterManager meterManager) 
{
+    public MessageMeterInterceptor(Client client, ClientMeterManager 
meterManager) {
         this.client = client;
         this.meterManager = meterManager;
     }
 
-    private void doBeforeSendMessage(MessageHandlerContext context) {
+    private void doBeforeSendMessage(MessageInterceptorContext context) {
         // Record the time before sending message.
         context.putAttribute(SEND_STOPWATCH_KEY, 
Attribute.create(Stopwatch.createStarted()));
     }
 
-    private void doAfterSendMessage(MessageHandlerContext context, 
List<GeneralMessage> messages) {
+    private void doAfterSendMessage(MessageInterceptorContext context, 
List<GeneralMessage> messages) {
         final Attribute<Stopwatch> stopwatchAttr = 
context.getAttribute(SEND_STOPWATCH_KEY);
         if (null == stopwatchAttr) {
             // Should never reach here.
@@ -105,7 +105,7 @@ public class MessageMeterHandler implements MessageHandler {
         meterManager.record(HistogramEnum.DELIVERY_LATENCY, attributes, 
latency);
     }
 
-    private void doBeforeConsumeMessage(MessageHandlerContext context, 
List<GeneralMessage> messages) {
+    private void doBeforeConsumeMessage(MessageInterceptorContext context, 
List<GeneralMessage> messages) {
         if (messages.isEmpty()) {
             // Should never reach here.
             return;
@@ -133,7 +133,7 @@ public class MessageMeterHandler implements MessageHandler {
         context.putAttribute(CONSUME_STOPWATCH_KEY, 
Attribute.create(Stopwatch.createStarted()));
     }
 
-    private void doAfterConsumeMessage(MessageHandlerContext context, 
List<GeneralMessage> messages) {
+    private void doAfterConsumeMessage(MessageInterceptorContext context, 
List<GeneralMessage> messages) {
         if (!(client instanceof PushConsumer)) {
             // Should never reach here.
             LOGGER.error("[Bug] current client is not push consumer, 
clientId={}", client.getClientId());
@@ -160,7 +160,7 @@ public class MessageMeterHandler implements MessageHandler {
     }
 
     @Override
-    public void doBefore(MessageHandlerContext context, List<GeneralMessage> 
messages) {
+    public void doBefore(MessageInterceptorContext context, 
List<GeneralMessage> messages) {
         if (!meterManager.isEnabled()) {
             return;
         }
@@ -180,7 +180,7 @@ public class MessageMeterHandler implements MessageHandler {
     }
 
     @Override
-    public void doAfter(MessageHandlerContext context, List<GeneralMessage> 
messages) {
+    public void doAfter(MessageInterceptorContext context, 
List<GeneralMessage> messages) {
         if (!meterManager.isEnabled()) {
             return;
         }
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeServiceTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeServiceTest.java
index 034c0d7..dbf8c89 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeServiceTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeServiceTest.java
@@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.java.hook.MessageHandler;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.misc.ClientId;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
@@ -41,7 +41,7 @@ import org.mockito.Mockito;
 
 public class ConsumeServiceTest extends TestBase {
     private final ClientId clientId = new ClientId();
-    private final MessageHandler interceptor = 
Mockito.mock(MessageHandler.class);
+    private final MessageInterceptor interceptor = 
Mockito.mock(MessageInterceptor.class);
     private final ThreadPoolExecutor consumptionExecutor = new 
ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
         new LinkedBlockingQueue<>(), new 
ThreadFactoryImpl("TestMessageConsumption"));
     private final ScheduledExecutorService scheduler = new 
ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTaskTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTaskTest.java
index 70dcef2..2331214 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTaskTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTaskTest.java
@@ -21,7 +21,7 @@ import static org.junit.Assert.assertEquals;
 
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.java.hook.MessageHandler;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.misc.ClientId;
 import org.apache.rocketmq.client.java.tool.TestBase;
@@ -36,8 +36,8 @@ public class ConsumeTaskTest extends TestBase {
         final MessageViewImpl messageView = fakeMessageViewImpl();
         final MessageListener messageListener = 
Mockito.mock(MessageListener.class);
         
Mockito.when(messageListener.consume(messageView)).thenReturn(ConsumeResult.SUCCESS);
-        final MessageHandler messageHandler = 
Mockito.mock(MessageHandler.class);
-        final ConsumeTask consumeTask = new ConsumeTask(clientId, 
messageListener, messageView, messageHandler);
+        final MessageInterceptor messageInterceptor = 
Mockito.mock(MessageInterceptor.class);
+        final ConsumeTask consumeTask = new ConsumeTask(clientId, 
messageListener, messageView, messageInterceptor);
         final ConsumeResult consumeResult = consumeTask.call();
         assertEquals(ConsumeResult.SUCCESS, consumeResult);
     }
@@ -48,8 +48,8 @@ public class ConsumeTaskTest extends TestBase {
         final MessageViewImpl messageView = fakeMessageViewImpl();
         final MessageListener messageListener = 
Mockito.mock(MessageListener.class);
         Mockito.when(messageListener.consume(messageView)).thenThrow(new 
RuntimeException());
-        final MessageHandler messageHandler = 
Mockito.mock(MessageHandler.class);
-        final ConsumeTask consumeTask = new ConsumeTask(clientId, 
messageListener, messageView, messageHandler);
+        final MessageInterceptor messageInterceptor = 
Mockito.mock(MessageInterceptor.class);
+        final ConsumeTask consumeTask = new ConsumeTask(clientId, 
messageListener, messageView, messageInterceptor);
         final ConsumeResult consumeResult = consumeTask.call();
         assertEquals(ConsumeResult.FAILURE, consumeResult);
     }
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandlerTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandlerTest.java
index 569c2bd..7f01b06 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandlerTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandlerTest.java
@@ -27,9 +27,9 @@ import java.util.List;
 import java.util.Optional;
 import org.apache.rocketmq.client.apis.consumer.PushConsumer;
 import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
-import org.apache.rocketmq.client.java.hook.MessageHandlerContextImpl;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
+import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
 import org.apache.rocketmq.client.java.impl.Client;
 import org.apache.rocketmq.client.java.impl.ClientImpl;
 import org.apache.rocketmq.client.java.message.GeneralMessage;
@@ -48,15 +48,15 @@ public class MessageMeterHandlerTest extends TestBase {
         Mockito.doReturn(true).when(meterManager).isEnabled();
         ClientId clientId = new ClientId();
         Mockito.doReturn(clientId).when(producer).getClientId();
-        final MessageMeterHandler meterHandler = new 
MessageMeterHandler(producer, meterManager);
+        final MessageMeterInterceptor meterHandler = new 
MessageMeterInterceptor(producer, meterManager);
         final GeneralMessage message = Mockito.mock(GeneralMessage.class);
         String topic = FAKE_TOPIC_0;
         Mockito.doReturn(topic).when(message).getTopic();
         List<GeneralMessage> messageList = new ArrayList<>();
         messageList.add(message);
-        final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.SEND);
+        final MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(MessageHookPoints.SEND);
         meterHandler.doBefore(context, messageList);
-        
assertNotNull(context.getAttributes().get(MessageMeterHandler.SEND_STOPWATCH_KEY));
+        
assertNotNull(context.getAttributes().get(MessageMeterInterceptor.SEND_STOPWATCH_KEY));
         context.setStatus(MessageHookPointsStatus.OK);
         meterHandler.doAfter(context, messageList);
         ArgumentCaptor<Attributes> attributesArgumentCaptor = 
ArgumentCaptor.forClass(Attributes.class);
@@ -75,13 +75,13 @@ public class MessageMeterHandlerTest extends TestBase {
         Mockito.doReturn(true).when(meterManager).isEnabled();
         ClientId clientId = new ClientId();
         Mockito.doReturn(clientId).when(producer).getClientId();
-        final MessageMeterHandler meterHandler = new 
MessageMeterHandler(producer, meterManager);
+        final MessageMeterInterceptor meterHandler = new 
MessageMeterInterceptor(producer, meterManager);
         final GeneralMessage message = Mockito.mock(GeneralMessage.class);
         String topic = FAKE_TOPIC_0;
         Mockito.doReturn(topic).when(message).getTopic();
         List<GeneralMessage> messageList = new ArrayList<>();
         messageList.add(message);
-        final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.SEND);
+        final MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(MessageHookPoints.SEND);
         meterHandler.doBefore(context, messageList);
         context.setStatus(MessageHookPointsStatus.UNSET);
         meterHandler.doAfter(context, messageList);
@@ -101,13 +101,13 @@ public class MessageMeterHandlerTest extends TestBase {
         Mockito.doReturn(true).when(meterManager).isEnabled();
         ClientId clientId = new ClientId();
         Mockito.doReturn(clientId).when(producer).getClientId();
-        final MessageMeterHandler meterHandler = new 
MessageMeterHandler(producer, meterManager);
+        final MessageMeterInterceptor meterHandler = new 
MessageMeterInterceptor(producer, meterManager);
         final GeneralMessage message = Mockito.mock(GeneralMessage.class);
         String topic = FAKE_TOPIC_0;
         Mockito.doReturn(topic).when(message).getTopic();
         List<GeneralMessage> messageList = new ArrayList<>();
         messageList.add(message);
-        final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.SEND);
+        final MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(MessageHookPoints.SEND);
         meterHandler.doBefore(context, messageList);
         context.setStatus(MessageHookPointsStatus.ERROR);
         meterHandler.doAfter(context, messageList);
@@ -135,7 +135,7 @@ public class MessageMeterHandlerTest extends TestBase {
         Mockito.doReturn(true).when(meterManager).isEnabled();
         ClientId clientId = new ClientId();
         Mockito.doReturn(clientId).when(pushConsumer).getClientId();
-        final MessageMeterHandler meterHandler = new 
MessageMeterHandler(pushConsumer, meterManager);
+        final MessageMeterInterceptor meterHandler = new 
MessageMeterInterceptor(pushConsumer, meterManager);
         final GeneralMessage message = Mockito.mock(GeneralMessage.class);
         String topic = FAKE_TOPIC_0;
         Mockito.doReturn(topic).when(message).getTopic();
@@ -143,7 +143,7 @@ public class MessageMeterHandlerTest extends TestBase {
         messageList.add(message);
         final Optional<Long> optionalTransportDeliveryTimestamp = 
Optional.of(System.currentTimeMillis());
         
Mockito.doReturn(optionalTransportDeliveryTimestamp).when(message).getTransportDeliveryTimestamp();
-        final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.RECEIVE);
+        final MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);
         meterHandler.doAfter(context, messageList);
         ArgumentCaptor<Attributes> attributesArgumentCaptor = 
ArgumentCaptor.forClass(Attributes.class);
         Mockito.verify(meterManager, Mockito.times(1))
@@ -163,13 +163,13 @@ public class MessageMeterHandlerTest extends TestBase {
         Mockito.doReturn(true).when(meterManager).isEnabled();
         ClientId clientId = new ClientId();
         Mockito.doReturn(clientId).when(pushConsumer).getClientId();
-        final MessageMeterHandler meterHandler = new 
MessageMeterHandler(pushConsumer, meterManager);
+        final MessageMeterInterceptor meterHandler = new 
MessageMeterInterceptor(pushConsumer, meterManager);
         final GeneralMessage message = Mockito.mock(GeneralMessage.class);
         Mockito.doReturn(FAKE_TOPIC_0).when(message).getTopic();
         List<GeneralMessage> messageList = new ArrayList<>();
         final Optional<Long> optionalTransportDeliveryTimestamp = 
Optional.of(System.currentTimeMillis());
         
Mockito.doReturn(optionalTransportDeliveryTimestamp).when(message).getTransportDeliveryTimestamp();
-        final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.RECEIVE);
+        final MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);
         meterHandler.doAfter(context, messageList);
         Mockito.verify(meterManager, Mockito.never())
             .record(any(HistogramEnum.class), any(Attributes.class), 
Mockito.anyDouble());
@@ -184,7 +184,7 @@ public class MessageMeterHandlerTest extends TestBase {
         Mockito.doReturn(true).when(meterManager).isEnabled();
         ClientId clientId = new ClientId();
         Mockito.doReturn(clientId).when(simpleConsumer).getClientId();
-        final MessageMeterHandler meterHandler = new 
MessageMeterHandler(simpleConsumer, meterManager);
+        final MessageMeterInterceptor meterHandler = new 
MessageMeterInterceptor(simpleConsumer, meterManager);
         final GeneralMessage message = Mockito.mock(GeneralMessage.class);
         String topic = FAKE_TOPIC_0;
         Mockito.doReturn(topic).when(message).getTopic();
@@ -192,7 +192,7 @@ public class MessageMeterHandlerTest extends TestBase {
         messageList.add(message);
         final Optional<Long> optionalTransportDeliveryTimestamp = 
Optional.of(System.currentTimeMillis());
         
Mockito.doReturn(optionalTransportDeliveryTimestamp).when(message).getTransportDeliveryTimestamp();
-        final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.RECEIVE);
+        final MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);
         meterHandler.doAfter(context, messageList);
         ArgumentCaptor<Attributes> attributesArgumentCaptor = 
ArgumentCaptor.forClass(Attributes.class);
         Mockito.verify(meterManager, Mockito.times(1))
@@ -213,7 +213,7 @@ public class MessageMeterHandlerTest extends TestBase {
         Mockito.doReturn(true).when(meterManager).isEnabled();
         ClientId clientId = new ClientId();
         Mockito.doReturn(clientId).when(pushConsumer).getClientId();
-        final MessageMeterHandler meterHandler = new 
MessageMeterHandler(pushConsumer, meterManager);
+        final MessageMeterInterceptor meterHandler = new 
MessageMeterInterceptor(pushConsumer, meterManager);
         List<GeneralMessage> generalMessages = new ArrayList<>();
         final GeneralMessage message = Mockito.mock(GeneralMessage.class);
         generalMessages.add(message);
@@ -222,7 +222,7 @@ public class MessageMeterHandlerTest extends TestBase {
         long awaitTimeMills = 3000;
         long decodeTimestamp = System.currentTimeMillis() - awaitTimeMills;
         
Mockito.doReturn(Optional.of(decodeTimestamp)).when(message).getDecodeTimestamp();
-        final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.CONSUME);
+        final MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(MessageHookPoints.CONSUME);
         meterHandler.doBefore(context, generalMessages);
         ArgumentCaptor<Attributes> attributes0ArgumentCaptor = 
ArgumentCaptor.forClass(Attributes.class);
         final ArgumentCaptor<Double> awaitTimeArgumentCaptor = 
ArgumentCaptor.forClass(Double.class);
@@ -234,7 +234,7 @@ public class MessageMeterHandlerTest extends TestBase {
         assertEquals(topic, attributes0.get(MetricLabels.TOPIC));
         assertEquals(consumerGroup, 
attributes0.get(MetricLabels.CONSUMER_GROUP));
         assertEquals(clientId.toString(), 
attributes0.get(MetricLabels.CLIENT_ID));
-        
assertNotNull(context.getAttributes().get(MessageMeterHandler.CONSUME_STOPWATCH_KEY));
+        
assertNotNull(context.getAttributes().get(MessageMeterInterceptor.CONSUME_STOPWATCH_KEY));
 
         context.setStatus(MessageHookPointsStatus.OK);
         meterHandler.doAfter(context, generalMessages);

Reply via email to