This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 003efdd734e [fix][client] Fix multi-topics consumer could receive old 
messages after seek (#21945)
003efdd734e is described below

commit 003efdd734ee3a373bf86bdfcd740f2c9ab83771
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Jan 31 00:31:15 2024 +0800

    [fix][client] Fix multi-topics consumer could receive old messages after 
seek (#21945)
---
 .../pulsar/client/impl/TopicsConsumerImplTest.java | 80 +++++++++++++++++++++-
 .../client/impl/MultiTopicsConsumerImpl.java       | 66 ++++++++++++------
 2 files changed, 125 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 51b32c2b44e..c343ab0d6e2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageIdAdv;
+import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.MessageRouter;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
@@ -57,22 +58,27 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
-
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Set;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -1394,4 +1400,76 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         }
     }
 
+    @DataProvider
+    public static Object[][] seekByFunction() {
+        return new Object[][] {
+                { true }, { false }
+        };
+    }
+
+    @Test(timeOut = 30000, dataProvider = "seekByFunction")
+    public void testSeekToNewerPosition(boolean seekByFunction) throws 
Exception {
+        final var topic1 = TopicName.get(newTopicName()).toString()
+                .replace("my-property", "public").replace("my-ns", "default");
+        final var topic2 = TopicName.get(newTopicName()).toString()
+                .replace("my-property", "public").replace("my-ns", "default");
+        @Cleanup final var producer1 = 
pulsarClient.newProducer(Schema.STRING).topic(topic1).create();
+        @Cleanup final var producer2 = 
pulsarClient.newProducer(Schema.STRING).topic(topic2).create();
+        producer1.send("1-0");
+        producer2.send("2-0");
+        producer1.send("1-1");
+        producer2.send("2-1");
+        final var consumer1 = pulsarClient.newConsumer(Schema.STRING)
+                .topics(Arrays.asList(topic1, topic2)).subscriptionName("sub")
+                .ackTimeout(1, TimeUnit.SECONDS)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+        final var timestamps = new ArrayList<Long>();
+        for (int i = 0; i < 4; i++) {
+            timestamps.add(consumer1.receive().getPublishTime());
+        }
+        timestamps.sort(Comparator.naturalOrder());
+        final var timestamp = timestamps.get(2);
+        consumer1.close();
+
+        final Function<Consumer<String>, CompletableFuture<Void>> seekAsync = 
consumer -> {
+            final var future = seekByFunction ? consumer.seekAsync(__ -> 
timestamp) : consumer.seekAsync(timestamp);
+            assertEquals(((ConsumerBase<String>) 
consumer).getIncomingMessageSize(), 0L);
+            assertEquals(((ConsumerBase<String>) 
consumer).getTotalIncomingMessages(), 0);
+            assertTrue(((ConsumerBase<String>) 
consumer).getUnAckedMessageTracker().isEmpty());
+            return future;
+        };
+
+        @Cleanup final var consumer2 = pulsarClient.newConsumer(Schema.STRING)
+                .topics(Arrays.asList(topic1, 
topic2)).subscriptionName("sub-2")
+                .ackTimeout(1, TimeUnit.SECONDS)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+        seekAsync.apply(consumer2).get();
+        final var values = new TreeSet<String>();
+        for (int i = 0; i < 2; i++) {
+            values.add(consumer2.receive().getValue());
+        }
+        assertEquals(values, new TreeSet<>(Arrays.asList("1-1", "2-1")));
+
+        final var valuesInListener = new CopyOnWriteArrayList<String>();
+        @Cleanup final var consumer3 = pulsarClient.newConsumer(Schema.STRING)
+                .topics(Arrays.asList(topic1, 
topic2)).subscriptionName("sub-3")
+                .messageListener((MessageListener<String>) (__, msg) -> 
valuesInListener.add(msg.getValue()))
+                .ackTimeout(1, TimeUnit.SECONDS)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+        seekAsync.apply(consumer3).get();
+        if (valuesInListener.isEmpty()) {
+            Awaitility.await().untilAsserted(() -> 
assertEquals(valuesInListener.size(), 2));
+            assertEquals(valuesInListener.stream().sorted().toList(), 
Arrays.asList("1-1", "2-1"));
+        } // else: consumer3 has passed messages to the listener before seek, 
in this case we cannot assume anything
+
+        @Cleanup final var consumer4 = pulsarClient.newConsumer(Schema.STRING)
+                .topics(Arrays.asList(topic1, 
topic2)).subscriptionName("sub-4")
+                .ackTimeout(1, TimeUnit.SECONDS)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+        seekAsync.apply(consumer4).get();
+        final var valuesInReceiveAsync = new ArrayList<String>();
+        valuesInReceiveAsync.add(consumer4.receiveAsync().get().getValue());
+        valuesInReceiveAsync.add(consumer4.receiveAsync().get().getValue());
+        assertEquals(valuesInReceiveAsync.stream().sorted().toList(), 
Arrays.asList("1-1", "2-1"));
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 6ba3aaaaa46..baabaf67070 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
@@ -101,7 +102,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     private final MultiTopicConsumerStatsRecorderImpl stats;
     private final ConsumerConfigurationData<T> internalConfig;
 
-    private volatile MessageIdAdv startMessageId;
+    private final MessageIdAdv startMessageId;
+    private volatile boolean duringSeek = false;
     private final long startMessageRollbackDurationInSec;
     MultiTopicsConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<T> conf,
             ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> 
subscribeFuture, Schema<T> schema,
@@ -235,6 +237,10 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     }
 
     private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean 
