This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 4581087  Generate client's signature in ClientManager (#190)
4581087 is described below

commit 45810875ddff3e25de2e9810bd40c0b84e4397e6
Author: Aaron Ai <[email protected]>
AuthorDate: Sat Aug 27 17:49:32 2022 +0800

    Generate client's signature in ClientManager (#190)
---
 .../apache/rocketmq/client/java/impl/Client.java   |  6 ++
 .../rocketmq/client/java/impl/ClientImpl.java      | 79 +++++++++----------
 .../rocketmq/client/java/impl/ClientManager.java   | 52 +++++-------
 .../client/java/impl/ClientManagerImpl.java        | 92 +++++++++++++---------
 .../client/java/impl/consumer/ConsumerImpl.java    | 20 ++---
 .../java/impl/consumer/PushConsumerImpl.java       | 17 ++--
 .../client/java/impl/producer/ProducerImpl.java    | 25 ++----
 .../client/java/impl/ClientManagerImplTest.java    | 51 +++++-------
 .../java/impl/consumer/ConsumerImplTest.java       |  7 +-
 .../java/impl/producer/ProducerImplTest.java       | 10 +--
 10 files changed, 161 insertions(+), 198 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
index 08147c0..d5bb57d 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.client.java.impl;
 
+import io.grpc.Metadata;
 import org.apache.rocketmq.client.java.misc.ClientId;
 import org.apache.rocketmq.client.java.route.Endpoints;
 
