This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch java in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit ceceb124cadee9080356bf06ac02c52b4ba69596 Author: Aaron Ai <[email protected]> AuthorDate: Wed Jul 6 20:08:02 2022 +0800 Java: Add invocation context --- .../rocketmq/client/java/impl/ClientImpl.java | 19 ++-- .../rocketmq/client/java/impl/ClientManager.java | 48 +++++----- .../client/java/impl/ClientManagerImpl.java | 71 ++++++--------- .../client/java/impl/consumer/ConsumerImpl.java | 41 +++++---- .../java/impl/consumer/ProcessQueueImpl.java | 33 ++++--- .../java/impl/consumer/PushConsumerImpl.java | 27 +++--- .../java/impl/consumer/ReceiveMessageResult.java | 8 +- .../java/impl/consumer/SimpleConsumerImpl.java | 19 ++-- .../client/java/impl/producer/ProducerImpl.java | 21 +++-- .../client/java/rpc/InvocationContext.java | 46 ++++++++++ .../apache/rocketmq/client/java/rpc/RpcClient.java | 51 ++++++----- .../rocketmq/client/java/rpc/RpcClientImpl.java | 100 ++++++++++++++------- .../rocketmq/client/java/rpc/RpcContext.java | 39 ++++++++ .../apache/rocketmq/client/java/rpc/Signature.java | 2 +- .../apache/rocketmq/client/java/rpc/TLSHelper.java | 7 +- .../java/impl/consumer/ProcessQueueImplTest.java | 15 ++-- .../java/impl/consumer/PushConsumerImplTest.java | 6 +- .../java/impl/consumer/SimpleConsumerImplTest.java | 13 +-- .../java/impl/producer/ProducerImplTest.java | 21 +++-- .../apache/rocketmq/client/java/tool/TestBase.java | 97 +++++++++----------- 20 files changed, 409 insertions(+), 275 deletions(-) diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java index a4b144d..590bbee 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java @@ -81,6 +81,7 @@ import org.apache.rocketmq.client.java.misc.Utilities; import org.apache.rocketmq.client.java.route.Endpoints; import org.apache.rocketmq.client.java.route.TopicRouteData; import org.apache.rocketmq.client.java.route.TopicRouteDataResult; +import org.apache.rocketmq.client.java.rpc.InvocationContext; import org.apache.rocketmq.client.java.rpc.Signature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -555,7 +556,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, /** * Real-time signature generation */ - protected Metadata sign() throws UnsupportedEncodingException, NoSuchAlgorithmException, InvalidKeyException { + protected Metadata sign() throws NoSuchAlgorithmException, InvalidKeyException { return Signature.sign(clientConfiguration, clientId); } @@ -568,11 +569,12 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, private void doHeartbeat(HeartbeatRequest request, final Endpoints endpoints) { try { Metadata metadata = sign(); - final ListenableFuture<HeartbeatResponse> future = clientManager + final ListenableFuture<InvocationContext<HeartbeatResponse>> future = clientManager .heartbeat(endpoints, metadata, request, clientConfiguration.getRequestTimeout()); - Futures.addCallback(future, new FutureCallback<HeartbeatResponse>() { + Futures.addCallback(future, new FutureCallback<InvocationContext<HeartbeatResponse>>() { @Override - public void onSuccess(HeartbeatResponse response) { + public void onSuccess(InvocationContext<HeartbeatResponse> context) { + final HeartbeatResponse response = context.getResp(); final Status status = response.getStatus(); final Code code = status.getCode(); if (Code.OK != code) { @@ -612,15 +614,15 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, } private ListenableFuture<TopicRouteDataResult> fetchTopicRoute(final String topic) { - final SettableFuture<TopicRouteDataResult> future = SettableFuture.create(); try { Resource topicResource = Resource.newBuilder().setName(topic).build(); final QueryRouteRequest request = QueryRouteRequest.newBuilder().setTopic(topicResource) .setEndpoints(accessEndpoints.toProtobuf()).build(); final Metadata metadata = sign(); - final ListenableFuture<QueryRouteResponse> responseFuture = + final ListenableFuture<InvocationContext<QueryRouteResponse>> contextFuture = clientManager.queryRoute(accessEndpoints, metadata, request, clientConfiguration.getRequestTimeout()); - return Futures.transform(responseFuture, response -> { + return Futures.transform(contextFuture, context -> { + final QueryRouteResponse response = context.getResp(); final Status status = response.getStatus(); final Code code = status.getCode(); if (Code.OK != code) { @@ -631,8 +633,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, return new TopicRouteDataResult(new TopicRouteData(response.getMessageQueuesList()), status); }, MoreExecutors.directExecutor()); } catch (Throwable t) { - future.setException(t); - return future; + return Futures.immediateFailedFuture(t); } } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java index 9f40a6d..9901ac9 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java @@ -46,6 +46,7 @@ import java.util.Iterator; import java.util.concurrent.ScheduledExecutorService; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.java.route.Endpoints; +import org.apache.rocketmq.client.java.rpc.InvocationContext; /** * Client manager supplies a series of unified APIs to execute remote procedure calls for each {@link Client}. @@ -90,9 +91,10 @@ public interface ClientManager { * @param metadata gRPC request header metadata. * @param request query route request. * @param duration request max duration. - * @return response future of the topic route. + * @return invocation of response future. */ - ListenableFuture<QueryRouteResponse> queryRoute(Endpoints endpoints, Metadata metadata, QueryRouteRequest request, + ListenableFuture<InvocationContext<QueryRouteResponse>> queryRoute(Endpoints endpoints, Metadata metadata, + QueryRouteRequest request, Duration duration); /** @@ -102,9 +104,10 @@ public interface ClientManager { * @param metadata gRPC request header metadata. * @param request heartbeat request. * @param duration request max duration. - * @return response future of heartbeat. + * @return invocation of response future. */ - ListenableFuture<HeartbeatResponse> heartbeat(Endpoints endpoints, Metadata metadata, HeartbeatRequest request, + ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Endpoints endpoints, Metadata metadata, + HeartbeatRequest request, Duration duration); /** @@ -114,9 +117,9 @@ public interface ClientManager { * @param metadata gRPC request header metadata. * @param request send message request. * @param duration request max duration. - * @return response future of the sending message. + * @return invocation of response future. */ - ListenableFuture<SendMessageResponse> sendMessage(Endpoints endpoints, Metadata metadata, + ListenableFuture<InvocationContext<SendMessageResponse>> sendMessage(Endpoints endpoints, Metadata metadata, SendMessageRequest request, Duration duration); /** @@ -126,9 +129,9 @@ public interface ClientManager { * @param metadata gRPC request header metadata. * @param request query assignment request. * @param duration request max duration. - * @return response future of query assignment. + * @return invocation of response future. */ - ListenableFuture<QueryAssignmentResponse> queryAssignment(Endpoints endpoints, Metadata metadata, + ListenableFuture<InvocationContext<QueryAssignmentResponse>> queryAssignment(Endpoints endpoints, Metadata metadata, QueryAssignmentRequest request, Duration duration); /** @@ -136,9 +139,10 @@ public interface ClientManager { * * @param endpoints requested endpoints. * @param metadata gRPC request header metadata. + * @return invocation of response future. */ - ListenableFuture<Iterator<ReceiveMessageResponse>> receiveMessage(Endpoints endpoints, Metadata metadata, - ReceiveMessageRequest request, Duration duration); + ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> receiveMessage(Endpoints endpoints, + Metadata metadata, ReceiveMessageRequest request, Duration duration); /** * Ack message asynchronously after the success of consumption, the method ensures no throwable. @@ -147,10 +151,10 @@ public interface ClientManager { * @param metadata gRPC request header metadata. * @param request ack message request. * @param duration request max duration. - * @return response future of ack message. + * @return invocation of response future. */ - ListenableFuture<AckMessageResponse> ackMessage(Endpoints endpoints, Metadata metadata, AckMessageRequest request, - Duration duration); + ListenableFuture<InvocationContext<AckMessageResponse>> ackMessage(Endpoints endpoints, Metadata metadata, + AckMessageRequest request, Duration duration); /** * Nack message asynchronously after the failure of consumption, the method ensures no throwable. @@ -159,10 +163,10 @@ public interface ClientManager { * @param metadata gRPC request header metadata. * @param request nack message request. * @param duration request max duration. - * @return response future of nack message. + * @return invocation of response future. */ - ListenableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuration(Endpoints endpoints, Metadata metadata, - ChangeInvisibleDurationRequest request, Duration duration); + ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> changeInvisibleDuration(Endpoints endpoints, + Metadata metadata, ChangeInvisibleDurationRequest request, Duration duration); /** * Send a message to the dead letter queue asynchronously, the method ensures no throwable. @@ -171,9 +175,9 @@ public interface ClientManager { * @param metadata gRPC request header metadata. * @param request request of sending a message to DLQ. * @param duration request max duration. - * @return response future of sending a message to DLQ. + * @return invocation of response future. */ - ListenableFuture<ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue( + ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue( Endpoints endpoints, Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, Duration duration); /** @@ -183,9 +187,9 @@ public interface ClientManager { * @param metadata gRPC request header metadata. * @param request end transaction request. * @param duration request max duration. - * @return response future of submitting transaction resolution. + * @return invocation of response future. */ - ListenableFuture<EndTransactionResponse> endTransaction(Endpoints endpoints, Metadata metadata, + ListenableFuture<InvocationContext<EndTransactionResponse>> endTransaction(Endpoints endpoints, Metadata metadata, EndTransactionRequest request, Duration duration); /** @@ -198,8 +202,8 @@ public interface ClientManager { * @return response future of notification of client termination. */ @SuppressWarnings("UnusedReturnValue") - ListenableFuture<NotifyClientTerminationResponse> notifyClientTermination(Endpoints endpoints, Metadata metadata, - NotifyClientTerminationRequest request, Duration duration); + ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> notifyClientTermination(Endpoints endpoints, + Metadata metadata, NotifyClientTerminationRequest request, Duration duration); StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, Metadata metadata, Duration duration, StreamObserver<TelemetryCommand> responseObserver) throws ClientException; diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java index 4d33ab0..337edcc 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java @@ -39,8 +39,8 @@ import apache.rocketmq.v2.SendMessageRequest; import apache.rocketmq.v2.SendMessageResponse; import apache.rocketmq.v2.TelemetryCommand; import com.google.common.util.concurrent.AbstractIdleService; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import com.google.errorprone.annotations.concurrent.GuardedBy; import io.grpc.Metadata; import io.grpc.stub.StreamObserver; @@ -65,6 +65,7 @@ import org.apache.rocketmq.client.java.misc.ExecutorServices; import org.apache.rocketmq.client.java.misc.MetadataUtils; import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl; import org.apache.rocketmq.client.java.route.Endpoints; +import org.apache.rocketmq.client.java.rpc.InvocationContext; import org.apache.rocketmq.client.java.rpc.RpcClient; import org.apache.rocketmq.client.java.rpc.RpcClientImpl; import org.slf4j.Logger; @@ -235,132 +236,112 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana } @Override - public ListenableFuture<QueryRouteResponse> queryRoute(Endpoints endpoints, Metadata metadata, + public ListenableFuture<InvocationContext<QueryRouteResponse>> queryRoute(Endpoints endpoints, Metadata metadata, QueryRouteRequest request, Duration duration) { try { final RpcClient rpcClient = getRpcClient(endpoints); return rpcClient.queryRoute(metadata, request, asyncWorker, duration); } catch (Throwable t) { - final SettableFuture<QueryRouteResponse> future = SettableFuture.create(); - future.setException(t); - return future; + return Futures.immediateFailedFuture(t); } } @Override - public ListenableFuture<HeartbeatResponse> heartbeat(Endpoints endpoints, Metadata metadata, + public ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Endpoints endpoints, Metadata metadata, HeartbeatRequest request, Duration duration) { try { final RpcClient rpcClient = getRpcClient(endpoints); return rpcClient.heartbeat(metadata, request, asyncWorker, duration); } catch (Throwable t) { - final SettableFuture<HeartbeatResponse> future = SettableFuture.create(); - future.setException(t); - return future; + return Futures.immediateFailedFuture(t); } } @Override - public ListenableFuture<SendMessageResponse> sendMessage(Endpoints endpoints, Metadata metadata, + public ListenableFuture<InvocationContext<SendMessageResponse>> sendMessage(Endpoints endpoints, Metadata metadata, SendMessageRequest request, Duration duration) { try { final RpcClient rpcClient = getRpcClient(endpoints); return rpcClient.sendMessage(metadata, request, asyncWorker, duration); } catch (Throwable t) { - final SettableFuture<SendMessageResponse> future = SettableFuture.create(); - future.setException(t); - return future; + return Futures.immediateFailedFuture(t); } } @Override - public ListenableFuture<QueryAssignmentResponse> queryAssignment(Endpoints endpoints, Metadata metadata, - QueryAssignmentRequest request, Duration duration) { + public ListenableFuture<InvocationContext<QueryAssignmentResponse>> queryAssignment(Endpoints endpoints, + Metadata metadata, QueryAssignmentRequest request, Duration duration) { try { final RpcClient rpcClient = getRpcClient(endpoints); return rpcClient.queryAssignment(metadata, request, asyncWorker, duration); } catch (Throwable t) { - final SettableFuture<QueryAssignmentResponse> future = SettableFuture.create(); - future.setException(t); - return future; + return Futures.immediateFailedFuture(t); } } @Override - public ListenableFuture<Iterator<ReceiveMessageResponse>> receiveMessage(Endpoints endpoints, Metadata metadata, - ReceiveMessageRequest request, Duration duration) { + public ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> receiveMessage(Endpoints endpoints, + Metadata metadata, ReceiveMessageRequest request, Duration duration) { try { final RpcClient rpcClient = getRpcClient(endpoints); return rpcClient.receiveMessage(metadata, request, asyncWorker, duration); } catch (Throwable t) { - SettableFuture<Iterator<ReceiveMessageResponse>> future = SettableFuture.create(); - future.setException(t); - return future; + return Futures.immediateFailedFuture(t); } } @Override - public ListenableFuture<AckMessageResponse> ackMessage(Endpoints endpoints, Metadata metadata, + public ListenableFuture<InvocationContext<AckMessageResponse>> ackMessage(Endpoints endpoints, Metadata metadata, AckMessageRequest request, Duration duration) { try { final RpcClient rpcClient = getRpcClient(endpoints); return rpcClient.ackMessage(metadata, request, asyncWorker, duration); } catch (Throwable t) { - final SettableFuture<AckMessageResponse> future = SettableFuture.create(); - future.setException(t); - return future; + return Futures.immediateFailedFuture(t); } } @Override - public ListenableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuration(Endpoints endpoints, - Metadata metadata, ChangeInvisibleDurationRequest request, Duration duration) { + public ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> changeInvisibleDuration( + Endpoints endpoints, Metadata metadata, ChangeInvisibleDurationRequest request, Duration duration) { try { final RpcClient rpcClient = getRpcClient(endpoints); return rpcClient.changeInvisibleDuration(metadata, request, asyncWorker, duration); } catch (Throwable t) { - final SettableFuture<ChangeInvisibleDurationResponse> future = SettableFuture.create(); - future.setException(t); - return future; + return Futures.immediateFailedFuture(t); } } @Override - public ListenableFuture<ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue( + public ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue( Endpoints endpoints, Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, Duration duration) { try { final RpcClient rpcClient = getRpcClient(endpoints); return rpcClient.forwardMessageToDeadLetterQueue(metadata, request, asyncWorker, duration); } catch (Throwable t) { - final SettableFuture<ForwardMessageToDeadLetterQueueResponse> future = SettableFuture.create(); - future.setException(t); - return future; + return Futures.immediateFailedFuture(t); } } @Override - public ListenableFuture<EndTransactionResponse> endTransaction(Endpoints endpoints, Metadata metadata, - EndTransactionRequest request, Duration duration) { + public ListenableFuture<InvocationContext<EndTransactionResponse>> endTransaction(Endpoints endpoints, + Metadata metadata, EndTransactionRequest request, Duration duration) { try { final RpcClient rpcClient = getRpcClient(endpoints); return rpcClient.endTransaction(metadata, request, asyncWorker, duration); } catch (Throwable t) { - SettableFuture<EndTransactionResponse> future = SettableFuture.create(); - future.setException(t); - return future; + return Futures.immediateFailedFuture(t); } } @Override - public ListenableFuture<NotifyClientTerminationResponse> notifyClientTermination( + public ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> notifyClientTermination( Endpoints endpoints, Metadata metadata, NotifyClientTerminationRequest request, Duration duration) { try { final RpcClient rpcClient = getRpcClient(endpoints); return rpcClient.notifyClientTermination(metadata, request, asyncWorker, duration); } catch (Throwable t) { - final SettableFuture<NotifyClientTerminationResponse> future = SettableFuture.create(); - future.setException(t); - return future; + return Futures.immediateFailedFuture(t); } } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java index 539b876..a8b04ec 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java @@ -56,6 +56,7 @@ import org.apache.rocketmq.client.java.message.MessageCommon; import org.apache.rocketmq.client.java.message.MessageViewImpl; import org.apache.rocketmq.client.java.route.Endpoints; import org.apache.rocketmq.client.java.route.MessageQueueImpl; +import org.apache.rocketmq.client.java.rpc.InvocationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,16 +72,18 @@ abstract class ConsumerImpl extends ClientImpl { } @SuppressWarnings("SameParameterValue") - protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRequest request, MessageQueueImpl mq, - Duration timeout) { + protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRequest request, + MessageQueueImpl mq, Duration timeout) { List<MessageViewImpl> messages = new ArrayList<>(); final SettableFuture<ReceiveMessageResult> future0 = SettableFuture.create(); try { Metadata metadata = sign(); final Endpoints endpoints = mq.getBroker().getEndpoints(); - final ListenableFuture<Iterator<ReceiveMessageResponse>> future = clientManager.receiveMessage(endpoints, - metadata, request, timeout); - return Futures.transform(future, it -> { + final ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> future = + clientManager.receiveMessage(endpoints, + metadata, request, timeout); + return Futures.transform(future, context -> { + final Iterator<ReceiveMessageResponse> it = context.getResp(); // Null here means status not set yet. Status status = null; Timestamp deliveryTimestampFromRemote = null; @@ -106,7 +109,7 @@ abstract class ConsumerImpl extends ClientImpl { final MessageViewImpl view = MessageViewImpl.fromProtobuf(message, mq, deliveryTimestampFromRemote); messages.add(view); } - return new ReceiveMessageResult(endpoints, status, messages); + return new ReceiveMessageResult(endpoints, context.getRpcContext().getRequestId(), status, messages); }, MoreExecutors.directExecutor()); } catch (Throwable t) { future0.setException(t); @@ -134,9 +137,9 @@ abstract class ConsumerImpl extends ClientImpl { } - public ListenableFuture<AckMessageResponse> ackMessage(MessageViewImpl messageView) { + public ListenableFuture<InvocationContext<AckMessageResponse>> ackMessage(MessageViewImpl messageView) { final Endpoints endpoints = messageView.getEndpoints(); - ListenableFuture<AckMessageResponse> future; + ListenableFuture<InvocationContext<AckMessageResponse>> future; final Stopwatch stopwatch = Stopwatch.createStarted(); final List<MessageCommon> messageCommons = Collections.singletonList(messageView.getMessageCommon()); @@ -146,11 +149,14 @@ abstract class ConsumerImpl extends ClientImpl { final Metadata metadata = sign(); future = clientManager.ackMessage(endpoints, metadata, request, clientConfiguration.getRequestTimeout()); } catch (Throwable t) { - return Futures.immediateFailedFuture(t); + final SettableFuture<InvocationContext<AckMessageResponse>> future0 = SettableFuture.create(); + future0.setException(t); + future = future0; } - Futures.addCallback(future, new FutureCallback<AckMessageResponse>() { + Futures.addCallback(future, new FutureCallback<InvocationContext<AckMessageResponse>>() { @Override - public void onSuccess(AckMessageResponse response) { + public void onSuccess(InvocationContext<AckMessageResponse> context) { + final AckMessageResponse response = context.getResp(); final Status status = response.getStatus(); final Code code = status.getCode(); final Duration duration = stopwatch.elapsed(); @@ -168,10 +174,10 @@ abstract class ConsumerImpl extends ClientImpl { return future; } - public ListenableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuration(MessageViewImpl messageView, - Duration invisibleDuration) { + public ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> changeInvisibleDuration( + MessageViewImpl messageView, Duration invisibleDuration) { final Endpoints endpoints = messageView.getEndpoints(); - ListenableFuture<ChangeInvisibleDurationResponse> future; + ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future; final Stopwatch stopwatch = Stopwatch.createStarted(); final List<MessageCommon> messageCommons = Collections.singletonList(messageView.getMessageCommon()); @@ -182,14 +188,15 @@ abstract class ConsumerImpl extends ClientImpl { future = clientManager.changeInvisibleDuration(endpoints, metadata, request, clientConfiguration.getRequestTimeout()); } catch (Throwable t) { - final SettableFuture<ChangeInvisibleDurationResponse> future0 = SettableFuture.create(); + final SettableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future0 = SettableFuture.create(); future0.setException(t); future = future0; } final MessageId messageId = messageView.getMessageId(); - Futures.addCallback(future, new FutureCallback<ChangeInvisibleDurationResponse>() { + Futures.addCallback(future, new FutureCallback<InvocationContext<ChangeInvisibleDurationResponse>>() { @Override - public void onSuccess(ChangeInvisibleDurationResponse response) { + public void onSuccess(InvocationContext<ChangeInvisibleDurationResponse> context) { + final ChangeInvisibleDurationResponse response = context.getResp(); final Status status = response.getStatus(); final Code code = status.getCode(); final Duration duration = stopwatch.elapsed(); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java index f36012f..c914dce 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java @@ -51,6 +51,7 @@ import org.apache.rocketmq.client.java.message.MessageViewImpl; import org.apache.rocketmq.client.java.retry.RetryPolicy; import org.apache.rocketmq.client.java.route.Endpoints; import org.apache.rocketmq.client.java.route.MessageQueueImpl; +import org.apache.rocketmq.client.java.rpc.InvocationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -387,11 +388,12 @@ class ProcessQueueImpl implements ProcessQueue { final String consumerGroup = consumer.getConsumerGroup(); final MessageId messageId = messageView.getMessageId(); final Endpoints endpoints = messageView.getEndpoints(); - final ListenableFuture<AckMessageResponse> future = consumer.ackMessage(messageView); - Futures.addCallback(future, new FutureCallback<AckMessageResponse>() { + final ListenableFuture<InvocationContext<AckMessageResponse>> future = consumer.ackMessage(messageView); + Futures.addCallback(future, new FutureCallback<InvocationContext<AckMessageResponse>>() { @Override - public void onSuccess(AckMessageResponse response) { - final Status status = response.getStatus(); + public void onSuccess(InvocationContext<AckMessageResponse> context) { + final AckMessageResponse resp = context.getResp(); + final Status status = resp.getStatus(); final Code code = status.getCode(); if (Code.OK.equals(code)) { LOGGER.debug("Ack message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, " @@ -478,16 +480,17 @@ class ProcessQueueImpl implements ProcessQueue { private void forwardToDeadLetterQueue(final MessageViewImpl messageView, final int attempt, final SettableFuture<Void> future0) { - final ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future = + final ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> future = consumer.forwardMessageToDeadLetterQueue(messageView); final String clientId = consumer.getClientId(); final String consumerGroup = consumer.getConsumerGroup(); final MessageId messageId = messageView.getMessageId(); final Endpoints endpoints = messageView.getEndpoints(); - Futures.addCallback(future, new FutureCallback<ForwardMessageToDeadLetterQueueResponse>() { + Futures.addCallback(future, new FutureCallback<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>() { @Override - public void onSuccess(ForwardMessageToDeadLetterQueueResponse response) { - final Status status = response.getStatus(); + public void onSuccess(InvocationContext<ForwardMessageToDeadLetterQueueResponse> context) { + final ForwardMessageToDeadLetterQueueResponse resp = context.getResp(); + final Status status = resp.getStatus(); final Code code = status.getCode(); // Log failure and retry later. if (!Code.OK.equals(code)) { @@ -559,17 +562,19 @@ class ProcessQueueImpl implements ProcessQueue { final String consumerGroup = consumer.getConsumerGroup(); final MessageId messageId = messageView.getMessageId(); final Endpoints endpoints = messageView.getEndpoints(); - final ListenableFuture<AckMessageResponse> future = consumer.ackMessage(messageView); - Futures.addCallback(future, new FutureCallback<AckMessageResponse>() { + final ListenableFuture<InvocationContext<AckMessageResponse>> future = consumer.ackMessage(messageView); + Futures.addCallback(future, new FutureCallback<InvocationContext<AckMessageResponse>>() { @Override - public void onSuccess(AckMessageResponse response) { - final Status status = response.getStatus(); + public void onSuccess(InvocationContext<AckMessageResponse> context) { + final AckMessageResponse resp = context.getResp(); + final String requestId = context.getRpcContext().getRequestId(); + final Status status = resp.getStatus(); final Code code = status.getCode(); // Log failure and retry later. if (!Code.OK.equals(code)) { LOGGER.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, " - + "consumerGroup={}, attempt={}, messageId={}, mq={}, code={}, endpoints={}, status " - + "message=[{}]", clientId, consumerGroup, attempt, messageId, mq, code, + + "consumerGroup={}, attempt={}, messageId={}, mq={}, code={}, requestId={}, endpoints={}, " + + "status message=[{}]", clientId, consumerGroup, attempt, messageId, mq, code, requestId, endpoints, status.getMessage()); ackFifoMessageLater(messageView, 1 + attempt, future0); return; diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java index 4a80377..41648bd 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java @@ -71,6 +71,7 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy; import org.apache.rocketmq.client.java.route.Endpoints; import org.apache.rocketmq.client.java.route.MessageQueueImpl; import org.apache.rocketmq.client.java.route.TopicRouteDataResult; +import org.apache.rocketmq.client.java.rpc.InvocationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -264,15 +265,16 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach private ListenableFuture<Assignments> queryAssignment(final String topic) { final ListenableFuture<Endpoints> future = pickEndpointsToQueryAssignments(topic); - final ListenableFuture<QueryAssignmentResponse> responseFuture = + final ListenableFuture<InvocationContext<QueryAssignmentResponse>> responseFuture = Futures.transformAsync(future, endpoints -> { final Metadata metadata = sign(); final QueryAssignmentRequest request = wrapQueryAssignmentRequest(topic); return clientManager.queryAssignment(endpoints, metadata, request, clientConfiguration.getRequestTimeout()); }, MoreExecutors.directExecutor()); - return Futures.transformAsync(responseFuture, response -> { - final Status status = response.getStatus(); + return Futures.transformAsync(responseFuture, context -> { + final QueryAssignmentResponse resp = context.getResp(); + final Status status = resp.getStatus(); final Code code = status.getCode(); if (!Code.OK.equals(code)) { final String message = String.format("Failed to query assignment, code=%d, status message=[{%s}]", @@ -280,7 +282,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach throw new RuntimeException(message); } SettableFuture<Assignments> future0 = SettableFuture.create(); - final List<Assignment> assignmentList = response.getAssignmentsList().stream().map(assignment -> + final List<Assignment> assignmentList = resp.getAssignmentsList().stream().map(assignment -> new Assignment(new MessageQueueImpl(assignment.getMessageQueue()))).collect(Collectors.toList()); final Assignments assignments = new Assignments(assignmentList); future0.set(assignments); @@ -398,7 +400,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach cacheAssignments.put(topic, latest); return; } - LOGGER.info("Assignments of topic={} remains the same, assignments={}, clientId={}", topic, + LOGGER.debug("Assignments of topic={} remains the same, assignments={}, clientId={}", topic, existed, clientId); // Process queue may be dropped, need to be synchronized anyway. syncProcessQueue(topic, latest, filterExpression); @@ -509,7 +511,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach .setMaxDeliveryAttempts(getRetryPolicy().getMaxAttempts()).build(); } - public ListenableFuture<ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue( + public ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue( final MessageViewImpl messageView) { // Intercept before forwarding message to DLQ. final Stopwatch stopwatch = Stopwatch.createStarted(); @@ -517,7 +519,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach doBefore(MessageHookPoints.FORWARD_TO_DLQ, messageCommons); final Endpoints endpoints = messageView.getEndpoints(); - ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future; + ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> future; try { final ForwardMessageToDeadLetterQueueRequest request = wrapForwardMessageToDeadLetterQueueRequest(messageView); @@ -525,15 +527,14 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach future = clientManager.forwardMessageToDeadLetterQueue(endpoints, metadata, request, clientConfiguration.getRequestTimeout()); } catch (Throwable t) { - final SettableFuture<ForwardMessageToDeadLetterQueueResponse> future0 = SettableFuture.create(); - future0.setException(t); - future = future0; + future = Futures.immediateFailedFuture(t); } - Futures.addCallback(future, new FutureCallback<ForwardMessageToDeadLetterQueueResponse>() { + Futures.addCallback(future, new FutureCallback<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>() { @Override - public void onSuccess(ForwardMessageToDeadLetterQueueResponse response) { + public void onSuccess(InvocationContext<ForwardMessageToDeadLetterQueueResponse> context) { + final ForwardMessageToDeadLetterQueueResponse resp = context.getResp(); final Duration duration = stopwatch.elapsed(); - MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(response.getStatus().getCode()) ? + MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(resp.getStatus().getCode()) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR; // Intercept after forwarding message to DLQ. doAfter(MessageHookPoints.FORWARD_TO_DLQ, messageCommons, duration, messageHookPointsStatus); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java index 09b8d12..79d15a8 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java @@ -36,12 +36,14 @@ import org.apache.rocketmq.client.java.route.Endpoints; public class ReceiveMessageResult { private final Endpoints endpoints; + private final String requestId; private final ClientException exception; private final List<MessageViewImpl> messages; - public ReceiveMessageResult(Endpoints endpoints, Status status, List<MessageViewImpl> messages) { + public ReceiveMessageResult(Endpoints endpoints, String requestId, Status status, List<MessageViewImpl> messages) { this.endpoints = endpoints; + this.requestId = requestId; final Code code = status.getCode(); switch (code) { case OK: @@ -108,4 +110,8 @@ public class ReceiveMessageResult { public List<MessageViewImpl> getMessages() { return messages; } + + public String getRequestId() { + return requestId; + } } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java index b0ccfd9..77750e1 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java @@ -56,6 +56,7 @@ import org.apache.rocketmq.client.java.message.MessageViewImpl; import org.apache.rocketmq.client.java.message.protocol.Resource; import org.apache.rocketmq.client.java.route.MessageQueueImpl; import org.apache.rocketmq.client.java.route.TopicRouteDataResult; +import org.apache.rocketmq.client.java.rpc.InvocationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -240,9 +241,10 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer { return future0; } MessageViewImpl impl = (MessageViewImpl) messageView; - final ListenableFuture<AckMessageResponse> future = ackMessage(impl); - return Futures.transformAsync(future, response -> { - final Status status = response.getStatus(); + final ListenableFuture<InvocationContext<AckMessageResponse>> future = ackMessage(impl); + return Futures.transformAsync(future, context -> { + final AckMessageResponse resp = context.getResp(); + final Status status = resp.getStatus(); final Code code = status.getCode(); switch (code) { case OK: @@ -300,12 +302,13 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer { return future0; } MessageViewImpl impl = (MessageViewImpl) messageView; - final ListenableFuture<ChangeInvisibleDurationResponse> future = changeInvisibleDuration(impl, - invisibleDuration); - return Futures.transformAsync(future, response -> { + final ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future = + changeInvisibleDuration(impl, invisibleDuration); + return Futures.transformAsync(future, context -> { + final ChangeInvisibleDurationResponse resp = context.getResp(); // Refresh receipt handle manually. - impl.setReceiptHandle(response.getReceiptHandle()); - final Status status = response.getStatus(); + impl.setReceiptHandle(resp.getReceiptHandle()); + final Status status = resp.getStatus(); final Code code = status.getCode(); switch (code) { case OK: diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java index 6434cca..427d318 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java @@ -72,6 +72,7 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy; import org.apache.rocketmq.client.java.route.Endpoints; import org.apache.rocketmq.client.java.route.MessageQueueImpl; import org.apache.rocketmq.client.java.route.TopicRouteDataResult; +import org.apache.rocketmq.client.java.rpc.InvocationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -280,13 +281,14 @@ class ProducerImpl extends ClientImpl implements Producer { MessageHookPoints.COMMIT_TRANSACTION : MessageHookPoints.ROLLBACK_TRANSACTION; doBefore(messageHookPoints, messageCommons); - final ListenableFuture<EndTransactionResponse> future = + final ListenableFuture<InvocationContext<EndTransactionResponse>> future = clientManager.endTransaction(endpoints, metadata, request, requestTimeout); - Futures.addCallback(future, new FutureCallback<EndTransactionResponse>() { + Futures.addCallback(future, new FutureCallback<InvocationContext<EndTransactionResponse>>() { @Override - public void onSuccess(EndTransactionResponse result) { + public void onSuccess(InvocationContext<EndTransactionResponse> context) { final Duration duration = stopwatch.elapsed(); - final Status status = result.getStatus(); + final EndTransactionResponse resp = context.getResp(); + final Status status = resp.getStatus(); final Code code = status.getCode(); MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR; @@ -299,8 +301,9 @@ class ProducerImpl extends ClientImpl implements Producer { doAfter(messageHookPoints, messageCommons, duration, MessageHookPointsStatus.ERROR); } }, MoreExecutors.directExecutor()); - final EndTransactionResponse response = handleClientFuture(future); - final Status status = response.getStatus(); + final InvocationContext<EndTransactionResponse> context = handleClientFuture(future); + final EndTransactionResponse resp = context.getResp(); + final Status status = resp.getStatus(); final Code code = status.getCode(); if (!Code.OK.equals(code)) { throw new ClientException(code.getNumber(), status.getMessage()); @@ -442,13 +445,13 @@ class ProducerImpl extends ClientImpl implements Producer { final Endpoints endpoints = messageQueue.getBroker().getEndpoints(); final SendMessageRequest request = wrapSendMessageRequest(messages); - final ListenableFuture<SendMessageResponse> responseFuture = clientManager.sendMessage(endpoints, metadata, - request, clientConfiguration.getRequestTimeout()); + final ListenableFuture<InvocationContext<SendMessageResponse>> responseFuture = + clientManager.sendMessage(endpoints, metadata, request, clientConfiguration.getRequestTimeout()); final ListenableFuture<List<SendReceiptImpl>> attemptFuture = Futures.transformAsync(responseFuture, response -> { final SettableFuture<List<SendReceiptImpl>> future0 = SettableFuture.create(); - future0.set(SendReceiptImpl.processSendResponse(messageQueue, response)); + future0.set(SendReceiptImpl.processSendResponse(messageQueue, response.getResp())); return future0; }, MoreExecutors.directExecutor()); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/InvocationContext.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/InvocationContext.java new file mode 100644 index 0000000..eb5f487 --- /dev/null +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/InvocationContext.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.java.rpc; + +import com.google.common.base.MoreObjects; + +public class InvocationContext<T> { + private final T t; + private final RpcContext rpcContext; + + public InvocationContext(T t, RpcContext rpcContext) { + this.t = t; + this.rpcContext = rpcContext; + } + + public T getResp() { + return t; + } + + public RpcContext getRpcContext() { + return rpcContext; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("resp", t) + .add("rpcContext", rpcContext) + .toString(); + } +} diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java index 415d228..70744f3 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java @@ -72,9 +72,10 @@ public interface RpcClient { * @param request query route request. * @param executor gRPC asynchronous executor. * @param duration request max duration. - * @return response future of topic route. + * @return invocation of response future. */ - ListenableFuture<QueryRouteResponse> queryRoute(Metadata metadata, QueryRouteRequest request, Executor executor, + ListenableFuture<InvocationContext<QueryRouteResponse>> queryRoute(Metadata metadata, QueryRouteRequest request, + Executor executor, Duration duration); /** @@ -84,9 +85,10 @@ public interface RpcClient { * @param request heart beat request. * @param executor gRPC asynchronous executor. * @param duration request max duration. - * @return response future of heart beat. + * @return invocation of response future. */ - ListenableFuture<HeartbeatResponse> heartbeat(Metadata metadata, HeartbeatRequest request, Executor executor, + ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Metadata metadata, HeartbeatRequest request, + Executor executor, Duration duration); /** @@ -96,10 +98,10 @@ public interface RpcClient { * @param request send message request. * @param executor gRPC asynchronous executor. * @param duration request max duration. - * @return response future of sending message. + * @return invocation of response future. */ - ListenableFuture<SendMessageResponse> sendMessage(Metadata metadata, SendMessageRequest request, Executor executor, - Duration duration); + ListenableFuture<InvocationContext<SendMessageResponse>> sendMessage(Metadata metadata, + SendMessageRequest request, Executor executor, Duration duration); /** * Query assignment asynchronously. @@ -108,10 +110,10 @@ public interface RpcClient { * @param request query assignment request. * @param executor gRPC asynchronous executor. * @param duration request max duration. - * @return response future of query assignment. + * @return invocation of response future. */ - ListenableFuture<QueryAssignmentResponse> queryAssignment(Metadata metadata, QueryAssignmentRequest request, - Executor executor, Duration duration); + ListenableFuture<InvocationContext<QueryAssignmentResponse>> queryAssignment(Metadata metadata, + QueryAssignmentRequest request, Executor executor, Duration duration); /** * Receiving message asynchronously from server. @@ -119,9 +121,10 @@ public interface RpcClient { * @param metadata gRPC request header metadata. * @param request receiving message request. * @param executor gRPC asynchronous executor. + * @return invocation of response future. */ - ListenableFuture<Iterator<ReceiveMessageResponse>> receiveMessage(Metadata metadata, ReceiveMessageRequest request, - ExecutorService executor, Duration duration); + ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> receiveMessage(Metadata metadata, + ReceiveMessageRequest request, ExecutorService executor, Duration duration); /** * Ack message asynchronously after success of consumption. @@ -130,10 +133,10 @@ public interface RpcClient { * @param request ack message request. * @param executor gRPC asynchronous executor. * @param duration request max duration. - * @return response future of ack message. + * @return invocation of response future. */ - ListenableFuture<AckMessageResponse> ackMessage(Metadata metadata, AckMessageRequest request, Executor executor, - Duration duration); + ListenableFuture<InvocationContext<AckMessageResponse>> ackMessage(Metadata metadata, AckMessageRequest request, + Executor executor, Duration duration); /** * Change message invisible duration. @@ -142,9 +145,9 @@ public interface RpcClient { * @param request change invisible duration request. * @param executor gRPC asynchronous executor. * @param duration request max duration. - * @return response future of change message invisible duration. + * @return invocation of response future. */ - ListenableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuration(Metadata metadata, + ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> changeInvisibleDuration(Metadata metadata, ChangeInvisibleDurationRequest request, Executor executor, Duration duration); /** @@ -154,9 +157,9 @@ public interface RpcClient { * @param request request of sending message to DLQ. * @param executor gRPC asynchronous executor. * @param duration request max duration. - * @return response future of sending message to DLQ. + * @return invocation of response future. */ - ListenableFuture<ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue( + ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue( Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, Executor executor, Duration duration); /** @@ -166,10 +169,10 @@ public interface RpcClient { * @param request end transaction request. * @param executor gRPC asynchronous executor. * @param duration request max duration. - * @return response future of submitting transaction resolution. + * @return invocation of response future. */ - ListenableFuture<EndTransactionResponse> endTransaction(Metadata metadata, EndTransactionRequest request, - Executor executor, Duration duration); + ListenableFuture<InvocationContext<EndTransactionResponse>> endTransaction(Metadata metadata, + EndTransactionRequest request, Executor executor, Duration duration); /** * Asynchronously notify server that client is terminated. @@ -178,9 +181,9 @@ public interface RpcClient { * @param request notify client termination request. * @param executor gRPC asynchronous executor. * @param duration request max duration. - * @return response future of notification of client termination. + * @return invocation of response future. */ - ListenableFuture<NotifyClientTerminationResponse> notifyClientTermination(Metadata metadata, + ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> notifyClientTermination(Metadata metadata, NotifyClientTerminationRequest request, Executor executor, Duration duration); StreamObserver<TelemetryCommand> telemetry(Metadata metadata, Executor executor, Duration duration, diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java index 15b94aa..ffba65e 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java @@ -39,6 +39,7 @@ import apache.rocketmq.v2.ReceiveMessageResponse; import apache.rocketmq.v2.SendMessageRequest; import apache.rocketmq.v2.SendMessageResponse; import apache.rocketmq.v2.TelemetryCommand; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import io.grpc.ClientInterceptor; @@ -62,10 +63,12 @@ import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLException; import org.apache.rocketmq.client.java.route.Endpoints; +@SuppressWarnings("UnstableApiUsage") public class RpcClientImpl implements RpcClient { private static final Duration KEEP_ALIVE_DURATION = Duration.ofSeconds(30); private static final int GRPC_MAX_MESSAGE_SIZE = Integer.MAX_VALUE; + private final Endpoints endpoints; private final ManagedChannel channel; private final MessagingServiceGrpc.MessagingServiceFutureStub futureStub; private final MessagingServiceGrpc.MessagingServiceBlockingStub blockingStub; @@ -75,6 +78,7 @@ public class RpcClientImpl implements RpcClient { @SuppressWarnings("deprecation") public RpcClientImpl(Endpoints endpoints) throws SSLException { + this.endpoints = endpoints; final SslContextBuilder builder = GrpcSslContexts.forClient(); builder.trustManager(InsecureTrustManagerFactory.INSTANCE); SslContext sslContext = builder.build(); @@ -102,6 +106,14 @@ public class RpcClientImpl implements RpcClient { this.activityNanoTime = System.nanoTime(); } + private <T> ListenableFuture<InvocationContext<T>> wrapInvocationContext(ListenableFuture<T> future, + Metadata header) { + return Futures.transformAsync(future, response -> { + final RpcContext rpcContext = new RpcContext(endpoints, header); + return Futures.immediateFuture(new InvocationContext<>(response, rpcContext)); + }, MoreExecutors.directExecutor()); + } + @Override public Duration idleDuration() { return Duration.ofNanos(System.nanoTime() - activityNanoTime); @@ -113,85 +125,105 @@ public class RpcClientImpl implements RpcClient { } @Override - public ListenableFuture<QueryRouteResponse> queryRoute(Metadata metadata, QueryRouteRequest request, - Executor executor, Duration duration) { + public ListenableFuture<InvocationContext<QueryRouteResponse>> queryRoute(Metadata metadata, + QueryRouteRequest request, Executor executor, Duration duration) { this.activityNanoTime = System.nanoTime(); - return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) + final ListenableFuture<QueryRouteResponse> future = futureStub + .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).queryRoute(request); + return wrapInvocationContext(future, metadata); } @Override - public ListenableFuture<HeartbeatResponse> heartbeat(Metadata metadata, HeartbeatRequest request, Executor executor, - Duration duration) { + public ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Metadata metadata, HeartbeatRequest request, + Executor executor, Duration duration) { this.activityNanoTime = System.nanoTime(); - return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) - .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).heartbeat(request); + final ListenableFuture<HeartbeatResponse> future = + futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) + .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).heartbeat(request); + return wrapInvocationContext(future, metadata); } @Override - public ListenableFuture<SendMessageResponse> sendMessage(Metadata metadata, SendMessageRequest request, - Executor executor, Duration duration) { + public ListenableFuture<InvocationContext<SendMessageResponse>> sendMessage(Metadata metadata, + SendMessageRequest request, Executor executor, Duration duration) { this.activityNanoTime = System.nanoTime(); - return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) - .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).sendMessage(request); + final ListenableFuture<SendMessageResponse> future = + futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) + .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).sendMessage(request); + return wrapInvocationContext(future, metadata); } @Override - public ListenableFuture<QueryAssignmentResponse> queryAssignment(Metadata metadata, QueryAssignmentRequest request, - Executor executor, Duration duration) { + public ListenableFuture<InvocationContext<QueryAssignmentResponse>> queryAssignment(Metadata metadata, + QueryAssignmentRequest request, Executor executor, Duration duration) { this.activityNanoTime = System.nanoTime(); - return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) - .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).queryAssignment(request); + final ListenableFuture<QueryAssignmentResponse> future = + futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) + .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).queryAssignment(request); + return wrapInvocationContext(future, metadata); } @Override - public ListenableFuture<Iterator<ReceiveMessageResponse>> receiveMessage(Metadata metadata, + public ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> receiveMessage(Metadata metadata, ReceiveMessageRequest request, ExecutorService executor, Duration duration) { this.activityNanoTime = System.nanoTime(); final Callable<Iterator<ReceiveMessageResponse>> callable = () -> blockingStub .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).receiveMessage(request); - return MoreExecutors.listeningDecorator(executor).submit(callable); + final ListenableFuture<Iterator<ReceiveMessageResponse>> future = + MoreExecutors.listeningDecorator(executor).submit(callable); + return wrapInvocationContext(future, metadata); } @Override - public ListenableFuture<AckMessageResponse> ackMessage(Metadata metadata, AckMessageRequest request, - Executor executor, Duration duration) { + public ListenableFuture<InvocationContext<AckMessageResponse>> ackMessage(Metadata metadata, + AckMessageRequest request, Executor executor, Duration duration) { this.activityNanoTime = System.nanoTime(); - return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) - .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).ackMessage(request); + final ListenableFuture<AckMessageResponse> future = + futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) + .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).ackMessage(request); + return wrapInvocationContext(future, metadata); } @Override - public ListenableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuration(Metadata metadata, - ChangeInvisibleDurationRequest request, Executor executor, Duration duration) { + public ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> changeInvisibleDuration( + Metadata metadata, ChangeInvisibleDurationRequest request, Executor executor, Duration duration) { this.activityNanoTime = System.nanoTime(); - return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) - .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).changeInvisibleDuration(request); + final ListenableFuture<ChangeInvisibleDurationResponse> future = + futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) + .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).changeInvisibleDuration(request); + return wrapInvocationContext(future, metadata); } @Override - public ListenableFuture<ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue( + public ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue( Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, Executor executor, Duration duration) { this.activityNanoTime = System.nanoTime(); - return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) + final ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future = futureStub + .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).forwardMessageToDeadLetterQueue(request); + return wrapInvocationContext(future, metadata); } @Override - public ListenableFuture<EndTransactionResponse> endTransaction(Metadata metadata, EndTransactionRequest request, - Executor executor, Duration duration) { + public ListenableFuture<InvocationContext<EndTransactionResponse>> endTransaction(Metadata metadata, + EndTransactionRequest request, Executor executor, Duration duration) { this.activityNanoTime = System.nanoTime(); - return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) - .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).endTransaction(request); + final ListenableFuture<EndTransactionResponse> future = + futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) + .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).endTransaction(request); + return wrapInvocationContext(future, metadata); } @Override - public ListenableFuture<NotifyClientTerminationResponse> notifyClientTermination( + public ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> notifyClientTermination( Metadata metadata, NotifyClientTerminationRequest request, Executor executor, Duration duration) { this.activityNanoTime = System.nanoTime(); - return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) - .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).notifyClientTermination(request); + final ListenableFuture<NotifyClientTerminationResponse> future = + futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) + .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).notifyClientTermination(request); + return wrapInvocationContext(future, metadata); } @Override diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcContext.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcContext.java new file mode 100644 index 0000000..5def58b --- /dev/null +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcContext.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.java.rpc; + +import io.grpc.Metadata; +import org.apache.rocketmq.client.java.route.Endpoints; + +public class RpcContext { + private final Endpoints endpoints; + private final Metadata header; + + public RpcContext(Endpoints endpoints, Metadata header) { + this.endpoints = endpoints; + this.header = header; + } + + public String getRequestId() { + return header.get(Metadata.Key.of(Signature.REQUEST_ID_KEY, Metadata.ASCII_STRING_MARSHALLER)); + } + + public Endpoints getEndpoints() { + return endpoints; + } +} diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java index 33b9f76..5c0a2cb 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java @@ -53,7 +53,7 @@ public class Signature { private Signature() { } - public static Metadata sign(ClientConfiguration config, String clientId) throws UnsupportedEncodingException, + public static Metadata sign(ClientConfiguration config, String clientId) throws NoSuchAlgorithmException, InvalidKeyException { Metadata metadata = new Metadata(); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java index ca2f720..9b60aa6 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java @@ -31,11 +31,10 @@ public class TLSHelper { private TLSHelper() { } - public static String sign(String accessSecret, String dateTime) throws UnsupportedEncodingException, - NoSuchAlgorithmException, - InvalidKeyException { + public static String sign(String accessSecret, String dateTime) throws NoSuchAlgorithmException, + InvalidKeyException { SecretKeySpec signingKey = new SecretKeySpec(accessSecret.getBytes(StandardCharsets.UTF_8), - HMAC_SHA1_ALGORITHM); + HMAC_SHA1_ALGORITHM); Mac mac; mac = Mac.getInstance(HMAC_SHA1_ALGORITHM); mac.init(signingKey); diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java index 401ce38..5ef44bc 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java @@ -44,8 +44,10 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.java.message.MessageViewImpl; +import org.apache.rocketmq.client.java.misc.RequestIdGenerator; import org.apache.rocketmq.client.java.retry.RetryPolicy; import org.apache.rocketmq.client.java.route.MessageQueueImpl; +import org.apache.rocketmq.client.java.rpc.InvocationContext; import org.apache.rocketmq.client.java.tool.TestBase; import org.junit.Before; import org.junit.Test; @@ -117,7 +119,7 @@ public class ProcessQueueImplTest extends TestBase { when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy); when(pushConsumerSettings.isFifo()).thenReturn(false); when(pushConsumer.changeInvisibleDuration(any(MessageViewImpl.class), any(Duration.class))) - .thenReturn(okChangeInvisibleDurationFuture()); + .thenReturn(okChangeInvisibleDurationCtxFuture()); processQueue.cacheMessages(messageViewList); verify(pushConsumer, times(1)) .changeInvisibleDuration(any(MessageViewImpl.class), any(Duration.class)); @@ -148,7 +150,8 @@ public class ProcessQueueImplTest extends TestBase { List<MessageViewImpl> messageViewList = new ArrayList<>(); final MessageViewImpl messageView = fakeMessageViewImpl(); messageViewList.add(messageView); - ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(fakeEndpoints(), status, messageViewList); + ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(fakeEndpoints(), + RequestIdGenerator.getInstance().next(), status, messageViewList); SettableFuture<ReceiveMessageResult> future0 = SettableFuture.create(); future0.set(receiveMessageResult); when(pushConsumer.receiveMessage(any(ReceiveMessageRequest.class), any(MessageQueueImpl.class), @@ -178,7 +181,7 @@ public class ProcessQueueImplTest extends TestBase { assertEquals(cachedMessageCount, processQueue.cachedMessagesCount()); assertEquals(1, processQueue.inflightMessagesCount()); - final ListenableFuture<AckMessageResponse> future = okAckMessageResponseFuture(); + final ListenableFuture<InvocationContext<AckMessageResponse>> future = okAckMessageResponseFuture(); when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future); processQueue.eraseMessage(optionalMessageView.get(), ConsumeResult.SUCCESS); future.addListener(() -> verify(pushConsumer, times(1)) @@ -221,7 +224,7 @@ public class ProcessQueueImplTest extends TestBase { final MessageViewImpl messageView = fakeMessageViewImpl(2, false); messageViewList.add(messageView); processQueue.cacheMessages(messageViewList); - ListenableFuture<AckMessageResponse> future0 = okAckMessageResponseFuture(); + ListenableFuture<InvocationContext<AckMessageResponse>> future0 = okAckMessageResponseFuture(); when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future0); when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy); when(retryPolicy.getMaxAttempts()).thenReturn(1); @@ -237,7 +240,7 @@ public class ProcessQueueImplTest extends TestBase { final MessageViewImpl messageView = fakeMessageViewImpl(2, false); messageViewList.add(messageView); processQueue.cacheMessages(messageViewList); - ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future0 = + ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> future0 = okForwardMessageToDeadLetterQueueResponseFuture(); when(pushConsumer.forwardMessageToDeadLetterQueue(any(MessageViewImpl.class))).thenReturn(future0); when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy); @@ -254,7 +257,7 @@ public class ProcessQueueImplTest extends TestBase { final MessageViewImpl messageView = fakeMessageViewImpl(2, false); messageViewList.add(messageView); processQueue.cacheMessages(messageViewList); - ListenableFuture<AckMessageResponse> future0 = okAckMessageResponseFuture(); + ListenableFuture<InvocationContext<AckMessageResponse>> future0 = okAckMessageResponseFuture(); when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future0); when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy); when(retryPolicy.getMaxAttempts()).thenReturn(2); diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java index 54e507c..a7b5357 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java @@ -124,7 +124,8 @@ public class PushConsumerImplTest extends TestBase { any(QueryRouteRequest.class), any(Duration.class)); verify(clientManager, atLeast(1)).queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class), any(Duration.class)); - Assert.assertEquals(okQueryAssignmentResponseFuture().get().getAssignmentsCount(), pushConsumer.getQueueSize()); + Assert.assertEquals(okQueryAssignmentResponseFuture().get().getResp().getAssignmentsCount(), + pushConsumer.getQueueSize()); when(clientManager.queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class), any(Duration.class))).thenReturn(okEmptyQueryAssignmentResponseFuture()); pushConsumer.scanAssignments(); @@ -148,7 +149,8 @@ public class PushConsumerImplTest extends TestBase { @Test public void testSubscribeWithSubscriptionOverwriting() throws ClientException { - pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener, + pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, + messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount); start(pushConsumer); final FilterExpression filterExpression = new FilterExpression(FAKE_TAG_0); diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java index 9568a81..88445d1 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java @@ -63,6 +63,7 @@ import org.apache.rocketmq.client.java.impl.TelemetrySession; import org.apache.rocketmq.client.java.message.MessageViewImpl; import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl; import org.apache.rocketmq.client.java.route.Endpoints; +import org.apache.rocketmq.client.java.rpc.InvocationContext; import org.apache.rocketmq.client.java.tool.TestBase; import org.junit.Test; import org.junit.runner.RunWith; @@ -87,7 +88,7 @@ public class SimpleConsumerImplTest extends TestBase { private SimpleConsumerImpl simpleConsumer; private void start(SimpleConsumerImpl simpleConsumer) throws ClientException { - SettableFuture<QueryRouteResponse> future0 = SettableFuture.create(); + SettableFuture<InvocationContext<QueryRouteResponse>> future0 = SettableFuture.create(); Status status = Status.newBuilder().setCode(Code.OK).build(); List<MessageQueue> messageQueueList = new ArrayList<>(); MessageQueue mq = MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0)) @@ -97,7 +98,9 @@ public class SimpleConsumerImplTest extends TestBase { messageQueueList.add(mq); QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status) .addAllMessageQueues(messageQueueList).build(); - future0.set(response); + final InvocationContext<QueryRouteResponse> invocationContext = new InvocationContext<>(response, + fakeRpcContext()); + future0.set(invocationContext); when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class))) .thenReturn(future0); @@ -180,7 +183,7 @@ public class SimpleConsumerImplTest extends TestBase { simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions); start(simpleConsumer); int receivedMessageCount = 16; - final ListenableFuture<Iterator<ReceiveMessageResponse>> future = + final ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> future = okReceiveMessageResponsesFuture(FAKE_TOPIC_0, receivedMessageCount); when(clientManager.receiveMessage(any(Endpoints.class), any(Metadata.class), any(ReceiveMessageRequest.class), any(Duration.class))).thenReturn(future); @@ -197,7 +200,7 @@ public class SimpleConsumerImplTest extends TestBase { start(simpleConsumer); try { final MessageViewImpl messageView = fakeMessageViewImpl(); - final ListenableFuture<AckMessageResponse> future = okAckMessageResponseFuture(); + final ListenableFuture<InvocationContext<AckMessageResponse>> future = okAckMessageResponseFuture(); when(clientManager.ackMessage(any(Endpoints.class), any(Metadata.class), any(AckMessageRequest.class), any(Duration.class))).thenReturn(future); simpleConsumer.ack(messageView); @@ -212,7 +215,7 @@ public class SimpleConsumerImplTest extends TestBase { start(simpleConsumer); try { final MessageViewImpl messageView = fakeMessageViewImpl(); - final ListenableFuture<ChangeInvisibleDurationResponse> future = + final ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future = okChangeInvisibleDurationResponseFuture(FAKE_RECEIPT_HANDLE_1); when(clientManager.changeInvisibleDuration(any(Endpoints.class), any(Metadata.class), any(ChangeInvisibleDurationRequest.class), any(Duration.class))).thenReturn(future); diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java index 907a6fc..619538e 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java @@ -61,6 +61,7 @@ import org.apache.rocketmq.client.java.impl.ClientManagerRegistry; import org.apache.rocketmq.client.java.impl.TelemetrySession; import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl; import org.apache.rocketmq.client.java.route.Endpoints; +import org.apache.rocketmq.client.java.rpc.InvocationContext; import org.apache.rocketmq.client.java.tool.TestBase; import org.junit.Test; import org.junit.runner.RunWith; @@ -94,7 +95,7 @@ public class ProducerImplTest extends TestBase { null); private void start(ProducerImpl producer) throws ClientException { - SettableFuture<QueryRouteResponse> future0 = SettableFuture.create(); + SettableFuture<InvocationContext<QueryRouteResponse>> future0 = SettableFuture.create(); Status status = Status.newBuilder().setCode(Code.OK).build(); List<MessageQueue> messageQueueList = new ArrayList<>(); MessageQueue mq = MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0)) @@ -102,9 +103,11 @@ public class ProducerImplTest extends TestBase { .setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0())) .setId(0).build(); messageQueueList.add(mq); - QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status) + QueryRouteResponse resp = QueryRouteResponse.newBuilder().setStatus(status) .addAllMessageQueues(messageQueueList).build(); - future0.set(response); + final InvocationContext<QueryRouteResponse> invocationContext = + new InvocationContext<>(resp, fakeRpcContext()); + future0.set(invocationContext); when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class))) .thenReturn(future0); @@ -144,10 +147,11 @@ public class ProducerImplTest extends TestBase { verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class)); final Message message = fakeMessage(FAKE_TOPIC_0); - final ListenableFuture<SendMessageResponse> future = okSendMessageResponseFutureWithSingleEntry(); + final ListenableFuture<InvocationContext<SendMessageResponse>> future = + okSendMessageResponseFutureWithSingleEntry(); when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class), any(Duration.class))).thenReturn(future); - final SendMessageResponse response = future.get(); + final SendMessageResponse response = future.get().getResp(); assertEquals(1, response.getEntriesCount()); final apache.rocketmq.v2.SendResultEntry receipt = response.getEntriesList().iterator().next(); final SendReceipt sendReceipt = producer.send(message); @@ -163,10 +167,11 @@ public class ProducerImplTest extends TestBase { verify(clientManager, never()).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class)); final Message message = fakeMessage(FAKE_TOPIC_0); - final ListenableFuture<SendMessageResponse> future = okSendMessageResponseFutureWithSingleEntry(); + final ListenableFuture<InvocationContext<SendMessageResponse>> future = + okSendMessageResponseFutureWithSingleEntry(); when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class), any(Duration.class))).thenReturn(future); - final SendMessageResponse response = future.get(); + final SendMessageResponse response = future.get().getResp(); assertEquals(1, response.getEntriesCount()); final SendReceipt sendReceipt = producerWithoutTopicBinding.send(message); verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), @@ -185,7 +190,7 @@ public class ProducerImplTest extends TestBase { any(QueryRouteRequest.class), any(Duration.class)); verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class)); - final ListenableFuture<SendMessageResponse> future = failureSendMessageResponseFuture(); + final ListenableFuture<InvocationContext<SendMessageResponse>> future = failureSendMessageResponseFuture(); when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class), any(Duration.class))).thenReturn(future); Message message0 = fakeMessage(FAKE_TOPIC_0); diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java index 212cb06..f3bcbb2 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java @@ -39,9 +39,11 @@ import apache.rocketmq.v2.SendMessageResponse; import apache.rocketmq.v2.SendResultEntry; import apache.rocketmq.v2.Status; import apache.rocketmq.v2.SystemProperties; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.ByteString; +import io.grpc.Metadata; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; @@ -72,6 +74,8 @@ import org.apache.rocketmq.client.java.retry.CustomizedBackoffRetryPolicy; import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy; import org.apache.rocketmq.client.java.route.Endpoints; import org.apache.rocketmq.client.java.route.MessageQueueImpl; +import org.apache.rocketmq.client.java.rpc.InvocationContext; +import org.apache.rocketmq.client.java.rpc.RpcContext; public class TestBase { protected static final String FAKE_CLIENT_ID = "mbp@29848@cno0nhxy"; @@ -137,6 +141,10 @@ public class TestBase { return new Endpoints(fakePbEndpoints0()); } + protected RpcContext fakeRpcContext() { + return new RpcContext(fakeEndpoints(), new Metadata()); + } + protected Message fakeMessage(String topic) { return new MessageBuilderImpl().setTopic(topic).setBody(RandomUtils.nextBytes(1)).build(); } @@ -207,40 +215,35 @@ public class TestBase { .setPermission(Permission.READ_WRITE).build(); } - protected ListenableFuture<QueryRouteResponse> okQueryRouteResponseFuture() { - SettableFuture<QueryRouteResponse> future = SettableFuture.create(); + protected ListenableFuture<InvocationContext<QueryRouteResponse>> okQueryRouteResponseFuture() { Status status = Status.newBuilder().setCode(Code.OK).build(); - final QueryRouteResponse response = + final QueryRouteResponse resp = QueryRouteResponse.newBuilder().setStatus(status).addMessageQueues(fakePbMessageQueue0()).build(); - future.set(response); - return future; + return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext())); } - protected ListenableFuture<ChangeInvisibleDurationResponse> okChangeInvisibleDurationFuture() { - SettableFuture<ChangeInvisibleDurationResponse> future = SettableFuture.create(); + protected ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> + okChangeInvisibleDurationCtxFuture() { Status status = Status.newBuilder().setCode(Code.OK).build(); - final ChangeInvisibleDurationResponse response = + final ChangeInvisibleDurationResponse resp = ChangeInvisibleDurationResponse.newBuilder().setStatus(status).build(); - future.set(response); - return future; + return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext())); } - protected ListenableFuture<QueryAssignmentResponse> okQueryAssignmentResponseFuture() { + protected ListenableFuture<InvocationContext<QueryAssignmentResponse>> okQueryAssignmentResponseFuture() { final SettableFuture<QueryAssignmentResponse> future = SettableFuture.create(); final Status status = Status.newBuilder().setCode(Code.OK).build(); Assignment assignment = Assignment.newBuilder().setMessageQueue(fakePbMessageQueue0()).build(); - QueryAssignmentResponse response = QueryAssignmentResponse.newBuilder().setStatus(status) + QueryAssignmentResponse resp = QueryAssignmentResponse.newBuilder().setStatus(status) .addAssignments(assignment).build(); - future.set(response); - return future; + return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext())); } - protected ListenableFuture<QueryAssignmentResponse> okEmptyQueryAssignmentResponseFuture() { + protected ListenableFuture<InvocationContext<QueryAssignmentResponse>> okEmptyQueryAssignmentResponseFuture() { final SettableFuture<QueryAssignmentResponse> future = SettableFuture.create(); final Status status = Status.newBuilder().setCode(Code.OK).build(); - final QueryAssignmentResponse response = QueryAssignmentResponse.newBuilder().setStatus(status).build(); - future.set(response); - return future; + final QueryAssignmentResponse resp = QueryAssignmentResponse.newBuilder().setStatus(status).build(); + return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext())); } protected Map<String, FilterExpression> createSubscriptionExpressions(String topic) { @@ -250,53 +253,44 @@ public class TestBase { return map; } - protected ListenableFuture<AckMessageResponse> okAckMessageResponseFuture() { + protected ListenableFuture<InvocationContext<AckMessageResponse>> okAckMessageResponseFuture() { final Status status = Status.newBuilder().setCode(Code.OK).build(); - SettableFuture<AckMessageResponse> future0 = SettableFuture.create(); - final AckMessageResponse response = AckMessageResponse.newBuilder().setStatus(status).build(); - future0.set(response); - return future0; + final AckMessageResponse resp = AckMessageResponse.newBuilder().setStatus(status).build(); + return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext())); } - protected ListenableFuture<ChangeInvisibleDurationResponse> okChangeInvisibleDurationResponseFuture( - String receiptHandle) { + protected ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> + okChangeInvisibleDurationResponseFuture(String receiptHandle) { final Status status = Status.newBuilder().setCode(Code.OK).build(); SettableFuture<ChangeInvisibleDurationResponse> future = SettableFuture.create(); - ChangeInvisibleDurationResponse response = ChangeInvisibleDurationResponse.newBuilder().setStatus(status) + ChangeInvisibleDurationResponse resp = ChangeInvisibleDurationResponse.newBuilder().setStatus(status) .setReceiptHandle(receiptHandle).build(); - future.set(response); - return future; + return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext())); } - protected ListenableFuture<ForwardMessageToDeadLetterQueueResponse> - okForwardMessageToDeadLetterQueueResponseFuture() { + protected ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> + okForwardMessageToDeadLetterQueueResponseFuture() { final Status status = Status.newBuilder().setCode(Code.OK).build(); - SettableFuture<ForwardMessageToDeadLetterQueueResponse> future0 = SettableFuture.create(); - final ForwardMessageToDeadLetterQueueResponse response = + final ForwardMessageToDeadLetterQueueResponse resp = ForwardMessageToDeadLetterQueueResponse.newBuilder().setStatus(status).build(); - future0.set(response); - return future0; + return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext())); } - protected ListenableFuture<SendMessageResponse> okSendMessageResponseFutureWithSingleEntry() { + protected ListenableFuture<InvocationContext<SendMessageResponse>> okSendMessageResponseFutureWithSingleEntry() { final Status status = Status.newBuilder().setCode(Code.OK).build(); - SettableFuture<SendMessageResponse> future0 = SettableFuture.create(); final String messageId = MessageIdCodec.getInstance().nextMessageId().toString(); SendResultEntry entry = SendResultEntry.newBuilder().setMessageId(messageId) .setTransactionId(FAKE_TRANSACTION_ID).setStatus(status).setOffset(1).build(); - SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status).addEntries(entry).build(); - future0.set(response); - return future0; + SendMessageResponse resp = SendMessageResponse.newBuilder().setStatus(status).addEntries(entry).build(); + return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext())); } - protected ListenableFuture<SendMessageResponse> failureSendMessageResponseFuture() { + protected ListenableFuture<InvocationContext<SendMessageResponse>> failureSendMessageResponseFuture() { final Status status = Status.newBuilder().setCode(Code.FORBIDDEN).build(); - SettableFuture<SendMessageResponse> future0 = SettableFuture.create(); SendResultEntry sendResultEntry = SendResultEntry.newBuilder().setStatus(status).setStatus(status).build(); - SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status) + SendMessageResponse resp = SendMessageResponse.newBuilder().setStatus(status) .addEntries(sendResultEntry).build(); - future0.set(response); - return future0; + return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext())); } protected ListenableFuture<SendMessageResponse> okBatchSendMessageResponseFuture() { @@ -326,11 +320,9 @@ public class TestBase { .setSystemProperties(systemProperties).build(); } - protected ListenableFuture<Iterator<ReceiveMessageResponse>> okReceiveMessageResponsesFuture(String topic, - int messageCount) { + protected ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> okReceiveMessageResponsesFuture( + String topic, int messageCount) { final Status status = Status.newBuilder().setCode(Code.OK).build(); - SettableFuture<Iterator<ReceiveMessageResponse>> future = SettableFuture.create(); - final apache.rocketmq.v2.Message message = fakePbMessage(topic); List<ReceiveMessageResponse> responses = new ArrayList<>(); ReceiveMessageResponse statusResponse = ReceiveMessageResponse.newBuilder().setStatus(status).build(); @@ -339,9 +331,7 @@ public class TestBase { ReceiveMessageResponse messageResponse = ReceiveMessageResponse.newBuilder().setMessage(message).build(); responses.add(messageResponse); } - - future.set(responses.iterator()); - return future; + return Futures.immediateFuture(new InvocationContext<>(responses.iterator(), fakeRpcContext())); } protected ListenableFuture<EndTransactionResponse> okEndTransactionResponseFuture() { @@ -369,8 +359,9 @@ public class TestBase { protected SendReceiptImpl fakeSendReceiptImpl( MessageQueueImpl mq) throws ExecutionException, InterruptedException, ClientException { - final ListenableFuture<SendMessageResponse> future = okSendMessageResponseFutureWithSingleEntry(); - final SendMessageResponse response = future.get(); + final ListenableFuture<InvocationContext<SendMessageResponse>> future = + okSendMessageResponseFutureWithSingleEntry(); + final SendMessageResponse response = future.get().getResp(); final List<SendReceiptImpl> receipts = SendReceiptImpl.processSendResponse(mq, response); return receipts.iterator().next(); }
