This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2e987369fa4 [feat][client] PIP-374: Visibility of messages in
receiverQueue for the consumers (#23303)
2e987369fa4 is described below
commit 2e987369fa4444dc303ba2674e94b7a712710d64
Author: Baodi Shi <[email protected]>
AuthorDate: Tue Sep 17 20:46:09 2024 +0800
[feat][client] PIP-374: Visibility of messages in receiverQueue for the
consumers (#23303)
---
pip/pip-374.md | 4 +-
.../apache/pulsar/client/api/InterceptorsTest.java | 96 ++++++++++++++++++++++
.../pulsar/client/api/ConsumerInterceptor.java | 38 +++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 8 ++
.../apache/pulsar/client/impl/ConsumerImpl.java | 5 +-
.../pulsar/client/impl/ConsumerInterceptors.java | 32 ++++++++
.../client/impl/MultiTopicsConsumerImpl.java | 5 ++
7 files changed, 184 insertions(+), 4 deletions(-)
diff --git a/pip/pip-374.md b/pip/pip-374.md
index 42646176474..49fe3371596 100644
--- a/pip/pip-374.md
+++ b/pip/pip-374.md
@@ -67,5 +67,5 @@ Since we added a default method onArrival() in interface, one
who has provided t
<!--
Updated afterwards
-->
-* Mailing List discussion thread:
-* Mailing List voting thread:
+* Mailing List discussion thread:
https://lists.apache.org/thread/hcfpm4j6hpwxb2olfrro8g4dls35q8rx
+* Mailing List voting thread:
https://lists.apache.org/thread/wrr02s4cdzqmo1vonp92w6229qo0rv0z
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index afb17a18647..8115f34121d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.KeyValue;
@@ -870,6 +871,101 @@ public class InterceptorsTest extends
ProducerConsumerBase {
Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS));
}
+ @Test(dataProvider = "topicPartition")
+ public void testConsumerInterceptorForOnArrive(int topicPartition) throws
PulsarClientException,
+ InterruptedException, PulsarAdminException {
+ String topicName = "persistent://my-property/my-ns/on-arrive";
+ if (topicPartition > 0) {
+ admin.topics().createPartitionedTopic(topicName, topicPartition);
+ }
+
+ final int receiveQueueSize = 100;
+ final int totalNumOfMessages = receiveQueueSize * 2;
+
+ // The onArrival method is called for half of the receiveQueueSize
messages before beforeConsume is called for all messages.
+ CountDownLatch latch = new CountDownLatch(receiveQueueSize / 2);
+ final AtomicInteger onArrivalCount = new AtomicInteger(0);
+ ConsumerInterceptor<String> interceptor = new
ConsumerInterceptor<String>() {
+ @Override
+ public void close() {}
+
+ @Override
+ public Message<String> onArrival(Consumer<String> consumer,
Message<String> message) {
+ MessageImpl<String> msg = (MessageImpl<String>) message;
+
msg.getMessageBuilder().addProperty().setKey("onArrival").setValue("1");
+ latch.countDown();
+ onArrivalCount.incrementAndGet();
+ return msg;
+ }
+
+ @Override
+ public Message<String> beforeConsume(Consumer<String> consumer,
Message<String> message) {
+ return message;
+ }
+
+ @Override
+ public void onAcknowledge(Consumer<String> consumer, MessageId
messageId, Throwable cause) {
+
+ }
+
+ @Override
+ public void onAcknowledgeCumulative(Consumer<String> consumer,
MessageId messageId, Throwable cause) {
+
+ }
+
+ @Override
+ public void onNegativeAcksSend(Consumer<String> consumer,
Set<MessageId> messageIds) {
+ }
+
+ @Override
+ public void onAckTimeoutSend(Consumer<String> consumer,
Set<MessageId> messageIds) {
+
+ }
+ };
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .create();
+
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-arrive")
+ .intercept(interceptor)
+ .receiverQueueSize(receiveQueueSize)
+ .subscribe();
+
+ for (int i = 0; i < totalNumOfMessages; i++) {
+ producer.send("Mock message");
+ }
+
+ // Not call receive message, just wait for onArrival interceptor.
+ latch.await();
+ Assert.assertEquals(latch.getCount(), 0);
+
+ for (int i = 0; i < totalNumOfMessages; i++) {
+ Message<String> message = consumer.receive();
+ MessageImpl<String> msgImpl;
+ if (message instanceof MessageImpl<String>) {
+ msgImpl = (MessageImpl<String>) message;
+ } else if (message instanceof TopicMessageImpl<String>) {
+ msgImpl = (MessageImpl<String>) ((TopicMessageImpl<String>)
message).getMessage();
+ } else {
+ throw new ClassCastException("Message type is not expected");
+ }
+ boolean haveKey = false;
+ for (KeyValue keyValue :
msgImpl.getMessageBuilder().getPropertiesList()) {
+ if ("onArrival".equals(keyValue.getKey())) {
+ haveKey = true;
+ }
+ }
+ Assert.assertTrue(haveKey);
+ }
+ Assert.assertEquals(totalNumOfMessages, onArrivalCount.get());
+
+ producer.close();
+ consumer.close();
+ }
+
private void produceAndConsume(int msgCount, Producer<byte[]> producer,
Reader<byte[]> reader)
throws PulsarClientException {
for (int i = 0; i < msgCount; i++) {
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
index be2f9b0f108..1beea3adba2 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
@@ -41,6 +41,44 @@ public interface ConsumerInterceptor<T> extends
AutoCloseable {
*/
void close();
+ /**
+ * This method is called when a message arrives in the consumer.
+ *
+ * <p>This method provides visibility into the messages that have been
received
+ * by the consumer but have not yet been processed. This can be useful for
+ * monitoring the state of the consumer's receiver queue and understanding
+ * the consumer's processing rate.
+ *
+ * <p>The method is allowed to modify the message, in which case the
modified
+ * message will be returned.
+ *
+ * <p>Any exception thrown by this method will be caught by the caller,
logged,
+ * but not propagated to the client.
+ *
+ * <p>Since the consumer may run multiple interceptors, a particular
+ * interceptor's <tt>onArrival</tt> callback will be called in the order
+ * specified by {@link ConsumerBuilder#intercept(ConsumerInterceptor[])}.
The
+ * first interceptor in the list gets the consumed message, the following
+ * interceptor will be passed the message returned by the previous
interceptor,
+ * and so on. Since interceptors are allowed to modify the message,
interceptors
+ * may potentially get the messages already modified by other interceptors.
+ * However, building a pipeline of mutable interceptors that depend on the
output
+ * of the previous interceptor is discouraged, because of potential
side-effects
+ * caused by interceptors potentially failing to modify the message and
throwing
+ * an exception. If one of the interceptors in the list throws an
exception from
+ * <tt>onArrival</tt>, the exception is caught, logged, and the next
interceptor
+ * is called with the message returned by the last successful interceptor
in the
+ * list, or otherwise the original consumed message.
+ *
+ * @param consumer the consumer which contains the interceptor
+ * @param message the message that has arrived in the receiver queue
+ * @return the message that is either modified by the interceptor or the
same
+ * message passed into the method
+ */
+ default Message<T> onArrival(Consumer<T> consumer, Message<T> message) {
+ return message;
+ }
+
/**
* This is called just before the message is returned by
* {@link Consumer#receive()}, {@link MessageListener#received(Consumer,
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 9748a42f0cb..03256a3e139 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -852,6 +852,14 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
+ '}';
}
+ protected Message<T> onArrival(Message<T> message) {
+ if (interceptors != null) {
+ return interceptors.onArrival(this, message);
+ } else {
+ return message;
+ }
+ }
+
protected Message<T> beforeConsume(Message<T> message) {
if (interceptors != null) {
return interceptors.beforeConsume(this, message);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 996569704d7..60b9d145c48 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1301,9 +1301,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
increaseAvailablePermits(cnx());
return;
}
+ Message<T> interceptMsg = onArrival(message);
if (hasNextPendingReceive()) {
- notifyPendingReceivedCallback(message, null);
- } else if (enqueueMessageAndCheckBatchReceive(message) &&
hasPendingBatchReceive()) {
+ notifyPendingReceivedCallback(interceptMsg, null);
+ } else if (enqueueMessageAndCheckBatchReceive(interceptMsg) &&
hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
});
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
index 832dc0bacae..dd1e2cec3b3 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
@@ -44,6 +44,38 @@ public class ConsumerInterceptors<T> implements Closeable {
this.interceptors = interceptors;
}
+
+ /**
+ * This method is called when a message arrives in the consumer.
+ * <p>
+ * This method calls {@link ConsumerInterceptor#onArrival(Consumer,
Message) method for each
+ * interceptor.
+ * <p>
+ * This method does not throw exceptions. If any of the interceptors in
the chain throws an exception, it gets
+ * caught and logged, and next interceptor in int the chain is called with
'messages' returned by the previous
+ * successful interceptor beforeConsume call.
+ *
+ * @param consumer the consumer which contains the interceptors
+ * @param message message to be consume by the client.
+ * @return messages that are either modified by interceptors or same as
messages passed to this method.
+ */
+ public Message<T> onArrival(Consumer<T> consumer, Message<T> message) {
+ Message<T> interceptorMessage = message;
+ for (int i = 0, interceptorsSize = interceptors.size(); i <
interceptorsSize; i++) {
+ try {
+ interceptorMessage = interceptors.get(i).onArrival(consumer,
interceptorMessage);
+ } catch (Throwable e) {
+ if (consumer != null) {
+ log.warn("Error executing interceptor beforeConsume
callback topic: {} consumerName: {}",
+ consumer.getTopic(), consumer.getConsumerName(),
e);
+ } else {
+ log.warn("Error executing interceptor beforeConsume
callback", e);
+ }
+ }
+ }
+ return interceptorMessage;
+ }
+
/**
* This is called just before the message is returned by {@link
Consumer#receive()},
* {@link MessageListener#received(Consumer, Message)} or the {@link
java.util.concurrent.CompletableFuture}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index bf8bd6cc951..513c0101ac6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1608,6 +1608,11 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
private ConsumerInterceptors<T>
getInternalConsumerInterceptors(ConsumerInterceptors<T> multiTopicInterceptors)
{
return new ConsumerInterceptors<T>(new ArrayList<>()) {
+ @Override
+ public Message<T> onArrival(Consumer<T> consumer, Message<T>
message) {
+ return multiTopicInterceptors.onArrival(consumer, message);
+ }
+
@Override
public Message<T> beforeConsume(Consumer<T> consumer, Message<T>
message) {
return message;