This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 137df29f857 [fix][client] fix the beforeConsume() method earlier hit
with message listener (#23578)
137df29f857 is described below
commit 137df29f85798b00de75460a1acb91c7bc25453f
Author: Penghui Li <[email protected]>
AuthorDate: Fri Nov 8 10:59:06 2024 +0800
[fix][client] fix the beforeConsume() method earlier hit with message
listener (#23578)
---
.../apache/pulsar/client/api/InterceptorsTest.java | 78 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 1 +
.../apache/pulsar/client/impl/ConsumerImpl.java | 3 +-
.../client/impl/MultiTopicsConsumerImpl.java | 2 +-
4 files changed, 82 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index 8115f34121d..f71cdc55141 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -476,6 +476,84 @@ public class InterceptorsTest extends ProducerConsumerBase
{
consumer.close();
}
+ @Test(dataProvider = "topicPartition")
+ public void testDoNotEarlierHitBeforeConsumerWithMessageListener(int
partitions) throws Exception {
+
+ AtomicInteger beforeConsumeCount = new AtomicInteger(0);
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(lookupUrl.toString())
+ .listenerThreads(1)
+ .build();
+
+ ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<>() {
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Message<String> beforeConsume(Consumer<String> consumer,
Message<String> message) {
+ beforeConsumeCount.incrementAndGet();
+ log.info("beforeConsume messageId: {}",
message.getMessageId());
+ return message;
+ }
+
+ @Override
+ public void onAcknowledge(Consumer<String> consumer, MessageId
messageId, Throwable cause) {
+ }
+
+ @Override
+ public void onAcknowledgeCumulative(Consumer<String> consumer,
MessageId messageId, Throwable cause) {
+ }
+
+ @Override
+ public void onNegativeAcksSend(Consumer<String> consumer,
Set<MessageId> messageIds) {
+ }
+
+ @Override
+ public void onAckTimeoutSend(Consumer<String> consumer,
Set<MessageId> messageIds) {
+ }
+ };
+
+ final String topicName = "persistent://my-property/my-ns/my-topic";
+
+ if (partitions > 0) {
+ admin.topics().createPartitionedTopic(topicName, partitions);
+ } else {
+ admin.topics().createNonPartitionedTopic(topicName);
+ }
+
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .intercept(interceptor)
+ .subscriptionName("my-subscription")
+ .messageListener((c, m) -> {
+ // Simulate a long processing time
+ try {
+ Thread.sleep(60000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .subscribe();
+
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic")
+ .create();
+
+ final int messages = 10;
+ for (int i = 0; i < messages; i++) {
+ producer.newMessage().value("Hello Pulsar!").send();
+ }
+ Awaitility.await().untilAsserted(() -> {
+ // Ensure that the interceptor is not hit before the message
listener
+ Assert.assertEquals(beforeConsumeCount.get(), 1);
+ });
+ producer.close();
+ consumer.close();
+ client.close();
+ }
+
@Test
public void testConsumerInterceptorWithPatternTopicSubscribe() throws
PulsarClientException {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 3073f3a8334..0fc906b6e7a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -1180,6 +1180,7 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
id = msg.getMessageId();
}
unAckedMessageTracker.add(id, msg.getRedeliveryCount());
+ beforeConsume(msg);
listener.received(ConsumerBase.this, msg);
} catch (Throwable t) {
log.error("[{}][{}] Message listener error in processing message:
{}", topic, subscription,
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index be01bd00eb3..004adab56f5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -542,7 +542,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return null;
}
messageProcessed(message);
- return beforeConsume(message);
+ message = listener == null ? beforeConsume(message) : message;
+ return message;
} catch (InterruptedException e) {
ExceptionHandler.handleInterruptedException(e);
State state = getState();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index ff293af2308..528a140b81c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -401,7 +401,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
decreaseIncomingMessageSize(message);
checkArgument(message instanceof TopicMessageImpl);
trackUnAckedMsgIfNoListener(message.getMessageId(),
message.getRedeliveryCount());
- message = beforeConsume(message);
+ message = listener == null ? beforeConsume(message) : message;
}
resumeReceivingFromPausedConsumersIfNeeded();
return message;