This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-2.10.5.3-a41ecf
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d89f7c0b03744c6401c771e58cc601ffa55113ee
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Aug 14 10:26:47 2024 +0800

    [fix][client] Fix for early hit `beforeConsume` for MultiTopicConsumer 
(#23141)
    
    (cherry picked from commit c07b158f003c5a5623296189f0932d7058d2e75a)
---
 .../apache/pulsar/client/api/InterceptorsTest.java | 45 +++++++++++------
 .../client/impl/MultiTopicsConsumerImpl.java       | 58 ++++++++++++++++++++--
 2 files changed, 84 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index b3f5ed3b487..8f239aea1f0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.api;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -27,8 +29,6 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Sets;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
@@ -66,6 +66,12 @@ public class InterceptorsTest extends ProducerConsumerBase {
         return new Object[][] { { 0 }, { 1000 } };
     }
 
+    @DataProvider(name = "topics")
+    public Object[][] getTopics() {
+        return new Object[][] 
{{Lists.newArrayList("persistent://my-property/my-ns/my-topic") },
+                { 
Lists.newArrayList("persistent://my-property/my-ns/my-topic", 
"persistent://my-property/my-ns/my-topic1") }};
+    }
+
     @Test
     public void testProducerInterceptor() throws Exception {
         Map<MessageId, List<String>> ackCallback = new HashMap<>();
@@ -390,9 +396,9 @@ public class InterceptorsTest extends ProducerConsumerBase {
 
             @Override
             public Message<String> beforeConsume(Consumer<String> consumer, 
Message<String> message) {
-                MessageImpl<String> msg = (MessageImpl<String>) message;
+                MessageImpl<String> msg = ((MessageImpl<String>) 
((TopicMessageImpl<String>) message).getMessage());
                 
msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
-                return msg;
+                return message;
             }
 
             @Override
@@ -436,13 +442,19 @@ public class InterceptorsTest extends 
ProducerConsumerBase {
 
         int keyCount = 0;
         for (int i = 0; i < 2; i++) {
-            Message<String> received = consumer.receive();
+            Message<String> received;
+            if (i % 2 == 0) {
+                received = consumer.receive();
+            } else {
+                received = consumer.receiveAsync().join();
+            }
             MessageImpl<String> msg = (MessageImpl<String>) 
((TopicMessageImpl<String>) received).getMessage();
             for (KeyValue keyValue : 
msg.getMessageBuilder().getPropertiesList()) {
                 if ("beforeConsumer".equals(keyValue.getKey())) {
                     keyCount++;
                 }
             }
+            Assert.assertEquals(keyCount, i + 1);
             consumer.acknowledge(received);
         }
         Assert.assertEquals(2, keyCount);
@@ -462,9 +474,9 @@ public class InterceptorsTest extends ProducerConsumerBase {
 
             @Override
             public Message<String> beforeConsume(Consumer<String> consumer, 
Message<String> message) {
-                MessageImpl<String> msg = (MessageImpl<String>) message;
+                MessageImpl<String> msg = ((MessageImpl<String>) 
((TopicMessageImpl<String>) message).getMessage());
                 
msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
-                return msg;
+                return message;
             }
 
             @Override
@@ -599,8 +611,8 @@ public class InterceptorsTest extends ProducerConsumerBase {
         consumer.close();
     }
 
-    @Test
-    public void testConsumerInterceptorForNegativeAcksSend() throws 
PulsarClientException, InterruptedException {
+    @Test(dataProvider = "topics")
+    public void testConsumerInterceptorForNegativeAcksSend(List<String> 
topics) throws PulsarClientException, InterruptedException {
         final int totalNumOfMessages = 100;
         CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);
 
@@ -627,6 +639,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
 
             @Override
             public void onNegativeAcksSend(Consumer<String> consumer, 
Set<MessageId> messageIds) {
+                Assert.assertTrue(latch.getCount() > 0);
                 messageIds.forEach(messageId -> latch.countDown());
             }
 
@@ -637,7 +650,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
         };
 
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic")
+                .topics(topics)
                 .subscriptionType(SubscriptionType.Failover)
                 .intercept(interceptor)
                 .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
@@ -645,7 +658,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
                 .subscribe();
 
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic")
+                .topic(topics.get(0))
                 .create();
 
         for (int i = 0; i < totalNumOfMessages; i++) {
@@ -669,8 +682,9 @@ public class InterceptorsTest extends ProducerConsumerBase {
         consumer.close();
     }
 
-    @Test
-    public void testConsumerInterceptorForAckTimeoutSend() throws 
PulsarClientException, InterruptedException {
+    @Test(dataProvider = "topics")
+    public void testConsumerInterceptorForAckTimeoutSend(List<String> topics) 
throws PulsarClientException,
+            InterruptedException {
         final int totalNumOfMessages = 100;
         CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);
 
@@ -701,16 +715,17 @@ public class InterceptorsTest extends 
ProducerConsumerBase {
 
             @Override
             public void onAckTimeoutSend(Consumer<String> consumer, 
Set<MessageId> messageIds) {
+                Assert.assertTrue(latch.getCount() > 0);
                 messageIds.forEach(messageId -> latch.countDown());
             }
         };
 
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic")
+                .topic(topics.get(0))
                 .create();
 
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic")
+                .topics(topics)
                 .subscriptionName("foo")
                 .intercept(interceptor)
                 .ackTimeout(2, TimeUnit.SECONDS)
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index b01c25d215b..be618744180 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.collect.Lists;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -104,6 +105,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
     private volatile BatchMessageIdImpl startMessageId = null;
     private final long startMessageRollbackDurationInSec;
+    private final ConsumerInterceptors<T> internalConsumerInterceptors;
     MultiTopicsConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<T> conf,
             ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> 
subscribeFuture, Schema<T> schema,
             ConsumerInterceptors<T> interceptors, boolean 
createTopicIfDoesNotExist) {
@@ -133,6 +135,11 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             long startMessageRollbackDurationInSec) {
         super(client, singleTopic, conf, Math.max(2, 
conf.getReceiverQueueSize()), executorProvider, subscribeFuture,
                 schema, interceptors);
+        if (interceptors != null) {
+           this.internalConsumerInterceptors = 
getInternalConsumerInterceptors(interceptors);
+        } else {
+            this.internalConsumerInterceptors = null;
+        }
 
         checkArgument(conf.getReceiverQueueSize() > 0,
             "Receiver queue size needs to be greater than 0 for Topics 
Consumer");
@@ -315,7 +322,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
         if (receivedFuture != null) {
             unAckedMessageTracker.add(topicMessage.getMessageId(), 
topicMessage.getRedeliveryCount());
-            completePendingReceive(receivedFuture, topicMessage);
+            final Message<T> interceptMessage = beforeConsume(topicMessage);
+            completePendingReceive(receivedFuture, interceptMessage);
         } else if (enqueueMessageAndCheckBatchReceive(topicMessage) && 
hasPendingBatchReceive()) {
             notifyPendingBatchReceivedCallBack();
         }
@@ -366,7 +374,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             }
             unAckedMessageTracker.add(message.getMessageId(), 
message.getRedeliveryCount());
             resumeReceivingFromPausedConsumersIfNeeded();
-            return message;
+            return beforeConsume(message);
         } catch (Exception e) {
             throw PulsarClientException.unwrap(e);
         }
@@ -393,6 +401,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     }
                 }
                 unAckedMessageTracker.add(message.getMessageId(), 
message.getRedeliveryCount());
+                message = beforeConsume(message);
             }
             resumeReceivingFromPausedConsumersIfNeeded();
             return message;
@@ -463,7 +472,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 checkState(message instanceof TopicMessageImpl);
                 unAckedMessageTracker.add(message.getMessageId(), 
message.getRedeliveryCount());
                 resumeReceivingFromPausedConsumersIfNeeded();
-                result.complete(message);
+                result.complete(beforeConsume(message));
             }
         });
         return result;
