This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch java
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit ceceb124cadee9080356bf06ac02c52b4ba69596
Author: Aaron Ai <[email protected]>
AuthorDate: Wed Jul 6 20:08:02 2022 +0800

    Java: Add invocation context
---
 .../rocketmq/client/java/impl/ClientImpl.java      |  19 ++--
 .../rocketmq/client/java/impl/ClientManager.java   |  48 +++++-----
 .../client/java/impl/ClientManagerImpl.java        |  71 ++++++---------
 .../client/java/impl/consumer/ConsumerImpl.java    |  41 +++++----
 .../java/impl/consumer/ProcessQueueImpl.java       |  33 ++++---
 .../java/impl/consumer/PushConsumerImpl.java       |  27 +++---
 .../java/impl/consumer/ReceiveMessageResult.java   |   8 +-
 .../java/impl/consumer/SimpleConsumerImpl.java     |  19 ++--
 .../client/java/impl/producer/ProducerImpl.java    |  21 +++--
 .../client/java/rpc/InvocationContext.java         |  46 ++++++++++
 .../apache/rocketmq/client/java/rpc/RpcClient.java |  51 ++++++-----
 .../rocketmq/client/java/rpc/RpcClientImpl.java    | 100 ++++++++++++++-------
 .../rocketmq/client/java/rpc/RpcContext.java       |  39 ++++++++
 .../apache/rocketmq/client/java/rpc/Signature.java |   2 +-
 .../apache/rocketmq/client/java/rpc/TLSHelper.java |   7 +-
 .../java/impl/consumer/ProcessQueueImplTest.java   |  15 ++--
 .../java/impl/consumer/PushConsumerImplTest.java   |   6 +-
 .../java/impl/consumer/SimpleConsumerImplTest.java |  13 +--
 .../java/impl/producer/ProducerImplTest.java       |  21 +++--
 .../apache/rocketmq/client/java/tool/TestBase.java |  97 +++++++++-----------
 20 files changed, 409 insertions(+), 275 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index a4b144d..590bbee 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -81,6 +81,7 @@ import org.apache.rocketmq.client.java.misc.Utilities;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.TopicRouteData;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.apache.rocketmq.client.java.rpc.Signature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -555,7 +556,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     /**
      * Real-time signature generation
      */
