This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e987369fa4 [feat][client] PIP-374: Visibility of messages in 
receiverQueue for the consumers (#23303)
2e987369fa4 is described below

commit 2e987369fa4444dc303ba2674e94b7a712710d64
Author: Baodi Shi <[email protected]>
AuthorDate: Tue Sep 17 20:46:09 2024 +0800

    [feat][client] PIP-374: Visibility of messages in receiverQueue for the 
consumers (#23303)
---
 pip/pip-374.md                                     |  4 +-
 .../apache/pulsar/client/api/InterceptorsTest.java | 96 ++++++++++++++++++++++
 .../pulsar/client/api/ConsumerInterceptor.java     | 38 +++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  8 ++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  5 +-
 .../pulsar/client/impl/ConsumerInterceptors.java   | 32 ++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |  5 ++
 7 files changed, 184 insertions(+), 4 deletions(-)

diff --git a/pip/pip-374.md b/pip/pip-374.md
index 42646176474..49fe3371596 100644
--- a/pip/pip-374.md
+++ b/pip/pip-374.md
@@ -67,5 +67,5 @@ Since we added a default method onArrival() in interface, one 
who has provided t
 <!--
 Updated afterwards
 -->
-* Mailing List discussion thread:
-* Mailing List voting thread:
+* Mailing List discussion thread: 
https://lists.apache.org/thread/hcfpm4j6hpwxb2olfrro8g4dls35q8rx
+* Mailing List voting thread: 
https://lists.apache.org/thread/wrr02s4cdzqmo1vonp92w6229qo0rv0z
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index afb17a18647..8115f34121d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomUtils;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.common.api.proto.KeyValue;
@@ -870,6 +871,101 @@ public class InterceptorsTest extends 
ProducerConsumerBase {
         Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS));
     }
 
