This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4700c7cf64b18eafa20b8a14591bb5d50f794b69 Author: Penghui Li <[email protected]> AuthorDate: Fri Nov 8 10:59:06 2024 +0800 [fix][client] fix the beforeConsume() method earlier hit with message listener (#23578) (cherry picked from commit 137df29f85798b00de75460a1acb91c7bc25453f) --- .../apache/pulsar/client/api/InterceptorsTest.java | 78 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerBase.java | 1 + .../apache/pulsar/client/impl/ConsumerImpl.java | 3 +- .../client/impl/MultiTopicsConsumerImpl.java | 2 +- 4 files changed, 82 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java index afb17a18647..4fac35aa78f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java @@ -475,6 +475,84 @@ public class InterceptorsTest extends ProducerConsumerBase { consumer.close(); } + @Test(dataProvider = "topicPartition") + public void testDoNotEarlierHitBeforeConsumerWithMessageListener(int partitions) throws Exception { + + AtomicInteger beforeConsumeCount = new AtomicInteger(0); + PulsarClient client = PulsarClient.builder() + .serviceUrl(lookupUrl.toString()) + .listenerThreads(1) + .build(); + + ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<>() { + @Override + public void close() { + } + + @Override + public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) { + beforeConsumeCount.incrementAndGet(); + log.info("beforeConsume messageId: {}", message.getMessageId()); + return message; + } + + @Override + public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) { + } + + @Override + public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) { + } + + @Override + public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) { + } + + @Override + public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) { + } + }; + + final String topicName = "persistent://my-property/my-ns/my-topic"; + + if (partitions > 0) { + admin.topics().createPartitionedTopic(topicName, partitions); + } else { + admin.topics().createNonPartitionedTopic(topicName); + } + + Consumer<String> consumer = client.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionType(SubscriptionType.Shared) + .intercept(interceptor) + .subscriptionName("my-subscription") + .messageListener((c, m) -> { + // Simulate a long processing time + try { + Thread.sleep(60000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }) + .subscribe(); + + Producer<String> producer = client.newProducer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic") + .create(); + + final int messages = 10; + for (int i = 0; i < messages; i++) { + producer.newMessage().value("Hello Pulsar!").send(); + } + Awaitility.await().untilAsserted(() -> { + // Ensure that the interceptor is not hit before the message listener + Assert.assertEquals(beforeConsumeCount.get(), 1); + }); + producer.close(); + consumer.close(); + client.close(); + } + @Test public void testConsumerInterceptorWithPatternTopicSubscribe() throws PulsarClientException { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 74abb82bfe8..12f0d75c3a4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -1166,6 +1166,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T id = msg.getMessageId(); } unAckedMessageTracker.add(id, msg.getRedeliveryCount()); + beforeConsume(msg); listener.received(ConsumerBase.this, msg); } catch (Throwable t) { log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index b47b3f83308..c05fd021a99 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -545,7 +545,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return null; } messageProcessed(message); - return beforeConsume(message); + message = listener == null ? beforeConsume(message) : message; + return message; } catch (InterruptedException e) { ExceptionHandler.handleInterruptedException(e); State state = getState(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index bf8bd6cc951..4270e0036af 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -396,7 +396,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { decreaseIncomingMessageSize(message); checkArgument(message instanceof TopicMessageImpl); trackUnAckedMsgIfNoListener(message.getMessageId(), message.getRedeliveryCount()); - message = beforeConsume(message); + message = listener == null ? beforeConsume(message) : message; } resumeReceivingFromPausedConsumersIfNeeded(); return message;
