This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 4581087 Generate client's signature in ClientManager (#190)
4581087 is described below
commit 45810875ddff3e25de2e9810bd40c0b84e4397e6
Author: Aaron Ai <[email protected]>
AuthorDate: Sat Aug 27 17:49:32 2022 +0800
Generate client's signature in ClientManager (#190)
---
.../apache/rocketmq/client/java/impl/Client.java | 6 ++
.../rocketmq/client/java/impl/ClientImpl.java | 79 +++++++++----------
.../rocketmq/client/java/impl/ClientManager.java | 52 +++++-------
.../client/java/impl/ClientManagerImpl.java | 92 +++++++++++++---------
.../client/java/impl/consumer/ConsumerImpl.java | 20 ++---
.../java/impl/consumer/PushConsumerImpl.java | 17 ++--
.../client/java/impl/producer/ProducerImpl.java | 25 ++----
.../client/java/impl/ClientManagerImplTest.java | 51 +++++-------
.../java/impl/consumer/ConsumerImplTest.java | 7 +-
.../java/impl/producer/ProducerImplTest.java | 10 +--
10 files changed, 161 insertions(+), 198 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
index 08147c0..d5bb57d 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.java.impl;
+import io.grpc.Metadata;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.apache.rocketmq.client.java.route.Endpoints;
@@ -28,6 +29,11 @@ public interface Client {
*/
ClientId getClientId();
+ /**
+ * @return signature for tls
+ */
+ Metadata sign() throws Exception;
+
/**
* Send heart beat to remote {@link Endpoints}.
*/
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 4f39801..d48ee74 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -62,6 +62,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
@@ -260,8 +261,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
public StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints,
StreamObserver<TelemetryCommand> observer) throws ClientException {
try {
- final Metadata metadata = this.sign();
- return clientManager.telemetry(endpoints, metadata,
TELEMETRY_TIMEOUT, observer);
+ return clientManager.telemetry(endpoints, TELEMETRY_TIMEOUT,
observer);
} catch (ClientException e) {
throw e;
} catch (Throwable t) {
@@ -384,27 +384,21 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
/**
* Triggered when {@link TopicRouteData} is fetched from remote.
- *
- * <p>Never thrown any exception.
*/
- public ListenableFuture<TopicRouteData> onTopicRouteDataFetched(String
topic, TopicRouteData topicRouteData) {
+ public void onTopicRouteDataFetched(String topic, TopicRouteData
topicRouteData) throws ClientException,
+ ExecutionException, InterruptedException, TimeoutException {
final Set<Endpoints> routeEndpoints = topicRouteData
.getMessageQueues().stream()
.map(mq -> mq.getBroker().getEndpoints())
.collect(Collectors.toSet());
final Set<Endpoints> existRouteEndpoints = getTotalRouteEndpoints();
final Set<Endpoints> newEndpoints = new
HashSet<>(Sets.difference(routeEndpoints, existRouteEndpoints));
- try {
- for (Endpoints endpoints : newEndpoints) {
- final ClientSessionImpl clientSession =
getClientSession(endpoints);
- clientSession.syncSettings();
- }
- topicRouteCache.put(topic, topicRouteData);
- onTopicRouteDataUpdate0(topic, topicRouteData);
- return Futures.immediateFuture(topicRouteData);
- } catch (Throwable t) {
- return Futures.immediateFailedFuture(t);
+ for (Endpoints endpoints : newEndpoints) {
+ final ClientSessionImpl clientSession =
getClientSession(endpoints);
+ clientSession.syncSettings();
}
+ topicRouteCache.put(topic, topicRouteData);
+ onTopicRouteDataUpdate0(topic, topicRouteData);
}
public void onTopicRouteDataUpdate0(String topic, TopicRouteData
topicRouteData) {
@@ -477,13 +471,13 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
final Set<Endpoints> routeEndpointsSet = getTotalRouteEndpoints();
final NotifyClientTerminationRequest notifyClientTerminationRequest =
wrapNotifyClientTerminationRequest();
try {
- final Metadata metadata = sign();
for (Endpoints endpoints : routeEndpointsSet) {
- clientManager.notifyClientTermination(endpoints, metadata,
notifyClientTerminationRequest,
+ clientManager.notifyClientTermination(endpoints,
notifyClientTerminationRequest,
clientConfiguration.getRequestTimeout());
}
} catch (Throwable t) {
- LOGGER.error("Exception raised while notifying client's
termination, clientId={}", clientId, t);
+ // Should never reach here.
+ LOGGER.error("[Bug] Exception raised while notifying client's
termination, clientId={}", clientId, t);
}
}
@@ -514,7 +508,8 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
/**
* Real-time signature generation
*/
- protected Metadata sign() throws NoSuchAlgorithmException,
InvalidKeyException {
+ @Override
+ public Metadata sign() throws NoSuchAlgorithmException,
InvalidKeyException {
return Signature.sign(clientConfiguration, clientId);
}
@@ -526,8 +521,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
*/
private void doHeartbeat(HeartbeatRequest request, final Endpoints
endpoints) {
try {
- Metadata metadata = sign();
- final RpcFuture<HeartbeatRequest, HeartbeatResponse> future =
clientManager.heartbeat(endpoints, metadata,
+ final RpcFuture<HeartbeatRequest, HeartbeatResponse> future =
clientManager.heartbeat(endpoints,
request, clientConfiguration.getRequestTimeout());
Futures.addCallback(future, new
FutureCallback<HeartbeatResponse>() {
@Override
@@ -552,9 +546,10 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
LOGGER.warn("Failed to send heartbeat, endpoints={},
clientId={}", endpoints, clientId, t);
}
}, MoreExecutors.directExecutor());
- } catch (Throwable e) {
- LOGGER.error("Exception raised while preparing heartbeat,
endpoints={}, clientId={}", endpoints, clientId,
- e);
+ } catch (Throwable t) {
+ // Should never reach here.
+ LOGGER.error("[Bug] Exception raised while preparing heartbeat,
endpoints={}, clientId={}", endpoints,
+ clientId, t);
}
}
@@ -571,8 +566,11 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
}
private ListenableFuture<TopicRouteData> fetchTopicRoute(final String
topic) {
- final ListenableFuture<TopicRouteData> future =
Futures.transformAsync(fetchTopicRoute0(topic),
- topicRouteData -> onTopicRouteDataFetched(topic, topicRouteData),
MoreExecutors.directExecutor());
+ final ListenableFuture<TopicRouteData> future0 =
fetchTopicRoute0(topic);
+ final ListenableFuture<TopicRouteData> future =
Futures.transformAsync(future0, topicRouteData -> {
+ onTopicRouteDataFetched(topic, topicRouteData);
+ return Futures.immediateFuture(topicRouteData);
+ }, MoreExecutors.directExecutor());
Futures.addCallback(future, new FutureCallback<TopicRouteData>() {
@Override
public void onSuccess(TopicRouteData topicRouteData) {
@@ -589,23 +587,18 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
}
protected ListenableFuture<TopicRouteData> fetchTopicRoute0(final String
topic) {
- try {
- Resource topicResource =
Resource.newBuilder().setName(topic).build();
- final QueryRouteRequest request =
QueryRouteRequest.newBuilder().setTopic(topicResource)
- .setEndpoints(endpoints.toProtobuf()).build();
- final Metadata metadata = sign();
- final RpcFuture<QueryRouteRequest, QueryRouteResponse> future =
- clientManager.queryRoute(endpoints, metadata, request,
clientConfiguration.getRequestTimeout());
- return Futures.transformAsync(future, response -> {
- final Status status = response.getStatus();
- StatusChecker.check(status, future);
- final List<MessageQueue> messageQueuesList =
response.getMessageQueuesList();
- final TopicRouteData topicRouteData = new
TopicRouteData(messageQueuesList);
- return Futures.immediateFuture(topicRouteData);
- }, MoreExecutors.directExecutor());
- } catch (Throwable t) {
- return Futures.immediateFailedFuture(t);
- }
+ Resource topicResource = Resource.newBuilder().setName(topic).build();
+ final QueryRouteRequest request =
QueryRouteRequest.newBuilder().setTopic(topicResource)
+ .setEndpoints(endpoints.toProtobuf()).build();
+ final RpcFuture<QueryRouteRequest, QueryRouteResponse> future =
+ clientManager.queryRoute(endpoints, request,
clientConfiguration.getRequestTimeout());
+ return Futures.transformAsync(future, response -> {
+ final Status status = response.getStatus();
+ StatusChecker.check(status, future);
+ final List<MessageQueue> messageQueuesList =
response.getMessageQueuesList();
+ final TopicRouteData topicRouteData = new
TopicRouteData(messageQueuesList);
+ return Futures.immediateFuture(topicRouteData);
+ }, MoreExecutors.directExecutor());
}
protected Set<Endpoints> getTotalRouteEndpoints() {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
index 39db0e0..468712b 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
@@ -39,7 +39,6 @@ import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.TelemetryCommand;
import com.google.common.util.concurrent.AbstractIdleService;
-import io.grpc.Metadata;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.util.List;
@@ -67,134 +66,125 @@ public abstract class ClientManager extends
AbstractIdleService {
* Query topic route asynchronously, the method ensures no throwable.
*
* @param endpoints requested endpoints.
- * @param metadata gRPC request header metadata.
* @param request query route request.
* @param duration request max duration.
* @return invocation of response future.
*/
- public abstract RpcFuture<QueryRouteRequest, QueryRouteResponse>
- queryRoute(Endpoints endpoints, Metadata metadata, QueryRouteRequest
request, Duration duration);
+ public abstract RpcFuture<QueryRouteRequest, QueryRouteResponse>
queryRoute(Endpoints endpoints,
+ QueryRouteRequest request, Duration duration);
/**
* Heart beat asynchronously, the method ensures no throwable.
*
* @param endpoints requested endpoints.
- * @param metadata gRPC request header metadata.
* @param request heartbeat request.
* @param duration request max duration.
* @return invocation of response future.
*/
- public abstract RpcFuture<HeartbeatRequest, HeartbeatResponse>
- heartbeat(Endpoints endpoints, Metadata metadata, HeartbeatRequest
request, Duration duration);
+ public abstract RpcFuture<HeartbeatRequest, HeartbeatResponse>
heartbeat(Endpoints endpoints,
+ HeartbeatRequest request, Duration duration);
/**
* Send message asynchronously, the method ensures no throwable.
*
* @param endpoints requested endpoints.
- * @param metadata gRPC request header metadata.
* @param request send message request.
* @param duration request max duration.
* @return invocation of response future.
*/
- public abstract RpcFuture<SendMessageRequest, SendMessageResponse>
- sendMessage(Endpoints endpoints, Metadata metadata, SendMessageRequest
request, Duration duration);
+ public abstract RpcFuture<SendMessageRequest, SendMessageResponse>
sendMessage(Endpoints endpoints,
+ SendMessageRequest request, Duration duration);
/**
* Query assignment asynchronously, the method ensures no throwable.
*
* @param endpoints requested endpoints.
- * @param metadata gRPC request header metadata.
* @param request query assignment request.
* @param duration request max duration.
* @return invocation of response future.
*/
- public abstract RpcFuture<QueryAssignmentRequest, QueryAssignmentResponse>
- queryAssignment(Endpoints endpoints, Metadata metadata,
QueryAssignmentRequest request, Duration duration);
+ public abstract RpcFuture<QueryAssignmentRequest, QueryAssignmentResponse>
queryAssignment(Endpoints endpoints,
+ QueryAssignmentRequest request, Duration duration);
/**
* Receiving messages asynchronously from the server, the method ensures
no throwable.
*
* @param endpoints requested endpoints.
- * @param metadata gRPC request header metadata.
+ * @param request receive message request.
+ * @param duration request max duration.
* @return invocation of response future.
*/
- public abstract RpcFuture<ReceiveMessageRequest,
List<ReceiveMessageResponse>>
- receiveMessage(Endpoints endpoints, Metadata metadata,
ReceiveMessageRequest request, Duration duration);
+ public abstract RpcFuture<ReceiveMessageRequest,
List<ReceiveMessageResponse>> receiveMessage(Endpoints endpoints,
+ ReceiveMessageRequest request, Duration duration);
/**
* Ack message asynchronously after the success of consumption, the method
ensures no throwable.
*
* @param endpoints requested endpoints.
- * @param metadata gRPC request header metadata.
* @param request ack message request.
* @param duration request max duration.
* @return invocation of response future.
*/
- public abstract RpcFuture<AckMessageRequest, AckMessageResponse>
- ackMessage(Endpoints endpoints, Metadata metadata, AckMessageRequest
request, Duration duration);
+ public abstract RpcFuture<AckMessageRequest, AckMessageResponse>
ackMessage(Endpoints endpoints,
+ AckMessageRequest request, Duration duration);
/**
* Nack message asynchronously after the failure of consumption, the
method ensures no throwable.
*
* @param endpoints requested endpoints.
- * @param metadata gRPC request header metadata.
* @param request nack message request.
* @param duration request max duration.
* @return invocation of response future.
*/
public abstract RpcFuture<ChangeInvisibleDurationRequest,
ChangeInvisibleDurationResponse>
- changeInvisibleDuration(Endpoints endpoints, Metadata metadata,
ChangeInvisibleDurationRequest request,
+ changeInvisibleDuration(Endpoints endpoints,
ChangeInvisibleDurationRequest request,
Duration duration);
/**
* Send a message to the dead letter queue asynchronously, the method
ensures no throwable.
*
* @param endpoints requested endpoints.
- * @param metadata gRPC request header metadata.
* @param request request of sending a message to DLQ.
* @param duration request max duration.
* @return invocation of response future.
*/
public abstract RpcFuture<ForwardMessageToDeadLetterQueueRequest,
ForwardMessageToDeadLetterQueueResponse>
forwardMessageToDeadLetterQueue(Endpoints endpoints,
- Metadata metadata, ForwardMessageToDeadLetterQueueRequest request,
Duration duration);
+ ForwardMessageToDeadLetterQueueRequest request, Duration duration);
/**
* Submit transaction resolution asynchronously, the method ensures no
throwable.
*
* @param endpoints requested endpoints.
- * @param metadata gRPC request header metadata.
* @param request end transaction request.
* @param duration request max duration.
* @return invocation of response future.
*/
- public abstract RpcFuture<EndTransactionRequest, EndTransactionResponse>
- endTransaction(Endpoints endpoints, Metadata metadata,
EndTransactionRequest request, Duration duration);
+ public abstract RpcFuture<EndTransactionRequest, EndTransactionResponse>
endTransaction(Endpoints endpoints,
+ EndTransactionRequest request, Duration duration);
/**
* Asynchronously notify the server that client is terminated, the method
ensures no throwable.
*
* @param endpoints request endpoints.
- * @param metadata gRPC request header metadata.
* @param request notify client termination request.
* @param duration request max duration.
* @return response future of notification of client termination.
*/
@SuppressWarnings("UnusedReturnValue")
public abstract RpcFuture<NotifyClientTerminationRequest,
NotifyClientTerminationResponse>
- notifyClientTermination(Endpoints endpoints, Metadata metadata,
NotifyClientTerminationRequest request,
+ notifyClientTermination(Endpoints endpoints,
NotifyClientTerminationRequest request,
Duration duration);
/**
* Establish telemetry session stream to server.
*
* @param endpoints request endpoints.
- * @param metadata gRPC request header metadata.
* @param duration stream max duration.
* @param responseObserver response observer.
* @return request observer.
* @throws ClientException if failed to establish telemetry session stream.
*/
- public abstract StreamObserver<TelemetryCommand> telemetry(Endpoints
endpoints, Metadata metadata,
- Duration duration, StreamObserver<TelemetryCommand> responseObserver)
throws ClientException;
+ public abstract StreamObserver<TelemetryCommand> telemetry(Endpoints
endpoints, Duration duration,
+ StreamObserver<TelemetryCommand> responseObserver) throws
ClientException;
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index 3b84ac8..49ab080 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -38,7 +38,6 @@ import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.TelemetryCommand;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Metadata;
@@ -59,6 +58,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLException;
import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.exception.InternalErrorException;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.apache.rocketmq.client.java.misc.ExecutorServices;
import org.apache.rocketmq.client.java.misc.MetadataUtils;
@@ -194,152 +194,167 @@ public class ClientManagerImpl extends ClientManager {
}
@Override
- public RpcFuture<QueryRouteRequest, QueryRouteResponse>
queryRoute(Endpoints endpoints, Metadata metadata,
- QueryRouteRequest request, Duration duration) {
- final Context context = new Context(endpoints, metadata);
+ public RpcFuture<QueryRouteRequest, QueryRouteResponse>
queryRoute(Endpoints endpoints, QueryRouteRequest request,
+ Duration duration) {
try {
+ final Metadata metadata = client.sign();
+ final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
final ListenableFuture<QueryRouteResponse> future =
rpcClient.queryRoute(metadata, request, asyncWorker,
duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
- return new RpcFuture<>(context, request,
Futures.immediateFailedFuture(t));
+ return new RpcFuture<>(t);
}
}
@Override
- public RpcFuture<HeartbeatRequest, HeartbeatResponse> heartbeat(Endpoints
endpoints, Metadata metadata,
- HeartbeatRequest request, Duration duration) {
- final Context context = new Context(endpoints, metadata);
+ public RpcFuture<HeartbeatRequest, HeartbeatResponse> heartbeat(Endpoints
endpoints, HeartbeatRequest request,
+ Duration duration) {
try {
+ final Metadata metadata = client.sign();
+ final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
ListenableFuture<HeartbeatResponse> future =
rpcClient.heartbeat(metadata, request, asyncWorker, duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
- return new RpcFuture<>(context, request,
Futures.immediateFailedFuture(t));
+ return new RpcFuture<>(t);
}
}
@Override
- public RpcFuture<SendMessageRequest, SendMessageResponse>
sendMessage(Endpoints endpoints, Metadata metadata,
+ public RpcFuture<SendMessageRequest, SendMessageResponse>
sendMessage(Endpoints endpoints,
SendMessageRequest request, Duration duration) {
- final Context context = new Context(endpoints, metadata);
try {
+ final Metadata metadata = client.sign();
+ final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
final ListenableFuture<SendMessageResponse> future =
rpcClient.sendMessage(metadata, request, asyncWorker,
duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
- return new RpcFuture<>(context, request,
Futures.immediateFailedFuture(t));
+ return new RpcFuture<>(t);
}
}
@Override
public RpcFuture<QueryAssignmentRequest, QueryAssignmentResponse>
queryAssignment(Endpoints endpoints,
- Metadata metadata, QueryAssignmentRequest request, Duration duration) {
- final Context context = new Context(endpoints, metadata);
+ QueryAssignmentRequest request, Duration duration) {
try {
+ final Metadata metadata = client.sign();
+ final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
final ListenableFuture<QueryAssignmentResponse> future =
rpcClient.queryAssignment(metadata, request, asyncWorker,
duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
- return new RpcFuture<>(context, request,
Futures.immediateFailedFuture(t));
+ return new RpcFuture<>(t);
}
}
@Override
public RpcFuture<ReceiveMessageRequest, List<ReceiveMessageResponse>>
receiveMessage(Endpoints endpoints,
- Metadata metadata, ReceiveMessageRequest request, Duration duration) {
- final Context context = new Context(endpoints, metadata);
+ ReceiveMessageRequest request, Duration duration) {
try {
+ final Metadata metadata = client.sign();
+ final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
final ListenableFuture<List<ReceiveMessageResponse>> future =
rpcClient.receiveMessage(metadata, request, asyncWorker,
duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
- return new RpcFuture<>(context, request,
Futures.immediateFailedFuture(t));
+ return new RpcFuture<>(t);
}
}
@Override
- public RpcFuture<AckMessageRequest, AckMessageResponse>
ackMessage(Endpoints endpoints, Metadata metadata,
- AckMessageRequest request, Duration duration) {
- final Context context = new Context(endpoints, metadata);
+ public RpcFuture<AckMessageRequest, AckMessageResponse>
ackMessage(Endpoints endpoints, AckMessageRequest request,
+ Duration duration) {
try {
+ final Metadata metadata = client.sign();
+ final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
final ListenableFuture<AckMessageResponse> future =
rpcClient.ackMessage(metadata, request, asyncWorker, duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
- return new RpcFuture<>(context, request,
Futures.immediateFailedFuture(t));
+ return new RpcFuture<>(t);
}
}
@Override
public RpcFuture<ChangeInvisibleDurationRequest,
ChangeInvisibleDurationResponse>
- changeInvisibleDuration(Endpoints endpoints, Metadata metadata,
ChangeInvisibleDurationRequest request,
+ changeInvisibleDuration(Endpoints endpoints,
ChangeInvisibleDurationRequest request,
Duration duration) {
- final Context context = new Context(endpoints, metadata);
try {
+ final Metadata metadata = client.sign();
+ final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
final ListenableFuture<ChangeInvisibleDurationResponse> future =
rpcClient.changeInvisibleDuration(metadata, request,
asyncWorker, duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
- return new RpcFuture<>(context, request,
Futures.immediateFailedFuture(t));
+ return new RpcFuture<>(t);
}
}
@Override
public RpcFuture<ForwardMessageToDeadLetterQueueRequest,
ForwardMessageToDeadLetterQueueResponse>
- forwardMessageToDeadLetterQueue(Endpoints endpoints, Metadata metadata,
- ForwardMessageToDeadLetterQueueRequest request, Duration duration) {
- final Context context = new Context(endpoints, metadata);
+ forwardMessageToDeadLetterQueue(Endpoints endpoints,
ForwardMessageToDeadLetterQueueRequest request,
+ Duration duration) {
try {
+ final Metadata metadata = client.sign();
+ final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
final ListenableFuture<ForwardMessageToDeadLetterQueueResponse>
future =
rpcClient.forwardMessageToDeadLetterQueue(metadata, request,
asyncWorker, duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
- return new RpcFuture<>(context, request,
Futures.immediateFailedFuture(t));
+ return new RpcFuture<>(t);
}
}
@Override
public RpcFuture<EndTransactionRequest, EndTransactionResponse>
endTransaction(Endpoints endpoints,
- Metadata metadata, EndTransactionRequest request, Duration duration) {
- final Context context = new Context(endpoints, metadata);
+ EndTransactionRequest request, Duration duration) {
try {
+ final Metadata metadata = client.sign();
+ final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
final ListenableFuture<EndTransactionResponse> future =
rpcClient.endTransaction(metadata, request, asyncWorker,
duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
- return new RpcFuture<>(context, request,
Futures.immediateFailedFuture(t));
+ return new RpcFuture<>(t);
}
}
@Override
public RpcFuture<NotifyClientTerminationRequest,
NotifyClientTerminationResponse>
- notifyClientTermination(Endpoints endpoints, Metadata metadata,
NotifyClientTerminationRequest request,
+ notifyClientTermination(Endpoints endpoints,
NotifyClientTerminationRequest request,
Duration duration) {
- final Context context = new Context(endpoints, metadata);
try {
+ final Metadata metadata = client.sign();
+ final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
final ListenableFuture<NotifyClientTerminationResponse> future =
rpcClient.notifyClientTermination(metadata, request,
asyncWorker, duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
- return new RpcFuture<>(context, request,
Futures.immediateFailedFuture(t));
+ return new RpcFuture<>(t);
}
}
@Override
- public StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints,
Metadata metadata, Duration duration,
+ public StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints,
Duration duration,
StreamObserver<TelemetryCommand> responseObserver) throws
ClientException {
- final RpcClient rpcClient = getRpcClient(endpoints);
- return rpcClient.telemetry(metadata, asyncWorker, duration,
responseObserver);
+ try {
+ final Metadata metadata = client.sign();
+ final RpcClient rpcClient = getRpcClient(endpoints);
+ return rpcClient.telemetry(metadata, asyncWorker, duration,
responseObserver);
+ } catch (Throwable t) {
+ throw new InternalErrorException(t);
+ }
}
@Override
@@ -444,6 +459,7 @@ public class ClientManagerImpl extends ClientManager {
LOGGER.info("Shutdown the client manager successfully, clientId={}",
clientId);
}
+ @SuppressWarnings("NullableProblems")
@Override
protected String serviceName() {
return super.serviceName() + "-" + client.getClientId().getIndex();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 1a2bf16..bb06b97 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -38,7 +38,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Durations;
import com.google.protobuf.util.Timestamps;
-import io.grpc.Metadata;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
@@ -79,13 +78,12 @@ abstract class ConsumerImpl extends ClientImpl {
MessageQueueImpl mq, Duration awaitDuration) {
List<MessageViewImpl> messages = new ArrayList<>();
try {
- Metadata metadata = sign();
final Endpoints endpoints = mq.getBroker().getEndpoints();
final Duration tolerance = clientConfiguration.getRequestTimeout();
final Duration timeout = Duration.ofNanos(awaitDuration.toNanos()
+ tolerance.toNanos());
final ClientManager clientManager = this.getClientManager();
final RpcFuture<ReceiveMessageRequest,
List<ReceiveMessageResponse>> future =
- clientManager.receiveMessage(endpoints, metadata, request,
timeout);
+ clientManager.receiveMessage(endpoints, request, timeout);
return Futures.transformAsync(future, responses -> {
Status status =
Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR)
.setMessage("status was not set by server")
@@ -118,6 +116,8 @@ abstract class ConsumerImpl extends ClientImpl {
return Futures.immediateFuture(receiveMessageResult);
}, MoreExecutors.directExecutor());
} catch (Throwable t) {
+ // Should never reach here.
+ LOGGER.error("[Bug] Exception raised during message receiving,
mq={}, clientId={}", mq, clientId, t);
return Futures.immediateFailedFuture(t);
}
}
@@ -150,9 +150,8 @@ abstract class ConsumerImpl extends ClientImpl {
doBefore(context, generalMessages);
try {
final AckMessageRequest request =
wrapAckMessageRequest(messageView);
- final Metadata metadata = sign();
final Duration requestTimeout =
clientConfiguration.getRequestTimeout();
- future = this.getClientManager().ackMessage(endpoints, metadata,
request, requestTimeout);
+ future = this.getClientManager().ackMessage(endpoints, request,
requestTimeout);
} catch (Throwable t) {
future = new RpcFuture<>(t);
}
@@ -185,14 +184,9 @@ abstract class ConsumerImpl extends ClientImpl {
final MessageHandlerContextImpl context =
new
MessageHandlerContextImpl(MessageHookPoints.CHANGE_INVISIBLE_DURATION);
doBefore(context, generalMessages);
- try {
- final Metadata metadata = sign();
- final ChangeInvisibleDurationRequest request =
wrapChangeInvisibleDuration(messageView, invisibleDuration);
- final Duration requestTimeout =
clientConfiguration.getRequestTimeout();
- future =
this.getClientManager().changeInvisibleDuration(endpoints, metadata, request,
requestTimeout);
- } catch (Throwable t) {
- future = new RpcFuture<>(t);
- }
+ final ChangeInvisibleDurationRequest request =
wrapChangeInvisibleDuration(messageView, invisibleDuration);
+ final Duration requestTimeout =
clientConfiguration.getRequestTimeout();
+ future = this.getClientManager().changeInvisibleDuration(endpoints,
request, requestTimeout);
final MessageId messageId = messageView.getMessageId();
Futures.addCallback(future, new
FutureCallback<ChangeInvisibleDurationResponse>() {
@Override
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 960b48c..47119d0 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -32,7 +32,6 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-import io.grpc.Metadata;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
@@ -266,11 +265,10 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
ListenableFuture<Assignments> queryAssignment(final String topic) {
final ListenableFuture<Endpoints> future0 =
pickEndpointsToQueryAssignments(topic);
return Futures.transformAsync(future0, endpoints -> {
- final Metadata metadata = sign();
final QueryAssignmentRequest request =
wrapQueryAssignmentRequest(topic);
final Duration requestTimeout =
clientConfiguration.getRequestTimeout();
final RpcFuture<QueryAssignmentRequest, QueryAssignmentResponse>
future1 =
- this.getClientManager().queryAssignment(endpoints, metadata,
request, requestTimeout);
+ this.getClientManager().queryAssignment(endpoints, request,
requestTimeout);
return Futures.transformAsync(future1, response -> {
final Status status = response.getStatus();
StatusChecker.check(status, future1);
@@ -514,15 +512,10 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
final Endpoints endpoints = messageView.getEndpoints();
RpcFuture<ForwardMessageToDeadLetterQueueRequest,
ForwardMessageToDeadLetterQueueResponse> future;
- try {
- final ForwardMessageToDeadLetterQueueRequest request =
- wrapForwardMessageToDeadLetterQueueRequest(messageView);
- final Metadata metadata = sign();
- future =
this.getClientManager().forwardMessageToDeadLetterQueue(endpoints, metadata,
request,
- clientConfiguration.getRequestTimeout());
- } catch (Throwable t) {
- future = new RpcFuture<>(t);
- }
+ final ForwardMessageToDeadLetterQueueRequest request =
+ wrapForwardMessageToDeadLetterQueueRequest(messageView);
+ future =
this.getClientManager().forwardMessageToDeadLetterQueue(endpoints, request,
+ clientConfiguration.getRequestTimeout());
Futures.addCallback(future, new
FutureCallback<ForwardMessageToDeadLetterQueueResponse>() {
@Override
public void onSuccess(ForwardMessageToDeadLetterQueueResponse
response) {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 58d2c23..fd87fef 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -35,7 +35,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
-import io.grpc.Metadata;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
@@ -257,12 +256,6 @@ class ProducerImpl extends ClientImpl implements Producer {
public void endTransaction(Endpoints endpoints, GeneralMessage
generalMessage, MessageId messageId,
String transactionId, final TransactionResolution resolution) throws
ClientException {
- Metadata metadata;
- try {
- metadata = sign();
- } catch (Throwable t) {
- throw new ClientException(t);
- }
final EndTransactionRequest.Builder builder =
EndTransactionRequest.newBuilder()
.setMessageId(messageId.toString()).setTransactionId(transactionId)
.setTopic(apache.rocketmq.v2.Resource.newBuilder().setName(generalMessage.getTopic()).build());
@@ -283,7 +276,7 @@ class ProducerImpl extends ClientImpl implements Producer {
doBefore(context, generalMessages);
final RpcFuture<EndTransactionRequest, EndTransactionResponse> future =
- this.getClientManager().endTransaction(endpoints, metadata,
request, requestTimeout);
+ this.getClientManager().endTransaction(endpoints, request,
requestTimeout);
Futures.addCallback(future, new
FutureCallback<EndTransactionResponse>() {
@Override
public void onSuccess(EndTransactionResponse response) {
@@ -425,11 +418,11 @@ class ProducerImpl extends ClientImpl implements Producer
{
return
SendMessageRequest.newBuilder().addAllMessages(messages).build();
}
- ListenableFuture<List<SendReceiptImpl>> send0(Metadata metadata, Endpoints
endpoints,
- List<PublishingMessageImpl> pubMessages, MessageQueueImpl mq) {
+ ListenableFuture<List<SendReceiptImpl>> send0(Endpoints endpoints,
List<PublishingMessageImpl> pubMessages,
+ MessageQueueImpl mq) {
final SendMessageRequest request = wrapSendMessageRequest(pubMessages,
mq);
final RpcFuture<SendMessageRequest, SendMessageResponse> future0 =
- this.getClientManager().sendMessage(endpoints, metadata, request,
clientConfiguration.getRequestTimeout());
+ this.getClientManager().sendMessage(endpoints, request,
clientConfiguration.getRequestTimeout());
return Futures.transformAsync(future0,
response ->
Futures.immediateFuture(SendReceiptImpl.processResponseInvocation(mq, response,
future0)),
MoreExecutors.directExecutor());
@@ -437,14 +430,6 @@ class ProducerImpl extends ClientImpl implements Producer {
private void send0(SettableFuture<List<SendReceiptImpl>> future0, String
topic, MessageType messageType,
final List<MessageQueueImpl> candidates, final
List<PublishingMessageImpl> messages, final int attempt) {
- Metadata metadata;
- try {
- metadata = sign();
- } catch (Throwable t) {
- // Failed to sign, no need to proceed.
- future0.setException(t);
- return;
- }
// Calculate the current message queue.
final MessageQueueImpl mq = candidates.get(IntMath.mod(attempt - 1,
candidates.size()));
final List<MessageType> acceptMessageTypes =
mq.getAcceptMessageTypes();
@@ -456,7 +441,7 @@ class ProducerImpl extends ClientImpl implements Producer {
return;
}
final Endpoints endpoints = mq.getBroker().getEndpoints();
- final ListenableFuture<List<SendReceiptImpl>> future = send0(metadata,
endpoints, messages, mq);
+ final ListenableFuture<List<SendReceiptImpl>> future =
send0(endpoints, messages, mq);
final int maxAttempts = this.getRetryPolicy().getMaxAttempts();
// Intercept before message publishing.
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
index a3e8666..cb348ab 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
@@ -27,7 +27,6 @@ import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.SendMessageRequest;
-import io.grpc.Metadata;
import java.time.Duration;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.apache.rocketmq.client.java.tool.TestBase;
@@ -56,91 +55,81 @@ public class ClientManagerImplTest extends TestBase {
@Test
public void testQueryRoute() {
- Metadata metadata = new Metadata();
QueryRouteRequest request = QueryRouteRequest.newBuilder().build();
- CLIENT_MANAGER.queryRoute(fakeEndpoints(), metadata, request,
Duration.ofSeconds(1));
- CLIENT_MANAGER.queryRoute(null, metadata, request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.queryRoute(fakeEndpoints(), request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.queryRoute(null, request, Duration.ofSeconds(1));
// Expect no exception thrown.
}
@Test
public void testHeartbeat() {
- Metadata metadata = new Metadata();
HeartbeatRequest request = HeartbeatRequest.newBuilder().build();
- CLIENT_MANAGER.heartbeat(fakeEndpoints(), metadata, request,
Duration.ofSeconds(1));
- CLIENT_MANAGER.heartbeat(null, metadata, request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.heartbeat(fakeEndpoints(), request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.heartbeat(null, request, Duration.ofSeconds(1));
// Expect no exception thrown.
}
@Test
public void testSendMessage() {
- Metadata metadata = new Metadata();
SendMessageRequest request = SendMessageRequest.newBuilder().build();
- CLIENT_MANAGER.sendMessage(fakeEndpoints(), metadata, request,
Duration.ofSeconds(1));
- CLIENT_MANAGER.sendMessage(null, metadata, request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.sendMessage(fakeEndpoints(), request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.sendMessage(null, request, Duration.ofSeconds(1));
// Expect no exception thrown.
}
@Test
public void testQueryAssignment() {
- Metadata metadata = new Metadata();
QueryAssignmentRequest request =
QueryAssignmentRequest.newBuilder().build();
- CLIENT_MANAGER.queryAssignment(fakeEndpoints(), metadata, request,
Duration.ofSeconds(1));
- CLIENT_MANAGER.queryAssignment(null, metadata, request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.queryAssignment(fakeEndpoints(), request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.queryAssignment(null, request, Duration.ofSeconds(1));
// Expect no exception thrown.
}
@Test
public void testReceiveMessage() {
- Metadata metadata = new Metadata();
ReceiveMessageRequest request =
ReceiveMessageRequest.newBuilder().build();
- CLIENT_MANAGER.receiveMessage(fakeEndpoints(), metadata, request,
Duration.ofSeconds(1));
- CLIENT_MANAGER.receiveMessage(null, metadata, request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.receiveMessage(fakeEndpoints(), request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.receiveMessage(null, request, Duration.ofSeconds(1));
// Expect no exception thrown.
}
@Test
public void testAckMessage() {
- Metadata metadata = new Metadata();
AckMessageRequest request = AckMessageRequest.newBuilder().build();
- CLIENT_MANAGER.ackMessage(fakeEndpoints(), metadata, request,
Duration.ofSeconds(1));
- CLIENT_MANAGER.ackMessage(null, metadata, request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.ackMessage(fakeEndpoints(), request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.ackMessage(null, request, Duration.ofSeconds(1));
// Expect no exception thrown.
}
@Test
public void testChangeInvisibleDuration() {
- Metadata metadata = new Metadata();
ChangeInvisibleDurationRequest request =
ChangeInvisibleDurationRequest.newBuilder().build();
- CLIENT_MANAGER.changeInvisibleDuration(fakeEndpoints(), metadata,
request, Duration.ofSeconds(1));
- CLIENT_MANAGER.changeInvisibleDuration(null, metadata, request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.changeInvisibleDuration(fakeEndpoints(), request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.changeInvisibleDuration(null, request,
Duration.ofSeconds(1));
// Expect no exception thrown.
}
@Test
public void testForwardMessageToDeadLetterQueue() {
- Metadata metadata = new Metadata();
ForwardMessageToDeadLetterQueueRequest request =
ForwardMessageToDeadLetterQueueRequest.newBuilder().build();
- CLIENT_MANAGER.forwardMessageToDeadLetterQueue(fakeEndpoints(),
metadata, request, Duration.ofSeconds(1));
- CLIENT_MANAGER.forwardMessageToDeadLetterQueue(null, metadata,
request, Duration.ofSeconds(1));
+ CLIENT_MANAGER.forwardMessageToDeadLetterQueue(fakeEndpoints(),
request, Duration.ofSeconds(1));
+ CLIENT_MANAGER.forwardMessageToDeadLetterQueue(null, request,
Duration.ofSeconds(1));
// Expect no exception thrown.
}
@Test
public void testEndTransaction() {
- Metadata metadata = new Metadata();
EndTransactionRequest request =
EndTransactionRequest.newBuilder().build();
- CLIENT_MANAGER.endTransaction(fakeEndpoints(), metadata, request,
Duration.ofSeconds(1));
- CLIENT_MANAGER.endTransaction(null, metadata, request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.endTransaction(fakeEndpoints(), request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.endTransaction(null, request, Duration.ofSeconds(1));
// Expect no exception thrown.
}
@Test
public void testNotifyClientTermination() {
- Metadata metadata = new Metadata();
NotifyClientTerminationRequest request =
NotifyClientTerminationRequest.newBuilder().build();
- CLIENT_MANAGER.notifyClientTermination(fakeEndpoints(), metadata,
request, Duration.ofSeconds(1));
- CLIENT_MANAGER.notifyClientTermination(null, metadata, request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.notifyClientTermination(fakeEndpoints(), request,
Duration.ofSeconds(1));
+ CLIENT_MANAGER.notifyClientTermination(null, request,
Duration.ofSeconds(1));
// Expect no exception thrown.
}
}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
index a7ae86a..619cd7d 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
@@ -26,7 +26,6 @@ import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import com.google.common.util.concurrent.ListenableFuture;
-import io.grpc.Metadata;
import java.time.Duration;
import java.util.List;
import java.util.Map;
@@ -68,7 +67,7 @@ public class ConsumerImplTest extends TestBase {
final RpcFuture<ReceiveMessageRequest, List<ReceiveMessageResponse>>
future =
okReceiveMessageResponsesFuture(FAKE_TOPIC_0,
receivedMessageCount);
future.get();
-
Mockito.doReturn(future).when(clientManager).receiveMessage(any(Endpoints.class),
any(Metadata.class),
+
Mockito.doReturn(future).when(clientManager).receiveMessage(any(Endpoints.class),
any(ReceiveMessageRequest.class), any(Duration.class));
final MessageQueueImpl mq = fakeMessageQueueImpl(FAKE_TOPIC_0);
final ReceiveMessageRequest request =
pushConsumer.wrapReceiveMessageRequest(1,
@@ -91,7 +90,7 @@ public class ConsumerImplTest extends TestBase {
Mockito.doReturn(clientManager).when(pushConsumer).getClientManager();
final RpcFuture<AckMessageRequest, AckMessageResponse> future =
okAckMessageResponseFuture();
-
Mockito.doReturn(future).when(clientManager).ackMessage(any(Endpoints.class),
any(Metadata.class),
+
Mockito.doReturn(future).when(clientManager).ackMessage(any(Endpoints.class),
any(AckMessageRequest.class), any(Duration.class));
final MessageViewImpl messageView = fakeMessageViewImpl();
final RpcFuture<AckMessageRequest, AckMessageResponse> future0 =
@@ -112,7 +111,7 @@ public class ConsumerImplTest extends TestBase {
Mockito.doReturn(clientManager).when(pushConsumer).getClientManager();
final RpcFuture<ChangeInvisibleDurationRequest,
ChangeInvisibleDurationResponse> future =
okChangeInvisibleDurationCtxFuture();
-
Mockito.doReturn(future).when(clientManager).changeInvisibleDuration(any(Endpoints.class),
any(Metadata.class),
+
Mockito.doReturn(future).when(clientManager).changeInvisibleDuration(any(Endpoints.class),
any(ChangeInvisibleDurationRequest.class), any(Duration.class));
final MessageViewImpl messageView = fakeMessageViewImpl();
final RpcFuture<ChangeInvisibleDurationRequest,
ChangeInvisibleDurationResponse> future0 =
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
index b4dae64..60bb0fe 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
@@ -30,7 +30,6 @@ import apache.rocketmq.v2.Permission;
import apache.rocketmq.v2.Resource;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Service;
-import io.grpc.Metadata;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -91,9 +90,9 @@ public class ProducerImplTest extends TestBase {
final MessageQueueImpl messageQueue =
fakeMessageQueueImpl(FAKE_TOPIC_0);
final SendReceiptImpl sendReceiptImpl =
fakeSendReceiptImpl(messageQueue);
Mockito.doReturn(Futures.immediateFuture(Collections.singletonList(sendReceiptImpl)))
- .when(producer).send0(any(Metadata.class), any(Endpoints.class),
anyList(), any(MessageQueueImpl.class));
+ .when(producer).send0(any(Endpoints.class), anyList(),
any(MessageQueueImpl.class));
producer.send(message);
- verify(producer, times(1)).send0(any(Metadata.class),
any(Endpoints.class), anyList(),
+ verify(producer, times(1)).send0(any(Endpoints.class), anyList(),
any(MessageQueueImpl.class));
producer.close();
}
@@ -104,11 +103,10 @@ public class ProducerImplTest extends TestBase {
final Message message = fakeMessage(FAKE_TOPIC_0);
final Exception exception = new IllegalArgumentException();
Mockito.doReturn(Futures.immediateFailedFuture(exception))
- .when(producer).send0(any(Metadata.class), any(Endpoints.class),
anyList(), any(MessageQueueImpl.class));
+ .when(producer).send0(any(Endpoints.class), anyList(),
any(MessageQueueImpl.class));
producer.send(message);
final int maxAttempts =
producer.publishingSettings.getRetryPolicy().getMaxAttempts();
- verify(producer, times(maxAttempts)).send0(any(Metadata.class),
any(Endpoints.class), anyList(),
- any(MessageQueueImpl.class));
+ verify(producer, times(maxAttempts)).send0(any(Endpoints.class),
anyList(), any(MessageQueueImpl.class));
producer.close();
}
}
\ No newline at end of file