batchReceive) {
+        if (duringSeek) {
+            log.info("[{}] Pause receiving messages for topic {} due to seek", 
subscription, consumer.getTopic());
+            return;
+        }
         CompletableFuture<List<Message<T>>> messagesFuture;
         if (batchReceive) {
             messagesFuture = consumer.batchReceiveAsync().thenApply(msgs -> 
((MessagesImpl<T>) msgs).getMessageList());
@@ -252,8 +258,12 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             }
             // Process the message, add to the queue and trigger listener or 
async callback
             messages.forEach(msg -> {
-                if (isValidConsumerEpoch((MessageImpl<T>) msg)) {
+                final boolean skipDueToSeek = duringSeek;
+                if (isValidConsumerEpoch((MessageImpl<T>) msg) && 
!skipDueToSeek) {
                     messageReceived(consumer, msg);
+                } else if (skipDueToSeek) {
+                    log.info("[{}] [{}] Skip processing message {} received 
during seek", topic, subscription,
+                            msg.getMessageId());
                 }
             });
 
@@ -748,17 +758,12 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
     @Override
     public CompletableFuture<Void> seekAsync(Function<String, Object> 
function) {
-        List<CompletableFuture<Void>> futures = new 
ArrayList<>(consumers.size());
-        consumers.values().forEach(consumer -> 
futures.add(consumer.seekAsync(function)));
-        unAckedMessageTracker.clear();
-        incomingMessages.clear();
-        resetIncomingMessageSize();
-        return FutureUtil.waitForAll(futures);
+        return seekAllAsync(consumer -> consumer.seekAsync(function));
     }
 
     @Override
     public CompletableFuture<Void> seekAsync(MessageId messageId) {
-        final Consumer<T> internalConsumer;
+        final ConsumerImpl<T> internalConsumer;
         if (messageId instanceof TopicMessageId) {
             TopicMessageId topicMessageId = (TopicMessageId) messageId;
             internalConsumer = consumers.get(topicMessageId.getOwnerTopic());
@@ -775,25 +780,46 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             );
         }
 
-        final CompletableFuture<Void> seekFuture;
         if (internalConsumer == null) {
-            List<CompletableFuture<Void>> futures = new 
ArrayList<>(consumers.size());
-            consumers.values().forEach(consumerImpl -> 
futures.add(consumerImpl.seekAsync(messageId)));
-            seekFuture = FutureUtil.waitForAll(futures);
+            return seekAllAsync(consumer -> consumer.seekAsync(messageId));
         } else {
-            seekFuture = internalConsumer.seekAsync(messageId);
+            return seekAsyncInternal(Collections.singleton(internalConsumer), 
__ -> __.seekAsync(messageId));
         }
+    }
+
+    @Override
+    public CompletableFuture<Void> seekAsync(long timestamp) {
+        return seekAllAsync(consumer -> consumer.seekAsync(timestamp));
+    }
 
+    private CompletableFuture<Void> 
seekAsyncInternal(Collection<ConsumerImpl<T>> consumers,
+                                                      
Function<ConsumerImpl<T>, CompletableFuture<Void>> seekFunc) {
+        beforeSeek();
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        
FutureUtil.waitForAll(consumers.stream().map(seekFunc).collect(Collectors.toList()))
+                .whenComplete((__, e) -> afterSeek(future, e));
+        return future;
+    }
+
+    private CompletableFuture<Void> seekAllAsync(Function<ConsumerImpl<T>, 
CompletableFuture<Void>> seekFunc) {
+        return seekAsyncInternal(consumers.values(), seekFunc);
+    }
+
+    private void beforeSeek() {
+        duringSeek = true;
         unAckedMessageTracker.clear();
         clearIncomingMessages();
-        return seekFuture;
     }
 
-    @Override
-    public CompletableFuture<Void> seekAsync(long timestamp) {
-        List<CompletableFuture<Void>> futures = new 
ArrayList<>(consumers.size());
-        consumers.values().forEach(consumer -> 
futures.add(consumer.seekAsync(timestamp)));
-        return FutureUtil.waitForAll(futures);
+    private void afterSeek(CompletableFuture<Void> seekFuture, @Nullable 
Throwable throwable) {
+        duringSeek = false;
+        log.info("[{}] Resume receiving messages for {} since seek is done", 
subscription, consumers.keySet());
+        startReceivingMessages(new ArrayList<>(consumers.values()));
+        if (throwable == null) {
+            seekFuture.complete(null);
+        } else {
+            seekFuture.completeExceptionally(throwable);
+        }
     }
 
     @Override

Reply via email to