@@ -1119,7 +1128,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         return ConsumerImpl.newConsumerImpl(client, partitionName,
                 configurationData, client.externalExecutorProvider(),
                 partitionIndex, true, listener != null, subFuture,
-                startMessageId, schema, interceptors,
+                startMessageId, schema, this.internalConsumerInterceptors,
                 createIfDoesNotExist, startMessageRollbackDurationInSec);
     }
 
@@ -1503,4 +1512,45 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             acknowledgeCumulativeAsync(msg);
         }
     }
+
+    private ConsumerInterceptors<T> 
getInternalConsumerInterceptors(ConsumerInterceptors<T> multiTopicInterceptors) 
{
+        return new ConsumerInterceptors<T>(new ArrayList<>()) {
+
+            @Override
+            public Message<T> beforeConsume(Consumer<T> consumer, Message<T> 
message) {
+                return message;
+            }
+
+            @Override
+            public void onAcknowledge(Consumer<T> consumer, MessageId 
messageId, Throwable exception) {
+                multiTopicInterceptors.onAcknowledge(consumer, messageId, 
exception);
+            }
+
+            @Override
+            public void onAcknowledgeCumulative(Consumer<T> consumer,
+                                                MessageId messageId, Throwable 
exception) {
+                multiTopicInterceptors.onAcknowledgeCumulative(consumer, 
messageId, exception);
+            }
+
+            @Override
+            public void onNegativeAcksSend(Consumer<T> consumer, 
Set<MessageId> set) {
+                multiTopicInterceptors.onNegativeAcksSend(consumer, set);
+            }
+
+            @Override
+            public void onAckTimeoutSend(Consumer<T> consumer, Set<MessageId> 
set) {
+                multiTopicInterceptors.onAckTimeoutSend(consumer, set);
+            }
+
+            @Override
+            public void onPartitionsChange(String topicName, int partitions) {
+                multiTopicInterceptors.onPartitionsChange(topicName, 
partitions);
+            }
+
+            @Override
+            public void close() throws IOException {
+                multiTopicInterceptors.close();
+            }
+        };
+    }
 }

Reply via email to