+    @Test(dataProvider = "topicPartition")
+    public void testConsumerInterceptorForOnArrive(int topicPartition) throws 
PulsarClientException,
+            InterruptedException, PulsarAdminException {
+        String topicName = "persistent://my-property/my-ns/on-arrive";
+        if (topicPartition > 0) {
+            admin.topics().createPartitionedTopic(topicName, topicPartition);
+        }
+
+        final int receiveQueueSize = 100;
+        final int totalNumOfMessages = receiveQueueSize * 2;
+
+        // The onArrival method is called for half of the receiveQueueSize 
messages before beforeConsume is called for all messages.
+        CountDownLatch latch = new CountDownLatch(receiveQueueSize / 2);
+        final AtomicInteger onArrivalCount = new AtomicInteger(0);
+        ConsumerInterceptor<String> interceptor = new 
ConsumerInterceptor<String>() {
+            @Override
+            public void close() {}
+
+            @Override
+            public Message<String> onArrival(Consumer<String> consumer, 
Message<String> message) {
+                MessageImpl<String> msg = (MessageImpl<String>) message;
+                
msg.getMessageBuilder().addProperty().setKey("onArrival").setValue("1");
+                latch.countDown();
+                onArrivalCount.incrementAndGet();
+                return msg;
+            }
+
+            @Override
+            public Message<String> beforeConsume(Consumer<String> consumer, 
Message<String> message) {
+                return message;
+            }
+
+            @Override
+            public void onAcknowledge(Consumer<String> consumer, MessageId 
messageId, Throwable cause) {
+
+            }
+
+            @Override
+            public void onAcknowledgeCumulative(Consumer<String> consumer, 
MessageId messageId, Throwable cause) {
+
+            }
+
+            @Override
+            public void onNegativeAcksSend(Consumer<String> consumer, 
Set<MessageId> messageIds) {
+            }
+
+            @Override
+            public void onAckTimeoutSend(Consumer<String> consumer, 
Set<MessageId> messageIds) {
+
+            }
+        };
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .create();
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("test-arrive")
+                .intercept(interceptor)
+                .receiverQueueSize(receiveQueueSize)
+                .subscribe();
+
+        for (int i = 0; i < totalNumOfMessages; i++) {
+            producer.send("Mock message");
+        }
+
+        // Not call receive message, just wait for onArrival interceptor.
+        latch.await();
+        Assert.assertEquals(latch.getCount(), 0);
+
+        for (int i = 0; i < totalNumOfMessages; i++) {
+            Message<String> message = consumer.receive();
+            MessageImpl<String> msgImpl;
+            if (message instanceof MessageImpl<String>) {
+                msgImpl = (MessageImpl<String>) message;
+            } else if (message instanceof TopicMessageImpl<String>) {
+                msgImpl = (MessageImpl<String>) ((TopicMessageImpl<String>) 
message).getMessage();
+            } else {
+                throw new ClassCastException("Message type is not expected");
+            }
+            boolean haveKey = false;
+            for (KeyValue keyValue : 
msgImpl.getMessageBuilder().getPropertiesList()) {
+                if ("onArrival".equals(keyValue.getKey())) {
+                    haveKey = true;
+                }
+            }
+            Assert.assertTrue(haveKey);
+        }
+        Assert.assertEquals(totalNumOfMessages, onArrivalCount.get());
+
+        producer.close();
+        consumer.close();
+    }
+
     private void produceAndConsume(int msgCount, Producer<byte[]> producer, 
Reader<byte[]> reader)
             throws PulsarClientException {
         for (int i = 0; i < msgCount; i++) {
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
index be2f9b0f108..1beea3adba2 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
@@ -41,6 +41,44 @@ public interface ConsumerInterceptor<T> extends 
AutoCloseable {
      */
     void close();
 
+    /**
+     * This method is called when a message arrives in the consumer.
+     *
+     * <p>This method provides visibility into the messages that have been 
received
+     * by the consumer but have not yet been processed. This can be useful for
+     * monitoring the state of the consumer's receiver queue and understanding
+     * the consumer's processing rate.
+     *
+     * <p>The method is allowed to modify the message, in which case the 
modified
+     * message will be returned.
+     *
+     * <p>Any exception thrown by this method will be caught by the caller, 
logged,
+     * but not propagated to the client.
+     *
+     * <p>Since the consumer may run multiple interceptors, a particular
+     * interceptor's <tt>onArrival</tt> callback will be called in the order
+     * specified by {@link ConsumerBuilder#intercept(ConsumerInterceptor[])}. 
The
+     * first interceptor in the list gets the consumed message, the following
+     * interceptor will be passed the message returned by the previous 
interceptor,
+     * and so on. Since interceptors are allowed to modify the message, 
interceptors
+     * may potentially get the messages already modified by other interceptors.
+     * However, building a pipeline of mutable interceptors that depend on the 
output
+     * of the previous interceptor is discouraged, because of potential 
side-effects
+     * caused by interceptors potentially failing to modify the message and 
throwing
+     * an exception. If one of the interceptors in the list throws an 
exception from
+     * <tt>onArrival</tt>, the exception is caught, logged, and the next 
interceptor
+     * is called with the message returned by the last successful interceptor 
in the
+     * list, or otherwise the original consumed message.
+     *
+     * @param consumer the consumer which contains the interceptor
+     * @param message the message that has arrived in the receiver queue
+     * @return the message that is either modified by the interceptor or the 
same
+     *         message passed into the method
+     */
+    default Message<T> onArrival(Consumer<T> consumer, Message<T> message) {
+        return message;
+    }
+
     /**
      * This is called just before the message is returned by
      * {@link Consumer#receive()}, {@link MessageListener#received(Consumer,
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 9748a42f0cb..03256a3e139 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -852,6 +852,14 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
                 + '}';
     }
 
+    protected Message<T> onArrival(Message<T> message) {
+        if (interceptors != null) {
+            return interceptors.onArrival(this, message);
+        } else {
+            return message;
+        }
+    }
+
     protected Message<T> beforeConsume(Message<T> message) {
         if (interceptors != null) {
             return interceptors.beforeConsume(this, message);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 996569704d7..60b9d145c48 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1301,9 +1301,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 increaseAvailablePermits(cnx());
                 return;
             }
+            Message<T> interceptMsg = onArrival(message);
             if (hasNextPendingReceive()) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && 
hasPendingBatchReceive()) {
+                notifyPendingReceivedCallback(interceptMsg, null);
+            } else if (enqueueMessageAndCheckBatchReceive(interceptMsg) && 
hasPendingBatchReceive()) {
                 notifyPendingBatchReceivedCallBack();
             }
         });
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
index 832dc0bacae..dd1e2cec3b3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
@@ -44,6 +44,38 @@ public class ConsumerInterceptors<T> implements Closeable {
         this.interceptors = interceptors;
     }
 
+
+    /**
+     * This method is called when a message arrives in the consumer.
+     * <p>
+     * This method calls {@link ConsumerInterceptor#onArrival(Consumer, 
Message) method for each
+     * interceptor.
+     * <p>
+     * This method does not throw exceptions. If any of the interceptors in 
the chain throws an exception, it gets
+     * caught and logged, and next interceptor in int the chain is called with 
'messages' returned by the previous
+     * successful interceptor beforeConsume call.
+     *
+     * @param consumer the consumer which contains the interceptors
+     * @param message message to be consume by the client.
+     * @return messages that are either modified by interceptors or same as 
messages passed to this method.
+     */
+    public Message<T> onArrival(Consumer<T> consumer, Message<T> message) {
+        Message<T> interceptorMessage = message;
+        for (int i = 0, interceptorsSize = interceptors.size(); i < 
interceptorsSize; i++) {
+            try {
+                interceptorMessage = interceptors.get(i).onArrival(consumer, 
interceptorMessage);
+            } catch (Throwable e) {
+                if (consumer != null) {
+                    log.warn("Error executing interceptor beforeConsume 
callback topic: {} consumerName: {}",
+                            consumer.getTopic(), consumer.getConsumerName(), 
e);
+                } else {
+                    log.warn("Error executing interceptor beforeConsume 
callback", e);
+                }
+            }
+        }
+        return interceptorMessage;
+    }
+
     /**
      * This is called just before the message is returned by {@link 
Consumer#receive()},
      * {@link MessageListener#received(Consumer, Message)} or the {@link 
java.util.concurrent.CompletableFuture}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index bf8bd6cc951..513c0101ac6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1608,6 +1608,11 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     private ConsumerInterceptors<T> 
getInternalConsumerInterceptors(ConsumerInterceptors<T> multiTopicInterceptors) 
{
         return new ConsumerInterceptors<T>(new ArrayList<>()) {
 
+            @Override
+            public Message<T> onArrival(Consumer<T> consumer, Message<T> 
message) {
+                return multiTopicInterceptors.onArrival(consumer, message);
+            }
+
             @Override
             public Message<T> beforeConsume(Consumer<T> consumer, Message<T> 
message) {
                 return message;

Reply via email to