This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 290a4da Polish code (#194)
290a4da is described below
commit 290a4da4aa609e7778089a3c09aa3cde58def496
Author: Aaron Ai <[email protected]>
AuthorDate: Sun Aug 28 17:18:36 2022 +0800
Polish code (#194)
---
.../java/example/ProducerFifoMessageExample.java | 2 +-
...dler.java => CompositedMessageInterceptor.java} | 38 +++++++++++-----------
...MessageHandler.java => MessageInterceptor.java} | 8 ++---
...Context.java => MessageInterceptorContext.java} | 2 +-
...mpl.java => MessageInterceptorContextImpl.java} | 10 +++---
.../rocketmq/client/java/impl/ClientImpl.java | 29 +++++++++--------
.../rocketmq/client/java/impl/ClientManager.java | 6 ++--
.../client/java/impl/consumer/ConsumeService.java | 10 +++---
.../client/java/impl/consumer/ConsumeTask.java | 18 +++++-----
.../client/java/impl/consumer/ConsumerImpl.java | 23 ++++++-------
.../java/impl/consumer/FifoConsumeService.java | 7 ++--
.../client/java/impl/consumer/ProcessQueue.java | 16 ++++++---
.../java/impl/consumer/ProcessQueueImpl.java | 32 +++++++++---------
.../java/impl/consumer/PushConsumerImpl.java | 12 ++++---
.../java/impl/consumer/StandardConsumeService.java | 7 ++--
.../client/java/impl/producer/ProducerImpl.java | 20 ++++++------
...erHandler.java => MessageMeterInterceptor.java} | 22 ++++++-------
.../java/impl/consumer/ConsumeServiceTest.java | 4 +--
.../client/java/impl/consumer/ConsumeTaskTest.java | 10 +++---
.../java/metrics/MessageMeterHandlerTest.java | 34 +++++++++----------
20 files changed, 160 insertions(+), 150 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
index 823aab4..6af757f 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
@@ -69,7 +69,7 @@ public class ProducerFifoMessageExample {
// Key(s) of the message, another way to mark message besides
message id.
.setKeys("yourMessageKey-1ff69ada8e0e")
// Message group decides the message delivery order.
- .setMessageGroup("youMessageGroup0")
+ .setMessageGroup("yourMessageGroup0")
.setBody(body)
.build();
try {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageHandler.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java
similarity index 62%
rename from
java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageHandler.java
rename to
java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java
index 7a021ab..c0ddaef 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageHandler.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java
@@ -25,48 +25,48 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("rawtypes")
-public class CompositedMessageHandler implements MessageHandler {
- private static final Logger LOGGER =
LoggerFactory.getLogger(MessageHandler.class);
- private static final AttributeKey<Map<Integer, Map<AttributeKey,
Attribute>>> HANDLER_ATTRIBUTES_KEY =
- AttributeKey.create("composited_handler_attributes");
- private final List<MessageHandler> handlers;
+public class CompositedMessageInterceptor implements MessageInterceptor {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MessageInterceptor.class);
+ private static final AttributeKey<Map<Integer, Map<AttributeKey,
Attribute>>> INTERCEPTOR_ATTRIBUTES_KEY =
+ AttributeKey.create("composited_interceptor_attributes");
+ private final List<MessageInterceptor> interceptors;
- public CompositedMessageHandler(List<MessageHandler> handlers) {
- this.handlers = handlers;
+ public CompositedMessageInterceptor(List<MessageInterceptor> interceptors)
{
+ this.interceptors = interceptors;
}
@Override
- public void doBefore(MessageHandlerContext context0, List<GeneralMessage>
messages) {
+ public void doBefore(MessageInterceptorContext context0,
List<GeneralMessage> messages) {
final HashMap<Integer, Map<AttributeKey, Attribute>> attributeMap =
new HashMap<>();
- for (int index = 0; index < handlers.size(); index++) {
- MessageHandler handler = handlers.get(index);
+ for (int index = 0; index < interceptors.size(); index++) {
+ MessageInterceptor interceptor = interceptors.get(index);
final MessageHookPoints messageHookPoints =
context0.getMessageHookPoints();
final MessageHookPointsStatus status = context0.getStatus();
- final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(messageHookPoints, status);
+ final MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(messageHookPoints, status);
try {
- handler.doBefore(context, messages);
+ interceptor.doBefore(context, messages);
} catch (Throwable t) {
LOGGER.error("Exception raised while handing messages", t);
}
final Map<AttributeKey, Attribute> attributes =
context.getAttributes();
attributeMap.put(index, attributes);
}
- context0.putAttribute(HANDLER_ATTRIBUTES_KEY,
Attribute.create(attributeMap));
+ context0.putAttribute(INTERCEPTOR_ATTRIBUTES_KEY,
Attribute.create(attributeMap));
}
@Override
- public void doAfter(MessageHandlerContext context0, List<GeneralMessage>
messages) {
- for (int index = handlers.size() - 1; index >= 0; index--) {
+ public void doAfter(MessageInterceptorContext context0,
List<GeneralMessage> messages) {
+ for (int index = interceptors.size() - 1; index >= 0; index--) {
final Map<Integer, Map<AttributeKey, Attribute>> attributeMap =
- context0.getAttribute(HANDLER_ATTRIBUTES_KEY).get();
+ context0.getAttribute(INTERCEPTOR_ATTRIBUTES_KEY).get();
final Map<AttributeKey, Attribute> attributes =
attributeMap.get(index);
final MessageHookPoints messageHookPoints =
context0.getMessageHookPoints();
final MessageHookPointsStatus status = context0.getStatus();
- final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(messageHookPoints, status,
+ final MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(messageHookPoints, status,
attributes);
- MessageHandler handler = handlers.get(index);
+ MessageInterceptor interceptor = interceptors.get(index);
try {
- handler.doAfter(context, messages);
+ interceptor.doAfter(context, messages);
} catch (Throwable t) {
LOGGER.error("Exception raised while handing messages", t);
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHandler.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptor.java
similarity index 76%
rename from
java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHandler.java
rename to
java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptor.java
index b2c5087..593c408 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHandler.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptor.java
@@ -22,11 +22,11 @@ import java.util.List;
import org.apache.rocketmq.client.java.message.GeneralMessage;
/**
- * Message interceptor based on {@link MessageHookPoints}.
+ * Interface for intercepting ingoing/outgoing message before/after they are
dispatched by client.
*/
@Beta
-public interface MessageHandler {
- void doBefore(MessageHandlerContext context, List<GeneralMessage>
messages);
+public interface MessageInterceptor {
+ void doBefore(MessageInterceptorContext context, List<GeneralMessage>
messages);
- void doAfter(MessageHandlerContext context, List<GeneralMessage> messages);
+ void doAfter(MessageInterceptorContext context, List<GeneralMessage>
messages);
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHandlerContext.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptorContext.java
similarity index 95%
rename from
java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHandlerContext.java
rename to
java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptorContext.java
index e38aaec..6769ae9 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHandlerContext.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptorContext.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.client.java.hook;
-public interface MessageHandlerContext {
+public interface MessageInterceptorContext {
MessageHookPoints getMessageHookPoints();
MessageHookPointsStatus getStatus();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHandlerContextImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptorContextImpl.java
similarity index 82%
rename from
java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHandlerContextImpl.java
rename to
java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptorContextImpl.java
index 486cd30..2565eb7 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHandlerContextImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptorContextImpl.java
@@ -21,31 +21,31 @@ import java.util.HashMap;
import java.util.Map;
@SuppressWarnings("rawtypes")
-public class MessageHandlerContextImpl implements MessageHandlerContext {
+public class MessageInterceptorContextImpl implements
MessageInterceptorContext {
private final MessageHookPoints messageHookPoints;
private MessageHookPointsStatus status;
private final Map<AttributeKey, Attribute> attributes;
- public MessageHandlerContextImpl(MessageHookPoints messageHookPoints) {
+ public MessageInterceptorContextImpl(MessageHookPoints messageHookPoints) {
this.messageHookPoints = messageHookPoints;
this.status = MessageHookPointsStatus.UNSET;
this.attributes = new HashMap<>();
}
- public MessageHandlerContextImpl(MessageHookPoints messageHookPoints,
MessageHookPointsStatus status) {
+ public MessageInterceptorContextImpl(MessageHookPoints messageHookPoints,
MessageHookPointsStatus status) {
this.messageHookPoints = messageHookPoints;
this.status = status;
this.attributes = new HashMap<>();
}
- public MessageHandlerContextImpl(MessageHookPoints messageHookPoints,
MessageHookPointsStatus status,
+ public MessageInterceptorContextImpl(MessageHookPoints messageHookPoints,
MessageHookPointsStatus status,
Map<AttributeKey, Attribute> attributes) {
this.messageHookPoints = messageHookPoints;
this.status = status;
this.attributes = attributes;
}
- public MessageHandlerContextImpl(MessageHandlerContextImpl context,
MessageHookPointsStatus status) {
+ public MessageInterceptorContextImpl(MessageInterceptorContextImpl
context, MessageHookPointsStatus status) {
this.messageHookPoints = context.messageHookPoints;
this.status = status;
this.attributes = context.attributes;
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index d48ee74..4745ec5 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -72,13 +72,13 @@ import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.exception.InternalErrorException;
import org.apache.rocketmq.client.java.exception.StatusChecker;
-import org.apache.rocketmq.client.java.hook.CompositedMessageHandler;
-import org.apache.rocketmq.client.java.hook.MessageHandler;
-import org.apache.rocketmq.client.java.hook.MessageHandlerContext;
+import org.apache.rocketmq.client.java.hook.CompositedMessageInterceptor;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
+import org.apache.rocketmq.client.java.hook.MessageInterceptorContext;
import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
import org.apache.rocketmq.client.java.message.GeneralMessage;
import org.apache.rocketmq.client.java.metrics.ClientMeterManager;
-import org.apache.rocketmq.client.java.metrics.MessageMeterHandler;
+import org.apache.rocketmq.client.java.metrics.MessageMeterInterceptor;
import org.apache.rocketmq.client.java.metrics.Metric;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.apache.rocketmq.client.java.misc.ExecutorServices;
@@ -93,7 +93,7 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
public abstract class ClientImpl extends AbstractIdleService implements
Client, ClientSessionHandler,
- MessageHandler {
+ MessageInterceptor {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClientImpl.class);
/**
* The telemetry timeout should not be too long, otherwise
@@ -126,7 +126,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
private final Map<Endpoints, ClientSessionImpl> sessionsTable;
private final ReadWriteLock sessionsLock;
- private final CompositedMessageHandler compositedMessageHandler;
+ private final CompositedMessageInterceptor compositedMessageInterceptor;
public ClientImpl(ClientConfiguration clientConfiguration, Set<String>
topics) {
this.clientConfiguration = checkNotNull(clientConfiguration,
"clientConfiguration should not be null");
@@ -158,8 +158,9 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
this.clientMeterManager = new ClientMeterManager(clientId,
clientConfiguration);
- this.compositedMessageHandler =
- new CompositedMessageHandler(Collections.singletonList(new
MessageMeterHandler(this, clientMeterManager)));
+ this.compositedMessageInterceptor =
+ new CompositedMessageInterceptor(Collections.singletonList(new
MessageMeterInterceptor(this,
+ clientMeterManager)));
this.telemetryCommandExecutor = new ThreadPoolExecutor(
1,
@@ -232,22 +233,22 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
}
@Override
- public void doBefore(MessageHandlerContext context, List<GeneralMessage>
generalMessages) {
+ public void doBefore(MessageInterceptorContext context,
List<GeneralMessage> generalMessages) {
try {
- compositedMessageHandler.doBefore(context, generalMessages);
+ compositedMessageInterceptor.doBefore(context, generalMessages);
} catch (Throwable t) {
// Should never reach here.
- LOGGER.error("[BUG] Exception raised while handling messages,
clientId={}", clientId, t);
+ LOGGER.error("[Bug] Exception raised while handling messages,
clientId={}", clientId, t);
}
}
@Override
- public void doAfter(MessageHandlerContext context, List<GeneralMessage>
generalMessages) {
+ public void doAfter(MessageInterceptorContext context,
List<GeneralMessage> generalMessages) {
try {
- compositedMessageHandler.doAfter(context, generalMessages);
+ compositedMessageInterceptor.doAfter(context, generalMessages);
} catch (Throwable t) {
// Should never reach here.
- LOGGER.error("[BUG] Exception raised while handling messages,
clientId={}", clientId, t);
+ LOGGER.error("[Bug] Exception raised while handling messages,
clientId={}", clientId, t);
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
index 468712b..ebc6f78 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
@@ -137,8 +137,7 @@ public abstract class ClientManager extends
AbstractIdleService {
* @return invocation of response future.
*/
public abstract RpcFuture<ChangeInvisibleDurationRequest,
ChangeInvisibleDurationResponse>
- changeInvisibleDuration(Endpoints endpoints,
ChangeInvisibleDurationRequest request,
- Duration duration);
+ changeInvisibleDuration(Endpoints endpoints,
ChangeInvisibleDurationRequest request, Duration duration);
/**
* Send a message to the dead letter queue asynchronously, the method
ensures no throwable.
@@ -173,8 +172,7 @@ public abstract class ClientManager extends
AbstractIdleService {
*/
@SuppressWarnings("UnusedReturnValue")
public abstract RpcFuture<NotifyClientTerminationRequest,
NotifyClientTerminationResponse>
- notifyClientTermination(Endpoints endpoints,
NotifyClientTerminationRequest request,
- Duration duration);
+ notifyClientTermination(Endpoints endpoints,
NotifyClientTerminationRequest request, Duration duration);
/**
* Establish telemetry session stream to server.
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
index ac653fc..1e9f585 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
@@ -30,7 +30,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.java.hook.MessageHandler;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.slf4j.Logger;
@@ -43,15 +43,15 @@ public abstract class ConsumeService {
protected final ClientId clientId;
private final MessageListener messageListener;
private final ThreadPoolExecutor consumptionExecutor;
- private final MessageHandler messageHandler;
+ private final MessageInterceptor messageInterceptor;
private final ScheduledExecutorService scheduler;
public ConsumeService(ClientId clientId, MessageListener messageListener,
ThreadPoolExecutor consumptionExecutor,
- MessageHandler messageHandler, ScheduledExecutorService scheduler) {
+ MessageInterceptor messageInterceptor, ScheduledExecutorService
scheduler) {
this.clientId = clientId;
this.messageListener = messageListener;
this.consumptionExecutor = consumptionExecutor;
- this.messageHandler = messageHandler;
+ this.messageInterceptor = messageInterceptor;
this.scheduler = scheduler;
}
@@ -63,7 +63,7 @@ public abstract class ConsumeService {
public ListenableFuture<ConsumeResult> consume(MessageViewImpl
messageView, Duration delay) {
final ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(consumptionExecutor);
- final ConsumeTask task = new ConsumeTask(clientId, messageListener,
messageView, messageHandler);
+ final ConsumeTask task = new ConsumeTask(clientId, messageListener,
messageView, messageInterceptor);
// Consume message with no delay.
if (Duration.ZERO.compareTo(delay) >= 0) {
return executorService.submit(task);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
index 001ad5a..45fd9c9 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
@@ -22,10 +22,10 @@ import java.util.List;
import java.util.concurrent.Callable;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.java.hook.MessageHandler;
-import org.apache.rocketmq.client.java.hook.MessageHandlerContextImpl;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
+import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
import org.apache.rocketmq.client.java.message.GeneralMessage;
import org.apache.rocketmq.client.java.message.GeneralMessageImpl;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
@@ -39,14 +39,14 @@ public class ConsumeTask implements Callable<ConsumeResult>
{
private final ClientId clientId;
private final MessageListener messageListener;
private final MessageViewImpl messageView;
- private final MessageHandler messageHandler;
+ private final MessageInterceptor messageInterceptor;
public ConsumeTask(ClientId clientId, MessageListener messageListener,
MessageViewImpl messageView,
- MessageHandler messageHandler) {
+ MessageInterceptor messageInterceptor) {
this.clientId = clientId;
this.messageListener = messageListener;
this.messageView = messageView;
- this.messageHandler = messageHandler;
+ this.messageInterceptor = messageInterceptor;
}
/**
@@ -58,8 +58,8 @@ public class ConsumeTask implements Callable<ConsumeResult> {
public ConsumeResult call() {
ConsumeResult consumeResult;
final List<GeneralMessage> generalMessages =
Collections.singletonList(new GeneralMessageImpl(messageView));
- MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.CONSUME);
- messageHandler.doBefore(context, generalMessages);
+ MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(MessageHookPoints.CONSUME);
+ messageInterceptor.doBefore(context, generalMessages);
try {
consumeResult = messageListener.consume(messageView);
} catch (Throwable t) {
@@ -69,8 +69,8 @@ public class ConsumeTask implements Callable<ConsumeResult> {
}
MessageHookPointsStatus status =
ConsumeResult.SUCCESS.equals(consumeResult) ? MessageHookPointsStatus.OK :
MessageHookPointsStatus.ERROR;
- context = new MessageHandlerContextImpl(context, status);
- messageHandler.doAfter(context, generalMessages);
+ context = new MessageInterceptorContextImpl(context, status);
+ messageInterceptor.doAfter(context, generalMessages);
// Make sure that the return value is the subset of messageViews.
return consumeResult;
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index bb06b97..4add788 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -48,9 +48,9 @@ import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.java.exception.StatusChecker;
-import org.apache.rocketmq.client.java.hook.MessageHandlerContextImpl;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
+import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
import org.apache.rocketmq.client.java.impl.ClientImpl;
import org.apache.rocketmq.client.java.impl.ClientManager;
import org.apache.rocketmq.client.java.message.GeneralMessage;
@@ -80,7 +80,7 @@ abstract class ConsumerImpl extends ClientImpl {
try {
final Endpoints endpoints = mq.getBroker().getEndpoints();
final Duration tolerance = clientConfiguration.getRequestTimeout();
- final Duration timeout = Duration.ofNanos(awaitDuration.toNanos()
+ tolerance.toNanos());
+ final Duration timeout = awaitDuration.plus(tolerance);
final ClientManager clientManager = this.getClientManager();
final RpcFuture<ReceiveMessageRequest,
List<ReceiveMessageResponse>> future =
clientManager.receiveMessage(endpoints, request, timeout);
@@ -146,7 +146,7 @@ abstract class ConsumerImpl extends ClientImpl {
final Endpoints endpoints = messageView.getEndpoints();
RpcFuture<AckMessageRequest, AckMessageResponse> future;
final List<GeneralMessage> generalMessages =
Collections.singletonList(new GeneralMessageImpl(messageView));
- final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.ACK);
+ final MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(MessageHookPoints.ACK);
doBefore(context, generalMessages);
try {
final AckMessageRequest request =
wrapAckMessageRequest(messageView);
@@ -160,15 +160,15 @@ abstract class ConsumerImpl extends ClientImpl {
public void onSuccess(AckMessageResponse response) {
final Status status = response.getStatus();
final Code code = status.getCode();
- MessageHookPointsStatus messageHookPointsStatus =
Code.OK.equals(code) ?
+ MessageHookPointsStatus hookPointsStatus =
Code.OK.equals(code) ?
MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
- MessageHandlerContextImpl context0 = new
MessageHandlerContextImpl(context, messageHookPointsStatus);
+ MessageInterceptorContextImpl context0 = new
MessageInterceptorContextImpl(context, hookPointsStatus);
doAfter(context0, generalMessages);
}
@Override
public void onFailure(Throwable t) {
- MessageHandlerContextImpl context0 = new
MessageHandlerContextImpl(context,
+ MessageInterceptorContextImpl context0 = new
MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
doAfter(context0, generalMessages);
}
@@ -181,8 +181,8 @@ abstract class ConsumerImpl extends ClientImpl {
final Endpoints endpoints = messageView.getEndpoints();
RpcFuture<ChangeInvisibleDurationRequest,
ChangeInvisibleDurationResponse> future;
final List<GeneralMessage> generalMessages =
Collections.singletonList(new GeneralMessageImpl(messageView));
- final MessageHandlerContextImpl context =
- new
MessageHandlerContextImpl(MessageHookPoints.CHANGE_INVISIBLE_DURATION);
+ final MessageInterceptorContextImpl context =
+ new
MessageInterceptorContextImpl(MessageHookPoints.CHANGE_INVISIBLE_DURATION);
doBefore(context, generalMessages);
final ChangeInvisibleDurationRequest request =
wrapChangeInvisibleDuration(messageView, invisibleDuration);
final Duration requestTimeout =
clientConfiguration.getRequestTimeout();
@@ -193,19 +193,20 @@ abstract class ConsumerImpl extends ClientImpl {
public void onSuccess(ChangeInvisibleDurationResponse response) {
final Status status = response.getStatus();
final Code code = status.getCode();
- MessageHookPointsStatus messageHookPointsStatus =
Code.OK.equals(code) ?
+ MessageHookPointsStatus hookPointsStatus =
Code.OK.equals(code) ?
MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
if (!Code.OK.equals(code)) {
LOGGER.error("Failed to change message invisible duration,
messageId={}, endpoints={}, code={}, " +
"status message=[{}], clientId={}", messageId,
endpoints, code, status.getMessage(), clientId);
}
- MessageHandlerContextImpl context0 = new
MessageHandlerContextImpl(context, messageHookPointsStatus);
+ MessageInterceptorContextImpl context0 = new
MessageInterceptorContextImpl(context,
+ hookPointsStatus);
doAfter(context0, generalMessages);
}
@Override
public void onFailure(Throwable t) {
- MessageHandlerContextImpl context0 = new
MessageHandlerContextImpl(context,
+ MessageInterceptorContextImpl context0 = new
MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
doAfter(context0, generalMessages);
LOGGER.error("Exception raised while changing message
invisible duration, messageId={}, endpoints={}, "
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
index a8fa8d0..8a6c7ca 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.java.hook.MessageHandler;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.slf4j.Logger;
@@ -37,8 +37,9 @@ class FifoConsumeService extends ConsumeService {
private static final Logger LOGGER =
LoggerFactory.getLogger(FifoConsumeService.class);
public FifoConsumeService(ClientId clientId, MessageListener
messageListener,
- ThreadPoolExecutor consumptionExecutor, MessageHandler messageHandler,
ScheduledExecutorService scheduler) {
- super(clientId, messageListener, consumptionExecutor, messageHandler,
scheduler);
+ ThreadPoolExecutor consumptionExecutor, MessageInterceptor
messageInterceptor,
+ ScheduledExecutorService scheduler) {
+ super(clientId, messageListener, consumptionExecutor,
messageInterceptor, scheduler);
}
@Override
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
index 0376d66..48b5612 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
@@ -36,9 +36,9 @@ import org.apache.rocketmq.client.java.route.MessageQueueImpl;
* <p>
* phase 1: Fetch 32 messages successfully from remote.
* <pre>
- * 32 in ┌─────────────────────────┐
- * ────────► 32 │
- * └─────────────────────────┘
+ * 32 in ┌─────────────────────────┐
+ * ───────► 32 │
+ * └─────────────────────────┘
* cached messages = 32
* </pre>
* phase 2: consuming 1 message.
@@ -56,7 +56,15 @@ import
org.apache.rocketmq.client.java.route.MessageQueueImpl;
* cached messages = 31
* </pre>
*
- * <p>Especially, there are some different processing procedures for FIFO
consumption. // TODO
+ * <p>Especially, there are some different processing procedures for FIFO
consumption. The server ensures that the
+ * next batch of messages will not be obtained by the client until the
previous batch of messages is confirmed to be
+ * consumed successfully or not. In detail, the server confirms the success of
consumption by message being
+ * successfully acknowledged, and confirms the consumption failure by being
successfully forwarding to the dead
+ * letter queue, thus the client should try to ensure it succeeded in
acknowledgement or forwarding to the dead
+ * letter queue as possible.
+ *
+ * <p>Considering the different workflow of FIFO consumption, {@link
#eraseFifoMessage(MessageViewImpl, ConsumeResult)}
+ * and {@link #discardFifoMessage(MessageViewImpl)} is provided.
*/
public interface ProcessQueue {
/**
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index c817bf3..4e31ff9 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -49,9 +49,9 @@ import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.java.exception.BadRequestException;
import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
-import org.apache.rocketmq.client.java.hook.MessageHandlerContextImpl;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
+import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
import org.apache.rocketmq.client.java.message.GeneralMessage;
import org.apache.rocketmq.client.java.message.GeneralMessageImpl;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
@@ -197,14 +197,13 @@ class ProcessQueueImpl implements ProcessQueue {
}
public void receiveMessage() {
+ final ClientId clientId = consumer.getClientId();
if (dropped) {
- LOGGER.info("Process queue has been dropped, no longer receive
message, mq={}, clientId={}", mq,
- consumer.getClientId());
+ LOGGER.info("Process queue has been dropped, no longer receive
message, mq={}, clientId={}", mq, clientId);
return;
}
if (this.isCacheFull()) {
- LOGGER.warn("Process queue cache is full, would receive message
later, mq={}, clientId={}", mq,
- consumer.getClientId());
+ LOGGER.warn("Process queue cache is full, would receive message
later, mq={}, clientId={}", mq, clientId);
receiveMessageLater(RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL);
return;
}
@@ -214,8 +213,7 @@ class ProcessQueueImpl implements ProcessQueue {
private void receiveMessageImmediately() {
final ClientId clientId = consumer.getClientId();
if (!consumer.isRunning()) {
- LOGGER.info("Stop to receive message because consumer is not
running, mq={}, clientId={}", mq,
- clientId);
+ LOGGER.info("Stop to receive message because consumer is not
running, mq={}, clientId={}", mq, clientId);
return;
}
try {
@@ -225,7 +223,7 @@ class ProcessQueueImpl implements ProcessQueue {
activityNanoTime = System.nanoTime();
// Intercept before message reception.
- final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.RECEIVE);
+ final MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);
consumer.doBefore(context, Collections.emptyList());
final ListenableFuture<ReceiveMessageResult> future =
consumer.receiveMessage(request, mq,
@@ -237,8 +235,8 @@ class ProcessQueueImpl implements ProcessQueue {
final List<GeneralMessage> generalMessages =
result.getMessageViewImpls().stream()
.map((Function<MessageView, GeneralMessage>)
GeneralMessageImpl::new)
.collect(Collectors.toList());
- final MessageHandlerContextImpl context0 =
- new MessageHandlerContextImpl(context,
MessageHookPointsStatus.OK);
+ final MessageInterceptorContextImpl context0 =
+ new MessageInterceptorContextImpl(context,
MessageHookPointsStatus.OK);
consumer.doAfter(context0, generalMessages);
try {
@@ -254,8 +252,8 @@ class ProcessQueueImpl implements ProcessQueue {
@Override
public void onFailure(Throwable t) {
// Intercept after message reception.
- final MessageHandlerContextImpl context0 =
- new MessageHandlerContextImpl(context,
MessageHookPointsStatus.ERROR);
+ final MessageInterceptorContextImpl context0 =
+ new MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
consumer.doAfter(context0, Collections.emptyList());
LOGGER.error("Exception raised during message reception,
mq={}, endpoints={}, clientId={}", mq,
@@ -274,10 +272,10 @@ class ProcessQueueImpl implements ProcessQueue {
public boolean isCacheFull() {
final int cacheMessageCountThresholdPerQueue =
consumer.cacheMessageCountThresholdPerQueue();
final long actualMessagesQuantity = this.cachedMessagesCount();
+ final ClientId clientId = consumer.getClientId();
if (cacheMessageCountThresholdPerQueue <= actualMessagesQuantity) {
LOGGER.warn("Process queue total cached messages quantity exceeds
the threshold, threshold={}, actual={}," +
- " mq={}, clientId={}", cacheMessageCountThresholdPerQueue,
actualMessagesQuantity, mq,
- consumer.getClientId());
+ " mq={}, clientId={}", cacheMessageCountThresholdPerQueue,
actualMessagesQuantity, mq, clientId);
cacheFullNanoTime = System.nanoTime();
return true;
}
@@ -286,7 +284,7 @@ class ProcessQueueImpl implements ProcessQueue {
if (cacheMessageBytesThresholdPerQueue <= actualCachedMessagesBytes) {
LOGGER.warn("Process queue total cached messages memory exceeds
the threshold, threshold={} bytes," +
" actual={} bytes, mq={}, clientId={}",
cacheMessageBytesThresholdPerQueue,
- actualCachedMessagesBytes, mq, consumer.getClientId());
+ actualCachedMessagesBytes, mq, clientId);
cacheFullNanoTime = System.nanoTime();
return true;
}
@@ -641,7 +639,7 @@ class ProcessQueueImpl implements ProcessQueue {
final long receptionTimes = this.receptionTimes.getAndSet(0);
final long receivedMessagesQuantity =
this.receivedMessagesQuantity.getAndSet(0);
LOGGER.info("Process queue stats: clientId={}, mq={},
receptionTimes={}, receivedMessageQuantity={}, "
- + "cachedMessageCount={}, cachedMessageBytes={}",
consumer.getClientId(), mq,
- receptionTimes, receivedMessagesQuantity,
this.getCachedMessageCount(), this.getCachedMessageBytes());
+ + "cachedMessageCount={}, cachedMessageBytes={}",
consumer.getClientId(), mq, receptionTimes,
+ receivedMessagesQuantity, this.getCachedMessageCount(),
this.getCachedMessageBytes());
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 47119d0..8194227 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -57,10 +57,10 @@ import
org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.java.exception.StatusChecker;
-import org.apache.rocketmq.client.java.hook.MessageHandlerContext;
-import org.apache.rocketmq.client.java.hook.MessageHandlerContextImpl;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
+import org.apache.rocketmq.client.java.hook.MessageInterceptorContext;
+import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
import org.apache.rocketmq.client.java.impl.Settings;
import org.apache.rocketmq.client.java.message.GeneralMessage;
import org.apache.rocketmq.client.java.message.GeneralMessageImpl;
@@ -191,8 +191,10 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
private ConsumeService createConsumeService() {
final ScheduledExecutorService scheduler =
this.getClientManager().getScheduler();
if (pushSubscriptionSettings.isFifo()) {
+ LOGGER.info("Create FIFO consume service, consumerGroup={},
clientId={}", consumerGroup, clientId);
return new FifoConsumeService(clientId, messageListener,
consumptionExecutor, this, scheduler);
}
+ LOGGER.info("Create standard consume service, consumerGroup={},
clientId={}", consumerGroup, clientId);
return new StandardConsumeService(clientId, messageListener,
consumptionExecutor, this, scheduler);
}
@@ -507,7 +509,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
forwardMessageToDeadLetterQueue(final MessageViewImpl messageView) {
// Intercept before forwarding message to DLQ.
final List<GeneralMessage> generalMessages =
Collections.singletonList(new GeneralMessageImpl(messageView));
- final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.FORWARD_TO_DLQ);
+ MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(MessageHookPoints.FORWARD_TO_DLQ);
doBefore(context, generalMessages);
final Endpoints endpoints = messageView.getEndpoints();
@@ -522,14 +524,14 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
// Intercept after forwarding message to DLQ.
MessageHookPointsStatus status =
Code.OK.equals(response.getStatus().getCode()) ?
MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
- final MessageHandlerContext context0 = new
MessageHandlerContextImpl(context, status);
+ final MessageInterceptorContext context0 = new
MessageInterceptorContextImpl(context, status);
doAfter(context0, generalMessages);
}
@Override
public void onFailure(Throwable t) {
// Intercept after forwarding message to DLQ.
- final MessageHandlerContext context0 = new
MessageHandlerContextImpl(context,
+ final MessageInterceptorContext context0 = new
MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
doAfter(context0, generalMessages);
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
index ae80cbe..eef6c7b 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.java.hook.MessageHandler;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.slf4j.Logger;
@@ -37,8 +37,9 @@ public class StandardConsumeService extends ConsumeService {
private static final Logger LOGGER =
LoggerFactory.getLogger(StandardConsumeService.class);
public StandardConsumeService(ClientId clientId, MessageListener
messageListener,
- ThreadPoolExecutor consumptionExecutor, MessageHandler messageHandler,
ScheduledExecutorService scheduler) {
- super(clientId, messageListener, consumptionExecutor, messageHandler,
scheduler);
+ ThreadPoolExecutor consumptionExecutor, MessageInterceptor
messageInterceptor,
+ ScheduledExecutorService scheduler) {
+ super(clientId, messageListener, consumptionExecutor,
messageInterceptor, scheduler);
}
@Override
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index fd87fef..40a0e13 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -60,9 +60,9 @@ import
org.apache.rocketmq.client.apis.producer.TransactionChecker;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.java.exception.InternalErrorException;
import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
-import org.apache.rocketmq.client.java.hook.MessageHandlerContextImpl;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
+import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
import org.apache.rocketmq.client.java.impl.ClientImpl;
import org.apache.rocketmq.client.java.impl.Settings;
import org.apache.rocketmq.client.java.message.GeneralMessage;
@@ -272,7 +272,7 @@ class ProducerImpl extends ClientImpl implements Producer {
final List<GeneralMessage> generalMessages =
Collections.singletonList(generalMessage);
MessageHookPoints messageHookPoints =
TransactionResolution.COMMIT.equals(resolution) ?
MessageHookPoints.COMMIT_TRANSACTION :
MessageHookPoints.ROLLBACK_TRANSACTION;
- final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(messageHookPoints);
+ final MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(messageHookPoints);
doBefore(context, generalMessages);
final RpcFuture<EndTransactionRequest, EndTransactionResponse> future =
@@ -282,16 +282,16 @@ class ProducerImpl extends ClientImpl implements Producer
{
public void onSuccess(EndTransactionResponse response) {
final Status status = response.getStatus();
final Code code = status.getCode();
- MessageHookPointsStatus messageHookPointsStatus =
Code.OK.equals(code) ? MessageHookPointsStatus.OK :
+ MessageHookPointsStatus hookPointsStatus =
Code.OK.equals(code) ? MessageHookPointsStatus.OK :
MessageHookPointsStatus.ERROR;
- final MessageHandlerContextImpl context0 = new
MessageHandlerContextImpl(context,
- messageHookPointsStatus);
+ final MessageInterceptorContextImpl context0 = new
MessageInterceptorContextImpl(context,
+ hookPointsStatus);
doAfter(context0, generalMessages);
}
@Override
public void onFailure(Throwable t) {
- final MessageHandlerContextImpl context0 = new
MessageHandlerContextImpl(context,
+ final MessageInterceptorContextImpl context0 = new
MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
doAfter(context0, generalMessages);
}
@@ -447,7 +447,7 @@ class ProducerImpl extends ClientImpl implements Producer {
// Intercept before message publishing.
final List<GeneralMessage> generalMessages =
messages.stream().map((Function<PublishingMessageImpl,
GeneralMessage>)
GeneralMessageImpl::new).collect(Collectors.toList());
- final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.SEND);
+ final MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(MessageHookPoints.SEND);
doBefore(context, generalMessages);
Futures.addCallback(future, new
FutureCallback<List<SendReceiptImpl>>() {
@@ -461,14 +461,14 @@ class ProducerImpl extends ClientImpl implements Producer
{
future0.setException(e);
// Intercept after message publishing.
- final MessageHandlerContextImpl context0 = new
MessageHandlerContextImpl(context,
+ final MessageInterceptorContextImpl context0 = new
MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
doAfter(context0, generalMessages);
return;
}
// Intercept after message publishing.
- final MessageHandlerContextImpl context0 = new
MessageHandlerContextImpl(context,
+ final MessageInterceptorContextImpl context0 = new
MessageInterceptorContextImpl(context,
MessageHookPointsStatus.OK);
doAfter(context0, generalMessages);
@@ -491,7 +491,7 @@ class ProducerImpl extends ClientImpl implements Producer {
@Override
public void onFailure(Throwable t) {
// Intercept after message publishing.
- final MessageHandlerContextImpl context0 = new
MessageHandlerContextImpl(context,
+ final MessageInterceptorContextImpl context0 = new
MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
doAfter(context0, generalMessages);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandler.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
similarity index 90%
rename from
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandler.java
rename to
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
index 0c70214..b68dd55 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandler.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
@@ -26,35 +26,35 @@ import
org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.java.hook.Attribute;
import org.apache.rocketmq.client.java.hook.AttributeKey;
-import org.apache.rocketmq.client.java.hook.MessageHandler;
-import org.apache.rocketmq.client.java.hook.MessageHandlerContext;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
+import org.apache.rocketmq.client.java.hook.MessageInterceptorContext;
import org.apache.rocketmq.client.java.impl.Client;
import org.apache.rocketmq.client.java.message.GeneralMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MessageMeterHandler implements MessageHandler {
+public class MessageMeterInterceptor implements MessageInterceptor {
static final AttributeKey<Stopwatch> SEND_STOPWATCH_KEY =
AttributeKey.create("send_stopwatch");
static final AttributeKey<Stopwatch> CONSUME_STOPWATCH_KEY =
AttributeKey.create("consume_stopwatch");
- private static final Logger LOGGER =
LoggerFactory.getLogger(MessageMeterHandler.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MessageMeterInterceptor.class);
private final Client client;
private final ClientMeterManager meterManager;
- public MessageMeterHandler(Client client, ClientMeterManager meterManager)
{
+ public MessageMeterInterceptor(Client client, ClientMeterManager
meterManager) {
this.client = client;
this.meterManager = meterManager;
}
- private void doBeforeSendMessage(MessageHandlerContext context) {
+ private void doBeforeSendMessage(MessageInterceptorContext context) {
// Record the time before sending message.
context.putAttribute(SEND_STOPWATCH_KEY,
Attribute.create(Stopwatch.createStarted()));
}
- private void doAfterSendMessage(MessageHandlerContext context,
List<GeneralMessage> messages) {
+ private void doAfterSendMessage(MessageInterceptorContext context,
List<GeneralMessage> messages) {
final Attribute<Stopwatch> stopwatchAttr =
context.getAttribute(SEND_STOPWATCH_KEY);
if (null == stopwatchAttr) {
// Should never reach here.
@@ -105,7 +105,7 @@ public class MessageMeterHandler implements MessageHandler {
meterManager.record(HistogramEnum.DELIVERY_LATENCY, attributes,
latency);
}
- private void doBeforeConsumeMessage(MessageHandlerContext context,
List<GeneralMessage> messages) {
+ private void doBeforeConsumeMessage(MessageInterceptorContext context,
List<GeneralMessage> messages) {
if (messages.isEmpty()) {
// Should never reach here.
return;
@@ -133,7 +133,7 @@ public class MessageMeterHandler implements MessageHandler {
context.putAttribute(CONSUME_STOPWATCH_KEY,
Attribute.create(Stopwatch.createStarted()));
}
- private void doAfterConsumeMessage(MessageHandlerContext context,
List<GeneralMessage> messages) {
+ private void doAfterConsumeMessage(MessageInterceptorContext context,
List<GeneralMessage> messages) {
if (!(client instanceof PushConsumer)) {
// Should never reach here.
LOGGER.error("[Bug] current client is not push consumer,
clientId={}", client.getClientId());
@@ -160,7 +160,7 @@ public class MessageMeterHandler implements MessageHandler {
}
@Override
- public void doBefore(MessageHandlerContext context, List<GeneralMessage>
messages) {
+ public void doBefore(MessageInterceptorContext context,
List<GeneralMessage> messages) {
if (!meterManager.isEnabled()) {
return;
}
@@ -180,7 +180,7 @@ public class MessageMeterHandler implements MessageHandler {
}
@Override
- public void doAfter(MessageHandlerContext context, List<GeneralMessage>
messages) {
+ public void doAfter(MessageInterceptorContext context,
List<GeneralMessage> messages) {
if (!meterManager.isEnabled()) {
return;
}
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeServiceTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeServiceTest.java
index 034c0d7..dbf8c89 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeServiceTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeServiceTest.java
@@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.java.hook.MessageHandler;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
@@ -41,7 +41,7 @@ import org.mockito.Mockito;
public class ConsumeServiceTest extends TestBase {
private final ClientId clientId = new ClientId();
- private final MessageHandler interceptor =
Mockito.mock(MessageHandler.class);
+ private final MessageInterceptor interceptor =
Mockito.mock(MessageInterceptor.class);
private final ThreadPoolExecutor consumptionExecutor = new
ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new
ThreadFactoryImpl("TestMessageConsumption"));
private final ScheduledExecutorService scheduler = new
ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTaskTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTaskTest.java
index 70dcef2..2331214 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTaskTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTaskTest.java
@@ -21,7 +21,7 @@ import static org.junit.Assert.assertEquals;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.java.hook.MessageHandler;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.apache.rocketmq.client.java.tool.TestBase;
@@ -36,8 +36,8 @@ public class ConsumeTaskTest extends TestBase {
final MessageViewImpl messageView = fakeMessageViewImpl();
final MessageListener messageListener =
Mockito.mock(MessageListener.class);
Mockito.when(messageListener.consume(messageView)).thenReturn(ConsumeResult.SUCCESS);
- final MessageHandler messageHandler =
Mockito.mock(MessageHandler.class);
- final ConsumeTask consumeTask = new ConsumeTask(clientId,
messageListener, messageView, messageHandler);
+ final MessageInterceptor messageInterceptor =
Mockito.mock(MessageInterceptor.class);
+ final ConsumeTask consumeTask = new ConsumeTask(clientId,
messageListener, messageView, messageInterceptor);
final ConsumeResult consumeResult = consumeTask.call();
assertEquals(ConsumeResult.SUCCESS, consumeResult);
}
@@ -48,8 +48,8 @@ public class ConsumeTaskTest extends TestBase {
final MessageViewImpl messageView = fakeMessageViewImpl();
final MessageListener messageListener =
Mockito.mock(MessageListener.class);
Mockito.when(messageListener.consume(messageView)).thenThrow(new
RuntimeException());
- final MessageHandler messageHandler =
Mockito.mock(MessageHandler.class);
- final ConsumeTask consumeTask = new ConsumeTask(clientId,
messageListener, messageView, messageHandler);
+ final MessageInterceptor messageInterceptor =
Mockito.mock(MessageInterceptor.class);
+ final ConsumeTask consumeTask = new ConsumeTask(clientId,
messageListener, messageView, messageInterceptor);
final ConsumeResult consumeResult = consumeTask.call();
assertEquals(ConsumeResult.FAILURE, consumeResult);
}
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandlerTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandlerTest.java
index 569c2bd..7f01b06 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandlerTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandlerTest.java
@@ -27,9 +27,9 @@ import java.util.List;
import java.util.Optional;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
-import org.apache.rocketmq.client.java.hook.MessageHandlerContextImpl;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
+import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
import org.apache.rocketmq.client.java.impl.Client;
import org.apache.rocketmq.client.java.impl.ClientImpl;
import org.apache.rocketmq.client.java.message.GeneralMessage;
@@ -48,15 +48,15 @@ public class MessageMeterHandlerTest extends TestBase {
Mockito.doReturn(true).when(meterManager).isEnabled();
ClientId clientId = new ClientId();
Mockito.doReturn(clientId).when(producer).getClientId();
- final MessageMeterHandler meterHandler = new
MessageMeterHandler(producer, meterManager);
+ final MessageMeterInterceptor meterHandler = new
MessageMeterInterceptor(producer, meterManager);
final GeneralMessage message = Mockito.mock(GeneralMessage.class);
String topic = FAKE_TOPIC_0;
Mockito.doReturn(topic).when(message).getTopic();
List<GeneralMessage> messageList = new ArrayList<>();
messageList.add(message);
- final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.SEND);
+ final MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(MessageHookPoints.SEND);
meterHandler.doBefore(context, messageList);
-
assertNotNull(context.getAttributes().get(MessageMeterHandler.SEND_STOPWATCH_KEY));
+
assertNotNull(context.getAttributes().get(MessageMeterInterceptor.SEND_STOPWATCH_KEY));
context.setStatus(MessageHookPointsStatus.OK);
meterHandler.doAfter(context, messageList);
ArgumentCaptor<Attributes> attributesArgumentCaptor =
ArgumentCaptor.forClass(Attributes.class);
@@ -75,13 +75,13 @@ public class MessageMeterHandlerTest extends TestBase {
Mockito.doReturn(true).when(meterManager).isEnabled();
ClientId clientId = new ClientId();
Mockito.doReturn(clientId).when(producer).getClientId();
- final MessageMeterHandler meterHandler = new
MessageMeterHandler(producer, meterManager);
+ final MessageMeterInterceptor meterHandler = new
MessageMeterInterceptor(producer, meterManager);
final GeneralMessage message = Mockito.mock(GeneralMessage.class);
String topic = FAKE_TOPIC_0;
Mockito.doReturn(topic).when(message).getTopic();
List<GeneralMessage> messageList = new ArrayList<>();
messageList.add(message);
- final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.SEND);
+ final MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(MessageHookPoints.SEND);
meterHandler.doBefore(context, messageList);
context.setStatus(MessageHookPointsStatus.UNSET);
meterHandler.doAfter(context, messageList);
@@ -101,13 +101,13 @@ public class MessageMeterHandlerTest extends TestBase {
Mockito.doReturn(true).when(meterManager).isEnabled();
ClientId clientId = new ClientId();
Mockito.doReturn(clientId).when(producer).getClientId();
- final MessageMeterHandler meterHandler = new
MessageMeterHandler(producer, meterManager);
+ final MessageMeterInterceptor meterHandler = new
MessageMeterInterceptor(producer, meterManager);
final GeneralMessage message = Mockito.mock(GeneralMessage.class);
String topic = FAKE_TOPIC_0;
Mockito.doReturn(topic).when(message).getTopic();
List<GeneralMessage> messageList = new ArrayList<>();
messageList.add(message);
- final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.SEND);
+ final MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(MessageHookPoints.SEND);
meterHandler.doBefore(context, messageList);
context.setStatus(MessageHookPointsStatus.ERROR);
meterHandler.doAfter(context, messageList);
@@ -135,7 +135,7 @@ public class MessageMeterHandlerTest extends TestBase {
Mockito.doReturn(true).when(meterManager).isEnabled();
ClientId clientId = new ClientId();
Mockito.doReturn(clientId).when(pushConsumer).getClientId();
- final MessageMeterHandler meterHandler = new
MessageMeterHandler(pushConsumer, meterManager);
+ final MessageMeterInterceptor meterHandler = new
MessageMeterInterceptor(pushConsumer, meterManager);
final GeneralMessage message = Mockito.mock(GeneralMessage.class);
String topic = FAKE_TOPIC_0;
Mockito.doReturn(topic).when(message).getTopic();
@@ -143,7 +143,7 @@ public class MessageMeterHandlerTest extends TestBase {
messageList.add(message);
final Optional<Long> optionalTransportDeliveryTimestamp =
Optional.of(System.currentTimeMillis());
Mockito.doReturn(optionalTransportDeliveryTimestamp).when(message).getTransportDeliveryTimestamp();
- final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.RECEIVE);
+ final MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);
meterHandler.doAfter(context, messageList);
ArgumentCaptor<Attributes> attributesArgumentCaptor =
ArgumentCaptor.forClass(Attributes.class);
Mockito.verify(meterManager, Mockito.times(1))
@@ -163,13 +163,13 @@ public class MessageMeterHandlerTest extends TestBase {
Mockito.doReturn(true).when(meterManager).isEnabled();
ClientId clientId = new ClientId();
Mockito.doReturn(clientId).when(pushConsumer).getClientId();
- final MessageMeterHandler meterHandler = new
MessageMeterHandler(pushConsumer, meterManager);
+ final MessageMeterInterceptor meterHandler = new
MessageMeterInterceptor(pushConsumer, meterManager);
final GeneralMessage message = Mockito.mock(GeneralMessage.class);
Mockito.doReturn(FAKE_TOPIC_0).when(message).getTopic();
List<GeneralMessage> messageList = new ArrayList<>();
final Optional<Long> optionalTransportDeliveryTimestamp =
Optional.of(System.currentTimeMillis());
Mockito.doReturn(optionalTransportDeliveryTimestamp).when(message).getTransportDeliveryTimestamp();
- final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.RECEIVE);
+ final MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);
meterHandler.doAfter(context, messageList);
Mockito.verify(meterManager, Mockito.never())
.record(any(HistogramEnum.class), any(Attributes.class),
Mockito.anyDouble());
@@ -184,7 +184,7 @@ public class MessageMeterHandlerTest extends TestBase {
Mockito.doReturn(true).when(meterManager).isEnabled();
ClientId clientId = new ClientId();
Mockito.doReturn(clientId).when(simpleConsumer).getClientId();
- final MessageMeterHandler meterHandler = new
MessageMeterHandler(simpleConsumer, meterManager);
+ final MessageMeterInterceptor meterHandler = new
MessageMeterInterceptor(simpleConsumer, meterManager);
final GeneralMessage message = Mockito.mock(GeneralMessage.class);
String topic = FAKE_TOPIC_0;
Mockito.doReturn(topic).when(message).getTopic();
@@ -192,7 +192,7 @@ public class MessageMeterHandlerTest extends TestBase {
messageList.add(message);
final Optional<Long> optionalTransportDeliveryTimestamp =
Optional.of(System.currentTimeMillis());
Mockito.doReturn(optionalTransportDeliveryTimestamp).when(message).getTransportDeliveryTimestamp();
- final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.RECEIVE);
+ final MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);
meterHandler.doAfter(context, messageList);
ArgumentCaptor<Attributes> attributesArgumentCaptor =
ArgumentCaptor.forClass(Attributes.class);
Mockito.verify(meterManager, Mockito.times(1))
@@ -213,7 +213,7 @@ public class MessageMeterHandlerTest extends TestBase {
Mockito.doReturn(true).when(meterManager).isEnabled();
ClientId clientId = new ClientId();
Mockito.doReturn(clientId).when(pushConsumer).getClientId();
- final MessageMeterHandler meterHandler = new
MessageMeterHandler(pushConsumer, meterManager);
+ final MessageMeterInterceptor meterHandler = new
MessageMeterInterceptor(pushConsumer, meterManager);
List<GeneralMessage> generalMessages = new ArrayList<>();
final GeneralMessage message = Mockito.mock(GeneralMessage.class);
generalMessages.add(message);
@@ -222,7 +222,7 @@ public class MessageMeterHandlerTest extends TestBase {
long awaitTimeMills = 3000;
long decodeTimestamp = System.currentTimeMillis() - awaitTimeMills;
Mockito.doReturn(Optional.of(decodeTimestamp)).when(message).getDecodeTimestamp();
- final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.CONSUME);
+ final MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(MessageHookPoints.CONSUME);
meterHandler.doBefore(context, generalMessages);
ArgumentCaptor<Attributes> attributes0ArgumentCaptor =
ArgumentCaptor.forClass(Attributes.class);
final ArgumentCaptor<Double> awaitTimeArgumentCaptor =
ArgumentCaptor.forClass(Double.class);
@@ -234,7 +234,7 @@ public class MessageMeterHandlerTest extends TestBase {
assertEquals(topic, attributes0.get(MetricLabels.TOPIC));
assertEquals(consumerGroup,
attributes0.get(MetricLabels.CONSUMER_GROUP));
assertEquals(clientId.toString(),
attributes0.get(MetricLabels.CLIENT_ID));
-
assertNotNull(context.getAttributes().get(MessageMeterHandler.CONSUME_STOPWATCH_KEY));
+
assertNotNull(context.getAttributes().get(MessageMeterInterceptor.CONSUME_STOPWATCH_KEY));
context.setStatus(MessageHookPointsStatus.OK);
meterHandler.doAfter(context, generalMessages);