This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 079196b  Remove RpcInvocation (#153)
079196b is described below

commit 079196b3a3467a873e91abd3274aff9672ea21e3
Author: Aaron Ai <[email protected]>
AuthorDate: Mon Aug 15 14:30:31 2022 +0800

    Remove RpcInvocation (#153)
    
    * Add badge
    
    * Remove RpcInvocation
---
 README.md                                          |   2 +
 .../client/java/exception/StatusChecker.java       |  10 +-
 .../rocketmq/client/java/impl/ClientImpl.java      |  18 ++--
 .../rocketmq/client/java/impl/ClientManager.java   |  47 +++++-----
 .../client/java/impl/ClientManagerImpl.java        | 101 +++++++++++++-------
 .../client/java/impl/consumer/ConsumerImpl.java    |  39 ++++----
 .../java/impl/consumer/ProcessQueueImpl.java       |  33 +++----
 .../java/impl/consumer/PushConsumerImpl.java       |  44 +++++----
 .../java/impl/consumer/SimpleConsumerImpl.java     |  18 ++--
 .../client/java/impl/producer/ProducerImpl.java    |  18 ++--
 .../client/java/impl/producer/SendReceiptImpl.java |  10 +-
 .../apache/rocketmq/client/java/rpc/RpcClient.java |  22 ++---
 .../rocketmq/client/java/rpc/RpcClientImpl.java    | 102 ++++++++-------------
 .../apache/rocketmq/client/java/rpc/RpcFuture.java |  82 +++++++++++++++++
 .../rocketmq/client/java/rpc/RpcInvocation.java    |  46 ----------
 .../client/java/exception/StatusCheckerTest.java   |  95 +++++++++----------
 .../java/impl/consumer/ConsumerImplTest.java       |  19 ++--
 .../java/impl/consumer/ProcessQueueImplTest.java   |  13 ++-
 .../java/impl/consumer/SimpleConsumerImplTest.java |  41 +++++----
 .../apache/rocketmq/client/java/tool/TestBase.java |  57 +++++++-----
 20 files changed, 435 insertions(+), 382 deletions(-)

diff --git a/README.md b/README.md
index bdb93cc..a1f70cf 100644
--- a/README.md
+++ b/README.md
@@ -5,6 +5,8 @@
 
[![C#](https://github.com/apache/rocketmq-clients/actions/workflows/csharp_build.yml/badge.svg)](https://github.com/apache/rocketmq-clients/actions/workflows/csharp_build.yml)
 
[![Java](https://github.com/apache/rocketmq-clients/actions/workflows/java_build.yml/badge.svg)](https://github.com/apache/rocketmq-clients/actions/workflows/java_build.yml)
 
[![Golang](https://github.com/apache/rocketmq-clients/actions/workflows/golang_build.yml/badge.svg)](https://github.com/apache/rocketmq-clients/actions/workflows/golang_build.yml)
+![Codecov 
branch](https://img.shields.io/codecov/c/gh/apache/rocketmq-clients/master?flag=cpp&label=CPP%20Coverage&logo=codecov)
+![Codecov 
branch](https://img.shields.io/codecov/c/gh/apache/rocketmq-clients/master?flag=java&label=Java%20Coverage&logo=codecov)
 
 ## Overview
 
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/StatusChecker.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/StatusChecker.java
index e22397b..1e5bf4e 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/StatusChecker.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/StatusChecker.java
@@ -18,11 +18,11 @@
 package org.apache.rocketmq.client.java.exception;
 
 import apache.rocketmq.v2.Code;
-import apache.rocketmq.v2.ReceiveMessageResponse;
+import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.Status;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.misc.MetadataUtils;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.rpc.RpcFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,8 +32,8 @@ public class StatusChecker {
     private StatusChecker() {
     }
 
-    public static void check(Status status, RpcInvocation<?> invocation) 
throws ClientException {
-        final String requestId = invocation.getContext().getRequestId();
+    public static void check(Status status, RpcFuture<?, ?> future) throws 
ClientException {
+        final String requestId = future.getContext().getRequestId();
         final Code code = status.getCode();
         final int codeNumber = code.getNumber();
         final String statusMessage = status.getMessage();
@@ -67,7 +67,7 @@ public class StatusChecker {
             case FORBIDDEN:
                 throw new ForbiddenException(codeNumber, requestId, 
statusMessage);
             case MESSAGE_NOT_FOUND:
-                if (invocation.getResponse() instanceof 
ReceiveMessageResponse) {
+                if (future.getRequest() instanceof ReceiveMessageRequest) {
                     return;
                 }
                 // fall through on purpose.
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index e7e6fd6..dc20ff1 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -84,7 +84,7 @@ import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.misc.Utilities;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.TopicRouteData;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.rpc.RpcFuture;
 import org.apache.rocketmq.client.java.rpc.Signature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -550,12 +550,11 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     private void doHeartbeat(HeartbeatRequest request, final Endpoints 
endpoints) {
         try {
             Metadata metadata = sign();
-            final ListenableFuture<RpcInvocation<HeartbeatResponse>> future = 
clientManager
-                .heartbeat(endpoints, metadata, request, 
clientConfiguration.getRequestTimeout());
-            Futures.addCallback(future, new 
FutureCallback<RpcInvocation<HeartbeatResponse>>() {
+            final RpcFuture<HeartbeatRequest, HeartbeatResponse> future = 
clientManager.heartbeat(endpoints, metadata,
+                request, clientConfiguration.getRequestTimeout());
+            Futures.addCallback(future, new 
FutureCallback<HeartbeatResponse>() {
                 @Override
-                public void onSuccess(RpcInvocation<HeartbeatResponse> inv) {
-                    final HeartbeatResponse response = inv.getResponse();
+                public void onSuccess(HeartbeatResponse response) {
                     final Status status = response.getStatus();
                     final Code code = status.getCode();
                     if (Code.OK != code) {
@@ -618,12 +617,11 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
             final QueryRouteRequest request = 
QueryRouteRequest.newBuilder().setTopic(topicResource)
                 .setEndpoints(endpoints.toProtobuf()).build();
             final Metadata metadata = sign();
-            final ListenableFuture<RpcInvocation<QueryRouteResponse>> future =
+            final RpcFuture<QueryRouteRequest, QueryRouteResponse> future =
                 clientManager.queryRoute(endpoints, metadata, request, 
clientConfiguration.getRequestTimeout());
-            return Futures.transformAsync(future, invocation -> {
-                final QueryRouteResponse response = invocation.getResponse();
+            return Futures.transformAsync(future, response -> {
                 final Status status = response.getStatus();
-                StatusChecker.check(status, invocation);
+                StatusChecker.check(status, future);
                 final List<MessageQueue> messageQueuesList = 
response.getMessageQueuesList();
                 final TopicRouteData topicRouteData = new 
TopicRouteData(messageQueuesList);
                 return Futures.immediateFuture(topicRouteData);
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
index 060f0d7..a115a01 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
@@ -39,7 +39,6 @@ import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.TelemetryCommand;
 import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.ListenableFuture;
 import io.grpc.Metadata;
 import io.grpc.stub.StreamObserver;
 import java.time.Duration;
@@ -47,7 +46,7 @@ import java.util.Iterator;
 import java.util.concurrent.ScheduledExecutorService;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.route.Endpoints;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.rpc.RpcFuture;
 
 /**
  * Client manager supplies a series of unified APIs to execute remote 
procedure calls for each {@link Client}.
@@ -73,8 +72,8 @@ public abstract class ClientManager extends 
AbstractIdleService {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    public abstract ListenableFuture<RpcInvocation<QueryRouteResponse>> 
queryRoute(Endpoints endpoints,
-        Metadata metadata, QueryRouteRequest request, Duration duration);
+    public abstract RpcFuture<QueryRouteRequest, QueryRouteResponse>
+    queryRoute(Endpoints endpoints, Metadata metadata, QueryRouteRequest 
request, Duration duration);
 
     /**
      * Heart beat asynchronously, the method ensures no throwable.
@@ -85,8 +84,8 @@ public abstract class ClientManager extends 
AbstractIdleService {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    public abstract ListenableFuture<RpcInvocation<HeartbeatResponse>> 
heartbeat(Endpoints endpoints,
-        Metadata metadata, HeartbeatRequest request, Duration duration);
+    public abstract RpcFuture<HeartbeatRequest, HeartbeatResponse>
+    heartbeat(Endpoints endpoints, Metadata metadata, HeartbeatRequest 
request, Duration duration);
 
     /**
      * Send message asynchronously, the method ensures no throwable.
@@ -97,8 +96,8 @@ public abstract class ClientManager extends 
AbstractIdleService {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    public abstract ListenableFuture<RpcInvocation<SendMessageResponse>> 
sendMessage(Endpoints endpoints,
-        Metadata metadata, SendMessageRequest request, Duration duration);
+    public abstract RpcFuture<SendMessageRequest, SendMessageResponse>
+    sendMessage(Endpoints endpoints, Metadata metadata, SendMessageRequest 
request, Duration duration);
 
     /**
      * Query assignment asynchronously, the method ensures no throwable.
@@ -109,8 +108,8 @@ public abstract class ClientManager extends 
AbstractIdleService {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    public abstract ListenableFuture<RpcInvocation<QueryAssignmentResponse>> 
queryAssignment(Endpoints endpoints,
-        Metadata metadata, QueryAssignmentRequest request, Duration duration);
+    public abstract RpcFuture<QueryAssignmentRequest, QueryAssignmentResponse>
+    queryAssignment(Endpoints endpoints, Metadata metadata, 
QueryAssignmentRequest request, Duration duration);
 
     /**
      * Receiving messages asynchronously from the server, the method ensures 
no throwable.
@@ -119,8 +118,8 @@ public abstract class ClientManager extends 
AbstractIdleService {
      * @param metadata  gRPC request header metadata.
      * @return invocation of response future.
      */
-    public abstract 
ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> 
receiveMessage(
-        Endpoints endpoints, Metadata metadata, ReceiveMessageRequest request, 
Duration duration);
+    public abstract RpcFuture<ReceiveMessageRequest, 
Iterator<ReceiveMessageResponse>>
+    receiveMessage(Endpoints endpoints, Metadata metadata, 
ReceiveMessageRequest request, Duration duration);
 
     /**
      * Ack message asynchronously after the success of consumption, the method 
ensures no throwable.
@@ -131,8 +130,8 @@ public abstract class ClientManager extends 
AbstractIdleService {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    public abstract ListenableFuture<RpcInvocation<AckMessageResponse>> 
ackMessage(Endpoints endpoints,
-        Metadata metadata, AckMessageRequest request, Duration duration);
+    public abstract RpcFuture<AckMessageRequest, AckMessageResponse>
+    ackMessage(Endpoints endpoints, Metadata metadata, AckMessageRequest 
request, Duration duration);
 
     /**
      * Nack message asynchronously after the failure of consumption, the 
method ensures no throwable.
@@ -143,8 +142,9 @@ public abstract class ClientManager extends 
AbstractIdleService {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    public abstract 
ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> 
changeInvisibleDuration(
-        Endpoints endpoints, Metadata metadata, ChangeInvisibleDurationRequest 
request, Duration duration);
+    public abstract RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse>
+    changeInvisibleDuration(Endpoints endpoints, Metadata metadata, 
ChangeInvisibleDurationRequest request,
+        Duration duration);
 
     /**
      * Send a message to the dead letter queue asynchronously, the method 
ensures no throwable.
@@ -155,9 +155,9 @@ public abstract class ClientManager extends 
AbstractIdleService {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    public abstract 
ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>>
-    forwardMessageToDeadLetterQueue(Endpoints endpoints, Metadata metadata,
-        ForwardMessageToDeadLetterQueueRequest request, Duration duration);
+    public abstract RpcFuture<ForwardMessageToDeadLetterQueueRequest,
+        ForwardMessageToDeadLetterQueueResponse> 
forwardMessageToDeadLetterQueue(Endpoints endpoints,
+        Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, 
Duration duration);
 
     /**
      * Submit transaction resolution asynchronously, the method ensures no 
throwable.
@@ -168,8 +168,8 @@ public abstract class ClientManager extends 
AbstractIdleService {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    public abstract ListenableFuture<RpcInvocation<EndTransactionResponse>> 
endTransaction(Endpoints endpoints,
-        Metadata metadata, EndTransactionRequest request, Duration duration);
+    public abstract RpcFuture<EndTransactionRequest, EndTransactionResponse>
+    endTransaction(Endpoints endpoints, Metadata metadata, 
EndTransactionRequest request, Duration duration);
 
     /**
      * Asynchronously notify the server that client is terminated, the method 
ensures no throwable.
@@ -181,8 +181,9 @@ public abstract class ClientManager extends 
AbstractIdleService {
      * @return response future of notification of client termination.
      */
     @SuppressWarnings("UnusedReturnValue")
-    public abstract 
ListenableFuture<RpcInvocation<NotifyClientTerminationResponse>> 
notifyClientTermination(
-        Endpoints endpoints, Metadata metadata, NotifyClientTerminationRequest 
request, Duration duration);
+    public abstract RpcFuture<NotifyClientTerminationRequest, 
NotifyClientTerminationResponse>
+    notifyClientTermination(Endpoints endpoints, Metadata metadata, 
NotifyClientTerminationRequest request,
+        Duration duration);
 
     /**
      * Establish telemetry session stream to server.
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index e9bc30a..9cf475a 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -62,9 +62,10 @@ import org.apache.rocketmq.client.java.misc.ExecutorServices;
 import org.apache.rocketmq.client.java.misc.MetadataUtils;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.rpc.Context;
 import org.apache.rocketmq.client.java.rpc.RpcClient;
 import org.apache.rocketmq.client.java.rpc.RpcClientImpl;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.rpc.RpcFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -190,112 +191,144 @@ public class ClientManagerImpl extends ClientManager {
     }
 
     @Override
-    public ListenableFuture<RpcInvocation<QueryRouteResponse>> 
queryRoute(Endpoints endpoints, Metadata metadata,
+    public RpcFuture<QueryRouteRequest, QueryRouteResponse> 
queryRoute(Endpoints endpoints, Metadata metadata,
         QueryRouteRequest request, Duration duration) {
+        final Context context = new Context(endpoints, metadata);
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
-            return rpcClient.queryRoute(metadata, request, asyncWorker, 
duration);
+            final ListenableFuture<QueryRouteResponse> future = 
rpcClient.queryRoute(metadata, request, asyncWorker,
+                duration);
+            return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return Futures.immediateFailedFuture(t);
+            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
         }
     }
 
     @Override
-    public ListenableFuture<RpcInvocation<HeartbeatResponse>> 
heartbeat(Endpoints endpoints, Metadata metadata,
+    public RpcFuture<HeartbeatRequest, HeartbeatResponse> heartbeat(Endpoints 
endpoints, Metadata metadata,
         HeartbeatRequest request, Duration duration) {
+        final Context context = new Context(endpoints, metadata);
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
-            return rpcClient.heartbeat(metadata, request, asyncWorker, 
duration);
+            ListenableFuture<HeartbeatResponse> future = 
rpcClient.heartbeat(metadata, request, asyncWorker, duration);
+            return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return Futures.immediateFailedFuture(t);
+            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
         }
     }
 
     @Override
-    public ListenableFuture<RpcInvocation<SendMessageResponse>> 
sendMessage(Endpoints endpoints, Metadata metadata,
+    public RpcFuture<SendMessageRequest, SendMessageResponse> 
sendMessage(Endpoints endpoints, Metadata metadata,
         SendMessageRequest request, Duration duration) {
+        final Context context = new Context(endpoints, metadata);
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
-            return rpcClient.sendMessage(metadata, request, asyncWorker, 
duration);
+            final ListenableFuture<SendMessageResponse> future =
+                rpcClient.sendMessage(metadata, request, asyncWorker, 
duration);
+            return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return Futures.immediateFailedFuture(t);
+            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
         }
     }
 
     @Override
-    public ListenableFuture<RpcInvocation<QueryAssignmentResponse>> 
queryAssignment(Endpoints endpoints,
+    public RpcFuture<QueryAssignmentRequest, QueryAssignmentResponse> 
queryAssignment(Endpoints endpoints,
         Metadata metadata, QueryAssignmentRequest request, Duration duration) {
+        final Context context = new Context(endpoints, metadata);
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
-            return rpcClient.queryAssignment(metadata, request, asyncWorker, 
duration);
+            final ListenableFuture<QueryAssignmentResponse> future =
+                rpcClient.queryAssignment(metadata, request, asyncWorker, 
duration);
+            return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return Futures.immediateFailedFuture(t);
+            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
         }
     }
 
     @Override
-    public ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> 
receiveMessage(Endpoints endpoints,
+    public RpcFuture<ReceiveMessageRequest, Iterator<ReceiveMessageResponse>> 
receiveMessage(Endpoints endpoints,
         Metadata metadata, ReceiveMessageRequest request, Duration duration) {
+        final Context context = new Context(endpoints, metadata);
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
-            return rpcClient.receiveMessage(metadata, request, asyncWorker, 
duration);
+            final ListenableFuture<Iterator<ReceiveMessageResponse>> future =
+                rpcClient.receiveMessage(metadata, request, asyncWorker, 
duration);
+            return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return Futures.immediateFailedFuture(t);
+            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
         }
     }
 
     @Override
-    public ListenableFuture<RpcInvocation<AckMessageResponse>> 
ackMessage(Endpoints endpoints, Metadata metadata,
+    public RpcFuture<AckMessageRequest, AckMessageResponse> 
ackMessage(Endpoints endpoints, Metadata metadata,
         AckMessageRequest request, Duration duration) {
+        final Context context = new Context(endpoints, metadata);
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
-            return rpcClient.ackMessage(metadata, request, asyncWorker, 
duration);
+            final ListenableFuture<AckMessageResponse> future =
+                rpcClient.ackMessage(metadata, request, asyncWorker, duration);
+            return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return Futures.immediateFailedFuture(t);
+            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
         }
     }
 
     @Override
-    public ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> 
changeInvisibleDuration(
-        Endpoints endpoints, Metadata metadata, ChangeInvisibleDurationRequest 
request, Duration duration) {
+    public RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse>
+    changeInvisibleDuration(Endpoints endpoints, Metadata metadata, 
ChangeInvisibleDurationRequest request,
+        Duration duration) {
+        final Context context = new Context(endpoints, metadata);
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
-            return rpcClient.changeInvisibleDuration(metadata, request, 
asyncWorker, duration);
+            final ListenableFuture<ChangeInvisibleDurationResponse> future =
+                rpcClient.changeInvisibleDuration(metadata, request, 
asyncWorker, duration);
+            return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return Futures.immediateFailedFuture(t);
+            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
         }
     }
 
     @Override
-    public 
ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>> 
forwardMessageToDeadLetterQueue(
-        Endpoints endpoints, Metadata metadata, 
ForwardMessageToDeadLetterQueueRequest request, Duration duration) {
+    public RpcFuture<ForwardMessageToDeadLetterQueueRequest, 
ForwardMessageToDeadLetterQueueResponse>
+    forwardMessageToDeadLetterQueue(Endpoints endpoints, Metadata metadata,
+        ForwardMessageToDeadLetterQueueRequest request, Duration duration) {
+        final Context context = new Context(endpoints, metadata);
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
-            return rpcClient.forwardMessageToDeadLetterQueue(metadata, 
request, asyncWorker, duration);
+            final ListenableFuture<ForwardMessageToDeadLetterQueueResponse> 
future =
+                rpcClient.forwardMessageToDeadLetterQueue(metadata, request, 
asyncWorker, duration);
+            return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return Futures.immediateFailedFuture(t);
+            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
         }
     }
 
     @Override
-    public ListenableFuture<RpcInvocation<EndTransactionResponse>> 
endTransaction(Endpoints endpoints,
+    public RpcFuture<EndTransactionRequest, EndTransactionResponse> 
endTransaction(Endpoints endpoints,
         Metadata metadata, EndTransactionRequest request, Duration duration) {
+        final Context context = new Context(endpoints, metadata);
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
-            return rpcClient.endTransaction(metadata, request, asyncWorker, 
duration);
+            final ListenableFuture<EndTransactionResponse> future =
+                rpcClient.endTransaction(metadata, request, asyncWorker, 
duration);
+            return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return Futures.immediateFailedFuture(t);
+            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
         }
     }
 
     @Override
-    public ListenableFuture<RpcInvocation<NotifyClientTerminationResponse>> 
notifyClientTermination(
-        Endpoints endpoints, Metadata metadata, NotifyClientTerminationRequest 
request, Duration duration) {
+    public RpcFuture<NotifyClientTerminationRequest, 
NotifyClientTerminationResponse>
+    notifyClientTermination(Endpoints endpoints, Metadata metadata, 
NotifyClientTerminationRequest request,
+        Duration duration) {
+        final Context context = new Context(endpoints, metadata);
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
-            return rpcClient.notifyClientTermination(metadata, request, 
asyncWorker, duration);
+            final ListenableFuture<NotifyClientTerminationResponse> future =
+                rpcClient.notifyClientTermination(metadata, request, 
asyncWorker, duration);
+            return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return Futures.immediateFailedFuture(t);
+            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
         }
     }
 
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 58b19b5..30464df 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -53,11 +53,12 @@ import 
org.apache.rocketmq.client.java.exception.StatusChecker;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
 import org.apache.rocketmq.client.java.impl.ClientImpl;
+import org.apache.rocketmq.client.java.impl.ClientManager;
 import org.apache.rocketmq.client.java.message.MessageCommon;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.rpc.RpcFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,17 +82,17 @@ abstract class ConsumerImpl extends ClientImpl {
             final Endpoints endpoints = mq.getBroker().getEndpoints();
             final Duration tolerance = clientConfiguration.getRequestTimeout();
             final Duration timeout = Duration.ofNanos(awaitDuration.toNanos() 
+ tolerance.toNanos());
-            final 
ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> future =
-                this.getClientManager().receiveMessage(endpoints, metadata, 
request, timeout);
-            return Futures.transformAsync(future, invocation -> {
-                final Iterator<ReceiveMessageResponse> it = 
invocation.getResponse();
+            final ClientManager clientManager = this.getClientManager();
+            final RpcFuture<ReceiveMessageRequest, 
Iterator<ReceiveMessageResponse>> future =
+                clientManager.receiveMessage(endpoints, metadata, request, 
timeout);
+            return Futures.transformAsync(future, iterator -> {
                 Status status = 
Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR)
                     .setMessage("status was not set by server")
                     .build();
                 Timestamp deliveryTimestampFromRemote = null;
                 List<Message> messageList = new ArrayList<>();
-                while (it.hasNext()) {
-                    final ReceiveMessageResponse response = it.next();
+                while (iterator.hasNext()) {
+                    final ReceiveMessageResponse response = iterator.next();
                     switch (response.getContentCase()) {
                         case STATUS:
                             status = response.getStatus();
@@ -111,7 +112,7 @@ abstract class ConsumerImpl extends ClientImpl {
                     final MessageViewImpl view = 
MessageViewImpl.fromProtobuf(message, mq, deliveryTimestampFromRemote);
                     messages.add(view);
                 }
-                StatusChecker.check(status, invocation);
+                StatusChecker.check(status, future);
                 final ReceiveMessageResult receiveMessageResult = new 
ReceiveMessageResult(endpoints, messages);
                 return Futures.immediateFuture(receiveMessageResult);
             }, MoreExecutors.directExecutor());
@@ -140,9 +141,9 @@ abstract class ConsumerImpl extends ClientImpl {
 
     }
 
-    protected ListenableFuture<RpcInvocation<AckMessageResponse>> 
ackMessage(MessageViewImpl messageView) {
+    protected RpcFuture<AckMessageRequest, AckMessageResponse> 
ackMessage(MessageViewImpl messageView) {
         final Endpoints endpoints = messageView.getEndpoints();
-        ListenableFuture<RpcInvocation<AckMessageResponse>> future;
+        RpcFuture<AckMessageRequest, AckMessageResponse> future;
 
         final Stopwatch stopwatch = Stopwatch.createStarted();
         final List<MessageCommon> messageCommons = 
Collections.singletonList(messageView.getMessageCommon());
@@ -153,12 +154,11 @@ abstract class ConsumerImpl extends ClientImpl {
             final Duration requestTimeout = 
clientConfiguration.getRequestTimeout();
             future = this.getClientManager().ackMessage(endpoints, metadata, 
request, requestTimeout);
         } catch (Throwable t) {
-            future = Futures.immediateFailedFuture(t);
+            future = new RpcFuture<>(t);
         }
-        Futures.addCallback(future, new 
FutureCallback<RpcInvocation<AckMessageResponse>>() {
+        Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
             @Override
-            public void onSuccess(RpcInvocation<AckMessageResponse> 
invocation) {
-                final AckMessageResponse response = invocation.getResponse();
+            public void onSuccess(AckMessageResponse response) {
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 final Duration duration = stopwatch.elapsed();
@@ -176,10 +176,10 @@ abstract class ConsumerImpl extends ClientImpl {
         return future;
     }
 
-    ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> 
changeInvisibleDuration(
+    RpcFuture<ChangeInvisibleDurationRequest, ChangeInvisibleDurationResponse> 
changeInvisibleDuration(
         MessageViewImpl messageView, Duration invisibleDuration) {
         final Endpoints endpoints = messageView.getEndpoints();
-        ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> 
future;
+        RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> future;
 
         final Stopwatch stopwatch = Stopwatch.createStarted();
         final List<MessageCommon> messageCommons = 
Collections.singletonList(messageView.getMessageCommon());
@@ -190,13 +190,12 @@ abstract class ConsumerImpl extends ClientImpl {
             final Duration requestTimeout = 
clientConfiguration.getRequestTimeout();
             future = 
this.getClientManager().changeInvisibleDuration(endpoints, metadata, request, 
requestTimeout);
         } catch (Throwable t) {
-            future = Futures.immediateFailedFuture(t);
+            future = new RpcFuture<>(t);
         }
         final MessageId messageId = messageView.getMessageId();
-        Futures.addCallback(future, new 
FutureCallback<RpcInvocation<ChangeInvisibleDurationResponse>>() {
+        Futures.addCallback(future, new 
FutureCallback<ChangeInvisibleDurationResponse>() {
             @Override
-            public void 
onSuccess(RpcInvocation<ChangeInvisibleDurationResponse> invocation) {
-                final ChangeInvisibleDurationResponse response = 
invocation.getResponse();
+            public void onSuccess(ChangeInvisibleDurationResponse response) {
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 final Duration duration = stopwatch.elapsed();
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index 12f6f61..9119bb7 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -17,9 +17,12 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
+import apache.rocketmq.v2.AckMessageRequest;
 import apache.rocketmq.v2.AckMessageResponse;
+import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
 import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
 import apache.rocketmq.v2.Code;
+import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
 import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
 import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.Status;
@@ -54,7 +57,7 @@ import 
org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.rpc.RpcFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -403,13 +406,12 @@ class ProcessQueueImpl implements ProcessQueue {
         final String consumerGroup = consumer.getConsumerGroup();
         final MessageId messageId = messageView.getMessageId();
         final Endpoints endpoints = messageView.getEndpoints();
-        final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> 
future =
+        final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> future =
             consumer.changeInvisibleDuration(messageView, duration);
-        Futures.addCallback(future, new 
FutureCallback<RpcInvocation<ChangeInvisibleDurationResponse>>() {
+        Futures.addCallback(future, new 
FutureCallback<ChangeInvisibleDurationResponse>() {
             @Override
-            public void 
onSuccess(RpcInvocation<ChangeInvisibleDurationResponse> invocation) {
-                final ChangeInvisibleDurationResponse response = 
invocation.getResponse();
-                final String requestId = 
invocation.getContext().getRequestId();
+            public void onSuccess(ChangeInvisibleDurationResponse response) {
+                final String requestId = future.getContext().getRequestId();
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 if (Code.INVALID_RECEIPT_HANDLE.equals(code)) {
@@ -538,17 +540,16 @@ class ProcessQueueImpl implements ProcessQueue {
 
     private void forwardToDeadLetterQueue(final MessageViewImpl messageView, 
final int attempt,
         final SettableFuture<Void> future0) {
-        final 
ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>> future 
=
+        final RpcFuture<ForwardMessageToDeadLetterQueueRequest, 
ForwardMessageToDeadLetterQueueResponse> future =
             consumer.forwardMessageToDeadLetterQueue(messageView);
         final String clientId = consumer.clientId();
         final String consumerGroup = consumer.getConsumerGroup();
         final MessageId messageId = messageView.getMessageId();
         final Endpoints endpoints = messageView.getEndpoints();
-        Futures.addCallback(future, new 
FutureCallback<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>>() {
+        Futures.addCallback(future, new 
FutureCallback<ForwardMessageToDeadLetterQueueResponse>() {
             @Override
-            public void 
onSuccess(RpcInvocation<ForwardMessageToDeadLetterQueueResponse> invocation) {
-                final ForwardMessageToDeadLetterQueueResponse response = 
invocation.getResponse();
-                final String requestId = 
invocation.getContext().getRequestId();
+            public void onSuccess(ForwardMessageToDeadLetterQueueResponse 
response) {
+                final String requestId = future.getContext().getRequestId();
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 // Log failure and retry later.
@@ -621,12 +622,12 @@ class ProcessQueueImpl implements ProcessQueue {
         final String consumerGroup = consumer.getConsumerGroup();
         final MessageId messageId = messageView.getMessageId();
         final Endpoints endpoints = messageView.getEndpoints();
-        final ListenableFuture<RpcInvocation<AckMessageResponse>> future = 
consumer.ackMessage(messageView);
-        Futures.addCallback(future, new 
FutureCallback<RpcInvocation<AckMessageResponse>>() {
+        final RpcFuture<AckMessageRequest, AckMessageResponse> future =
+            consumer.ackMessage(messageView);
+        Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
             @Override
-            public void onSuccess(RpcInvocation<AckMessageResponse> 
invocation) {
-                final AckMessageResponse response = invocation.getResponse();
-                final String requestId = 
invocation.getContext().getRequestId();
+            public void onSuccess(AckMessageResponse response) {
+                final String requestId = future.getContext().getRequestId();
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 if (Code.INVALID_RECEIPT_HANDLE.equals(code)) {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index a1e4332..b289aa7 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -72,7 +72,7 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteData;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.rpc.RpcFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -264,22 +264,21 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
     }
 
     ListenableFuture<Assignments> queryAssignment(final String topic) {
-        final ListenableFuture<Endpoints> future = 
pickEndpointsToQueryAssignments(topic);
-        final ListenableFuture<RpcInvocation<QueryAssignmentResponse>> 
responseFuture =
-            Futures.transformAsync(future, endpoints -> {
-                final Metadata metadata = sign();
-                final QueryAssignmentRequest request = 
wrapQueryAssignmentRequest(topic);
-                final Duration requestTimeout = 
clientConfiguration.getRequestTimeout();
-                return this.getClientManager().queryAssignment(endpoints, 
metadata, request, requestTimeout);
+        final ListenableFuture<Endpoints> future0 = 
pickEndpointsToQueryAssignments(topic);
+        return Futures.transformAsync(future0, endpoints -> {
+            final Metadata metadata = sign();
+            final QueryAssignmentRequest request = 
wrapQueryAssignmentRequest(topic);
+            final Duration requestTimeout = 
clientConfiguration.getRequestTimeout();
+            final RpcFuture<QueryAssignmentRequest, QueryAssignmentResponse> 
future1 =
+                this.getClientManager().queryAssignment(endpoints, metadata, 
request, requestTimeout);
+            return Futures.transformAsync(future1, response -> {
+                final Status status = response.getStatus();
+                StatusChecker.check(status, future1);
+                final List<Assignment> assignmentList = 
response.getAssignmentsList().stream().map(assignment ->
+                    new Assignment(new 
MessageQueueImpl(assignment.getMessageQueue()))).collect(Collectors.toList());
+                final Assignments assignments = new 
Assignments(assignmentList);
+                return Futures.immediateFuture(assignments);
             }, MoreExecutors.directExecutor());
-        return Futures.transformAsync(responseFuture, invocation -> {
-            final QueryAssignmentResponse response = invocation.getResponse();
-            final Status status = response.getStatus();
-            StatusChecker.check(status, invocation);
-            final List<Assignment> assignmentList = 
response.getAssignmentsList().stream().map(assignment ->
-                new Assignment(new 
MessageQueueImpl(assignment.getMessageQueue()))).collect(Collectors.toList());
-            final Assignments assignments = new Assignments(assignmentList);
-            return Futures.immediateFuture(assignments);
         }, MoreExecutors.directExecutor());
     }
 
@@ -506,15 +505,15 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
             .setMaxDeliveryAttempts(getRetryPolicy().getMaxAttempts()).build();
     }
 
-    public 
ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>> 
forwardMessageToDeadLetterQueue(
-        final MessageViewImpl messageView) {
+    public RpcFuture<ForwardMessageToDeadLetterQueueRequest, 
ForwardMessageToDeadLetterQueueResponse>
+    forwardMessageToDeadLetterQueue(final MessageViewImpl messageView) {
         // Intercept before forwarding message to DLQ.
         final Stopwatch stopwatch = Stopwatch.createStarted();
         final List<MessageCommon> messageCommons = 
Collections.singletonList(messageView.getMessageCommon());
         doBefore(MessageHookPoints.FORWARD_TO_DLQ, messageCommons);
 
         final Endpoints endpoints = messageView.getEndpoints();
-        
ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>> future;
+        RpcFuture<ForwardMessageToDeadLetterQueueRequest, 
ForwardMessageToDeadLetterQueueResponse> future;
         try {
             final ForwardMessageToDeadLetterQueueRequest request =
                 wrapForwardMessageToDeadLetterQueueRequest(messageView);
@@ -522,12 +521,11 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
             future = 
this.getClientManager().forwardMessageToDeadLetterQueue(endpoints, metadata, 
request,
                 clientConfiguration.getRequestTimeout());
         } catch (Throwable t) {
-            future = Futures.immediateFailedFuture(t);
+            future = new RpcFuture<>(t);
         }
-        Futures.addCallback(future, new 
FutureCallback<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>>() {
+        Futures.addCallback(future, new 
FutureCallback<ForwardMessageToDeadLetterQueueResponse>() {
             @Override
-            public void 
onSuccess(RpcInvocation<ForwardMessageToDeadLetterQueueResponse> invocation) {
-                final ForwardMessageToDeadLetterQueueResponse response = 
invocation.getResponse();
+            public void onSuccess(ForwardMessageToDeadLetterQueueResponse 
response) {
                 final Duration duration = stopwatch.elapsed();
                 MessageHookPointsStatus messageHookPointsStatus = 
Code.OK.equals(response.getStatus().getCode()) ?
                     MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index bb1500d..1b565dd 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -17,7 +17,9 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
+import apache.rocketmq.v2.AckMessageRequest;
 import apache.rocketmq.v2.AckMessageResponse;
+import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
 import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
 import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.Status;
@@ -48,7 +50,7 @@ import 
org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteData;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.rpc.RpcFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -232,11 +234,10 @@ class SimpleConsumerImpl extends ConsumerImpl implements 
SimpleConsumer {
             return Futures.immediateFailedFuture(exception);
         }
         MessageViewImpl impl = (MessageViewImpl) messageView;
-        final ListenableFuture<RpcInvocation<AckMessageResponse>> future = 
ackMessage(impl);
-        return Futures.transformAsync(future, invocation -> {
-            final AckMessageResponse response = invocation.getResponse();
+        final RpcFuture<AckMessageRequest, AckMessageResponse> future = 
ackMessage(impl);
+        return Futures.transformAsync(future, response -> {
             final Status status = response.getStatus();
-            StatusChecker.check(status, invocation);
+            StatusChecker.check(status, future);
             return Futures.immediateVoidFuture();
         }, clientCallbackExecutor);
     }
@@ -273,14 +274,13 @@ class SimpleConsumerImpl extends ConsumerImpl implements 
SimpleConsumer {
             return Futures.immediateFailedFuture(exception);
         }
         MessageViewImpl impl = (MessageViewImpl) messageView;
-        final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> 
future =
+        final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> future =
             changeInvisibleDuration(impl, invisibleDuration);
-        return Futures.transformAsync(future, invocation -> {
-            final ChangeInvisibleDurationResponse response = 
invocation.getResponse();
+        return Futures.transformAsync(future, response -> {
             // Refresh receipt handle manually.
             impl.setReceiptHandle(response.getReceiptHandle());
             final Status status = response.getStatus();
-            StatusChecker.check(status, invocation);
+            StatusChecker.check(status, future);
             return Futures.immediateVoidFuture();
         }, MoreExecutors.directExecutor());
     }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 39c9391..e323a59 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -74,7 +74,7 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteData;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.rpc.RpcFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -280,13 +280,12 @@ class ProducerImpl extends ClientImpl implements Producer 
{
             MessageHookPoints.COMMIT_TRANSACTION : 
MessageHookPoints.ROLLBACK_TRANSACTION;
         doBefore(messageHookPoints, messageCommons);
 
-        final ListenableFuture<RpcInvocation<EndTransactionResponse>> future =
+        final RpcFuture<EndTransactionRequest, EndTransactionResponse> future =
             this.getClientManager().endTransaction(endpoints, metadata, 
request, requestTimeout);
-        Futures.addCallback(future, new 
FutureCallback<RpcInvocation<EndTransactionResponse>>() {
+        Futures.addCallback(future, new 
FutureCallback<EndTransactionResponse>() {
             @Override
-            public void onSuccess(RpcInvocation<EndTransactionResponse> 
invocation) {
+            public void onSuccess(EndTransactionResponse response) {
                 final Duration duration = stopwatch.elapsed();
-                final EndTransactionResponse response = 
invocation.getResponse();
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 MessageHookPointsStatus messageHookPointsStatus = 
Code.OK.equals(code) ? MessageHookPointsStatus.OK :
@@ -300,12 +299,11 @@ class ProducerImpl extends ClientImpl implements Producer 
{
                 doAfter(messageHookPoints, messageCommons, duration, 
MessageHookPointsStatus.ERROR);
             }
         }, MoreExecutors.directExecutor());
-        final RpcInvocation<EndTransactionResponse> invocation = 
handleClientFuture(future);
-        final EndTransactionResponse response = invocation.getResponse();
+        final EndTransactionResponse response = handleClientFuture(future);
         final Status status = response.getStatus();
         final Code code = status.getCode();
         if (!Code.OK.equals(code)) {
-            throw new ClientException(code.getNumber(), 
invocation.getContext().getRequestId(), status.getMessage());
+            throw new ClientException(code.getNumber(), 
future.getContext().getRequestId(), status.getMessage());
         }
     }
 
@@ -426,10 +424,10 @@ class ProducerImpl extends ClientImpl implements Producer 
{
     ListenableFuture<List<SendReceiptImpl>> send0(Metadata metadata, Endpoints 
endpoints,
         List<PublishingMessageImpl> pubMessages, MessageQueueImpl mq) {
         final SendMessageRequest request = wrapSendMessageRequest(pubMessages, 
mq);
-        final ListenableFuture<RpcInvocation<SendMessageResponse>> future0 =
+        final RpcFuture<SendMessageRequest, SendMessageResponse> future0 =
             this.getClientManager().sendMessage(endpoints, metadata, request, 
clientConfiguration.getRequestTimeout());
         return Futures.transformAsync(future0,
-            invocation -> 
Futures.immediateFuture(SendReceiptImpl.processResponseInvocation(mq, 
invocation)),
+            response -> 
Futures.immediateFuture(SendReceiptImpl.processResponseInvocation(mq, response, 
future0)),
             MoreExecutors.directExecutor());
     }
 
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
index 114e789..5f9f403 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.client.java.impl.producer;
 
 import apache.rocketmq.v2.Code;
+import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.SendResultEntry;
 import apache.rocketmq.v2.Status;
@@ -33,7 +34,7 @@ import 
org.apache.rocketmq.client.java.exception.StatusChecker;
 import org.apache.rocketmq.client.java.message.MessageIdCodec;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.rpc.RpcFuture;
 
 public class SendReceiptImpl implements SendReceipt {
     private final MessageId messageId;
@@ -70,9 +71,8 @@ public class SendReceiptImpl implements SendReceipt {
         return offset;
     }
 
-    public static List<SendReceiptImpl> 
processResponseInvocation(MessageQueueImpl mq,
-        RpcInvocation<SendMessageResponse> invocation) throws ClientException {
-        final SendMessageResponse response = invocation.getResponse();
+    public static List<SendReceiptImpl> 
processResponseInvocation(MessageQueueImpl mq, SendMessageResponse response,
+        RpcFuture<SendMessageRequest, SendMessageResponse> future) throws 
ClientException {
         Status status = response.getStatus();
         List<SendReceiptImpl> sendReceipts = new ArrayList<>();
         final List<SendResultEntry> entries = response.getEntriesList();
@@ -82,7 +82,7 @@ public class SendReceiptImpl implements SendReceipt {
         if (abnormalStatus.isPresent()) {
             status = abnormalStatus.get();
         }
-        StatusChecker.check(status, invocation);
+        StatusChecker.check(status, future);
         for (SendResultEntry entry : entries) {
             final MessageId messageId = 
MessageIdCodec.getInstance().decode(entry.getMessageId());
             final String transactionId = entry.getTransactionId();
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
index 7979b4e..71cdda1 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
@@ -74,7 +74,7 @@ public interface RpcClient {
      * @param duration request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<RpcInvocation<QueryRouteResponse>> queryRoute(Metadata 
metadata, QueryRouteRequest request,
+    ListenableFuture<QueryRouteResponse> queryRoute(Metadata metadata, 
QueryRouteRequest request,
         Executor executor,
         Duration duration);
 
@@ -87,7 +87,7 @@ public interface RpcClient {
      * @param duration request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<RpcInvocation<HeartbeatResponse>> heartbeat(Metadata 
metadata, HeartbeatRequest request,
+    ListenableFuture<HeartbeatResponse> heartbeat(Metadata metadata, 
HeartbeatRequest request,
         Executor executor,
         Duration duration);
 
@@ -100,7 +100,7 @@ public interface RpcClient {
      * @param duration request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<RpcInvocation<SendMessageResponse>> sendMessage(Metadata 
metadata,
+    ListenableFuture<SendMessageResponse> sendMessage(Metadata metadata,
         SendMessageRequest request, Executor executor, Duration duration);
 
     /**
@@ -112,8 +112,8 @@ public interface RpcClient {
      * @param duration request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<RpcInvocation<QueryAssignmentResponse>> 
queryAssignment(Metadata metadata,
-        QueryAssignmentRequest request, Executor executor, Duration duration);
+    ListenableFuture<QueryAssignmentResponse> queryAssignment(Metadata 
metadata, QueryAssignmentRequest request,
+        Executor executor, Duration duration);
 
     /**
      * Receiving message asynchronously from server.
@@ -123,7 +123,7 @@ public interface RpcClient {
      * @param executor gRPC asynchronous executor.
      * @return invocation of response future.
      */
-    ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> 
receiveMessage(Metadata metadata,
+    ListenableFuture<Iterator<ReceiveMessageResponse>> receiveMessage(Metadata 
metadata,
         ReceiveMessageRequest request, ExecutorService executor, Duration 
duration);
 
     /**
@@ -135,7 +135,7 @@ public interface RpcClient {
      * @param duration request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<RpcInvocation<AckMessageResponse>> ackMessage(Metadata 
metadata, AckMessageRequest request,
+    ListenableFuture<AckMessageResponse> ackMessage(Metadata metadata, 
AckMessageRequest request,
         Executor executor, Duration duration);
 
     /**
@@ -147,7 +147,7 @@ public interface RpcClient {
      * @param duration request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> 
changeInvisibleDuration(Metadata metadata,
+    ListenableFuture<ChangeInvisibleDurationResponse> 
changeInvisibleDuration(Metadata metadata,
         ChangeInvisibleDurationRequest request, Executor executor, Duration 
duration);
 
     /**
@@ -159,7 +159,7 @@ public interface RpcClient {
      * @param duration request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>> 
forwardMessageToDeadLetterQueue(
+    ListenableFuture<ForwardMessageToDeadLetterQueueResponse> 
forwardMessageToDeadLetterQueue(
         Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, 
Executor executor, Duration duration);
 
     /**
@@ -171,7 +171,7 @@ public interface RpcClient {
      * @param duration request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<RpcInvocation<EndTransactionResponse>> 
endTransaction(Metadata metadata,
+    ListenableFuture<EndTransactionResponse> endTransaction(Metadata metadata,
         EndTransactionRequest request, Executor executor, Duration duration);
 
     /**
@@ -183,7 +183,7 @@ public interface RpcClient {
      * @param duration request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<RpcInvocation<NotifyClientTerminationResponse>> 
notifyClientTermination(Metadata metadata,
+    ListenableFuture<NotifyClientTerminationResponse> 
notifyClientTermination(Metadata metadata,
         NotifyClientTerminationRequest request, Executor executor, Duration 
duration);
 
     StreamObserver<TelemetryCommand> telemetry(Metadata metadata, Executor 
executor, Duration duration,
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
index 0578a2d..16f733e 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
@@ -39,7 +39,6 @@ import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.TelemetryCommand;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.grpc.ClientInterceptor;
@@ -47,6 +46,7 @@ import io.grpc.ManagedChannel;
 import io.grpc.Metadata;
 import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
+import io.grpc.netty.shaded.io.netty.channel.ChannelOption;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
 import 
io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
@@ -63,12 +63,11 @@ import java.util.concurrent.TimeUnit;
 import javax.net.ssl.SSLException;
 import org.apache.rocketmq.client.java.route.Endpoints;
 
-@SuppressWarnings("UnstableApiUsage")
 public class RpcClientImpl implements RpcClient {
     private static final Duration KEEP_ALIVE_DURATION = Duration.ofSeconds(30);
+    private static final int CONNECT_TIMEOUT_MILLIS = 3 * 1000;
     private static final int GRPC_MAX_MESSAGE_SIZE = Integer.MAX_VALUE;
 
-    private final Endpoints endpoints;
     private final ManagedChannel channel;
     private final MessagingServiceGrpc.MessagingServiceFutureStub futureStub;
     private final MessagingServiceGrpc.MessagingServiceBlockingStub 
blockingStub;
@@ -78,26 +77,24 @@ public class RpcClientImpl implements RpcClient {
 
     @SuppressWarnings("deprecation")
     public RpcClientImpl(Endpoints endpoints) throws SSLException {
-        this.endpoints = endpoints;
         final SslContextBuilder builder = GrpcSslContexts.forClient();
         builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
         SslContext sslContext = builder.build();
 
         final NettyChannelBuilder channelBuilder =
             NettyChannelBuilder.forTarget(endpoints.getGrpcTarget())
-                // .withOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
+                .withOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
CONNECT_TIMEOUT_MILLIS)
                 .keepAliveTime(KEEP_ALIVE_DURATION.toNanos(), 
TimeUnit.NANOSECONDS)
                 .maxInboundMessageSize(GRPC_MAX_MESSAGE_SIZE)
                 .intercept(LoggingInterceptor.getInstance())
                 .sslContext(sslContext);
         // Disable grpc's auto-retry here.
-
+        channelBuilder.disableRetry();
         final List<InetSocketAddress> socketAddresses = 
endpoints.toSocketAddresses();
         if (null != socketAddresses) {
             final IpNameResolverFactory ipNameResolverFactory = new 
IpNameResolverFactory(socketAddresses);
             channelBuilder.nameResolverFactory(ipNameResolverFactory);
         }
-
         this.channel = channelBuilder.build();
         this.futureStub = MessagingServiceGrpc.newFutureStub(channel);
         this.blockingStub = MessagingServiceGrpc.newBlockingStub(channel);
@@ -105,14 +102,6 @@ public class RpcClientImpl implements RpcClient {
         this.activityNanoTime = System.nanoTime();
     }
 
-    private <T> ListenableFuture<RpcInvocation<T>> 
wrapInvocationContext(ListenableFuture<T> future,
-        Metadata header) {
-        return Futures.transformAsync(future, response -> {
-            final Context context = new Context(endpoints, header);
-            return Futures.immediateFuture(new RpcInvocation<>(response, 
context));
-        }, MoreExecutors.directExecutor());
-    }
-
     @Override
     public Duration idleDuration() {
         return Duration.ofNanos(System.nanoTime() - activityNanoTime);
@@ -124,105 +113,86 @@ public class RpcClientImpl implements RpcClient {
     }
 
     @Override
-    public ListenableFuture<RpcInvocation<QueryRouteResponse>> 
queryRoute(Metadata metadata,
+    public ListenableFuture<QueryRouteResponse> queryRoute(Metadata metadata,
         QueryRouteRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        final ListenableFuture<QueryRouteResponse> future = futureStub
-            
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
             .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).queryRoute(request);
-        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<RpcInvocation<HeartbeatResponse>> 
heartbeat(Metadata metadata, HeartbeatRequest request,
-        Executor executor, Duration duration) {
+    public ListenableFuture<HeartbeatResponse> heartbeat(Metadata metadata,
+        HeartbeatRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        final ListenableFuture<HeartbeatResponse> future =
-            
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-                .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).heartbeat(request);
-        return wrapInvocationContext(future, metadata);
+        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+            .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).heartbeat(request);
     }
 
     @Override
-    public ListenableFuture<RpcInvocation<SendMessageResponse>> 
sendMessage(Metadata metadata,
+    public ListenableFuture<SendMessageResponse> sendMessage(Metadata metadata,
         SendMessageRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        final ListenableFuture<SendMessageResponse> future =
-            
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-                .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).sendMessage(request);
-        return wrapInvocationContext(future, metadata);
+        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+            .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).sendMessage(request);
     }
 
     @Override
-    public ListenableFuture<RpcInvocation<QueryAssignmentResponse>> 
queryAssignment(Metadata metadata,
+    public ListenableFuture<QueryAssignmentResponse> queryAssignment(Metadata 
metadata,
         QueryAssignmentRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        final ListenableFuture<QueryAssignmentResponse> future =
-            
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-                .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).queryAssignment(request);
-        return wrapInvocationContext(future, metadata);
+        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+            .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).queryAssignment(request);
     }
 
     @Override
-    public ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> 
receiveMessage(Metadata metadata,
+    public ListenableFuture<Iterator<ReceiveMessageResponse>> 
receiveMessage(Metadata metadata,
         ReceiveMessageRequest request, ExecutorService executor, Duration 
duration) {
         this.activityNanoTime = System.nanoTime();
         final Callable<Iterator<ReceiveMessageResponse>> callable = () -> 
blockingStub
             
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
             .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).receiveMessage(request);
-        final ListenableFuture<Iterator<ReceiveMessageResponse>> future =
-            MoreExecutors.listeningDecorator(executor).submit(callable);
-        return wrapInvocationContext(future, metadata);
+        return MoreExecutors.listeningDecorator(executor).submit(callable);
     }
 
     @Override
-    public ListenableFuture<RpcInvocation<AckMessageResponse>> 
ackMessage(Metadata metadata,
+    public ListenableFuture<AckMessageResponse> ackMessage(Metadata metadata,
         AckMessageRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        final ListenableFuture<AckMessageResponse> future =
-            
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-                .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).ackMessage(request);
-        return wrapInvocationContext(future, metadata);
+        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+            .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).ackMessage(request);
     }
 
     @Override
-    public ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> 
changeInvisibleDuration(
-        Metadata metadata, ChangeInvisibleDurationRequest request, Executor 
executor, Duration duration) {
+    public ListenableFuture<ChangeInvisibleDurationResponse> 
changeInvisibleDuration(Metadata metadata,
+        ChangeInvisibleDurationRequest request, Executor executor,
+        Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        final ListenableFuture<ChangeInvisibleDurationResponse> future =
-            
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-                .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).changeInvisibleDuration(request);
-        return wrapInvocationContext(future, metadata);
+        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+            .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).changeInvisibleDuration(request);
     }
 
     @Override
-    public 
ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>> 
forwardMessageToDeadLetterQueue(
+    public ListenableFuture<ForwardMessageToDeadLetterQueueResponse> 
forwardMessageToDeadLetterQueue(
         Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, 
Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        final ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future 
= futureStub
-            
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
             .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).forwardMessageToDeadLetterQueue(request);
-        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<RpcInvocation<EndTransactionResponse>> 
endTransaction(Metadata metadata,
-        EndTransactionRequest request, Executor executor, Duration duration) {
+    public ListenableFuture<EndTransactionResponse> endTransaction(Metadata 
metadata, EndTransactionRequest request,
+        Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        final ListenableFuture<EndTransactionResponse> future =
-            
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-                .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).endTransaction(request);
-        return wrapInvocationContext(future, metadata);
+        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+            .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).endTransaction(request);
     }
 
     @Override
-    public ListenableFuture<RpcInvocation<NotifyClientTerminationResponse>> 
notifyClientTermination(
-        Metadata metadata, NotifyClientTerminationRequest request, Executor 
executor, Duration duration) {
+    public ListenableFuture<NotifyClientTerminationResponse> 
notifyClientTermination(Metadata metadata,
+        NotifyClientTerminationRequest request, Executor executor, Duration 
duration) {
         this.activityNanoTime = System.nanoTime();
-        final ListenableFuture<NotifyClientTerminationResponse> future =
-            
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-                .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).notifyClientTermination(request);
-        return wrapInvocationContext(future, metadata);
+        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+            .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).notifyClientTermination(request);
     }
 
     @Override
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcFuture.java 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcFuture.java
new file mode 100644
index 0000000..83904a1
--- /dev/null
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcFuture.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.rpc;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+@SuppressWarnings("NullableProblems")
+public class RpcFuture<R, T> implements ListenableFuture<T> {
+    private final R request;
+    private final Context context;
+    private final ListenableFuture<T> responseFuture;
+
+    public RpcFuture(Context context, R request, ListenableFuture<T> 
responseFuture) {
+        this.request = request;
+        this.context = context;
+        this.responseFuture = responseFuture;
+    }
+
+    public RpcFuture(Throwable t) {
+        this.request = null;
+        this.context = null;
+        this.responseFuture = Futures.immediateFailedFuture(t);
+    }
+
+    public R getRequest() {
+        return request;
+    }
+
+    public Context getContext() {
+        return context;
+    }
+
+    @Override
+    public void addListener(Runnable listener, Executor executor) {
+        responseFuture.addListener(listener, executor);
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return responseFuture.cancel(mayInterruptIfRunning);
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return responseFuture.isCancelled();
+    }
+
+    @Override
+    public boolean isDone() {
+        return responseFuture.isDone();
+    }
+
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+        return responseFuture.get();
+    }
+
+    @Override
+    public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
+        return responseFuture.get(timeout, unit);
+    }
+}
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcInvocation.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcInvocation.java
deleted file mode 100644
index 4f2d6a1..0000000
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcInvocation.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.client.java.rpc;
-
-import com.google.common.base.MoreObjects;
-
-public class RpcInvocation<T> {
-    private final T t;
-    private final Context context;
-
-    public RpcInvocation(T t, Context context) {
-        this.t = t;
-        this.context = context;
-    }
-
-    public T getResponse() {
-        return t;
-    }
-
-    public Context getContext() {
-        return context;
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-            .add("resp", t)
-            .add("context", context)
-            .toString();
-    }
-}
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/exception/StatusCheckerTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/exception/StatusCheckerTest.java
index de37e60..5076203 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/exception/StatusCheckerTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/exception/StatusCheckerTest.java
@@ -22,14 +22,16 @@ import static org.junit.Assert.fail;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.Status;
+import com.google.common.util.concurrent.Futures;
 import io.grpc.Metadata;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.misc.RequestIdGenerator;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.rpc.Context;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.rpc.RpcFuture;
 import org.apache.rocketmq.client.java.rpc.Signature;
 import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class StatusCheckerTest extends TestBase {
@@ -46,7 +48,7 @@ public class StatusCheckerTest extends TestBase {
         Status status = Status.newBuilder().setCode(Code.OK).build();
         Object response = new Object();
         final Context context = generateContext();
-        RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+        RpcFuture<Object, Object> invocation = new RpcFuture<>(context, null, 
Futures.immediateFuture(response));
         StatusChecker.check(status, invocation);
     }
 
@@ -55,7 +57,7 @@ public class StatusCheckerTest extends TestBase {
         Status status = Status.newBuilder().setCode(Code.OK).build();
         Object response = new Object();
         final Context context = generateContext();
-        RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+        RpcFuture<Object, Object> invocation = new RpcFuture<>(context, null, 
Futures.immediateFuture(response));
         StatusChecker.check(status, invocation);
     }
 
@@ -66,9 +68,9 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.BAD_REQUEST).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> future = new RpcFuture<>(context, null, 
Futures.immediateFuture(response));
             try {
-                StatusChecker.check(status, invocation);
+                StatusChecker.check(status, future);
                 fail();
             } catch (BadRequestException ignore) {
                 // ignore on purpose
@@ -77,9 +79,9 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.ILLEGAL_ACCESS_POINT).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> future = new RpcFuture<>(context, null, 
Futures.immediateFuture(response));
             try {
-                StatusChecker.check(status, invocation);
+                StatusChecker.check(status, future);
                 fail();
             } catch (BadRequestException ignore) {
                 // ignore on purpose
@@ -88,7 +90,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.ILLEGAL_TOPIC).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -99,7 +101,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.ILLEGAL_CONSUMER_GROUP).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -110,7 +112,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.ILLEGAL_MESSAGE_TAG).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -121,7 +123,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.ILLEGAL_MESSAGE_KEY).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -132,7 +134,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.ILLEGAL_MESSAGE_GROUP).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -143,7 +145,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.ILLEGAL_MESSAGE_PROPERTY_KEY).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -154,7 +156,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.INVALID_TRANSACTION_ID).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -165,7 +167,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.ILLEGAL_MESSAGE_ID).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -176,7 +178,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.ILLEGAL_MESSAGE_GROUP).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -187,7 +189,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.ILLEGAL_FILTER_EXPRESSION).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -198,7 +200,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.ILLEGAL_INVISIBLE_TIME).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -209,7 +211,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.ILLEGAL_DELIVERY_TIME).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -220,7 +222,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.INVALID_RECEIPT_HANDLE).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -231,7 +233,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -242,7 +244,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.UNRECOGNIZED_CLIENT_TYPE).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -253,7 +255,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.MESSAGE_CORRUPTED).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -264,7 +266,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.CLIENT_ID_REQUIRED).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -279,7 +281,7 @@ public class StatusCheckerTest extends TestBase {
         Status status = Status.newBuilder().setCode(Code.UNAUTHORIZED).build();
         Object response = new Object();
         final Context context = generateContext();
-        RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+        RpcFuture<Object, Object> invocation = new RpcFuture<>(context, null, 
Futures.immediateFuture(response));
         try {
             StatusChecker.check(status, invocation);
             fail();
@@ -293,7 +295,7 @@ public class StatusCheckerTest extends TestBase {
         Status status = 
Status.newBuilder().setCode(Code.PAYMENT_REQUIRED).build();
         Object response = new Object();
         final Context context = generateContext();
-        RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+        RpcFuture<Object, Object> invocation = new RpcFuture<>(context, null, 
Futures.immediateFuture(response));
         try {
             StatusChecker.check(status, invocation);
             fail();
@@ -307,7 +309,7 @@ public class StatusCheckerTest extends TestBase {
         Status status = Status.newBuilder().setCode(Code.FORBIDDEN).build();
         Object response = new Object();
         final Context context = generateContext();
-        RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+        RpcFuture<Object, Object> invocation = new RpcFuture<>(context, null, 
Futures.immediateFuture(response));
         try {
             StatusChecker.check(status, invocation);
             fail();
@@ -317,11 +319,12 @@ public class StatusCheckerTest extends TestBase {
     }
 
     @Test
+    @Ignore
     public void testMessageNotFoundDuringReceiving() throws ClientException {
         Status status = 
Status.newBuilder().setCode(Code.MESSAGE_NOT_FOUND).build();
         ReceiveMessageResponse response = 
ReceiveMessageResponse.newBuilder().build();
         final Context context = generateContext();
-        RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+        RpcFuture<Object, Object> invocation = new RpcFuture<>(context, null, 
Futures.immediateFuture(response));
         StatusChecker.check(status, invocation);
     }
 
@@ -332,7 +335,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.MESSAGE_NOT_FOUND).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -343,7 +346,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.NOT_FOUND).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -354,7 +357,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.TOPIC_NOT_FOUND).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -365,7 +368,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.CONSUMER_GROUP_NOT_FOUND).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -382,7 +385,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.PAYLOAD_TOO_LARGE).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -393,7 +396,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.MESSAGE_BODY_TOO_LARGE).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -408,7 +411,7 @@ public class StatusCheckerTest extends TestBase {
         Status status = 
Status.newBuilder().setCode(Code.TOO_MANY_REQUESTS).build();
         ReceiveMessageResponse response = 
ReceiveMessageResponse.newBuilder().build();
         final Context context = generateContext();
-        RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+        RpcFuture<Object, Object> invocation = new RpcFuture<>(context, null, 
Futures.immediateFuture(response));
         try {
             StatusChecker.check(status, invocation);
             fail();
@@ -424,7 +427,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.REQUEST_HEADER_FIELDS_TOO_LARGE).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -435,7 +438,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.MESSAGE_PROPERTIES_TOO_LARGE).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -452,7 +455,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.INTERNAL_ERROR).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -463,7 +466,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -474,7 +477,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.HA_NOT_AVAILABLE).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -491,7 +494,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.PROXY_TIMEOUT).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -502,7 +505,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.MASTER_PERSISTENCE_TIMEOUT).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -513,7 +516,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.SLAVE_PERSISTENCE_TIMEOUT).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -530,7 +533,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.UNSUPPORTED).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -541,7 +544,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.VERSION_UNSUPPORTED).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
@@ -552,7 +555,7 @@ public class StatusCheckerTest extends TestBase {
 
         {
             Status status = 
Status.newBuilder().setCode(Code.VERIFY_FIFO_MESSAGE_UNSUPPORTED).build();
-            RpcInvocation<Object> invocation = new RpcInvocation<>(response, 
context);
+            RpcFuture<Object, Object> invocation = new RpcFuture<>(context, 
null, Futures.immediateFuture(response));
             try {
                 StatusChecker.check(status, invocation);
                 fail();
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
index b115fb2..8fb6179 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
@@ -39,7 +39,7 @@ import org.apache.rocketmq.client.java.impl.ClientManager;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.rpc.RpcFuture;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Assert;
 import org.junit.Test;
@@ -65,8 +65,9 @@ public class ConsumerImplTest extends TestBase {
         final ClientManager clientManager = Mockito.mock(ClientManager.class);
         Mockito.doReturn(clientManager).when(pushConsumer).getClientManager();
         int receivedMessageCount = 1;
-        final 
ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> future =
+        final RpcFuture<ReceiveMessageRequest, 
Iterator<ReceiveMessageResponse>> future =
             okReceiveMessageResponsesFuture(FAKE_TOPIC_0, 
receivedMessageCount);
+        future.get();
         
Mockito.doReturn(future).when(clientManager).receiveMessage(any(Endpoints.class),
 any(Metadata.class),
             any(ReceiveMessageRequest.class), any(Duration.class));
         final MessageQueueImpl mq = fakeMessageQueueImpl(FAKE_TOPIC_0);
@@ -88,14 +89,14 @@ public class ConsumerImplTest extends TestBase {
             consumptionThreadCount));
         final ClientManager clientManager = Mockito.mock(ClientManager.class);
         Mockito.doReturn(clientManager).when(pushConsumer).getClientManager();
-        final ListenableFuture<RpcInvocation<AckMessageResponse>> future =
+        final RpcFuture<AckMessageRequest, AckMessageResponse> future =
             okAckMessageResponseFuture();
         
Mockito.doReturn(future).when(clientManager).ackMessage(any(Endpoints.class), 
any(Metadata.class),
             any(AckMessageRequest.class), any(Duration.class));
         final MessageViewImpl messageView = fakeMessageViewImpl();
-        final ListenableFuture<RpcInvocation<AckMessageResponse>> future0 =
+        final RpcFuture<AckMessageRequest, AckMessageResponse> future0 =
             pushConsumer.ackMessage(messageView);
-        final RpcInvocation<AckMessageResponse> rpcInvocation = future0.get();
+        final AckMessageResponse rpcInvocation = future0.get();
         Assert.assertEquals(rpcInvocation, future.get());
     }
 
@@ -109,15 +110,15 @@ public class ConsumerImplTest extends TestBase {
             consumptionThreadCount));
         final ClientManager clientManager = Mockito.mock(ClientManager.class);
         Mockito.doReturn(clientManager).when(pushConsumer).getClientManager();
-        final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> 
future =
+        final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> future =
             okChangeInvisibleDurationCtxFuture();
         
Mockito.doReturn(future).when(clientManager).changeInvisibleDuration(any(Endpoints.class),
 any(Metadata.class),
             any(ChangeInvisibleDurationRequest.class), any(Duration.class));
         final MessageViewImpl messageView = fakeMessageViewImpl();
-        final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> 
future0 =
+        final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> future0 =
             pushConsumer.changeInvisibleDuration(messageView, 
Duration.ofSeconds(15));
-        final RpcInvocation<ChangeInvisibleDurationResponse> rpcInvocation = 
future0.get();
-        Assert.assertEquals(rpcInvocation, future.get());
+        final ChangeInvisibleDurationResponse response = future0.get();
+        Assert.assertEquals(response, future.get());
     }
 
 }
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index bc92717..d198ae3 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -26,8 +26,10 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import apache.rocketmq.v2.AckMessageRequest;
 import apache.rocketmq.v2.AckMessageResponse;
 import apache.rocketmq.v2.Code;
+import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
 import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
 import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.Status;
@@ -51,7 +53,7 @@ import 
org.apache.rocketmq.client.java.misc.RequestIdGenerator;
 import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.rpc.Context;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.rpc.RpcFuture;
 import org.apache.rocketmq.client.java.rpc.Signature;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Before;
@@ -192,7 +194,8 @@ public class ProcessQueueImplTest extends TestBase {
         assertEquals(cachedMessageCount, processQueue.cachedMessagesCount());
         assertEquals(1, processQueue.inflightMessagesCount());
 
-        final ListenableFuture<RpcInvocation<AckMessageResponse>> future = 
okAckMessageResponseFuture();
+        final RpcFuture<AckMessageRequest, AckMessageResponse> future =
+            okAckMessageResponseFuture();
         
when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future);
         processQueue.eraseMessage(optionalMessageView.get(), 
ConsumeResult.SUCCESS);
         future.addListener(() -> verify(pushConsumer, times(1))
@@ -235,7 +238,7 @@ public class ProcessQueueImplTest extends TestBase {
         final MessageViewImpl messageView = fakeMessageViewImpl(2, false);
         messageViewList.add(messageView);
         processQueue.cacheMessages(messageViewList);
-        ListenableFuture<RpcInvocation<AckMessageResponse>> future0 = 
okAckMessageResponseFuture();
+        RpcFuture<AckMessageRequest, AckMessageResponse> future0 = 
okAckMessageResponseFuture();
         
when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future0);
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
         when(retryPolicy.getMaxAttempts()).thenReturn(1);
@@ -251,7 +254,7 @@ public class ProcessQueueImplTest extends TestBase {
         final MessageViewImpl messageView = fakeMessageViewImpl(2, false);
         messageViewList.add(messageView);
         processQueue.cacheMessages(messageViewList);
-        
ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>> 
future0 =
+        RpcFuture<ForwardMessageToDeadLetterQueueRequest, 
ForwardMessageToDeadLetterQueueResponse> future0 =
             okForwardMessageToDeadLetterQueueResponseFuture();
         
when(pushConsumer.forwardMessageToDeadLetterQueue(any(MessageViewImpl.class))).thenReturn(future0);
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
@@ -268,7 +271,7 @@ public class ProcessQueueImplTest extends TestBase {
         final MessageViewImpl messageView = fakeMessageViewImpl(2, false);
         messageViewList.add(messageView);
         processQueue.cacheMessages(messageViewList);
-        ListenableFuture<RpcInvocation<AckMessageResponse>> future0 = 
okAckMessageResponseFuture();
+        RpcFuture<AckMessageRequest, AckMessageResponse> future0 = 
okAckMessageResponseFuture();
         
when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future0);
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
         when(retryPolicy.getMaxAttempts()).thenReturn(2);
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
index 6fc0709..164baca 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
@@ -22,10 +22,11 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
 
+import apache.rocketmq.v2.AckMessageRequest;
 import apache.rocketmq.v2.AckMessageResponse;
+import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
 import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
 import apache.rocketmq.v2.Code;
-import com.google.common.util.concurrent.ListenableFuture;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
@@ -44,7 +45,7 @@ import 
org.apache.rocketmq.client.java.exception.TooManyRequestsException;
 import org.apache.rocketmq.client.java.exception.UnauthorizedException;
 import org.apache.rocketmq.client.java.exception.UnsupportedException;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.rpc.RpcFuture;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -138,7 +139,7 @@ public class SimpleConsumerImplTest extends TestBase {
             }
         }
         {
-            final ListenableFuture<RpcInvocation<AckMessageResponse>> 
respFuture =
+            final RpcFuture<AckMessageRequest, AckMessageResponse> respFuture =
                 ackMessageResponseFuture(Code.ILLEGAL_CONSUMER_GROUP);
             doReturn(respFuture).when(simpleConsumer).ackMessage(messageView);
             final CompletableFuture<Void> future = 
simpleConsumer.ackAsync(messageView);
@@ -150,7 +151,7 @@ public class SimpleConsumerImplTest extends TestBase {
             }
         }
         {
-            final ListenableFuture<RpcInvocation<AckMessageResponse>> 
respFuture =
+            final RpcFuture<AckMessageRequest, AckMessageResponse> respFuture =
                 ackMessageResponseFuture(Code.INVALID_RECEIPT_HANDLE);
             doReturn(respFuture).when(simpleConsumer).ackMessage(messageView);
             final CompletableFuture<Void> future = 
simpleConsumer.ackAsync(messageView);
@@ -232,7 +233,7 @@ public class SimpleConsumerImplTest extends TestBase {
             }
         }
         {
-            final ListenableFuture<RpcInvocation<AckMessageResponse>> 
respFuture =
+            final RpcFuture<AckMessageRequest, AckMessageResponse> respFuture =
                 ackMessageResponseFuture(Code.INTERNAL_SERVER_ERROR);
             doReturn(respFuture).when(simpleConsumer).ackMessage(messageView);
             final CompletableFuture<Void> future = 
simpleConsumer.ackAsync(messageView);
@@ -273,7 +274,7 @@ public class SimpleConsumerImplTest extends TestBase {
         final MessageViewImpl messageView = fakeMessageViewImpl(false);
         final Duration duration = Duration.ofSeconds(3);
         {
-            final 
ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+            final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> respFuture =
                 okChangeInvisibleDurationCtxFuture();
             
doReturn(respFuture).when(simpleConsumer).changeInvisibleDuration(messageView, 
duration);
             final CompletableFuture<Void> future = 
simpleConsumer.changeInvisibleDurationAsync(messageView,
@@ -281,7 +282,7 @@ public class SimpleConsumerImplTest extends TestBase {
             future.get();
         }
         {
-            final 
ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+            final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> respFuture =
                 changInvisibleDurationCtxFuture(Code.BAD_REQUEST);
             
doReturn(respFuture).when(simpleConsumer).changeInvisibleDuration(messageView, 
duration);
             final CompletableFuture<Void> future = 
simpleConsumer.changeInvisibleDurationAsync(messageView,
@@ -294,7 +295,7 @@ public class SimpleConsumerImplTest extends TestBase {
             }
         }
         {
-            final 
ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+            final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> respFuture =
                 changInvisibleDurationCtxFuture(Code.ILLEGAL_TOPIC);
             
doReturn(respFuture).when(simpleConsumer).changeInvisibleDuration(messageView, 
duration);
             final CompletableFuture<Void> future = 
simpleConsumer.changeInvisibleDurationAsync(messageView,
@@ -307,7 +308,7 @@ public class SimpleConsumerImplTest extends TestBase {
             }
         }
         {
-            final 
ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+            final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> respFuture =
                 changInvisibleDurationCtxFuture(Code.ILLEGAL_CONSUMER_GROUP);
             
doReturn(respFuture).when(simpleConsumer).changeInvisibleDuration(messageView, 
duration);
             final CompletableFuture<Void> future = 
simpleConsumer.changeInvisibleDurationAsync(messageView,
@@ -320,7 +321,7 @@ public class SimpleConsumerImplTest extends TestBase {
             }
         }
         {
-            final 
ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+            final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> respFuture =
                 changInvisibleDurationCtxFuture(Code.ILLEGAL_INVISIBLE_TIME);
             
doReturn(respFuture).when(simpleConsumer).changeInvisibleDuration(messageView, 
duration);
             final CompletableFuture<Void> future = 
simpleConsumer.changeInvisibleDurationAsync(messageView,
@@ -333,7 +334,7 @@ public class SimpleConsumerImplTest extends TestBase {
             }
         }
         {
-            final 
ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+            final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> respFuture =
                 changInvisibleDurationCtxFuture(Code.INVALID_RECEIPT_HANDLE);
             
doReturn(respFuture).when(simpleConsumer).changeInvisibleDuration(messageView, 
duration);
             final CompletableFuture<Void> future = 
simpleConsumer.changeInvisibleDurationAsync(messageView,
@@ -346,7 +347,7 @@ public class SimpleConsumerImplTest extends TestBase {
             }
         }
         {
-            final 
ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+            final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> respFuture =
                 changInvisibleDurationCtxFuture(Code.CLIENT_ID_REQUIRED);
             
doReturn(respFuture).when(simpleConsumer).changeInvisibleDuration(messageView, 
duration);
             final CompletableFuture<Void> future = 
simpleConsumer.changeInvisibleDurationAsync(messageView,
@@ -359,7 +360,7 @@ public class SimpleConsumerImplTest extends TestBase {
             }
         }
         {
-            final 
ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+            final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> respFuture =
                 changInvisibleDurationCtxFuture(Code.UNAUTHORIZED);
             
doReturn(respFuture).when(simpleConsumer).changeInvisibleDuration(messageView, 
duration);
             final CompletableFuture<Void> future = 
simpleConsumer.changeInvisibleDurationAsync(messageView,
@@ -372,7 +373,7 @@ public class SimpleConsumerImplTest extends TestBase {
             }
         }
         {
-            final 
ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+            final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> respFuture =
                 changInvisibleDurationCtxFuture(Code.NOT_FOUND);
             
doReturn(respFuture).when(simpleConsumer).changeInvisibleDuration(messageView, 
duration);
             final CompletableFuture<Void> future = 
simpleConsumer.changeInvisibleDurationAsync(messageView,
@@ -385,7 +386,7 @@ public class SimpleConsumerImplTest extends TestBase {
             }
         }
         {
-            final 
ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+            final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> respFuture =
                 changInvisibleDurationCtxFuture(Code.TOPIC_NOT_FOUND);
             
doReturn(respFuture).when(simpleConsumer).changeInvisibleDuration(messageView, 
duration);
             final CompletableFuture<Void> future = 
simpleConsumer.changeInvisibleDurationAsync(messageView,
@@ -398,7 +399,7 @@ public class SimpleConsumerImplTest extends TestBase {
             }
         }
         {
-            final 
ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+            final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> respFuture =
                 changInvisibleDurationCtxFuture(Code.TOO_MANY_REQUESTS);
             
doReturn(respFuture).when(simpleConsumer).changeInvisibleDuration(messageView, 
duration);
             final CompletableFuture<Void> future = 
simpleConsumer.changeInvisibleDurationAsync(messageView,
@@ -411,7 +412,7 @@ public class SimpleConsumerImplTest extends TestBase {
             }
         }
         {
-            final 
ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+            final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> respFuture =
                 changInvisibleDurationCtxFuture(Code.INTERNAL_ERROR);
             
doReturn(respFuture).when(simpleConsumer).changeInvisibleDuration(messageView, 
duration);
             final CompletableFuture<Void> future = 
simpleConsumer.changeInvisibleDurationAsync(messageView,
@@ -424,7 +425,7 @@ public class SimpleConsumerImplTest extends TestBase {
             }
         }
         {
-            final 
ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+            final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> respFuture =
                 changInvisibleDurationCtxFuture(Code.INTERNAL_SERVER_ERROR);
             
doReturn(respFuture).when(simpleConsumer).changeInvisibleDuration(messageView, 
duration);
             final CompletableFuture<Void> future = 
simpleConsumer.changeInvisibleDurationAsync(messageView,
@@ -437,7 +438,7 @@ public class SimpleConsumerImplTest extends TestBase {
             }
         }
         {
-            final 
ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+            final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> respFuture =
                 changInvisibleDurationCtxFuture(Code.PROXY_TIMEOUT);
             
doReturn(respFuture).when(simpleConsumer).changeInvisibleDuration(messageView, 
duration);
             final CompletableFuture<Void> future = 
simpleConsumer.changeInvisibleDurationAsync(messageView,
@@ -450,7 +451,7 @@ public class SimpleConsumerImplTest extends TestBase {
             }
         }
         {
-            final 
ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+            final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> respFuture =
                 changInvisibleDurationCtxFuture(Code.UNSUPPORTED);
             
doReturn(respFuture).when(simpleConsumer).changeInvisibleDuration(messageView, 
duration);
             final CompletableFuture<Void> future = 
simpleConsumer.changeInvisibleDurationAsync(messageView,
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index 3c1fd06..00021bc 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -17,23 +17,29 @@
 
 package org.apache.rocketmq.client.java.tool;
 
+import apache.rocketmq.v2.AckMessageRequest;
 import apache.rocketmq.v2.AckMessageResponse;
 import apache.rocketmq.v2.Address;
 import apache.rocketmq.v2.AddressScheme;
 import apache.rocketmq.v2.Assignment;
 import apache.rocketmq.v2.Broker;
+import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
 import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.Digest;
 import apache.rocketmq.v2.DigestType;
 import apache.rocketmq.v2.EndTransactionResponse;
+import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
 import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
 import apache.rocketmq.v2.MessageQueue;
 import apache.rocketmq.v2.MessageType;
 import apache.rocketmq.v2.Permission;
+import apache.rocketmq.v2.QueryAssignmentRequest;
 import apache.rocketmq.v2.QueryAssignmentResponse;
+import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.Resource;
+import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.SendResultEntry;
 import apache.rocketmq.v2.Status;
@@ -75,8 +81,9 @@ import 
org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.rpc.Context;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.rpc.RpcFuture;
 import org.apache.rocketmq.client.java.rpc.Signature;
+import org.mockito.Mockito;
 
 public class TestBase {
     protected static final String FAKE_CLIENT_ID = "mbp@29848@cno0nhxy";
@@ -223,36 +230,36 @@ public class TestBase {
             .setPermission(Permission.READ_WRITE).build();
     }
 
-    protected ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>>
+    protected RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse>
     okChangeInvisibleDurationCtxFuture() {
         Status status = Status.newBuilder().setCode(Code.OK).build();
         final ChangeInvisibleDurationResponse response =
             
ChangeInvisibleDurationResponse.newBuilder().setStatus(status).build();
-        return Futures.immediateFuture(new RpcInvocation<>(response, 
fakeRpcContext()));
+        return new RpcFuture<>(fakeRpcContext(), null, 
Futures.immediateFuture(response));
     }
 
-    protected ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>>
+    protected RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse>
     changInvisibleDurationCtxFuture(Code code) {
         Status status = Status.newBuilder().setCode(code).build();
         final ChangeInvisibleDurationResponse response =
             
ChangeInvisibleDurationResponse.newBuilder().setStatus(status).build();
-        return Futures.immediateFuture(new RpcInvocation<>(response, 
fakeRpcContext()));
+        return new RpcFuture<>(fakeRpcContext(), null, 
Futures.immediateFuture(response));
     }
 
-    protected ListenableFuture<RpcInvocation<QueryAssignmentResponse>> 
okQueryAssignmentResponseFuture() {
+    protected RpcFuture<QueryAssignmentRequest, QueryAssignmentResponse> 
okQueryAssignmentResponseFuture() {
         final SettableFuture<QueryAssignmentResponse> future = 
SettableFuture.create();
         final Status status = Status.newBuilder().setCode(Code.OK).build();
         Assignment assignment = 
Assignment.newBuilder().setMessageQueue(fakePbMessageQueue0()).build();
         QueryAssignmentResponse response = 
QueryAssignmentResponse.newBuilder().setStatus(status)
             .addAssignments(assignment).build();
-        return Futures.immediateFuture(new RpcInvocation<>(response, 
fakeRpcContext()));
+        return new RpcFuture<>(fakeRpcContext(), null, 
Futures.immediateFuture(response));
     }
 
-    protected ListenableFuture<RpcInvocation<QueryAssignmentResponse>> 
okEmptyQueryAssignmentResponseFuture() {
+    protected RpcFuture<QueryAssignmentRequest, QueryAssignmentResponse> 
okEmptyQueryAssignmentResponseFuture() {
         final SettableFuture<QueryAssignmentResponse> future = 
SettableFuture.create();
         final Status status = Status.newBuilder().setCode(Code.OK).build();
         final QueryAssignmentResponse response = 
QueryAssignmentResponse.newBuilder().setStatus(status).build();
-        return Futures.immediateFuture(new RpcInvocation<>(response, 
fakeRpcContext()));
+        return new RpcFuture<>(fakeRpcContext(), null, 
Futures.immediateFuture(response));
     }
 
     protected Map<String, FilterExpression> 
createSubscriptionExpressions(String topic) {
@@ -262,41 +269,43 @@ public class TestBase {
         return map;
     }
 
-    protected ListenableFuture<RpcInvocation<AckMessageResponse>> 
ackMessageResponseFuture(Code code) {
+    protected RpcFuture<AckMessageRequest, AckMessageResponse> 
ackMessageResponseFuture(Code code) {
         final Status status = Status.newBuilder().setCode(code).build();
         final AckMessageResponse response = 
AckMessageResponse.newBuilder().setStatus(status).build();
-        return Futures.immediateFuture(new RpcInvocation<>(response, 
fakeRpcContext()));
+        return new RpcFuture<>(fakeRpcContext(), null, 
Futures.immediateFuture(response));
     }
 
-    protected ListenableFuture<RpcInvocation<AckMessageResponse>> 
okAckMessageResponseFuture() {
+    protected RpcFuture<AckMessageRequest, AckMessageResponse> 
okAckMessageResponseFuture() {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
+        final AckMessageRequest request = 
Mockito.mock(AckMessageRequest.class);
         final AckMessageResponse response = 
AckMessageResponse.newBuilder().setStatus(status).build();
-        return Futures.immediateFuture(new RpcInvocation<>(response, 
fakeRpcContext()));
+        return new RpcFuture<>(fakeRpcContext(), request,
+            Futures.immediateFuture(response));
     }
 
-    protected 
ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>>
-    okForwardMessageToDeadLetterQueueResponseFuture() {
+    protected RpcFuture<ForwardMessageToDeadLetterQueueRequest,
+        ForwardMessageToDeadLetterQueueResponse> 
okForwardMessageToDeadLetterQueueResponseFuture() {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
         final ForwardMessageToDeadLetterQueueResponse response =
             
ForwardMessageToDeadLetterQueueResponse.newBuilder().setStatus(status).build();
-        return Futures.immediateFuture(new RpcInvocation<>(response, 
fakeRpcContext()));
+        return new RpcFuture<>(fakeRpcContext(), null, 
Futures.immediateFuture(response));
     }
 
-    protected ListenableFuture<RpcInvocation<SendMessageResponse>> 
okSendMessageResponseFutureWithSingleEntry() {
+    protected RpcFuture<SendMessageRequest, SendMessageResponse> 
okSendMessageResponseFutureWithSingleEntry() {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
         final String messageId = 
MessageIdCodec.getInstance().nextMessageId().toString();
         SendResultEntry entry = 
SendResultEntry.newBuilder().setMessageId(messageId)
             
.setTransactionId(FAKE_TRANSACTION_ID).setStatus(status).setOffset(1).build();
         SendMessageResponse response = 
SendMessageResponse.newBuilder().setStatus(status).addEntries(entry).build();
-        return Futures.immediateFuture(new RpcInvocation<>(response, 
fakeRpcContext()));
+        return new RpcFuture<>(fakeRpcContext(), null, 
Futures.immediateFuture(response));
     }
 
-    protected ListenableFuture<RpcInvocation<SendMessageResponse>> 
failureSendMessageResponseFuture() {
+    protected RpcFuture<SendMessageRequest, SendMessageResponse> 
failureSendMessageResponseFuture() {
         final Status status = 
Status.newBuilder().setCode(Code.FORBIDDEN).build();
         SendResultEntry sendResultEntry = 
SendResultEntry.newBuilder().setStatus(status).setStatus(status).build();
         SendMessageResponse response = 
SendMessageResponse.newBuilder().setStatus(status)
             .addEntries(sendResultEntry).build();
-        return Futures.immediateFuture(new RpcInvocation<>(response, 
fakeRpcContext()));
+        return new RpcFuture<>(fakeRpcContext(), null, 
Futures.immediateFuture(response));
     }
 
     protected ListenableFuture<SendMessageResponse> 
okBatchSendMessageResponseFuture() {
@@ -326,7 +335,7 @@ public class TestBase {
             .setSystemProperties(systemProperties).build();
     }
 
-    protected 
ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> 
okReceiveMessageResponsesFuture(
+    protected RpcFuture<ReceiveMessageRequest, 
Iterator<ReceiveMessageResponse>> okReceiveMessageResponsesFuture(
         String topic, int messageCount) {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
         final apache.rocketmq.v2.Message message = fakePbMessage(topic);
@@ -337,7 +346,7 @@ public class TestBase {
             ReceiveMessageResponse messageResponse = 
ReceiveMessageResponse.newBuilder().setMessage(message).build();
             responses.add(messageResponse);
         }
-        return Futures.immediateFuture(new 
RpcInvocation<>(responses.iterator(), fakeRpcContext()));
+        return new RpcFuture<>(fakeRpcContext(), null, 
Futures.immediateFuture(responses.iterator()));
     }
 
     protected ListenableFuture<EndTransactionResponse> 
okEndTransactionResponseFuture() {
@@ -365,9 +374,9 @@ public class TestBase {
 
     protected SendReceiptImpl fakeSendReceiptImpl(
         MessageQueueImpl mq) throws ExecutionException, InterruptedException, 
ClientException {
-        final ListenableFuture<RpcInvocation<SendMessageResponse>> future =
+        final RpcFuture<SendMessageRequest, SendMessageResponse> future =
             okSendMessageResponseFutureWithSingleEntry();
-        final List<SendReceiptImpl> receipts = 
SendReceiptImpl.processResponseInvocation(mq, future.get());
+        final List<SendReceiptImpl> receipts = 
SendReceiptImpl.processResponseInvocation(mq, future.get(), future);
         return receipts.iterator().next();
     }
 }

Reply via email to