This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 137df29f857 [fix][client] fix the beforeConsume() method earlier hit 
with message listener (#23578)
137df29f857 is described below

commit 137df29f85798b00de75460a1acb91c7bc25453f
Author: Penghui Li <[email protected]>
AuthorDate: Fri Nov 8 10:59:06 2024 +0800

    [fix][client] fix the beforeConsume() method earlier hit with message 
listener (#23578)
---
 .../apache/pulsar/client/api/InterceptorsTest.java | 78 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  1 +
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  3 +-
 .../client/impl/MultiTopicsConsumerImpl.java       |  2 +-
 4 files changed, 82 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index 8115f34121d..f71cdc55141 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -476,6 +476,84 @@ public class InterceptorsTest extends ProducerConsumerBase 
{
         consumer.close();
     }
 
+    @Test(dataProvider = "topicPartition")
+    public void testDoNotEarlierHitBeforeConsumerWithMessageListener(int 
partitions) throws Exception {
+
+        AtomicInteger beforeConsumeCount = new AtomicInteger(0);
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(lookupUrl.toString())
+                .listenerThreads(1)
+                .build();
+
+        ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<>() {
+            @Override
+            public void close() {
+            }
+
+            @Override
+            public Message<String> beforeConsume(Consumer<String> consumer, 
Message<String> message) {
+                beforeConsumeCount.incrementAndGet();
+                log.info("beforeConsume messageId: {}", 
message.getMessageId());
+                return message;
+            }
+
+            @Override
+            public void onAcknowledge(Consumer<String> consumer, MessageId 
messageId, Throwable cause) {
+            }
+
+            @Override
+            public void onAcknowledgeCumulative(Consumer<String> consumer, 
MessageId messageId, Throwable cause) {
+            }
+
+            @Override
+            public void onNegativeAcksSend(Consumer<String> consumer, 
Set<MessageId> messageIds) {
+            }
+
+            @Override
+            public void onAckTimeoutSend(Consumer<String> consumer, 
Set<MessageId> messageIds) {
+            }
+        };
+
+        final String topicName = "persistent://my-property/my-ns/my-topic";
+
+        if (partitions > 0) {
+            admin.topics().createPartitionedTopic(topicName, partitions);
+        } else {
+            admin.topics().createNonPartitionedTopic(topicName);
+        }
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .intercept(interceptor)
+                .subscriptionName("my-subscription")
+                .messageListener((c, m) -> {
+                    // Simulate a long processing time
+                    try {
+                        Thread.sleep(60000);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                })
+                .subscribe();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .create();
+
+        final int messages = 10;
+        for (int i = 0; i < messages; i++) {
+            producer.newMessage().value("Hello Pulsar!").send();
+        }
+        Awaitility.await().untilAsserted(() -> {
+            // Ensure that the interceptor is not hit before the message 
listener
+            Assert.assertEquals(beforeConsumeCount.get(), 1);
+        });
+        producer.close();
+        consumer.close();
+        client.close();
+    }
+
     @Test
     public void testConsumerInterceptorWithPatternTopicSubscribe() throws 
PulsarClientException {
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 3073f3a8334..0fc906b6e7a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -1180,6 +1180,7 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
                 id = msg.getMessageId();
             }
             unAckedMessageTracker.add(id, msg.getRedeliveryCount());
+            beforeConsume(msg);
             listener.received(ConsumerBase.this, msg);
         } catch (Throwable t) {
             log.error("[{}][{}] Message listener error in processing message: 
{}", topic, subscription,
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index be01bd00eb3..004adab56f5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -542,7 +542,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 return null;
             }
             messageProcessed(message);
-            return beforeConsume(message);
+            message = listener == null ? beforeConsume(message) : message;
+            return message;
         } catch (InterruptedException e) {
             ExceptionHandler.handleInterruptedException(e);
             State state = getState();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index ff293af2308..528a140b81c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -401,7 +401,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 decreaseIncomingMessageSize(message);
                 checkArgument(message instanceof TopicMessageImpl);
                 trackUnAckedMsgIfNoListener(message.getMessageId(), 
message.getRedeliveryCount());
-                message = beforeConsume(message);
+                message = listener == null ? beforeConsume(message) : message;
             }
             resumeReceivingFromPausedConsumersIfNeeded();
             return message;

Reply via email to