This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 003efdd734e [fix][client] Fix multi-topics consumer could receive old
messages after seek (#21945)
003efdd734e is described below
commit 003efdd734ee3a373bf86bdfcd740f2c9ab83771
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Jan 31 00:31:15 2024 +0800
[fix][client] Fix multi-topics consumer could receive old messages after
seek (#21945)
---
.../pulsar/client/impl/TopicsConsumerImplTest.java | 80 +++++++++++++++++++++-
.../client/impl/MultiTopicsConsumerImpl.java | 66 ++++++++++++------
2 files changed, 125 insertions(+), 21 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 51b32c2b44e..c343ab0d6e2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
+import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
@@ -57,22 +58,27 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
import java.util.Set;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -1394,4 +1400,76 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
}
}
+ @DataProvider
+ public static Object[][] seekByFunction() {
+ return new Object[][] {
+ { true }, { false }
+ };
+ }
+
+ @Test(timeOut = 30000, dataProvider = "seekByFunction")
+ public void testSeekToNewerPosition(boolean seekByFunction) throws
Exception {
+ final var topic1 = TopicName.get(newTopicName()).toString()
+ .replace("my-property", "public").replace("my-ns", "default");
+ final var topic2 = TopicName.get(newTopicName()).toString()
+ .replace("my-property", "public").replace("my-ns", "default");
+ @Cleanup final var producer1 =
pulsarClient.newProducer(Schema.STRING).topic(topic1).create();
+ @Cleanup final var producer2 =
pulsarClient.newProducer(Schema.STRING).topic(topic2).create();
+ producer1.send("1-0");
+ producer2.send("2-0");
+ producer1.send("1-1");
+ producer2.send("2-1");
+ final var consumer1 = pulsarClient.newConsumer(Schema.STRING)
+ .topics(Arrays.asList(topic1, topic2)).subscriptionName("sub")
+ .ackTimeout(1, TimeUnit.SECONDS)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+ final var timestamps = new ArrayList<Long>();
+ for (int i = 0; i < 4; i++) {
+ timestamps.add(consumer1.receive().getPublishTime());
+ }
+ timestamps.sort(Comparator.naturalOrder());
+ final var timestamp = timestamps.get(2);
+ consumer1.close();
+
+ final Function<Consumer<String>, CompletableFuture<Void>> seekAsync =
consumer -> {
+ final var future = seekByFunction ? consumer.seekAsync(__ ->
timestamp) : consumer.seekAsync(timestamp);
+ assertEquals(((ConsumerBase<String>)
consumer).getIncomingMessageSize(), 0L);
+ assertEquals(((ConsumerBase<String>)
consumer).getTotalIncomingMessages(), 0);
+ assertTrue(((ConsumerBase<String>)
consumer).getUnAckedMessageTracker().isEmpty());
+ return future;
+ };
+
+ @Cleanup final var consumer2 = pulsarClient.newConsumer(Schema.STRING)
+ .topics(Arrays.asList(topic1,
topic2)).subscriptionName("sub-2")
+ .ackTimeout(1, TimeUnit.SECONDS)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+ seekAsync.apply(consumer2).get();
+ final var values = new TreeSet<String>();
+ for (int i = 0; i < 2; i++) {
+ values.add(consumer2.receive().getValue());
+ }
+ assertEquals(values, new TreeSet<>(Arrays.asList("1-1", "2-1")));
+
+ final var valuesInListener = new CopyOnWriteArrayList<String>();
+ @Cleanup final var consumer3 = pulsarClient.newConsumer(Schema.STRING)
+ .topics(Arrays.asList(topic1,
topic2)).subscriptionName("sub-3")
+ .messageListener((MessageListener<String>) (__, msg) ->
valuesInListener.add(msg.getValue()))
+ .ackTimeout(1, TimeUnit.SECONDS)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+ seekAsync.apply(consumer3).get();
+ if (valuesInListener.isEmpty()) {
+ Awaitility.await().untilAsserted(() ->
assertEquals(valuesInListener.size(), 2));
+ assertEquals(valuesInListener.stream().sorted().toList(),
Arrays.asList("1-1", "2-1"));
+ } // else: consumer3 has passed messages to the listener before seek,
in this case we cannot assume anything
+
+ @Cleanup final var consumer4 = pulsarClient.newConsumer(Schema.STRING)
+ .topics(Arrays.asList(topic1,
topic2)).subscriptionName("sub-4")
+ .ackTimeout(1, TimeUnit.SECONDS)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+ seekAsync.apply(consumer4).get();
+ final var valuesInReceiveAsync = new ArrayList<String>();
+ valuesInReceiveAsync.add(consumer4.receiveAsync().get().getValue());
+ valuesInReceiveAsync.add(consumer4.receiveAsync().get().getValue());
+ assertEquals(valuesInReceiveAsync.stream().sorted().toList(),
Arrays.asList("1-1", "2-1"));
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 6ba3aaaaa46..baabaf67070 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
@@ -101,7 +102,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
private final MultiTopicConsumerStatsRecorderImpl stats;
private final ConsumerConfigurationData<T> internalConfig;
- private volatile MessageIdAdv startMessageId;
+ private final MessageIdAdv startMessageId;
+ private volatile boolean duringSeek = false;
private final long startMessageRollbackDurationInSec;
MultiTopicsConsumerImpl(PulsarClientImpl client,
ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>>
subscribeFuture, Schema<T> schema,
@@ -235,6 +237,10 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean
batchReceive) {
+ if (duringSeek) {
+ log.info("[{}] Pause receiving messages for topic {} due to seek",
subscription, consumer.getTopic());
+ return;
+ }
CompletableFuture<List<Message<T>>> messagesFuture;
if (batchReceive) {
messagesFuture = consumer.batchReceiveAsync().thenApply(msgs ->
((MessagesImpl<T>) msgs).getMessageList());
@@ -252,8 +258,12 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
// Process the message, add to the queue and trigger listener or
async callback
messages.forEach(msg -> {
- if (isValidConsumerEpoch((MessageImpl<T>) msg)) {
+ final boolean skipDueToSeek = duringSeek;
+ if (isValidConsumerEpoch((MessageImpl<T>) msg) &&
!skipDueToSeek) {
messageReceived(consumer, msg);
+ } else if (skipDueToSeek) {
+ log.info("[{}] [{}] Skip processing message {} received
during seek", topic, subscription,
+ msg.getMessageId());
}
});
@@ -748,17 +758,12 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
@Override
public CompletableFuture<Void> seekAsync(Function<String, Object>
function) {
- List<CompletableFuture<Void>> futures = new
ArrayList<>(consumers.size());
- consumers.values().forEach(consumer ->
futures.add(consumer.seekAsync(function)));
- unAckedMessageTracker.clear();
- incomingMessages.clear();
- resetIncomingMessageSize();
- return FutureUtil.waitForAll(futures);
+ return seekAllAsync(consumer -> consumer.seekAsync(function));
}
@Override
public CompletableFuture<Void> seekAsync(MessageId messageId) {
- final Consumer<T> internalConsumer;
+ final ConsumerImpl<T> internalConsumer;
if (messageId instanceof TopicMessageId) {
TopicMessageId topicMessageId = (TopicMessageId) messageId;
internalConsumer = consumers.get(topicMessageId.getOwnerTopic());
@@ -775,25 +780,46 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
);
}
- final CompletableFuture<Void> seekFuture;
if (internalConsumer == null) {
- List<CompletableFuture<Void>> futures = new
ArrayList<>(consumers.size());
- consumers.values().forEach(consumerImpl ->
futures.add(consumerImpl.seekAsync(messageId)));
- seekFuture = FutureUtil.waitForAll(futures);
+ return seekAllAsync(consumer -> consumer.seekAsync(messageId));
} else {
- seekFuture = internalConsumer.seekAsync(messageId);
+ return seekAsyncInternal(Collections.singleton(internalConsumer),
__ -> __.seekAsync(messageId));
}
+ }
+
+ @Override
+ public CompletableFuture<Void> seekAsync(long timestamp) {
+ return seekAllAsync(consumer -> consumer.seekAsync(timestamp));
+ }
+ private CompletableFuture<Void>
seekAsyncInternal(Collection<ConsumerImpl<T>> consumers,
+
Function<ConsumerImpl<T>, CompletableFuture<Void>> seekFunc) {
+ beforeSeek();
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+
FutureUtil.waitForAll(consumers.stream().map(seekFunc).collect(Collectors.toList()))
+ .whenComplete((__, e) -> afterSeek(future, e));
+ return future;
+ }
+
+ private CompletableFuture<Void> seekAllAsync(Function<ConsumerImpl<T>,
CompletableFuture<Void>> seekFunc) {
+ return seekAsyncInternal(consumers.values(), seekFunc);
+ }
+
+ private void beforeSeek() {
+ duringSeek = true;
unAckedMessageTracker.clear();
clearIncomingMessages();
- return seekFuture;
}
- @Override
- public CompletableFuture<Void> seekAsync(long timestamp) {
- List<CompletableFuture<Void>> futures = new
ArrayList<>(consumers.size());
- consumers.values().forEach(consumer ->
futures.add(consumer.seekAsync(timestamp)));
- return FutureUtil.waitForAll(futures);
+ private void afterSeek(CompletableFuture<Void> seekFuture, @Nullable
Throwable throwable) {
+ duringSeek = false;
+ log.info("[{}] Resume receiving messages for {} since seek is done",
subscription, consumers.keySet());
+ startReceivingMessages(new ArrayList<>(consumers.values()));
+ if (throwable == null) {
+ seekFuture.complete(null);
+ } else {
+ seekFuture.completeExceptionally(throwable);
+ }
}
@Override