@@ -28,6 +29,11 @@ public interface Client {
      */
     ClientId getClientId();
 
+    /**
+     * @return signature for tls
+     */
+    Metadata sign() throws Exception;
+
     /**
      * Send heart beat to remote {@link Endpoints}.
      */
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 4f39801..d48ee74 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -62,6 +62,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -260,8 +261,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     public StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints,
         StreamObserver<TelemetryCommand> observer) throws ClientException {
         try {
-            final Metadata metadata = this.sign();
-            return clientManager.telemetry(endpoints, metadata, 
TELEMETRY_TIMEOUT, observer);
+            return clientManager.telemetry(endpoints, TELEMETRY_TIMEOUT, 
observer);
         } catch (ClientException e) {
             throw e;
         } catch (Throwable t) {
@@ -384,27 +384,21 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
 
     /**
      * Triggered when {@link TopicRouteData} is fetched from remote.
-     *
-     * <p>Never thrown any exception.
      */
-    public ListenableFuture<TopicRouteData> onTopicRouteDataFetched(String 
topic, TopicRouteData topicRouteData) {
+    public void onTopicRouteDataFetched(String topic, TopicRouteData 
topicRouteData) throws ClientException,
+        ExecutionException, InterruptedException, TimeoutException {
         final Set<Endpoints> routeEndpoints = topicRouteData
             .getMessageQueues().stream()
             .map(mq -> mq.getBroker().getEndpoints())
             .collect(Collectors.toSet());
         final Set<Endpoints> existRouteEndpoints = getTotalRouteEndpoints();
         final Set<Endpoints> newEndpoints = new 
HashSet<>(Sets.difference(routeEndpoints, existRouteEndpoints));
-        try {
-            for (Endpoints endpoints : newEndpoints) {
-                final ClientSessionImpl clientSession = 
getClientSession(endpoints);
-                clientSession.syncSettings();
-            }
-            topicRouteCache.put(topic, topicRouteData);
-            onTopicRouteDataUpdate0(topic, topicRouteData);
-            return Futures.immediateFuture(topicRouteData);
-        } catch (Throwable t) {
-            return Futures.immediateFailedFuture(t);
+        for (Endpoints endpoints : newEndpoints) {
+            final ClientSessionImpl clientSession = 
getClientSession(endpoints);
+            clientSession.syncSettings();
         }
+        topicRouteCache.put(topic, topicRouteData);
+        onTopicRouteDataUpdate0(topic, topicRouteData);
     }
 
     public void onTopicRouteDataUpdate0(String topic, TopicRouteData 
topicRouteData) {
@@ -477,13 +471,13 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
         final Set<Endpoints> routeEndpointsSet = getTotalRouteEndpoints();
         final NotifyClientTerminationRequest notifyClientTerminationRequest = 
wrapNotifyClientTerminationRequest();
         try {
-            final Metadata metadata = sign();
             for (Endpoints endpoints : routeEndpointsSet) {
-                clientManager.notifyClientTermination(endpoints, metadata, 
notifyClientTerminationRequest,
+                clientManager.notifyClientTermination(endpoints, 
notifyClientTerminationRequest,
                     clientConfiguration.getRequestTimeout());
             }
         } catch (Throwable t) {
-            LOGGER.error("Exception raised while notifying client's 
termination, clientId={}", clientId, t);
+            // Should never reach here.
+            LOGGER.error("[Bug] Exception raised while notifying client's 
termination, clientId={}", clientId, t);
         }
     }
 
@@ -514,7 +508,8 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     /**
      * Real-time signature generation
      */
-    protected Metadata sign() throws NoSuchAlgorithmException, 
InvalidKeyException {
+    @Override
+    public Metadata sign() throws NoSuchAlgorithmException, 
InvalidKeyException {
         return Signature.sign(clientConfiguration, clientId);
     }
 
@@ -526,8 +521,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
      */
     private void doHeartbeat(HeartbeatRequest request, final Endpoints 
endpoints) {
         try {
-            Metadata metadata = sign();
-            final RpcFuture<HeartbeatRequest, HeartbeatResponse> future = 
clientManager.heartbeat(endpoints, metadata,
+            final RpcFuture<HeartbeatRequest, HeartbeatResponse> future = 
clientManager.heartbeat(endpoints,
                 request, clientConfiguration.getRequestTimeout());
             Futures.addCallback(future, new 
FutureCallback<HeartbeatResponse>() {
                 @Override
@@ -552,9 +546,10 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
                     LOGGER.warn("Failed to send heartbeat, endpoints={}, 
clientId={}", endpoints, clientId, t);
                 }
             }, MoreExecutors.directExecutor());
-        } catch (Throwable e) {
-            LOGGER.error("Exception raised while preparing heartbeat, 
endpoints={}, clientId={}", endpoints, clientId,
-                e);
+        } catch (Throwable t) {
+            // Should never reach here.
+            LOGGER.error("[Bug] Exception raised while preparing heartbeat, 
endpoints={}, clientId={}", endpoints,
+                clientId, t);
         }
     }
 
@@ -571,8 +566,11 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     }
 
     private ListenableFuture<TopicRouteData> fetchTopicRoute(final String 
topic) {
-        final ListenableFuture<TopicRouteData> future = 
Futures.transformAsync(fetchTopicRoute0(topic),
-            topicRouteData -> onTopicRouteDataFetched(topic, topicRouteData), 
MoreExecutors.directExecutor());
+        final ListenableFuture<TopicRouteData> future0 = 
fetchTopicRoute0(topic);
+        final ListenableFuture<TopicRouteData> future = 
Futures.transformAsync(future0, topicRouteData -> {
+            onTopicRouteDataFetched(topic, topicRouteData);
+            return Futures.immediateFuture(topicRouteData);
+        }, MoreExecutors.directExecutor());
         Futures.addCallback(future, new FutureCallback<TopicRouteData>() {
             @Override
             public void onSuccess(TopicRouteData topicRouteData) {
@@ -589,23 +587,18 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     }
 
     protected ListenableFuture<TopicRouteData> fetchTopicRoute0(final String 
topic) {
-        try {
-            Resource topicResource = 
Resource.newBuilder().setName(topic).build();
-            final QueryRouteRequest request = 
QueryRouteRequest.newBuilder().setTopic(topicResource)
-                .setEndpoints(endpoints.toProtobuf()).build();
-            final Metadata metadata = sign();
-            final RpcFuture<QueryRouteRequest, QueryRouteResponse> future =
-                clientManager.queryRoute(endpoints, metadata, request, 
clientConfiguration.getRequestTimeout());
-            return Futures.transformAsync(future, response -> {
-                final Status status = response.getStatus();
-                StatusChecker.check(status, future);
-                final List<MessageQueue> messageQueuesList = 
response.getMessageQueuesList();
-                final TopicRouteData topicRouteData = new 
TopicRouteData(messageQueuesList);
-                return Futures.immediateFuture(topicRouteData);
-            }, MoreExecutors.directExecutor());
-        } catch (Throwable t) {
-            return Futures.immediateFailedFuture(t);
-        }
+        Resource topicResource = Resource.newBuilder().setName(topic).build();
+        final QueryRouteRequest request = 
QueryRouteRequest.newBuilder().setTopic(topicResource)
+            .setEndpoints(endpoints.toProtobuf()).build();
+        final RpcFuture<QueryRouteRequest, QueryRouteResponse> future =
+            clientManager.queryRoute(endpoints, request, 
clientConfiguration.getRequestTimeout());
+        return Futures.transformAsync(future, response -> {
+            final Status status = response.getStatus();
+            StatusChecker.check(status, future);
+            final List<MessageQueue> messageQueuesList = 
response.getMessageQueuesList();
+            final TopicRouteData topicRouteData = new 
TopicRouteData(messageQueuesList);
+            return Futures.immediateFuture(topicRouteData);
+        }, MoreExecutors.directExecutor());
     }
 
     protected Set<Endpoints> getTotalRouteEndpoints() {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
index 39db0e0..468712b 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
@@ -39,7 +39,6 @@ import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.TelemetryCommand;
 import com.google.common.util.concurrent.AbstractIdleService;
-import io.grpc.Metadata;
 import io.grpc.stub.StreamObserver;
 import java.time.Duration;
 import java.util.List;
@@ -67,134 +66,125 @@ public abstract class ClientManager extends 
AbstractIdleService {
      * Query topic route asynchronously, the method ensures no throwable.
      *
      * @param endpoints requested endpoints.
-     * @param metadata  gRPC request header metadata.
      * @param request   query route request.
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    public abstract RpcFuture<QueryRouteRequest, QueryRouteResponse>
-    queryRoute(Endpoints endpoints, Metadata metadata, QueryRouteRequest 
request, Duration duration);
+    public abstract RpcFuture<QueryRouteRequest, QueryRouteResponse> 
queryRoute(Endpoints endpoints,
+        QueryRouteRequest request, Duration duration);
 
     /**
      * Heart beat asynchronously, the method ensures no throwable.
      *
      * @param endpoints requested endpoints.
-     * @param metadata  gRPC request header metadata.
      * @param request   heartbeat request.
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    public abstract RpcFuture<HeartbeatRequest, HeartbeatResponse>
-    heartbeat(Endpoints endpoints, Metadata metadata, HeartbeatRequest 
request, Duration duration);
+    public abstract RpcFuture<HeartbeatRequest, HeartbeatResponse> 
heartbeat(Endpoints endpoints,
+        HeartbeatRequest request, Duration duration);
 
     /**
      * Send message asynchronously, the method ensures no throwable.
      *
      * @param endpoints requested endpoints.
-     * @param metadata  gRPC request header metadata.
      * @param request   send message request.
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    public abstract RpcFuture<SendMessageRequest, SendMessageResponse>
-    sendMessage(Endpoints endpoints, Metadata metadata, SendMessageRequest 
request, Duration duration);
+    public abstract RpcFuture<SendMessageRequest, SendMessageResponse> 
sendMessage(Endpoints endpoints,
+        SendMessageRequest request, Duration duration);
 
     /**
      * Query assignment asynchronously, the method ensures no throwable.
      *
      * @param endpoints requested endpoints.
-     * @param metadata  gRPC request header metadata.
      * @param request   query assignment request.
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    public abstract RpcFuture<QueryAssignmentRequest, QueryAssignmentResponse>
-    queryAssignment(Endpoints endpoints, Metadata metadata, 
QueryAssignmentRequest request, Duration duration);
+    public abstract RpcFuture<QueryAssignmentRequest, QueryAssignmentResponse> 
queryAssignment(Endpoints endpoints,
+        QueryAssignmentRequest request, Duration duration);
 
     /**
      * Receiving messages asynchronously from the server, the method ensures 
no throwable.
      *
      * @param endpoints requested endpoints.
-     * @param metadata  gRPC request header metadata.
+     * @param request   receive message request.
+     * @param duration  request max duration.
      * @return invocation of response future.
      */
-    public abstract RpcFuture<ReceiveMessageRequest, 
List<ReceiveMessageResponse>>
-    receiveMessage(Endpoints endpoints, Metadata metadata, 
ReceiveMessageRequest request, Duration duration);
+    public abstract RpcFuture<ReceiveMessageRequest, 
List<ReceiveMessageResponse>> receiveMessage(Endpoints endpoints,
+        ReceiveMessageRequest request, Duration duration);
 
     /**
      * Ack message asynchronously after the success of consumption, the method 
ensures no throwable.
      *
      * @param endpoints requested endpoints.
-     * @param metadata  gRPC request header metadata.
      * @param request   ack message request.
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    public abstract RpcFuture<AckMessageRequest, AckMessageResponse>
-    ackMessage(Endpoints endpoints, Metadata metadata, AckMessageRequest 
request, Duration duration);
+    public abstract RpcFuture<AckMessageRequest, AckMessageResponse> 
ackMessage(Endpoints endpoints,
+        AckMessageRequest request, Duration duration);
 
     /**
      * Nack message asynchronously after the failure of consumption, the 
method ensures no throwable.
      *
      * @param endpoints requested endpoints.
-     * @param metadata  gRPC request header metadata.
      * @param request   nack message request.
      * @param duration  request max duration.
      * @return invocation of response future.
      */
     public abstract RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse>
-    changeInvisibleDuration(Endpoints endpoints, Metadata metadata, 
ChangeInvisibleDurationRequest request,
+    changeInvisibleDuration(Endpoints endpoints, 
ChangeInvisibleDurationRequest request,
         Duration duration);
 
     /**
      * Send a message to the dead letter queue asynchronously, the method 
ensures no throwable.
      *
      * @param endpoints requested endpoints.
-     * @param metadata  gRPC request header metadata.
      * @param request   request of sending a message to DLQ.
      * @param duration  request max duration.
      * @return invocation of response future.
      */
     public abstract RpcFuture<ForwardMessageToDeadLetterQueueRequest,
         ForwardMessageToDeadLetterQueueResponse> 
forwardMessageToDeadLetterQueue(Endpoints endpoints,
-        Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, 
Duration duration);
+        ForwardMessageToDeadLetterQueueRequest request, Duration duration);
 
     /**
      * Submit transaction resolution asynchronously, the method ensures no 
throwable.
      *
      * @param endpoints requested endpoints.
-     * @param metadata  gRPC request header metadata.
      * @param request   end transaction request.
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    public abstract RpcFuture<EndTransactionRequest, EndTransactionResponse>
-    endTransaction(Endpoints endpoints, Metadata metadata, 
EndTransactionRequest request, Duration duration);
+    public abstract RpcFuture<EndTransactionRequest, EndTransactionResponse> 
endTransaction(Endpoints endpoints,
+        EndTransactionRequest request, Duration duration);
 
     /**
      * Asynchronously notify the server that client is terminated, the method 
ensures no throwable.
      *
      * @param endpoints request endpoints.
-     * @param metadata  gRPC request header metadata.
      * @param request   notify client termination request.
      * @param duration  request max duration.
      * @return response future of notification of client termination.
      */
     @SuppressWarnings("UnusedReturnValue")
     public abstract RpcFuture<NotifyClientTerminationRequest, 
NotifyClientTerminationResponse>
-    notifyClientTermination(Endpoints endpoints, Metadata metadata, 
NotifyClientTerminationRequest request,
+    notifyClientTermination(Endpoints endpoints, 
NotifyClientTerminationRequest request,
         Duration duration);
 
     /**
      * Establish telemetry session stream to server.
      *
      * @param endpoints        request endpoints.
-     * @param metadata         gRPC request header metadata.
      * @param duration         stream max duration.
      * @param responseObserver response observer.
      * @return request observer.
      * @throws ClientException if failed to establish telemetry session stream.
      */
-    public abstract StreamObserver<TelemetryCommand> telemetry(Endpoints 
endpoints, Metadata metadata,
-        Duration duration, StreamObserver<TelemetryCommand> responseObserver) 
throws ClientException;
+    public abstract StreamObserver<TelemetryCommand> telemetry(Endpoints 
endpoints, Duration duration,
+        StreamObserver<TelemetryCommand> responseObserver) throws 
ClientException;
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index 3b84ac8..49ab080 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -38,7 +38,6 @@ import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.TelemetryCommand;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import io.grpc.Metadata;
@@ -59,6 +58,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.net.ssl.SSLException;
 import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.exception.InternalErrorException;
 import org.apache.rocketmq.client.java.misc.ClientId;
 import org.apache.rocketmq.client.java.misc.ExecutorServices;
 import org.apache.rocketmq.client.java.misc.MetadataUtils;
@@ -194,152 +194,167 @@ public class ClientManagerImpl extends ClientManager {
     }
 
     @Override
-    public RpcFuture<QueryRouteRequest, QueryRouteResponse> 
queryRoute(Endpoints endpoints, Metadata metadata,
-        QueryRouteRequest request, Duration duration) {
-        final Context context = new Context(endpoints, metadata);
+    public RpcFuture<QueryRouteRequest, QueryRouteResponse> 
queryRoute(Endpoints endpoints, QueryRouteRequest request,
+        Duration duration) {
         try {
+            final Metadata metadata = client.sign();
+            final Context context = new Context(endpoints, metadata);
             final RpcClient rpcClient = getRpcClient(endpoints);
             final ListenableFuture<QueryRouteResponse> future = 
rpcClient.queryRoute(metadata, request, asyncWorker,
                 duration);
             return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
+            return new RpcFuture<>(t);
         }
     }
 
     @Override
-    public RpcFuture<HeartbeatRequest, HeartbeatResponse> heartbeat(Endpoints 
endpoints, Metadata metadata,
-        HeartbeatRequest request, Duration duration) {
-        final Context context = new Context(endpoints, metadata);
+    public RpcFuture<HeartbeatRequest, HeartbeatResponse> heartbeat(Endpoints 
endpoints, HeartbeatRequest request,
+        Duration duration) {
         try {
+            final Metadata metadata = client.sign();
+            final Context context = new Context(endpoints, metadata);
             final RpcClient rpcClient = getRpcClient(endpoints);
             ListenableFuture<HeartbeatResponse> future = 
rpcClient.heartbeat(metadata, request, asyncWorker, duration);
             return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
+            return new RpcFuture<>(t);
         }
     }
 
     @Override
-    public RpcFuture<SendMessageRequest, SendMessageResponse> 
sendMessage(Endpoints endpoints, Metadata metadata,
+    public RpcFuture<SendMessageRequest, SendMessageResponse> 
sendMessage(Endpoints endpoints,
         SendMessageRequest request, Duration duration) {
-        final Context context = new Context(endpoints, metadata);
         try {
+            final Metadata metadata = client.sign();
+            final Context context = new Context(endpoints, metadata);
             final RpcClient rpcClient = getRpcClient(endpoints);
             final ListenableFuture<SendMessageResponse> future =
                 rpcClient.sendMessage(metadata, request, asyncWorker, 
duration);
             return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
+            return new RpcFuture<>(t);
         }
     }
 
     @Override
     public RpcFuture<QueryAssignmentRequest, QueryAssignmentResponse> 
queryAssignment(Endpoints endpoints,
-        Metadata metadata, QueryAssignmentRequest request, Duration duration) {
-        final Context context = new Context(endpoints, metadata);
+        QueryAssignmentRequest request, Duration duration) {
         try {
+            final Metadata metadata = client.sign();
+            final Context context = new Context(endpoints, metadata);
             final RpcClient rpcClient = getRpcClient(endpoints);
             final ListenableFuture<QueryAssignmentResponse> future =
                 rpcClient.queryAssignment(metadata, request, asyncWorker, 
duration);
             return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
+            return new RpcFuture<>(t);
         }
     }
 
     @Override
     public RpcFuture<ReceiveMessageRequest, List<ReceiveMessageResponse>> 
receiveMessage(Endpoints endpoints,
-        Metadata metadata, ReceiveMessageRequest request, Duration duration) {
-        final Context context = new Context(endpoints, metadata);
+        ReceiveMessageRequest request, Duration duration) {
         try {
+            final Metadata metadata = client.sign();
+            final Context context = new Context(endpoints, metadata);
             final RpcClient rpcClient = getRpcClient(endpoints);
             final ListenableFuture<List<ReceiveMessageResponse>> future =
                 rpcClient.receiveMessage(metadata, request, asyncWorker, 
duration);
             return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
+            return new RpcFuture<>(t);
         }
     }
 
     @Override
-    public RpcFuture<AckMessageRequest, AckMessageResponse> 
ackMessage(Endpoints endpoints, Metadata metadata,
-        AckMessageRequest request, Duration duration) {
-        final Context context = new Context(endpoints, metadata);
+    public RpcFuture<AckMessageRequest, AckMessageResponse> 
ackMessage(Endpoints endpoints, AckMessageRequest request,
+        Duration duration) {
         try {
+            final Metadata metadata = client.sign();
+            final Context context = new Context(endpoints, metadata);
             final RpcClient rpcClient = getRpcClient(endpoints);
             final ListenableFuture<AckMessageResponse> future =
                 rpcClient.ackMessage(metadata, request, asyncWorker, duration);
             return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
+            return new RpcFuture<>(t);
         }
     }
 
     @Override
     public RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse>
-    changeInvisibleDuration(Endpoints endpoints, Metadata metadata, 
ChangeInvisibleDurationRequest request,
+    changeInvisibleDuration(Endpoints endpoints, 
ChangeInvisibleDurationRequest request,
         Duration duration) {
-        final Context context = new Context(endpoints, metadata);
         try {
+            final Metadata metadata = client.sign();
+            final Context context = new Context(endpoints, metadata);
             final RpcClient rpcClient = getRpcClient(endpoints);
             final ListenableFuture<ChangeInvisibleDurationResponse> future =
                 rpcClient.changeInvisibleDuration(metadata, request, 
asyncWorker, duration);
             return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
+            return new RpcFuture<>(t);
         }
     }
 
     @Override
     public RpcFuture<ForwardMessageToDeadLetterQueueRequest, 
ForwardMessageToDeadLetterQueueResponse>
-    forwardMessageToDeadLetterQueue(Endpoints endpoints, Metadata metadata,
-        ForwardMessageToDeadLetterQueueRequest request, Duration duration) {
-        final Context context = new Context(endpoints, metadata);
+    forwardMessageToDeadLetterQueue(Endpoints endpoints, 
ForwardMessageToDeadLetterQueueRequest request,
+        Duration duration) {
         try {
+            final Metadata metadata = client.sign();
+            final Context context = new Context(endpoints, metadata);
             final RpcClient rpcClient = getRpcClient(endpoints);
             final ListenableFuture<ForwardMessageToDeadLetterQueueResponse> 
future =
                 rpcClient.forwardMessageToDeadLetterQueue(metadata, request, 
asyncWorker, duration);
             return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
+            return new RpcFuture<>(t);
         }
     }
 
     @Override
     public RpcFuture<EndTransactionRequest, EndTransactionResponse> 
endTransaction(Endpoints endpoints,
-        Metadata metadata, EndTransactionRequest request, Duration duration) {
-        final Context context = new Context(endpoints, metadata);
+        EndTransactionRequest request, Duration duration) {
         try {
+            final Metadata metadata = client.sign();
+            final Context context = new Context(endpoints, metadata);
             final RpcClient rpcClient = getRpcClient(endpoints);
             final ListenableFuture<EndTransactionResponse> future =
                 rpcClient.endTransaction(metadata, request, asyncWorker, 
duration);
             return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
+            return new RpcFuture<>(t);
         }
     }
 
     @Override
     public RpcFuture<NotifyClientTerminationRequest, 
NotifyClientTerminationResponse>
-    notifyClientTermination(Endpoints endpoints, Metadata metadata, 
NotifyClientTerminationRequest request,
+    notifyClientTermination(Endpoints endpoints, 
NotifyClientTerminationRequest request,
         Duration duration) {
-        final Context context = new Context(endpoints, metadata);
         try {
+            final Metadata metadata = client.sign();
+            final Context context = new Context(endpoints, metadata);
             final RpcClient rpcClient = getRpcClient(endpoints);
             final ListenableFuture<NotifyClientTerminationResponse> future =
                 rpcClient.notifyClientTermination(metadata, request, 
asyncWorker, duration);
             return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
-            return new RpcFuture<>(context, request, 
Futures.immediateFailedFuture(t));
+            return new RpcFuture<>(t);
         }
     }
 
     @Override
-    public StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, 
Metadata metadata, Duration duration,
+    public StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, 
Duration duration,
         StreamObserver<TelemetryCommand> responseObserver) throws 
ClientException {
-        final RpcClient rpcClient = getRpcClient(endpoints);
-        return rpcClient.telemetry(metadata, asyncWorker, duration, 
responseObserver);
+        try {
+            final Metadata metadata = client.sign();
+            final RpcClient rpcClient = getRpcClient(endpoints);
+            return rpcClient.telemetry(metadata, asyncWorker, duration, 
responseObserver);
+        } catch (Throwable t) {
+            throw new InternalErrorException(t);
+        }
     }
 
     @Override
@@ -444,6 +459,7 @@ public class ClientManagerImpl extends ClientManager {
         LOGGER.info("Shutdown the client manager successfully, clientId={}", 
clientId);
     }
 
+    @SuppressWarnings("NullableProblems")
     @Override
     protected String serviceName() {
         return super.serviceName() + "-" + client.getClientId().getIndex();
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 1a2bf16..bb06b97 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -38,7 +38,6 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.protobuf.Timestamp;
 import com.google.protobuf.util.Durations;
 import com.google.protobuf.util.Timestamps;
-import io.grpc.Metadata;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -79,13 +78,12 @@ abstract class ConsumerImpl extends ClientImpl {
         MessageQueueImpl mq, Duration awaitDuration) {
         List<MessageViewImpl> messages = new ArrayList<>();
         try {
-            Metadata metadata = sign();
             final Endpoints endpoints = mq.getBroker().getEndpoints();
             final Duration tolerance = clientConfiguration.getRequestTimeout();
             final Duration timeout = Duration.ofNanos(awaitDuration.toNanos() 
+ tolerance.toNanos());
             final ClientManager clientManager = this.getClientManager();
             final RpcFuture<ReceiveMessageRequest, 
List<ReceiveMessageResponse>> future =
-                clientManager.receiveMessage(endpoints, metadata, request, 
timeout);
+                clientManager.receiveMessage(endpoints, request, timeout);
             return Futures.transformAsync(future, responses -> {
                 Status status = 
Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR)
                     .setMessage("status was not set by server")
@@ -118,6 +116,8 @@ abstract class ConsumerImpl extends ClientImpl {
                 return Futures.immediateFuture(receiveMessageResult);
             }, MoreExecutors.directExecutor());
         } catch (Throwable t) {
+            // Should never reach here.
+            LOGGER.error("[Bug] Exception raised during message receiving, 
mq={}, clientId={}", mq, clientId, t);
             return Futures.immediateFailedFuture(t);
         }
     }
@@ -150,9 +150,8 @@ abstract class ConsumerImpl extends ClientImpl {
         doBefore(context, generalMessages);
         try {
             final AckMessageRequest request = 
wrapAckMessageRequest(messageView);
-            final Metadata metadata = sign();
             final Duration requestTimeout = 
clientConfiguration.getRequestTimeout();
-            future = this.getClientManager().ackMessage(endpoints, metadata, 
request, requestTimeout);
+            future = this.getClientManager().ackMessage(endpoints, request, 
requestTimeout);
         } catch (Throwable t) {
             future = new RpcFuture<>(t);
         }
@@ -185,14 +184,9 @@ abstract class ConsumerImpl extends ClientImpl {
         final MessageHandlerContextImpl context =
             new 
MessageHandlerContextImpl(MessageHookPoints.CHANGE_INVISIBLE_DURATION);
         doBefore(context, generalMessages);
-        try {
-            final Metadata metadata = sign();
-            final ChangeInvisibleDurationRequest request = 
wrapChangeInvisibleDuration(messageView, invisibleDuration);
-            final Duration requestTimeout = 
clientConfiguration.getRequestTimeout();
-            future = 
this.getClientManager().changeInvisibleDuration(endpoints, metadata, request, 
requestTimeout);
-        } catch (Throwable t) {
-            future = new RpcFuture<>(t);
-        }
+        final ChangeInvisibleDurationRequest request = 
wrapChangeInvisibleDuration(messageView, invisibleDuration);
+        final Duration requestTimeout = 
clientConfiguration.getRequestTimeout();
+        future = this.getClientManager().changeInvisibleDuration(endpoints, 
request, requestTimeout);
         final MessageId messageId = messageView.getMessageId();
         Futures.addCallback(future, new 
FutureCallback<ChangeInvisibleDurationResponse>() {
             @Override
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 960b48c..47119d0 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -32,7 +32,6 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import io.grpc.Metadata;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
@@ -266,11 +265,10 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
     ListenableFuture<Assignments> queryAssignment(final String topic) {
         final ListenableFuture<Endpoints> future0 = 
pickEndpointsToQueryAssignments(topic);
         return Futures.transformAsync(future0, endpoints -> {
-            final Metadata metadata = sign();
             final QueryAssignmentRequest request = 
wrapQueryAssignmentRequest(topic);
             final Duration requestTimeout = 
clientConfiguration.getRequestTimeout();
             final RpcFuture<QueryAssignmentRequest, QueryAssignmentResponse> 
future1 =
-                this.getClientManager().queryAssignment(endpoints, metadata, 
request, requestTimeout);
+                this.getClientManager().queryAssignment(endpoints, request, 
requestTimeout);
             return Futures.transformAsync(future1, response -> {
                 final Status status = response.getStatus();
                 StatusChecker.check(status, future1);
@@ -514,15 +512,10 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
 
         final Endpoints endpoints = messageView.getEndpoints();
         RpcFuture<ForwardMessageToDeadLetterQueueRequest, 
ForwardMessageToDeadLetterQueueResponse> future;
-        try {
-            final ForwardMessageToDeadLetterQueueRequest request =
-                wrapForwardMessageToDeadLetterQueueRequest(messageView);
-            final Metadata metadata = sign();
-            future = 
this.getClientManager().forwardMessageToDeadLetterQueue(endpoints, metadata, 
request,
-                clientConfiguration.getRequestTimeout());
-        } catch (Throwable t) {
-            future = new RpcFuture<>(t);
-        }
+        final ForwardMessageToDeadLetterQueueRequest request =
+            wrapForwardMessageToDeadLetterQueueRequest(messageView);
+        future = 
this.getClientManager().forwardMessageToDeadLetterQueue(endpoints, request,
+            clientConfiguration.getRequestTimeout());
         Futures.addCallback(future, new 
FutureCallback<ForwardMessageToDeadLetterQueueResponse>() {
             @Override
             public void onSuccess(ForwardMessageToDeadLetterQueueResponse 
response) {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 58d2c23..fd87fef 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -35,7 +35,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import io.grpc.Metadata;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -257,12 +256,6 @@ class ProducerImpl extends ClientImpl implements Producer {
 
     public void endTransaction(Endpoints endpoints, GeneralMessage 
generalMessage, MessageId messageId,
         String transactionId, final TransactionResolution resolution) throws 
ClientException {
-        Metadata metadata;
-        try {
-            metadata = sign();
-        } catch (Throwable t) {
-            throw new ClientException(t);
-        }
         final EndTransactionRequest.Builder builder = 
EndTransactionRequest.newBuilder()
             .setMessageId(messageId.toString()).setTransactionId(transactionId)
             
.setTopic(apache.rocketmq.v2.Resource.newBuilder().setName(generalMessage.getTopic()).build());
@@ -283,7 +276,7 @@ class ProducerImpl extends ClientImpl implements Producer {
         doBefore(context, generalMessages);
 
         final RpcFuture<EndTransactionRequest, EndTransactionResponse> future =
-            this.getClientManager().endTransaction(endpoints, metadata, 
request, requestTimeout);
+            this.getClientManager().endTransaction(endpoints, request, 
requestTimeout);
         Futures.addCallback(future, new 
FutureCallback<EndTransactionResponse>() {
             @Override
             public void onSuccess(EndTransactionResponse response) {
@@ -425,11 +418,11 @@ class ProducerImpl extends ClientImpl implements Producer 
{
         return 
SendMessageRequest.newBuilder().addAllMessages(messages).build();
     }
 
-    ListenableFuture<List<SendReceiptImpl>> send0(Metadata metadata, Endpoints 
endpoints,
-        List<PublishingMessageImpl> pubMessages, MessageQueueImpl mq) {
+    ListenableFuture<List<SendReceiptImpl>> send0(Endpoints endpoints, 
List<PublishingMessageImpl> pubMessages,
+        MessageQueueImpl mq) {
         final SendMessageRequest request = wrapSendMessageRequest(pubMessages, 
mq);
         final RpcFuture<SendMessageRequest, SendMessageResponse> future0 =
-            this.getClientManager().sendMessage(endpoints, metadata, request, 
clientConfiguration.getRequestTimeout());
+            this.getClientManager().sendMessage(endpoints, request, 
clientConfiguration.getRequestTimeout());
         return Futures.transformAsync(future0,
             response -> 
Futures.immediateFuture(SendReceiptImpl.processResponseInvocation(mq, response, 
future0)),
             MoreExecutors.directExecutor());
@@ -437,14 +430,6 @@ class ProducerImpl extends ClientImpl implements Producer {
 
     private void send0(SettableFuture<List<SendReceiptImpl>> future0, String 
topic, MessageType messageType,
         final List<MessageQueueImpl> candidates, final 
List<PublishingMessageImpl> messages, final int attempt) {
-        Metadata metadata;
-        try {
-            metadata = sign();
-        } catch (Throwable t) {
-            // Failed to sign, no need to proceed.
-            future0.setException(t);
-            return;
-        }
         // Calculate the current message queue.
         final MessageQueueImpl mq = candidates.get(IntMath.mod(attempt - 1, 
candidates.size()));
         final List<MessageType> acceptMessageTypes = 
mq.getAcceptMessageTypes();
@@ -456,7 +441,7 @@ class ProducerImpl extends ClientImpl implements Producer {
             return;
         }
         final Endpoints endpoints = mq.getBroker().getEndpoints();
-        final ListenableFuture<List<SendReceiptImpl>> future = send0(metadata, 
endpoints, messages, mq);
+        final ListenableFuture<List<SendReceiptImpl>> future = 
send0(endpoints, messages, mq);
         final int maxAttempts = this.getRetryPolicy().getMaxAttempts();
 
         // Intercept before message publishing.
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
index a3e8666..cb348ab 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
@@ -27,7 +27,6 @@ import apache.rocketmq.v2.QueryAssignmentRequest;
 import apache.rocketmq.v2.QueryRouteRequest;
 import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.SendMessageRequest;
-import io.grpc.Metadata;
 import java.time.Duration;
 import org.apache.rocketmq.client.java.misc.ClientId;
 import org.apache.rocketmq.client.java.tool.TestBase;
@@ -56,91 +55,81 @@ public class ClientManagerImplTest extends TestBase {
 
     @Test
     public void testQueryRoute() {
-        Metadata metadata = new Metadata();
         QueryRouteRequest request = QueryRouteRequest.newBuilder().build();
-        CLIENT_MANAGER.queryRoute(fakeEndpoints(), metadata, request, 
Duration.ofSeconds(1));
-        CLIENT_MANAGER.queryRoute(null, metadata, request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.queryRoute(fakeEndpoints(), request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.queryRoute(null, request, Duration.ofSeconds(1));
         // Expect no exception thrown.
     }
 
     @Test
     public void testHeartbeat() {
-        Metadata metadata = new Metadata();
         HeartbeatRequest request = HeartbeatRequest.newBuilder().build();
-        CLIENT_MANAGER.heartbeat(fakeEndpoints(), metadata, request, 
Duration.ofSeconds(1));
-        CLIENT_MANAGER.heartbeat(null, metadata, request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.heartbeat(fakeEndpoints(), request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.heartbeat(null, request, Duration.ofSeconds(1));
         // Expect no exception thrown.
     }
 
     @Test
     public void testSendMessage() {
-        Metadata metadata = new Metadata();
         SendMessageRequest request = SendMessageRequest.newBuilder().build();
-        CLIENT_MANAGER.sendMessage(fakeEndpoints(), metadata, request, 
Duration.ofSeconds(1));
-        CLIENT_MANAGER.sendMessage(null, metadata, request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.sendMessage(fakeEndpoints(), request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.sendMessage(null, request, Duration.ofSeconds(1));
         // Expect no exception thrown.
     }
 
     @Test
     public void testQueryAssignment() {
-        Metadata metadata = new Metadata();
         QueryAssignmentRequest request = 
QueryAssignmentRequest.newBuilder().build();
-        CLIENT_MANAGER.queryAssignment(fakeEndpoints(), metadata, request, 
Duration.ofSeconds(1));
-        CLIENT_MANAGER.queryAssignment(null, metadata, request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.queryAssignment(fakeEndpoints(), request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.queryAssignment(null, request, Duration.ofSeconds(1));
         // Expect no exception thrown.
     }
 
     @Test
     public void testReceiveMessage() {
-        Metadata metadata = new Metadata();
         ReceiveMessageRequest request = 
ReceiveMessageRequest.newBuilder().build();
-        CLIENT_MANAGER.receiveMessage(fakeEndpoints(), metadata, request, 
Duration.ofSeconds(1));
-        CLIENT_MANAGER.receiveMessage(null, metadata, request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.receiveMessage(fakeEndpoints(), request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.receiveMessage(null, request, Duration.ofSeconds(1));
         // Expect no exception thrown.
     }
 
     @Test
     public void testAckMessage() {
-        Metadata metadata = new Metadata();
         AckMessageRequest request = AckMessageRequest.newBuilder().build();
-        CLIENT_MANAGER.ackMessage(fakeEndpoints(), metadata, request, 
Duration.ofSeconds(1));
-        CLIENT_MANAGER.ackMessage(null, metadata, request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.ackMessage(fakeEndpoints(), request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.ackMessage(null, request, Duration.ofSeconds(1));
         // Expect no exception thrown.
     }
 
     @Test
     public void testChangeInvisibleDuration() {
-        Metadata metadata = new Metadata();
         ChangeInvisibleDurationRequest request = 
ChangeInvisibleDurationRequest.newBuilder().build();
-        CLIENT_MANAGER.changeInvisibleDuration(fakeEndpoints(), metadata, 
request, Duration.ofSeconds(1));
-        CLIENT_MANAGER.changeInvisibleDuration(null, metadata, request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.changeInvisibleDuration(fakeEndpoints(), request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.changeInvisibleDuration(null, request, 
Duration.ofSeconds(1));
         // Expect no exception thrown.
     }
 
     @Test
     public void testForwardMessageToDeadLetterQueue() {
-        Metadata metadata = new Metadata();
         ForwardMessageToDeadLetterQueueRequest request = 
ForwardMessageToDeadLetterQueueRequest.newBuilder().build();
-        CLIENT_MANAGER.forwardMessageToDeadLetterQueue(fakeEndpoints(), 
metadata, request, Duration.ofSeconds(1));
-        CLIENT_MANAGER.forwardMessageToDeadLetterQueue(null, metadata, 
request, Duration.ofSeconds(1));
+        CLIENT_MANAGER.forwardMessageToDeadLetterQueue(fakeEndpoints(), 
request, Duration.ofSeconds(1));
+        CLIENT_MANAGER.forwardMessageToDeadLetterQueue(null, request, 
Duration.ofSeconds(1));
         // Expect no exception thrown.
     }
 
     @Test
     public void testEndTransaction() {
-        Metadata metadata = new Metadata();
         EndTransactionRequest request = 
EndTransactionRequest.newBuilder().build();
-        CLIENT_MANAGER.endTransaction(fakeEndpoints(), metadata, request, 
Duration.ofSeconds(1));
-        CLIENT_MANAGER.endTransaction(null, metadata, request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.endTransaction(fakeEndpoints(), request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.endTransaction(null, request, Duration.ofSeconds(1));
         // Expect no exception thrown.
     }
 
     @Test
     public void testNotifyClientTermination() {
-        Metadata metadata = new Metadata();
         NotifyClientTerminationRequest request = 
NotifyClientTerminationRequest.newBuilder().build();
-        CLIENT_MANAGER.notifyClientTermination(fakeEndpoints(), metadata, 
request, Duration.ofSeconds(1));
-        CLIENT_MANAGER.notifyClientTermination(null, metadata, request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.notifyClientTermination(fakeEndpoints(), request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.notifyClientTermination(null, request, 
Duration.ofSeconds(1));
         // Expect no exception thrown.
     }
 }
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
index a7ae86a..619cd7d 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
@@ -26,7 +26,6 @@ import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
 import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.ReceiveMessageResponse;
 import com.google.common.util.concurrent.ListenableFuture;
-import io.grpc.Metadata;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
@@ -68,7 +67,7 @@ public class ConsumerImplTest extends TestBase {
         final RpcFuture<ReceiveMessageRequest, List<ReceiveMessageResponse>> 
future =
             okReceiveMessageResponsesFuture(FAKE_TOPIC_0, 
receivedMessageCount);
         future.get();
-        
Mockito.doReturn(future).when(clientManager).receiveMessage(any(Endpoints.class),
 any(Metadata.class),
+        
Mockito.doReturn(future).when(clientManager).receiveMessage(any(Endpoints.class),
             any(ReceiveMessageRequest.class), any(Duration.class));
         final MessageQueueImpl mq = fakeMessageQueueImpl(FAKE_TOPIC_0);
         final ReceiveMessageRequest request = 
pushConsumer.wrapReceiveMessageRequest(1,
@@ -91,7 +90,7 @@ public class ConsumerImplTest extends TestBase {
         Mockito.doReturn(clientManager).when(pushConsumer).getClientManager();
         final RpcFuture<AckMessageRequest, AckMessageResponse> future =
             okAckMessageResponseFuture();
-        
Mockito.doReturn(future).when(clientManager).ackMessage(any(Endpoints.class), 
any(Metadata.class),
+        
Mockito.doReturn(future).when(clientManager).ackMessage(any(Endpoints.class),
             any(AckMessageRequest.class), any(Duration.class));
         final MessageViewImpl messageView = fakeMessageViewImpl();
         final RpcFuture<AckMessageRequest, AckMessageResponse> future0 =
@@ -112,7 +111,7 @@ public class ConsumerImplTest extends TestBase {
         Mockito.doReturn(clientManager).when(pushConsumer).getClientManager();
         final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> future =
             okChangeInvisibleDurationCtxFuture();
-        
Mockito.doReturn(future).when(clientManager).changeInvisibleDuration(any(Endpoints.class),
 any(Metadata.class),
+        
Mockito.doReturn(future).when(clientManager).changeInvisibleDuration(any(Endpoints.class),
             any(ChangeInvisibleDurationRequest.class), any(Duration.class));
         final MessageViewImpl messageView = fakeMessageViewImpl();
         final RpcFuture<ChangeInvisibleDurationRequest, 
ChangeInvisibleDurationResponse> future0 =
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
index b4dae64..60bb0fe 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
@@ -30,7 +30,6 @@ import apache.rocketmq.v2.Permission;
 import apache.rocketmq.v2.Resource;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Service;
-import io.grpc.Metadata;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -91,9 +90,9 @@ public class ProducerImplTest extends TestBase {
         final MessageQueueImpl messageQueue = 
fakeMessageQueueImpl(FAKE_TOPIC_0);
         final SendReceiptImpl sendReceiptImpl = 
fakeSendReceiptImpl(messageQueue);
         
Mockito.doReturn(Futures.immediateFuture(Collections.singletonList(sendReceiptImpl)))
-            .when(producer).send0(any(Metadata.class), any(Endpoints.class), 
anyList(), any(MessageQueueImpl.class));
+            .when(producer).send0(any(Endpoints.class), anyList(), 
any(MessageQueueImpl.class));
         producer.send(message);
-        verify(producer, times(1)).send0(any(Metadata.class), 
any(Endpoints.class), anyList(),
+        verify(producer, times(1)).send0(any(Endpoints.class), anyList(),
             any(MessageQueueImpl.class));
         producer.close();
     }
@@ -104,11 +103,10 @@ public class ProducerImplTest extends TestBase {
         final Message message = fakeMessage(FAKE_TOPIC_0);
         final Exception exception = new IllegalArgumentException();
         Mockito.doReturn(Futures.immediateFailedFuture(exception))
-            .when(producer).send0(any(Metadata.class), any(Endpoints.class), 
anyList(), any(MessageQueueImpl.class));
+            .when(producer).send0(any(Endpoints.class), anyList(), 
any(MessageQueueImpl.class));
         producer.send(message);
         final int maxAttempts = 
producer.publishingSettings.getRetryPolicy().getMaxAttempts();
-        verify(producer, times(maxAttempts)).send0(any(Metadata.class), 
any(Endpoints.class), anyList(),
-            any(MessageQueueImpl.class));
+        verify(producer, times(maxAttempts)).send0(any(Endpoints.class), 
anyList(), any(MessageQueueImpl.class));
         producer.close();
     }
 }
\ No newline at end of file

Reply via email to