-    protected Metadata sign() throws UnsupportedEncodingException, 
NoSuchAlgorithmException, InvalidKeyException {
+    protected Metadata sign() throws NoSuchAlgorithmException, 
InvalidKeyException {
         return Signature.sign(clientConfiguration, clientId);
     }
 
@@ -568,11 +569,12 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     private void doHeartbeat(HeartbeatRequest request, final Endpoints 
endpoints) {
         try {
             Metadata metadata = sign();
-            final ListenableFuture<HeartbeatResponse> future = clientManager
+            final ListenableFuture<InvocationContext<HeartbeatResponse>> 
future = clientManager
                 .heartbeat(endpoints, metadata, request, 
clientConfiguration.getRequestTimeout());
-            Futures.addCallback(future, new 
FutureCallback<HeartbeatResponse>() {
+            Futures.addCallback(future, new 
FutureCallback<InvocationContext<HeartbeatResponse>>() {
                 @Override
-                public void onSuccess(HeartbeatResponse response) {
+                public void onSuccess(InvocationContext<HeartbeatResponse> 
context) {
+                    final HeartbeatResponse response = context.getResp();
                     final Status status = response.getStatus();
                     final Code code = status.getCode();
                     if (Code.OK != code) {
@@ -612,15 +614,15 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     }
 
     private ListenableFuture<TopicRouteDataResult> fetchTopicRoute(final 
String topic) {
-        final SettableFuture<TopicRouteDataResult> future = 
SettableFuture.create();
         try {
             Resource topicResource = 
Resource.newBuilder().setName(topic).build();
             final QueryRouteRequest request = 
QueryRouteRequest.newBuilder().setTopic(topicResource)
                 .setEndpoints(accessEndpoints.toProtobuf()).build();
             final Metadata metadata = sign();
-            final ListenableFuture<QueryRouteResponse> responseFuture =
+            final ListenableFuture<InvocationContext<QueryRouteResponse>> 
contextFuture =
                 clientManager.queryRoute(accessEndpoints, metadata, request, 
clientConfiguration.getRequestTimeout());
-            return Futures.transform(responseFuture, response -> {
+            return Futures.transform(contextFuture, context -> {
+                final QueryRouteResponse response = context.getResp();
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 if (Code.OK != code) {
@@ -631,8 +633,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
                 return new TopicRouteDataResult(new 
TopicRouteData(response.getMessageQueuesList()), status);
             }, MoreExecutors.directExecutor());
         } catch (Throwable t) {
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
index 9f40a6d..9901ac9 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
@@ -46,6 +46,7 @@ import java.util.Iterator;
 import java.util.concurrent.ScheduledExecutorService;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 
 /**
  * Client manager supplies a series of unified APIs to execute remote 
procedure calls for each {@link Client}.
@@ -90,9 +91,10 @@ public interface ClientManager {
      * @param metadata  gRPC request header metadata.
      * @param request   query route request.
      * @param duration  request max duration.
-     * @return response future of the topic route.
+     * @return invocation of response future.
      */
-    ListenableFuture<QueryRouteResponse> queryRoute(Endpoints endpoints, 
Metadata metadata, QueryRouteRequest request,
+    ListenableFuture<InvocationContext<QueryRouteResponse>> 
queryRoute(Endpoints endpoints, Metadata metadata,
+        QueryRouteRequest request,
         Duration duration);
 
     /**
@@ -102,9 +104,10 @@ public interface ClientManager {
      * @param metadata  gRPC request header metadata.
      * @param request   heartbeat request.
      * @param duration  request max duration.
-     * @return response future of heartbeat.
+     * @return invocation of response future.
      */
-    ListenableFuture<HeartbeatResponse> heartbeat(Endpoints endpoints, 
Metadata metadata, HeartbeatRequest request,
+    ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Endpoints 
endpoints, Metadata metadata,
+        HeartbeatRequest request,
         Duration duration);
 
     /**
@@ -114,9 +117,9 @@ public interface ClientManager {
      * @param metadata  gRPC request header metadata.
      * @param request   send message request.
      * @param duration  request max duration.
-     * @return response future of the sending message.
+     * @return invocation of response future.
      */
-    ListenableFuture<SendMessageResponse> sendMessage(Endpoints endpoints, 
Metadata metadata,
+    ListenableFuture<InvocationContext<SendMessageResponse>> 
sendMessage(Endpoints endpoints, Metadata metadata,
         SendMessageRequest request, Duration duration);
 
     /**
@@ -126,9 +129,9 @@ public interface ClientManager {
      * @param metadata  gRPC request header metadata.
      * @param request   query assignment request.
      * @param duration  request max duration.
-     * @return response future of query assignment.
+     * @return invocation of response future.
      */
-    ListenableFuture<QueryAssignmentResponse> queryAssignment(Endpoints 
endpoints, Metadata metadata,
+    ListenableFuture<InvocationContext<QueryAssignmentResponse>> 
queryAssignment(Endpoints endpoints, Metadata metadata,
         QueryAssignmentRequest request, Duration duration);
 
     /**
@@ -136,9 +139,10 @@ public interface ClientManager {
      *
      * @param endpoints requested endpoints.
      * @param metadata  gRPC request header metadata.
+     * @return invocation of response future.
      */
-    ListenableFuture<Iterator<ReceiveMessageResponse>> 
receiveMessage(Endpoints endpoints, Metadata metadata,
-        ReceiveMessageRequest request, Duration duration);
+    ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> 
receiveMessage(Endpoints endpoints,
+        Metadata metadata, ReceiveMessageRequest request, Duration duration);
 
     /**
      * Ack message asynchronously after the success of consumption, the method 
ensures no throwable.
@@ -147,10 +151,10 @@ public interface ClientManager {
      * @param metadata  gRPC request header metadata.
      * @param request   ack message request.
      * @param duration  request max duration.
-     * @return response future of ack message.
+     * @return invocation of response future.
      */
-    ListenableFuture<AckMessageResponse> ackMessage(Endpoints endpoints, 
Metadata metadata, AckMessageRequest request,
-        Duration duration);
+    ListenableFuture<InvocationContext<AckMessageResponse>> 
ackMessage(Endpoints endpoints, Metadata metadata,
+        AckMessageRequest request, Duration duration);
 
     /**
      * Nack message asynchronously after the failure of consumption, the 
method ensures no throwable.
@@ -159,10 +163,10 @@ public interface ClientManager {
      * @param metadata  gRPC request header metadata.
      * @param request   nack message request.
      * @param duration  request max duration.
-     * @return response future of nack message.
+     * @return invocation of response future.
      */
-    ListenableFuture<ChangeInvisibleDurationResponse> 
changeInvisibleDuration(Endpoints endpoints, Metadata metadata,
-        ChangeInvisibleDurationRequest request, Duration duration);
+    ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> 
changeInvisibleDuration(Endpoints endpoints,
+        Metadata metadata, ChangeInvisibleDurationRequest request, Duration 
duration);
 
     /**
      * Send a message to the dead letter queue asynchronously, the method 
ensures no throwable.
@@ -171,9 +175,9 @@ public interface ClientManager {
      * @param metadata  gRPC request header metadata.
      * @param request   request of sending a message to DLQ.
      * @param duration  request max duration.
-     * @return response future of sending a message to DLQ.
+     * @return invocation of response future.
      */
-    ListenableFuture<ForwardMessageToDeadLetterQueueResponse> 
forwardMessageToDeadLetterQueue(
+    
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> 
forwardMessageToDeadLetterQueue(
         Endpoints endpoints, Metadata metadata, 
ForwardMessageToDeadLetterQueueRequest request, Duration duration);
 
     /**
@@ -183,9 +187,9 @@ public interface ClientManager {
      * @param metadata  gRPC request header metadata.
      * @param request   end transaction request.
      * @param duration  request max duration.
-     * @return response future of submitting transaction resolution.
+     * @return invocation of response future.
      */
-    ListenableFuture<EndTransactionResponse> endTransaction(Endpoints 
endpoints, Metadata metadata,
+    ListenableFuture<InvocationContext<EndTransactionResponse>> 
endTransaction(Endpoints endpoints, Metadata metadata,
         EndTransactionRequest request, Duration duration);
 
     /**
@@ -198,8 +202,8 @@ public interface ClientManager {
      * @return response future of notification of client termination.
      */
     @SuppressWarnings("UnusedReturnValue")
-    ListenableFuture<NotifyClientTerminationResponse> 
notifyClientTermination(Endpoints endpoints, Metadata metadata,
-        NotifyClientTerminationRequest request, Duration duration);
+    ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> 
notifyClientTermination(Endpoints endpoints,
+        Metadata metadata, NotifyClientTerminationRequest request, Duration 
duration);
 
     StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, Metadata 
metadata,
         Duration duration, StreamObserver<TelemetryCommand> responseObserver) 
throws ClientException;
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index 4d33ab0..337edcc 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -39,8 +39,8 @@ import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.TelemetryCommand;
 import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import io.grpc.Metadata;
 import io.grpc.stub.StreamObserver;
@@ -65,6 +65,7 @@ import org.apache.rocketmq.client.java.misc.ExecutorServices;
 import org.apache.rocketmq.client.java.misc.MetadataUtils;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.apache.rocketmq.client.java.rpc.RpcClient;
 import org.apache.rocketmq.client.java.rpc.RpcClientImpl;
 import org.slf4j.Logger;
@@ -235,132 +236,112 @@ public class ClientManagerImpl extends 
AbstractIdleService implements ClientMana
     }
 
     @Override
-    public ListenableFuture<QueryRouteResponse> queryRoute(Endpoints 
endpoints, Metadata metadata,
+    public ListenableFuture<InvocationContext<QueryRouteResponse>> 
queryRoute(Endpoints endpoints, Metadata metadata,
         QueryRouteRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.queryRoute(metadata, request, asyncWorker, 
duration);
         } catch (Throwable t) {
-            final SettableFuture<QueryRouteResponse> future = 
SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
     @Override
-    public ListenableFuture<HeartbeatResponse> heartbeat(Endpoints endpoints, 
Metadata metadata,
+    public ListenableFuture<InvocationContext<HeartbeatResponse>> 
heartbeat(Endpoints endpoints, Metadata metadata,
         HeartbeatRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.heartbeat(metadata, request, asyncWorker, 
duration);
         } catch (Throwable t) {
-            final SettableFuture<HeartbeatResponse> future = 
SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
     @Override
-    public ListenableFuture<SendMessageResponse> sendMessage(Endpoints 
endpoints, Metadata metadata,
+    public ListenableFuture<InvocationContext<SendMessageResponse>> 
sendMessage(Endpoints endpoints, Metadata metadata,
         SendMessageRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.sendMessage(metadata, request, asyncWorker, 
duration);
         } catch (Throwable t) {
-            final SettableFuture<SendMessageResponse> future = 
SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
     @Override
-    public ListenableFuture<QueryAssignmentResponse> queryAssignment(Endpoints 
endpoints, Metadata metadata,
-        QueryAssignmentRequest request, Duration duration) {
+    public ListenableFuture<InvocationContext<QueryAssignmentResponse>> 
queryAssignment(Endpoints endpoints,
+        Metadata metadata, QueryAssignmentRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.queryAssignment(metadata, request, asyncWorker, 
duration);
         } catch (Throwable t) {
-            final SettableFuture<QueryAssignmentResponse> future = 
SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
     @Override
-    public ListenableFuture<Iterator<ReceiveMessageResponse>> 
receiveMessage(Endpoints endpoints, Metadata metadata,
-        ReceiveMessageRequest request, Duration duration) {
+    public 
ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> 
receiveMessage(Endpoints endpoints,
+        Metadata metadata, ReceiveMessageRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.receiveMessage(metadata, request, asyncWorker, 
duration);
         } catch (Throwable t) {
-            SettableFuture<Iterator<ReceiveMessageResponse>> future = 
SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
     @Override
-    public ListenableFuture<AckMessageResponse> ackMessage(Endpoints 
endpoints, Metadata metadata,
+    public ListenableFuture<InvocationContext<AckMessageResponse>> 
ackMessage(Endpoints endpoints, Metadata metadata,
         AckMessageRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.ackMessage(metadata, request, asyncWorker, 
duration);
         } catch (Throwable t) {
-            final SettableFuture<AckMessageResponse> future = 
SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
     @Override
-    public ListenableFuture<ChangeInvisibleDurationResponse> 
changeInvisibleDuration(Endpoints endpoints,
-        Metadata metadata, ChangeInvisibleDurationRequest request, Duration 
duration) {
+    public 
ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> 
changeInvisibleDuration(
+        Endpoints endpoints, Metadata metadata, ChangeInvisibleDurationRequest 
request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.changeInvisibleDuration(metadata, request, 
asyncWorker, duration);
         } catch (Throwable t) {
-            final SettableFuture<ChangeInvisibleDurationResponse> future = 
SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
     @Override
-    public ListenableFuture<ForwardMessageToDeadLetterQueueResponse> 
forwardMessageToDeadLetterQueue(
+    public 
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> 
forwardMessageToDeadLetterQueue(
         Endpoints endpoints, Metadata metadata, 
ForwardMessageToDeadLetterQueueRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.forwardMessageToDeadLetterQueue(metadata, 
request, asyncWorker, duration);
         } catch (Throwable t) {
-            final SettableFuture<ForwardMessageToDeadLetterQueueResponse> 
future = SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
     @Override
-    public ListenableFuture<EndTransactionResponse> endTransaction(Endpoints 
endpoints, Metadata metadata,
-        EndTransactionRequest request, Duration duration) {
+    public ListenableFuture<InvocationContext<EndTransactionResponse>> 
endTransaction(Endpoints endpoints,
+        Metadata metadata, EndTransactionRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.endTransaction(metadata, request, asyncWorker, 
duration);
         } catch (Throwable t) {
-            SettableFuture<EndTransactionResponse> future = 
SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
     @Override
-    public ListenableFuture<NotifyClientTerminationResponse> 
notifyClientTermination(
+    public 
ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> 
notifyClientTermination(
         Endpoints endpoints, Metadata metadata, NotifyClientTerminationRequest 
request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.notifyClientTermination(metadata, request, 
asyncWorker, duration);
         } catch (Throwable t) {
-            final SettableFuture<NotifyClientTerminationResponse> future = 
SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 539b876..a8b04ec 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -56,6 +56,7 @@ import org.apache.rocketmq.client.java.message.MessageCommon;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,16 +72,18 @@ abstract class ConsumerImpl extends ClientImpl {
     }
 
     @SuppressWarnings("SameParameterValue")
-    protected ListenableFuture<ReceiveMessageResult> 
receiveMessage(ReceiveMessageRequest request, MessageQueueImpl mq,
-        Duration timeout) {
+    protected ListenableFuture<ReceiveMessageResult> 
receiveMessage(ReceiveMessageRequest request,
+        MessageQueueImpl mq, Duration timeout) {
         List<MessageViewImpl> messages = new ArrayList<>();
         final SettableFuture<ReceiveMessageResult> future0 = 
SettableFuture.create();
         try {
             Metadata metadata = sign();
             final Endpoints endpoints = mq.getBroker().getEndpoints();
-            final ListenableFuture<Iterator<ReceiveMessageResponse>> future = 
clientManager.receiveMessage(endpoints,
-                metadata, request, timeout);
-            return Futures.transform(future, it -> {
+            final 
ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> future =
+                clientManager.receiveMessage(endpoints,
+                    metadata, request, timeout);
+            return Futures.transform(future, context -> {
+                final Iterator<ReceiveMessageResponse> it = context.getResp();
                 // Null here means status not set yet.
                 Status status = null;
                 Timestamp deliveryTimestampFromRemote = null;
@@ -106,7 +109,7 @@ abstract class ConsumerImpl extends ClientImpl {
                     final MessageViewImpl view = 
MessageViewImpl.fromProtobuf(message, mq, deliveryTimestampFromRemote);
                     messages.add(view);
                 }
-                return new ReceiveMessageResult(endpoints, status, messages);
+                return new ReceiveMessageResult(endpoints, 
context.getRpcContext().getRequestId(), status, messages);
             }, MoreExecutors.directExecutor());
         } catch (Throwable t) {
             future0.setException(t);
@@ -134,9 +137,9 @@ abstract class ConsumerImpl extends ClientImpl {
 
     }
 
-    public ListenableFuture<AckMessageResponse> ackMessage(MessageViewImpl 
messageView) {
+    public ListenableFuture<InvocationContext<AckMessageResponse>> 
ackMessage(MessageViewImpl messageView) {
         final Endpoints endpoints = messageView.getEndpoints();
-        ListenableFuture<AckMessageResponse> future;
+        ListenableFuture<InvocationContext<AckMessageResponse>> future;
 
         final Stopwatch stopwatch = Stopwatch.createStarted();
         final List<MessageCommon> messageCommons = 
Collections.singletonList(messageView.getMessageCommon());
@@ -146,11 +149,14 @@ abstract class ConsumerImpl extends ClientImpl {
             final Metadata metadata = sign();
             future = clientManager.ackMessage(endpoints, metadata, request, 
clientConfiguration.getRequestTimeout());
         } catch (Throwable t) {
-            return Futures.immediateFailedFuture(t);
+            final SettableFuture<InvocationContext<AckMessageResponse>> 
future0 = SettableFuture.create();
+            future0.setException(t);
+            future = future0;
         }
-        Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
+        Futures.addCallback(future, new 
FutureCallback<InvocationContext<AckMessageResponse>>() {
             @Override
-            public void onSuccess(AckMessageResponse response) {
+            public void onSuccess(InvocationContext<AckMessageResponse> 
context) {
+                final AckMessageResponse response = context.getResp();
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 final Duration duration = stopwatch.elapsed();
@@ -168,10 +174,10 @@ abstract class ConsumerImpl extends ClientImpl {
         return future;
     }
 
-    public ListenableFuture<ChangeInvisibleDurationResponse> 
changeInvisibleDuration(MessageViewImpl messageView,
-        Duration invisibleDuration) {
+    public 
ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> 
changeInvisibleDuration(
+        MessageViewImpl messageView, Duration invisibleDuration) {
         final Endpoints endpoints = messageView.getEndpoints();
-        ListenableFuture<ChangeInvisibleDurationResponse> future;
+        ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> 
future;
 
         final Stopwatch stopwatch = Stopwatch.createStarted();
         final List<MessageCommon> messageCommons = 
Collections.singletonList(messageView.getMessageCommon());
@@ -182,14 +188,15 @@ abstract class ConsumerImpl extends ClientImpl {
             future = clientManager.changeInvisibleDuration(endpoints, 
metadata, request,
                 clientConfiguration.getRequestTimeout());
         } catch (Throwable t) {
-            final SettableFuture<ChangeInvisibleDurationResponse> future0 = 
SettableFuture.create();
+            final 
SettableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future0 = 
SettableFuture.create();
             future0.setException(t);
             future = future0;
         }
         final MessageId messageId = messageView.getMessageId();
-        Futures.addCallback(future, new 
FutureCallback<ChangeInvisibleDurationResponse>() {
+        Futures.addCallback(future, new 
FutureCallback<InvocationContext<ChangeInvisibleDurationResponse>>() {
             @Override
-            public void onSuccess(ChangeInvisibleDurationResponse response) {
+            public void 
onSuccess(InvocationContext<ChangeInvisibleDurationResponse> context) {
+                final ChangeInvisibleDurationResponse response = 
context.getResp();
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 final Duration duration = stopwatch.elapsed();
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index f36012f..c914dce 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -51,6 +51,7 @@ import 
org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -387,11 +388,12 @@ class ProcessQueueImpl implements ProcessQueue {
         final String consumerGroup = consumer.getConsumerGroup();
         final MessageId messageId = messageView.getMessageId();
         final Endpoints endpoints = messageView.getEndpoints();
-        final ListenableFuture<AckMessageResponse> future = 
consumer.ackMessage(messageView);
-        Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
+        final ListenableFuture<InvocationContext<AckMessageResponse>> future = 
consumer.ackMessage(messageView);
+        Futures.addCallback(future, new 
FutureCallback<InvocationContext<AckMessageResponse>>() {
             @Override
-            public void onSuccess(AckMessageResponse response) {
-                final Status status = response.getStatus();
+            public void onSuccess(InvocationContext<AckMessageResponse> 
context) {
+                final AckMessageResponse resp = context.getResp();
+                final Status status = resp.getStatus();
                 final Code code = status.getCode();
                 if (Code.OK.equals(code)) {
                     LOGGER.debug("Ack message successfully, clientId={}, 
consumerGroup={}, messageId={}, mq={}, "
@@ -478,16 +480,17 @@ class ProcessQueueImpl implements ProcessQueue {
 
     private void forwardToDeadLetterQueue(final MessageViewImpl messageView, 
final int attempt,
         final SettableFuture<Void> future0) {
-        final ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future 
=
+        final 
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> 
future =
             consumer.forwardMessageToDeadLetterQueue(messageView);
         final String clientId = consumer.getClientId();
         final String consumerGroup = consumer.getConsumerGroup();
         final MessageId messageId = messageView.getMessageId();
         final Endpoints endpoints = messageView.getEndpoints();
-        Futures.addCallback(future, new 
FutureCallback<ForwardMessageToDeadLetterQueueResponse>() {
+        Futures.addCallback(future, new 
FutureCallback<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>() {
             @Override
-            public void onSuccess(ForwardMessageToDeadLetterQueueResponse 
response) {
-                final Status status = response.getStatus();
+            public void 
onSuccess(InvocationContext<ForwardMessageToDeadLetterQueueResponse> context) {
+                final ForwardMessageToDeadLetterQueueResponse resp = 
context.getResp();
+                final Status status = resp.getStatus();
                 final Code code = status.getCode();
                 // Log failure and retry later.
                 if (!Code.OK.equals(code)) {
@@ -559,17 +562,19 @@ class ProcessQueueImpl implements ProcessQueue {
         final String consumerGroup = consumer.getConsumerGroup();
         final MessageId messageId = messageView.getMessageId();
         final Endpoints endpoints = messageView.getEndpoints();
-        final ListenableFuture<AckMessageResponse> future = 
consumer.ackMessage(messageView);
-        Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
+        final ListenableFuture<InvocationContext<AckMessageResponse>> future = 
consumer.ackMessage(messageView);
+        Futures.addCallback(future, new 
FutureCallback<InvocationContext<AckMessageResponse>>() {
             @Override
-            public void onSuccess(AckMessageResponse response) {
-                final Status status = response.getStatus();
+            public void onSuccess(InvocationContext<AckMessageResponse> 
context) {
+                final AckMessageResponse resp = context.getResp();
+                final String requestId = 
context.getRpcContext().getRequestId();
+                final Status status = resp.getStatus();
                 final Code code = status.getCode();
                 // Log failure and retry later.
                 if (!Code.OK.equals(code)) {
                     LOGGER.error("Failed to ack fifo message, would attempt to 
re-ack later, clientId={}, "
-                            + "consumerGroup={}, attempt={}, messageId={}, 
mq={}, code={}, endpoints={}, status "
-                            + "message=[{}]", clientId, consumerGroup, 
attempt, messageId, mq, code,
+                            + "consumerGroup={}, attempt={}, messageId={}, 
mq={}, code={}, requestId={}, endpoints={}, "
+                            + "status message=[{}]", clientId, consumerGroup, 
attempt, messageId, mq, code, requestId,
                         endpoints, status.getMessage());
                     ackFifoMessageLater(messageView, 1 + attempt, future0);
                     return;
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 4a80377..41648bd 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -71,6 +71,7 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -264,15 +265,16 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
 
     private ListenableFuture<Assignments> queryAssignment(final String topic) {
         final ListenableFuture<Endpoints> future = 
pickEndpointsToQueryAssignments(topic);
-        final ListenableFuture<QueryAssignmentResponse> responseFuture =
+        final ListenableFuture<InvocationContext<QueryAssignmentResponse>> 
responseFuture =
             Futures.transformAsync(future, endpoints -> {
                 final Metadata metadata = sign();
                 final QueryAssignmentRequest request = 
wrapQueryAssignmentRequest(topic);
                 return clientManager.queryAssignment(endpoints, metadata, 
request,
                     clientConfiguration.getRequestTimeout());
             }, MoreExecutors.directExecutor());
-        return Futures.transformAsync(responseFuture, response -> {
-            final Status status = response.getStatus();
+        return Futures.transformAsync(responseFuture, context -> {
+            final QueryAssignmentResponse resp = context.getResp();
+            final Status status = resp.getStatus();
             final Code code = status.getCode();
             if (!Code.OK.equals(code)) {
                 final String message = String.format("Failed to query 
assignment, code=%d, status message=[{%s}]",
@@ -280,7 +282,7 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
                 throw new RuntimeException(message);
             }
             SettableFuture<Assignments> future0 = SettableFuture.create();
-            final List<Assignment> assignmentList = 
response.getAssignmentsList().stream().map(assignment ->
+            final List<Assignment> assignmentList = 
resp.getAssignmentsList().stream().map(assignment ->
                 new Assignment(new 
MessageQueueImpl(assignment.getMessageQueue()))).collect(Collectors.toList());
             final Assignments assignments = new Assignments(assignmentList);
             future0.set(assignments);
@@ -398,7 +400,7 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
                             cacheAssignments.put(topic, latest);
                             return;
                         }
-                        LOGGER.info("Assignments of topic={} remains the same, 
assignments={}, clientId={}", topic,
+                        LOGGER.debug("Assignments of topic={} remains the 
same, assignments={}, clientId={}", topic,
                             existed, clientId);
                         // Process queue may be dropped, need to be 
synchronized anyway.
                         syncProcessQueue(topic, latest, filterExpression);
@@ -509,7 +511,7 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
             .setMaxDeliveryAttempts(getRetryPolicy().getMaxAttempts()).build();
     }
 
-    public ListenableFuture<ForwardMessageToDeadLetterQueueResponse> 
forwardMessageToDeadLetterQueue(
+    public 
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> 
forwardMessageToDeadLetterQueue(
         final MessageViewImpl messageView) {
         // Intercept before forwarding message to DLQ.
         final Stopwatch stopwatch = Stopwatch.createStarted();
@@ -517,7 +519,7 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
         doBefore(MessageHookPoints.FORWARD_TO_DLQ, messageCommons);
 
         final Endpoints endpoints = messageView.getEndpoints();
-        ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future;
+        
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> 
future;
         try {
             final ForwardMessageToDeadLetterQueueRequest request =
                 wrapForwardMessageToDeadLetterQueueRequest(messageView);
@@ -525,15 +527,14 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
             future = clientManager.forwardMessageToDeadLetterQueue(endpoints, 
metadata, request,
                 clientConfiguration.getRequestTimeout());
         } catch (Throwable t) {
-            final SettableFuture<ForwardMessageToDeadLetterQueueResponse> 
future0 = SettableFuture.create();
-            future0.setException(t);
-            future = future0;
+            future = Futures.immediateFailedFuture(t);
         }
-        Futures.addCallback(future, new 
FutureCallback<ForwardMessageToDeadLetterQueueResponse>() {
+        Futures.addCallback(future, new 
FutureCallback<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>() {
             @Override
-            public void onSuccess(ForwardMessageToDeadLetterQueueResponse 
response) {
+            public void 
onSuccess(InvocationContext<ForwardMessageToDeadLetterQueueResponse> context) {
+                final ForwardMessageToDeadLetterQueueResponse resp = 
context.getResp();
                 final Duration duration = stopwatch.elapsed();
-                MessageHookPointsStatus messageHookPointsStatus = 
Code.OK.equals(response.getStatus().getCode()) ?
+                MessageHookPointsStatus messageHookPointsStatus = 
Code.OK.equals(resp.getStatus().getCode()) ?
                     MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
                 // Intercept after forwarding message to DLQ.
                 doAfter(MessageHookPoints.FORWARD_TO_DLQ, messageCommons, 
duration, messageHookPointsStatus);
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
index 09b8d12..79d15a8 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
@@ -36,12 +36,14 @@ import org.apache.rocketmq.client.java.route.Endpoints;
 
 public class ReceiveMessageResult {
     private final Endpoints endpoints;
+    private final String requestId;
     private final ClientException exception;
 
     private final List<MessageViewImpl> messages;
 
-    public ReceiveMessageResult(Endpoints endpoints, Status status, 
List<MessageViewImpl> messages) {
+    public ReceiveMessageResult(Endpoints endpoints, String requestId, Status 
status, List<MessageViewImpl> messages) {
         this.endpoints = endpoints;
+        this.requestId = requestId;
         final Code code = status.getCode();
         switch (code) {
             case OK:
@@ -108,4 +110,8 @@ public class ReceiveMessageResult {
     public List<MessageViewImpl> getMessages() {
         return messages;
     }
+
+    public String getRequestId() {
+        return requestId;
+    }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index b0ccfd9..77750e1 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -56,6 +56,7 @@ import 
org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -240,9 +241,10 @@ class SimpleConsumerImpl extends ConsumerImpl implements 
SimpleConsumer {
             return future0;
         }
         MessageViewImpl impl = (MessageViewImpl) messageView;
-        final ListenableFuture<AckMessageResponse> future = ackMessage(impl);
-        return Futures.transformAsync(future, response -> {
-            final Status status = response.getStatus();
+        final ListenableFuture<InvocationContext<AckMessageResponse>> future = 
ackMessage(impl);
+        return Futures.transformAsync(future, context -> {
+            final AckMessageResponse resp = context.getResp();
+            final Status status = resp.getStatus();
             final Code code = status.getCode();
             switch (code) {
                 case OK:
@@ -300,12 +302,13 @@ class SimpleConsumerImpl extends ConsumerImpl implements 
SimpleConsumer {
             return future0;
         }
         MessageViewImpl impl = (MessageViewImpl) messageView;
-        final ListenableFuture<ChangeInvisibleDurationResponse> future = 
changeInvisibleDuration(impl,
-            invisibleDuration);
-        return Futures.transformAsync(future, response -> {
+        final 
ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future =
+            changeInvisibleDuration(impl, invisibleDuration);
+        return Futures.transformAsync(future, context -> {
+            final ChangeInvisibleDurationResponse resp = context.getResp();
             // Refresh receipt handle manually.
-            impl.setReceiptHandle(response.getReceiptHandle());
-            final Status status = response.getStatus();
+            impl.setReceiptHandle(resp.getReceiptHandle());
+            final Status status = resp.getStatus();
             final Code code = status.getCode();
             switch (code) {
                 case OK:
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 6434cca..427d318 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -72,6 +72,7 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -280,13 +281,14 @@ class ProducerImpl extends ClientImpl implements Producer 
{
             MessageHookPoints.COMMIT_TRANSACTION : 
MessageHookPoints.ROLLBACK_TRANSACTION;
         doBefore(messageHookPoints, messageCommons);
 
-        final ListenableFuture<EndTransactionResponse> future =
+        final ListenableFuture<InvocationContext<EndTransactionResponse>> 
future =
             clientManager.endTransaction(endpoints, metadata, request, 
requestTimeout);
-        Futures.addCallback(future, new 
FutureCallback<EndTransactionResponse>() {
+        Futures.addCallback(future, new 
FutureCallback<InvocationContext<EndTransactionResponse>>() {
             @Override
-            public void onSuccess(EndTransactionResponse result) {
+            public void onSuccess(InvocationContext<EndTransactionResponse> 
context) {
                 final Duration duration = stopwatch.elapsed();
-                final Status status = result.getStatus();
+                final EndTransactionResponse resp = context.getResp();
+                final Status status = resp.getStatus();
                 final Code code = status.getCode();
                 MessageHookPointsStatus messageHookPointsStatus = 
Code.OK.equals(code) ? MessageHookPointsStatus.OK :
                     MessageHookPointsStatus.ERROR;
@@ -299,8 +301,9 @@ class ProducerImpl extends ClientImpl implements Producer {
                 doAfter(messageHookPoints, messageCommons, duration, 
MessageHookPointsStatus.ERROR);
             }
         }, MoreExecutors.directExecutor());
-        final EndTransactionResponse response = handleClientFuture(future);
-        final Status status = response.getStatus();
+        final InvocationContext<EndTransactionResponse> context = 
handleClientFuture(future);
+        final EndTransactionResponse resp = context.getResp();
+        final Status status = resp.getStatus();
         final Code code = status.getCode();
         if (!Code.OK.equals(code)) {
             throw new ClientException(code.getNumber(), status.getMessage());
@@ -442,13 +445,13 @@ class ProducerImpl extends ClientImpl implements Producer 
{
         final Endpoints endpoints = messageQueue.getBroker().getEndpoints();
         final SendMessageRequest request = wrapSendMessageRequest(messages);
 
-        final ListenableFuture<SendMessageResponse> responseFuture = 
clientManager.sendMessage(endpoints, metadata,
-            request, clientConfiguration.getRequestTimeout());
+        final ListenableFuture<InvocationContext<SendMessageResponse>> 
responseFuture =
+            clientManager.sendMessage(endpoints, metadata, request, 
clientConfiguration.getRequestTimeout());
 
         final ListenableFuture<List<SendReceiptImpl>> attemptFuture = 
Futures.transformAsync(responseFuture,
             response -> {
                 final SettableFuture<List<SendReceiptImpl>> future0 = 
SettableFuture.create();
-                future0.set(SendReceiptImpl.processSendResponse(messageQueue, 
response));
+                future0.set(SendReceiptImpl.processSendResponse(messageQueue, 
response.getResp()));
                 return future0;
             }, MoreExecutors.directExecutor());
 
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/InvocationContext.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/InvocationContext.java
new file mode 100644
index 0000000..eb5f487
--- /dev/null
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/InvocationContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.rpc;
+
+import com.google.common.base.MoreObjects;
+
+public class InvocationContext<T> {
+    private final T t;
+    private final RpcContext rpcContext;
+
+    public InvocationContext(T t, RpcContext rpcContext) {
+        this.t = t;
+        this.rpcContext = rpcContext;
+    }
+
+    public T getResp() {
+        return t;
+    }
+
+    public RpcContext getRpcContext() {
+        return rpcContext;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("resp", t)
+            .add("rpcContext", rpcContext)
+            .toString();
+    }
+}
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
index 415d228..70744f3 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
@@ -72,9 +72,10 @@ public interface RpcClient {
      * @param request  query route request.
      * @param executor gRPC asynchronous executor.
      * @param duration request max duration.
-     * @return response future of topic route.
+     * @return invocation of response future.
      */
-    ListenableFuture<QueryRouteResponse> queryRoute(Metadata metadata, 
QueryRouteRequest request, Executor executor,
+    ListenableFuture<InvocationContext<QueryRouteResponse>> 
queryRoute(Metadata metadata, QueryRouteRequest request,
+        Executor executor,
         Duration duration);
 
     /**
@@ -84,9 +85,10 @@ public interface RpcClient {
      * @param request  heart beat request.
      * @param executor gRPC asynchronous executor.
      * @param duration request max duration.
-     * @return response future of heart beat.
+     * @return invocation of response future.
      */
-    ListenableFuture<HeartbeatResponse> heartbeat(Metadata metadata, 
HeartbeatRequest request, Executor executor,
+    ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Metadata 
metadata, HeartbeatRequest request,
+        Executor executor,
         Duration duration);
 
     /**
@@ -96,10 +98,10 @@ public interface RpcClient {
      * @param request  send message request.
      * @param executor gRPC asynchronous executor.
      * @param duration request max duration.
-     * @return response future of sending message.
+     * @return invocation of response future.
      */
-    ListenableFuture<SendMessageResponse> sendMessage(Metadata metadata, 
SendMessageRequest request, Executor executor,
-        Duration duration);
+    ListenableFuture<InvocationContext<SendMessageResponse>> 
sendMessage(Metadata metadata,
+        SendMessageRequest request, Executor executor, Duration duration);
 
     /**
      * Query assignment asynchronously.
@@ -108,10 +110,10 @@ public interface RpcClient {
      * @param request  query assignment request.
      * @param executor gRPC asynchronous executor.
      * @param duration request max duration.
-     * @return response future of query assignment.
+     * @return invocation of response future.
      */
-    ListenableFuture<QueryAssignmentResponse> queryAssignment(Metadata 
metadata, QueryAssignmentRequest request,
-        Executor executor, Duration duration);
+    ListenableFuture<InvocationContext<QueryAssignmentResponse>> 
queryAssignment(Metadata metadata,
+        QueryAssignmentRequest request, Executor executor, Duration duration);
 
     /**
      * Receiving message asynchronously from server.
@@ -119,9 +121,10 @@ public interface RpcClient {
      * @param metadata gRPC request header metadata.
      * @param request  receiving message request.
      * @param executor gRPC asynchronous executor.
+     * @return invocation of response future.
      */
-    ListenableFuture<Iterator<ReceiveMessageResponse>> receiveMessage(Metadata 
metadata, ReceiveMessageRequest request,
-        ExecutorService executor, Duration duration);
+    ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> 
receiveMessage(Metadata metadata,
+        ReceiveMessageRequest request, ExecutorService executor, Duration 
duration);
 
     /**
      * Ack message asynchronously after success of consumption.
@@ -130,10 +133,10 @@ public interface RpcClient {
      * @param request  ack message request.
      * @param executor gRPC asynchronous executor.
      * @param duration request max duration.
-     * @return response future of ack message.
+     * @return invocation of response future.
      */
-    ListenableFuture<AckMessageResponse> ackMessage(Metadata metadata, 
AckMessageRequest request, Executor executor,
-        Duration duration);
+    ListenableFuture<InvocationContext<AckMessageResponse>> 
ackMessage(Metadata metadata, AckMessageRequest request,
+        Executor executor, Duration duration);
 
     /**
      * Change message invisible duration.
@@ -142,9 +145,9 @@ public interface RpcClient {
      * @param request  change invisible duration request.
      * @param executor gRPC asynchronous executor.
      * @param duration request max duration.
-     * @return response future of change message invisible duration.
+     * @return invocation of response future.
      */
-    ListenableFuture<ChangeInvisibleDurationResponse> 
changeInvisibleDuration(Metadata metadata,
+    ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> 
changeInvisibleDuration(Metadata metadata,
         ChangeInvisibleDurationRequest request, Executor executor, Duration 
duration);
 
     /**
@@ -154,9 +157,9 @@ public interface RpcClient {
      * @param request  request of sending message to DLQ.
      * @param executor gRPC asynchronous executor.
      * @param duration request max duration.
-     * @return response future of sending message to DLQ.
+     * @return invocation of response future.
      */
-    ListenableFuture<ForwardMessageToDeadLetterQueueResponse> 
forwardMessageToDeadLetterQueue(
+    
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> 
forwardMessageToDeadLetterQueue(
         Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, 
Executor executor, Duration duration);
 
     /**
@@ -166,10 +169,10 @@ public interface RpcClient {
      * @param request  end transaction request.
      * @param executor gRPC asynchronous executor.
      * @param duration request max duration.
-     * @return response future of submitting transaction resolution.
+     * @return invocation of response future.
      */
-    ListenableFuture<EndTransactionResponse> endTransaction(Metadata metadata, 
EndTransactionRequest request,
-        Executor executor, Duration duration);
+    ListenableFuture<InvocationContext<EndTransactionResponse>> 
endTransaction(Metadata metadata,
+        EndTransactionRequest request, Executor executor, Duration duration);
 
     /**
      * Asynchronously notify server that client is terminated.
@@ -178,9 +181,9 @@ public interface RpcClient {
      * @param request  notify client termination request.
      * @param executor gRPC asynchronous executor.
      * @param duration request max duration.
-     * @return response future of notification of client termination.
+     * @return invocation of response future.
      */
-    ListenableFuture<NotifyClientTerminationResponse> 
notifyClientTermination(Metadata metadata,
+    ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> 
notifyClientTermination(Metadata metadata,
         NotifyClientTerminationRequest request, Executor executor, Duration 
duration);
 
     StreamObserver<TelemetryCommand> telemetry(Metadata metadata, Executor 
executor, Duration duration,
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
index 15b94aa..ffba65e 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
@@ -39,6 +39,7 @@ import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.TelemetryCommand;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.grpc.ClientInterceptor;
@@ -62,10 +63,12 @@ import java.util.concurrent.TimeUnit;
 import javax.net.ssl.SSLException;
 import org.apache.rocketmq.client.java.route.Endpoints;
 
+@SuppressWarnings("UnstableApiUsage")
 public class RpcClientImpl implements RpcClient {
     private static final Duration KEEP_ALIVE_DURATION = Duration.ofSeconds(30);
     private static final int GRPC_MAX_MESSAGE_SIZE = Integer.MAX_VALUE;
 
+    private final Endpoints endpoints;
     private final ManagedChannel channel;
     private final MessagingServiceGrpc.MessagingServiceFutureStub futureStub;
     private final MessagingServiceGrpc.MessagingServiceBlockingStub 
blockingStub;
@@ -75,6 +78,7 @@ public class RpcClientImpl implements RpcClient {
 
     @SuppressWarnings("deprecation")
     public RpcClientImpl(Endpoints endpoints) throws SSLException {
+        this.endpoints = endpoints;
         final SslContextBuilder builder = GrpcSslContexts.forClient();
         builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
         SslContext sslContext = builder.build();
@@ -102,6 +106,14 @@ public class RpcClientImpl implements RpcClient {
         this.activityNanoTime = System.nanoTime();
     }
 
+    private <T> ListenableFuture<InvocationContext<T>> 
wrapInvocationContext(ListenableFuture<T> future,
+        Metadata header) {
+        return Futures.transformAsync(future, response -> {
+            final RpcContext rpcContext = new RpcContext(endpoints, header);
+            return Futures.immediateFuture(new InvocationContext<>(response, 
rpcContext));
+        }, MoreExecutors.directExecutor());
+    }
+
     @Override
     public Duration idleDuration() {
         return Duration.ofNanos(System.nanoTime() - activityNanoTime);
@@ -113,85 +125,105 @@ public class RpcClientImpl implements RpcClient {
     }
 
     @Override
-    public ListenableFuture<QueryRouteResponse> queryRoute(Metadata metadata, 
QueryRouteRequest request,
-        Executor executor, Duration duration) {
+    public ListenableFuture<InvocationContext<QueryRouteResponse>> 
queryRoute(Metadata metadata,
+        QueryRouteRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+        final ListenableFuture<QueryRouteResponse> future = futureStub
+            
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
             .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).queryRoute(request);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<HeartbeatResponse> heartbeat(Metadata metadata, 
HeartbeatRequest request, Executor executor,
-        Duration duration) {
+    public ListenableFuture<InvocationContext<HeartbeatResponse>> 
heartbeat(Metadata metadata, HeartbeatRequest request,
+        Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-            .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).heartbeat(request);
+        final ListenableFuture<HeartbeatResponse> future =
+            
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+                .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).heartbeat(request);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<SendMessageResponse> sendMessage(Metadata 
metadata, SendMessageRequest request,
-        Executor executor, Duration duration) {
+    public ListenableFuture<InvocationContext<SendMessageResponse>> 
sendMessage(Metadata metadata,
+        SendMessageRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-            .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).sendMessage(request);
+        final ListenableFuture<SendMessageResponse> future =
+            
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+                .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).sendMessage(request);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<QueryAssignmentResponse> queryAssignment(Metadata 
metadata, QueryAssignmentRequest request,
-        Executor executor, Duration duration) {
+    public ListenableFuture<InvocationContext<QueryAssignmentResponse>> 
queryAssignment(Metadata metadata,
+        QueryAssignmentRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-            .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).queryAssignment(request);
+        final ListenableFuture<QueryAssignmentResponse> future =
+            
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+                .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).queryAssignment(request);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<Iterator<ReceiveMessageResponse>> 
receiveMessage(Metadata metadata,
+    public 
ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> 
receiveMessage(Metadata metadata,
         ReceiveMessageRequest request, ExecutorService executor, Duration 
duration) {
         this.activityNanoTime = System.nanoTime();
         final Callable<Iterator<ReceiveMessageResponse>> callable = () -> 
blockingStub
             
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
             .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).receiveMessage(request);
-        return MoreExecutors.listeningDecorator(executor).submit(callable);
+        final ListenableFuture<Iterator<ReceiveMessageResponse>> future =
+            MoreExecutors.listeningDecorator(executor).submit(callable);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<AckMessageResponse> ackMessage(Metadata metadata, 
AckMessageRequest request,
-        Executor executor, Duration duration) {
+    public ListenableFuture<InvocationContext<AckMessageResponse>> 
ackMessage(Metadata metadata,
+        AckMessageRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-            .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).ackMessage(request);
+        final ListenableFuture<AckMessageResponse> future =
+            
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+                .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).ackMessage(request);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<ChangeInvisibleDurationResponse> 
changeInvisibleDuration(Metadata metadata,
-        ChangeInvisibleDurationRequest request, Executor executor, Duration 
duration) {
+    public 
ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> 
changeInvisibleDuration(
+        Metadata metadata, ChangeInvisibleDurationRequest request, Executor 
executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-            .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).changeInvisibleDuration(request);
+        final ListenableFuture<ChangeInvisibleDurationResponse> future =
+            
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+                .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).changeInvisibleDuration(request);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<ForwardMessageToDeadLetterQueueResponse> 
forwardMessageToDeadLetterQueue(
+    public 
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> 
forwardMessageToDeadLetterQueue(
         Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, 
Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+        final ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future 
= futureStub
+            
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
             .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).forwardMessageToDeadLetterQueue(request);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<EndTransactionResponse> endTransaction(Metadata 
metadata, EndTransactionRequest request,
-        Executor executor, Duration duration) {
+    public ListenableFuture<InvocationContext<EndTransactionResponse>> 
endTransaction(Metadata metadata,
+        EndTransactionRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-            .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).endTransaction(request);
+        final ListenableFuture<EndTransactionResponse> future =
+            
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+                .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).endTransaction(request);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<NotifyClientTerminationResponse> 
notifyClientTermination(
+    public 
ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> 
notifyClientTermination(
         Metadata metadata, NotifyClientTerminationRequest request, Executor 
executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-            .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).notifyClientTermination(request);
+        final ListenableFuture<NotifyClientTerminationResponse> future =
+            
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+                .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).notifyClientTermination(request);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcContext.java 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcContext.java
new file mode 100644
index 0000000..5def58b
--- /dev/null
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcContext.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.rpc;
+
+import io.grpc.Metadata;
+import org.apache.rocketmq.client.java.route.Endpoints;
+
+public class RpcContext {
+    private final Endpoints endpoints;
+    private final Metadata header;
+
+    public RpcContext(Endpoints endpoints, Metadata header) {
+        this.endpoints = endpoints;
+        this.header = header;
+    }
+
+    public String getRequestId() {
+        return header.get(Metadata.Key.of(Signature.REQUEST_ID_KEY, 
Metadata.ASCII_STRING_MARSHALLER));
+    }
+
+    public Endpoints getEndpoints() {
+        return endpoints;
+    }
+}
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
index 33b9f76..5c0a2cb 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
@@ -53,7 +53,7 @@ public class Signature {
     private Signature() {
     }
 
-    public static Metadata sign(ClientConfiguration config, String clientId) 
throws UnsupportedEncodingException,
+    public static Metadata sign(ClientConfiguration config, String clientId) 
throws
         NoSuchAlgorithmException, InvalidKeyException {
         Metadata metadata = new Metadata();
 
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
index ca2f720..9b60aa6 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
@@ -31,11 +31,10 @@ public class TLSHelper {
     private TLSHelper() {
     }
 
-    public static String sign(String accessSecret, String dateTime) throws 
UnsupportedEncodingException,
-            NoSuchAlgorithmException,
-            InvalidKeyException {
+    public static String sign(String accessSecret, String dateTime) throws 
NoSuchAlgorithmException,
+        InvalidKeyException {
         SecretKeySpec signingKey = new 
SecretKeySpec(accessSecret.getBytes(StandardCharsets.UTF_8),
-                HMAC_SHA1_ALGORITHM);
+            HMAC_SHA1_ALGORITHM);
         Mac mac;
         mac = Mac.getInstance(HMAC_SHA1_ALGORITHM);
         mac.init(signingKey);
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index 401ce38..5ef44bc 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -44,8 +44,10 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.apache.rocketmq.client.java.misc.RequestIdGenerator;
 import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Before;
 import org.junit.Test;
@@ -117,7 +119,7 @@ public class ProcessQueueImplTest extends TestBase {
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
         when(pushConsumerSettings.isFifo()).thenReturn(false);
         when(pushConsumer.changeInvisibleDuration(any(MessageViewImpl.class), 
any(Duration.class)))
-            .thenReturn(okChangeInvisibleDurationFuture());
+            .thenReturn(okChangeInvisibleDurationCtxFuture());
         processQueue.cacheMessages(messageViewList);
         verify(pushConsumer, times(1))
             .changeInvisibleDuration(any(MessageViewImpl.class), 
any(Duration.class));
@@ -148,7 +150,8 @@ public class ProcessQueueImplTest extends TestBase {
         List<MessageViewImpl> messageViewList = new ArrayList<>();
         final MessageViewImpl messageView = fakeMessageViewImpl();
         messageViewList.add(messageView);
-        ReceiveMessageResult receiveMessageResult = new 
ReceiveMessageResult(fakeEndpoints(), status, messageViewList);
+        ReceiveMessageResult receiveMessageResult = new 
ReceiveMessageResult(fakeEndpoints(),
+            RequestIdGenerator.getInstance().next(), status, messageViewList);
         SettableFuture<ReceiveMessageResult> future0 = SettableFuture.create();
         future0.set(receiveMessageResult);
         when(pushConsumer.receiveMessage(any(ReceiveMessageRequest.class), 
any(MessageQueueImpl.class),
@@ -178,7 +181,7 @@ public class ProcessQueueImplTest extends TestBase {
         assertEquals(cachedMessageCount, processQueue.cachedMessagesCount());
         assertEquals(1, processQueue.inflightMessagesCount());
 
-        final ListenableFuture<AckMessageResponse> future = 
okAckMessageResponseFuture();
+        final ListenableFuture<InvocationContext<AckMessageResponse>> future = 
okAckMessageResponseFuture();
         
when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future);
         processQueue.eraseMessage(optionalMessageView.get(), 
ConsumeResult.SUCCESS);
         future.addListener(() -> verify(pushConsumer, times(1))
@@ -221,7 +224,7 @@ public class ProcessQueueImplTest extends TestBase {
         final MessageViewImpl messageView = fakeMessageViewImpl(2, false);
         messageViewList.add(messageView);
         processQueue.cacheMessages(messageViewList);
-        ListenableFuture<AckMessageResponse> future0 = 
okAckMessageResponseFuture();
+        ListenableFuture<InvocationContext<AckMessageResponse>> future0 = 
okAckMessageResponseFuture();
         
when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future0);
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
         when(retryPolicy.getMaxAttempts()).thenReturn(1);
@@ -237,7 +240,7 @@ public class ProcessQueueImplTest extends TestBase {
         final MessageViewImpl messageView = fakeMessageViewImpl(2, false);
         messageViewList.add(messageView);
         processQueue.cacheMessages(messageViewList);
-        ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future0 =
+        
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> 
future0 =
             okForwardMessageToDeadLetterQueueResponseFuture();
         
when(pushConsumer.forwardMessageToDeadLetterQueue(any(MessageViewImpl.class))).thenReturn(future0);
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
@@ -254,7 +257,7 @@ public class ProcessQueueImplTest extends TestBase {
         final MessageViewImpl messageView = fakeMessageViewImpl(2, false);
         messageViewList.add(messageView);
         processQueue.cacheMessages(messageViewList);
-        ListenableFuture<AckMessageResponse> future0 = 
okAckMessageResponseFuture();
+        ListenableFuture<InvocationContext<AckMessageResponse>> future0 = 
okAckMessageResponseFuture();
         
when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future0);
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
         when(retryPolicy.getMaxAttempts()).thenReturn(2);
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
index 54e507c..a7b5357 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
@@ -124,7 +124,8 @@ public class PushConsumerImplTest extends TestBase {
             any(QueryRouteRequest.class), any(Duration.class));
         verify(clientManager, atLeast(1)).queryAssignment(any(Endpoints.class),
             any(Metadata.class), any(QueryAssignmentRequest.class), 
any(Duration.class));
-        
Assert.assertEquals(okQueryAssignmentResponseFuture().get().getAssignmentsCount(),
 pushConsumer.getQueueSize());
+        
Assert.assertEquals(okQueryAssignmentResponseFuture().get().getResp().getAssignmentsCount(),
+            pushConsumer.getQueueSize());
         when(clientManager.queryAssignment(any(Endpoints.class), 
any(Metadata.class), any(QueryAssignmentRequest.class),
             
any(Duration.class))).thenReturn(okEmptyQueryAssignmentResponseFuture());
         pushConsumer.scanAssignments();
@@ -148,7 +149,8 @@ public class PushConsumerImplTest extends TestBase {
 
     @Test
     public void testSubscribeWithSubscriptionOverwriting() throws 
ClientException {
-        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, 
subscriptionExpressions, messageListener,
+        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, 
subscriptionExpressions,
+            messageListener,
             maxCacheMessageCount, maxCacheMessageSizeInBytes, 
consumptionThreadCount);
         start(pushConsumer);
         final FilterExpression filterExpression = new 
FilterExpression(FAKE_TAG_0);
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
index 9568a81..88445d1 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
@@ -63,6 +63,7 @@ import org.apache.rocketmq.client.java.impl.TelemetrySession;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -87,7 +88,7 @@ public class SimpleConsumerImplTest extends TestBase {
     private SimpleConsumerImpl simpleConsumer;
 
     private void start(SimpleConsumerImpl simpleConsumer) throws 
ClientException {
-        SettableFuture<QueryRouteResponse> future0 = SettableFuture.create();
+        SettableFuture<InvocationContext<QueryRouteResponse>> future0 = 
SettableFuture.create();
         Status status = Status.newBuilder().setCode(Code.OK).build();
         List<MessageQueue> messageQueueList = new ArrayList<>();
         MessageQueue mq = 
MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0))
@@ -97,7 +98,9 @@ public class SimpleConsumerImplTest extends TestBase {
         messageQueueList.add(mq);
         QueryRouteResponse response = 
QueryRouteResponse.newBuilder().setStatus(status)
             .addAllMessageQueues(messageQueueList).build();
-        future0.set(response);
+        final InvocationContext<QueryRouteResponse> invocationContext = new 
InvocationContext<>(response,
+            fakeRpcContext());
+        future0.set(invocationContext);
         when(clientManager.queryRoute(any(Endpoints.class), 
any(Metadata.class), any(QueryRouteRequest.class),
             any(Duration.class)))
             .thenReturn(future0);
@@ -180,7 +183,7 @@ public class SimpleConsumerImplTest extends TestBase {
         simpleConsumer = new SimpleConsumerImpl(clientConfiguration, 
FAKE_GROUP_0, awaitDuration, subExpressions);
         start(simpleConsumer);
         int receivedMessageCount = 16;
-        final ListenableFuture<Iterator<ReceiveMessageResponse>> future =
+        final 
ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> future =
             okReceiveMessageResponsesFuture(FAKE_TOPIC_0, 
receivedMessageCount);
         when(clientManager.receiveMessage(any(Endpoints.class), 
any(Metadata.class), any(ReceiveMessageRequest.class),
             any(Duration.class))).thenReturn(future);
@@ -197,7 +200,7 @@ public class SimpleConsumerImplTest extends TestBase {
         start(simpleConsumer);
         try {
             final MessageViewImpl messageView = fakeMessageViewImpl();
-            final ListenableFuture<AckMessageResponse> future = 
okAckMessageResponseFuture();
+            final ListenableFuture<InvocationContext<AckMessageResponse>> 
future = okAckMessageResponseFuture();
             when(clientManager.ackMessage(any(Endpoints.class), 
any(Metadata.class), any(AckMessageRequest.class),
                 any(Duration.class))).thenReturn(future);
             simpleConsumer.ack(messageView);
@@ -212,7 +215,7 @@ public class SimpleConsumerImplTest extends TestBase {
         start(simpleConsumer);
         try {
             final MessageViewImpl messageView = fakeMessageViewImpl();
-            final ListenableFuture<ChangeInvisibleDurationResponse> future =
+            final 
ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future =
                 okChangeInvisibleDurationResponseFuture(FAKE_RECEIPT_HANDLE_1);
             when(clientManager.changeInvisibleDuration(any(Endpoints.class), 
any(Metadata.class),
                 any(ChangeInvisibleDurationRequest.class), 
any(Duration.class))).thenReturn(future);
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
index 907a6fc..619538e 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
@@ -61,6 +61,7 @@ import 
org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
 import org.apache.rocketmq.client.java.impl.TelemetrySession;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -94,7 +95,7 @@ public class ProducerImplTest extends TestBase {
         null);
 
     private void start(ProducerImpl producer) throws ClientException {
-        SettableFuture<QueryRouteResponse> future0 = SettableFuture.create();
+        SettableFuture<InvocationContext<QueryRouteResponse>> future0 = 
SettableFuture.create();
         Status status = Status.newBuilder().setCode(Code.OK).build();
         List<MessageQueue> messageQueueList = new ArrayList<>();
         MessageQueue mq = 
MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0))
@@ -102,9 +103,11 @@ public class ProducerImplTest extends TestBase {
             
.setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0()))
             .setId(0).build();
         messageQueueList.add(mq);
-        QueryRouteResponse response = 
QueryRouteResponse.newBuilder().setStatus(status)
+        QueryRouteResponse resp = 
QueryRouteResponse.newBuilder().setStatus(status)
             .addAllMessageQueues(messageQueueList).build();
-        future0.set(response);
+        final InvocationContext<QueryRouteResponse> invocationContext =
+            new InvocationContext<>(resp, fakeRpcContext());
+        future0.set(invocationContext);
         when(clientManager.queryRoute(any(Endpoints.class), 
any(Metadata.class), any(QueryRouteRequest.class),
             any(Duration.class)))
             .thenReturn(future0);
@@ -144,10 +147,11 @@ public class ProducerImplTest extends TestBase {
         verify(clientManager, times(1)).telemetry(any(Endpoints.class), 
any(Metadata.class),
             any(Duration.class), any(TelemetrySession.class));
         final Message message = fakeMessage(FAKE_TOPIC_0);
-        final ListenableFuture<SendMessageResponse> future = 
okSendMessageResponseFutureWithSingleEntry();
+        final ListenableFuture<InvocationContext<SendMessageResponse>> future =
+            okSendMessageResponseFutureWithSingleEntry();
         when(clientManager.sendMessage(any(Endpoints.class), 
any(Metadata.class), any(SendMessageRequest.class),
             any(Duration.class))).thenReturn(future);
-        final SendMessageResponse response = future.get();
+        final SendMessageResponse response = future.get().getResp();
         assertEquals(1, response.getEntriesCount());
         final apache.rocketmq.v2.SendResultEntry receipt = 
response.getEntriesList().iterator().next();
         final SendReceipt sendReceipt = producer.send(message);
@@ -163,10 +167,11 @@ public class ProducerImplTest extends TestBase {
         verify(clientManager, never()).telemetry(any(Endpoints.class), 
any(Metadata.class), any(Duration.class),
             any(TelemetrySession.class));
         final Message message = fakeMessage(FAKE_TOPIC_0);
-        final ListenableFuture<SendMessageResponse> future = 
okSendMessageResponseFutureWithSingleEntry();
+        final ListenableFuture<InvocationContext<SendMessageResponse>> future =
+            okSendMessageResponseFutureWithSingleEntry();
         when(clientManager.sendMessage(any(Endpoints.class), 
any(Metadata.class), any(SendMessageRequest.class),
             any(Duration.class))).thenReturn(future);
-        final SendMessageResponse response = future.get();
+        final SendMessageResponse response = future.get().getResp();
         assertEquals(1, response.getEntriesCount());
         final SendReceipt sendReceipt = 
producerWithoutTopicBinding.send(message);
         verify(clientManager, times(1)).queryRoute(any(Endpoints.class), 
any(Metadata.class),
@@ -185,7 +190,7 @@ public class ProducerImplTest extends TestBase {
             any(QueryRouteRequest.class), any(Duration.class));
         verify(clientManager, times(1)).telemetry(any(Endpoints.class), 
any(Metadata.class), any(Duration.class),
             any(TelemetrySession.class));
-        final ListenableFuture<SendMessageResponse> future = 
failureSendMessageResponseFuture();
+        final ListenableFuture<InvocationContext<SendMessageResponse>> future 
= failureSendMessageResponseFuture();
         when(clientManager.sendMessage(any(Endpoints.class), 
any(Metadata.class), any(SendMessageRequest.class),
             any(Duration.class))).thenReturn(future);
         Message message0 = fakeMessage(FAKE_TOPIC_0);
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index 212cb06..f3bcbb2 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -39,9 +39,11 @@ import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.SendResultEntry;
 import apache.rocketmq.v2.Status;
 import apache.rocketmq.v2.SystemProperties;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.protobuf.ByteString;
+import io.grpc.Metadata;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -72,6 +74,8 @@ import 
org.apache.rocketmq.client.java.retry.CustomizedBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
+import org.apache.rocketmq.client.java.rpc.RpcContext;
 
 public class TestBase {
     protected static final String FAKE_CLIENT_ID = "mbp@29848@cno0nhxy";
@@ -137,6 +141,10 @@ public class TestBase {
         return new Endpoints(fakePbEndpoints0());
     }
 
+    protected RpcContext fakeRpcContext() {
+        return new RpcContext(fakeEndpoints(), new Metadata());
+    }
+
     protected Message fakeMessage(String topic) {
         return new 
MessageBuilderImpl().setTopic(topic).setBody(RandomUtils.nextBytes(1)).build();
     }
@@ -207,40 +215,35 @@ public class TestBase {
             .setPermission(Permission.READ_WRITE).build();
     }
 
-    protected ListenableFuture<QueryRouteResponse> 
okQueryRouteResponseFuture() {
-        SettableFuture<QueryRouteResponse> future = SettableFuture.create();
+    protected ListenableFuture<InvocationContext<QueryRouteResponse>> 
okQueryRouteResponseFuture() {
         Status status = Status.newBuilder().setCode(Code.OK).build();
-        final QueryRouteResponse response =
+        final QueryRouteResponse resp =
             
QueryRouteResponse.newBuilder().setStatus(status).addMessageQueues(fakePbMessageQueue0()).build();
-        future.set(response);
-        return future;
+        return Futures.immediateFuture(new InvocationContext<>(resp, 
fakeRpcContext()));
     }
 
-    protected ListenableFuture<ChangeInvisibleDurationResponse> 
okChangeInvisibleDurationFuture() {
-        SettableFuture<ChangeInvisibleDurationResponse> future = 
SettableFuture.create();
+    protected 
ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>>
+        okChangeInvisibleDurationCtxFuture() {
         Status status = Status.newBuilder().setCode(Code.OK).build();
-        final ChangeInvisibleDurationResponse response =
+        final ChangeInvisibleDurationResponse resp =
             
ChangeInvisibleDurationResponse.newBuilder().setStatus(status).build();
-        future.set(response);
-        return future;
+        return Futures.immediateFuture(new InvocationContext<>(resp, 
fakeRpcContext()));
     }
 
-    protected ListenableFuture<QueryAssignmentResponse> 
okQueryAssignmentResponseFuture() {
+    protected ListenableFuture<InvocationContext<QueryAssignmentResponse>> 
okQueryAssignmentResponseFuture() {
         final SettableFuture<QueryAssignmentResponse> future = 
SettableFuture.create();
         final Status status = Status.newBuilder().setCode(Code.OK).build();
         Assignment assignment = 
Assignment.newBuilder().setMessageQueue(fakePbMessageQueue0()).build();
-        QueryAssignmentResponse response = 
QueryAssignmentResponse.newBuilder().setStatus(status)
+        QueryAssignmentResponse resp = 
QueryAssignmentResponse.newBuilder().setStatus(status)
             .addAssignments(assignment).build();
-        future.set(response);
-        return future;
+        return Futures.immediateFuture(new InvocationContext<>(resp, 
fakeRpcContext()));
     }
 
-    protected ListenableFuture<QueryAssignmentResponse> 
okEmptyQueryAssignmentResponseFuture() {
+    protected ListenableFuture<InvocationContext<QueryAssignmentResponse>> 
okEmptyQueryAssignmentResponseFuture() {
         final SettableFuture<QueryAssignmentResponse> future = 
SettableFuture.create();
         final Status status = Status.newBuilder().setCode(Code.OK).build();
-        final QueryAssignmentResponse response = 
QueryAssignmentResponse.newBuilder().setStatus(status).build();
-        future.set(response);
-        return future;
+        final QueryAssignmentResponse resp = 
QueryAssignmentResponse.newBuilder().setStatus(status).build();
+        return Futures.immediateFuture(new InvocationContext<>(resp, 
fakeRpcContext()));
     }
 
     protected Map<String, FilterExpression> 
createSubscriptionExpressions(String topic) {
@@ -250,53 +253,44 @@ public class TestBase {
         return map;
     }
 
-    protected ListenableFuture<AckMessageResponse> 
okAckMessageResponseFuture() {
+    protected ListenableFuture<InvocationContext<AckMessageResponse>> 
okAckMessageResponseFuture() {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
-        SettableFuture<AckMessageResponse> future0 = SettableFuture.create();
-        final AckMessageResponse response = 
AckMessageResponse.newBuilder().setStatus(status).build();
-        future0.set(response);
-        return future0;
+        final AckMessageResponse resp = 
AckMessageResponse.newBuilder().setStatus(status).build();
+        return Futures.immediateFuture(new InvocationContext<>(resp, 
fakeRpcContext()));
     }
 
-    protected ListenableFuture<ChangeInvisibleDurationResponse> 
okChangeInvisibleDurationResponseFuture(
-        String receiptHandle) {
+    protected 
ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>>
+        okChangeInvisibleDurationResponseFuture(String receiptHandle) {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
         SettableFuture<ChangeInvisibleDurationResponse> future = 
SettableFuture.create();
-        ChangeInvisibleDurationResponse response = 
ChangeInvisibleDurationResponse.newBuilder().setStatus(status)
+        ChangeInvisibleDurationResponse resp = 
ChangeInvisibleDurationResponse.newBuilder().setStatus(status)
             .setReceiptHandle(receiptHandle).build();
-        future.set(response);
-        return future;
+        return Futures.immediateFuture(new InvocationContext<>(resp, 
fakeRpcContext()));
     }
 
-    protected ListenableFuture<ForwardMessageToDeadLetterQueueResponse>
-        okForwardMessageToDeadLetterQueueResponseFuture() {
+    protected 
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>
+    okForwardMessageToDeadLetterQueueResponseFuture() {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
-        SettableFuture<ForwardMessageToDeadLetterQueueResponse> future0 = 
SettableFuture.create();
-        final ForwardMessageToDeadLetterQueueResponse response =
+        final ForwardMessageToDeadLetterQueueResponse resp =
             
ForwardMessageToDeadLetterQueueResponse.newBuilder().setStatus(status).build();
-        future0.set(response);
-        return future0;
+        return Futures.immediateFuture(new InvocationContext<>(resp, 
fakeRpcContext()));
     }
 
-    protected ListenableFuture<SendMessageResponse> 
okSendMessageResponseFutureWithSingleEntry() {
+    protected ListenableFuture<InvocationContext<SendMessageResponse>> 
okSendMessageResponseFutureWithSingleEntry() {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
-        SettableFuture<SendMessageResponse> future0 = SettableFuture.create();
         final String messageId = 
MessageIdCodec.getInstance().nextMessageId().toString();
         SendResultEntry entry = 
SendResultEntry.newBuilder().setMessageId(messageId)
             
.setTransactionId(FAKE_TRANSACTION_ID).setStatus(status).setOffset(1).build();
-        SendMessageResponse response = 
SendMessageResponse.newBuilder().setStatus(status).addEntries(entry).build();
-        future0.set(response);
-        return future0;
+        SendMessageResponse resp = 
SendMessageResponse.newBuilder().setStatus(status).addEntries(entry).build();
+        return Futures.immediateFuture(new InvocationContext<>(resp, 
fakeRpcContext()));
     }
 
-    protected ListenableFuture<SendMessageResponse> 
failureSendMessageResponseFuture() {
+    protected ListenableFuture<InvocationContext<SendMessageResponse>> 
failureSendMessageResponseFuture() {
         final Status status = 
Status.newBuilder().setCode(Code.FORBIDDEN).build();
-        SettableFuture<SendMessageResponse> future0 = SettableFuture.create();
         SendResultEntry sendResultEntry = 
SendResultEntry.newBuilder().setStatus(status).setStatus(status).build();
-        SendMessageResponse response = 
SendMessageResponse.newBuilder().setStatus(status)
+        SendMessageResponse resp = 
SendMessageResponse.newBuilder().setStatus(status)
             .addEntries(sendResultEntry).build();
-        future0.set(response);
-        return future0;
+        return Futures.immediateFuture(new InvocationContext<>(resp, 
fakeRpcContext()));
     }
 
     protected ListenableFuture<SendMessageResponse> 
okBatchSendMessageResponseFuture() {
@@ -326,11 +320,9 @@ public class TestBase {
             .setSystemProperties(systemProperties).build();
     }
 
-    protected ListenableFuture<Iterator<ReceiveMessageResponse>> 
okReceiveMessageResponsesFuture(String topic,
-        int messageCount) {
+    protected 
ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> 
okReceiveMessageResponsesFuture(
+        String topic, int messageCount) {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
-        SettableFuture<Iterator<ReceiveMessageResponse>> future = 
SettableFuture.create();
-
         final apache.rocketmq.v2.Message message = fakePbMessage(topic);
         List<ReceiveMessageResponse> responses = new ArrayList<>();
         ReceiveMessageResponse statusResponse = 
ReceiveMessageResponse.newBuilder().setStatus(status).build();
@@ -339,9 +331,7 @@ public class TestBase {
             ReceiveMessageResponse messageResponse = 
ReceiveMessageResponse.newBuilder().setMessage(message).build();
             responses.add(messageResponse);
         }
-
-        future.set(responses.iterator());
-        return future;
+        return Futures.immediateFuture(new 
InvocationContext<>(responses.iterator(), fakeRpcContext()));
     }
 
     protected ListenableFuture<EndTransactionResponse> 
okEndTransactionResponseFuture() {
@@ -369,8 +359,9 @@ public class TestBase {
 
     protected SendReceiptImpl fakeSendReceiptImpl(
         MessageQueueImpl mq) throws ExecutionException, InterruptedException, 
ClientException {
-        final ListenableFuture<SendMessageResponse> future = 
okSendMessageResponseFutureWithSingleEntry();
-        final SendMessageResponse response = future.get();
+        final ListenableFuture<InvocationContext<SendMessageResponse>> future =
+            okSendMessageResponseFutureWithSingleEntry();
+        final SendMessageResponse response = future.get().getResp();
         final List<SendReceiptImpl> receipts = 
SendReceiptImpl.processSendResponse(mq, response);
         return receipts.iterator().next();
     }

Reply via email to