This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ce8f4505cfc799a74d15f786542cf9639ef33393 Author: Jason918 <[email protected]> AuthorDate: Thu Nov 4 23:18:10 2021 +0800 [ISSUE-12291][Client] 'StartMessageId' and 'RollbackDuration' not working in MultiTopicsReader for non-partitioned topics (#12308) Fixes #12291 ### Motivation Bug fix. 'StartMessageId' and 'RollbackDuration' is not working in MultiTopicsReader for non-partitioned topics. ### Modifications This fix is quite simple. Just add `startMessageId` and `startMessageRollbackDurationInSec` when creating underlying consumer with `ConsumerImpl.newConsumerImpl` (cherry picked from commit cb48152254b8c16596e7251ef9a7229d918d2e90) --- .../pulsar/client/impl/MultiTopicsReaderTest.java | 102 ++++++++++++++++++++- .../apache/pulsar/client/impl/ConsumerImpl.java | 2 +- .../client/impl/MultiTopicsConsumerImpl.java | 8 +- 3 files changed, 106 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java index 31a426e..a8a6ced 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java @@ -48,7 +48,6 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.RetentionPolicies; @@ -378,6 +377,107 @@ public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest { Awaitility.await().untilAsserted(() -> assertEquals(client.consumersCount(), 0)); } + @Test(timeOut = 20000) + public void testMultiNonPartitionedTopicWithStartMessageId() throws Exception { + final String topic1 = "persistent://my-property/my-ns/topic1" + UUID.randomUUID(); + final String topic2 = "persistent://my-property/my-ns/topic2" + UUID.randomUUID(); + List<String> topics = Arrays.asList(topic1, topic2); + PulsarClientImpl client = (PulsarClientImpl) pulsarClient; + + // create producer and send msg + List<Producer<String>> producerList = new ArrayList<>(); + for (String topicName : topics) { + producerList.add(pulsarClient.newProducer(Schema.STRING).topic(topicName).create()); + } + int msgNum = 10; + Set<String> messages = new HashSet<>(); + for (int i = 0; i < producerList.size(); i++) { + Producer<String> producer = producerList.get(i); + for (int j = 0; j < msgNum; j++) { + String msg = i + "msg" + j; + producer.send(msg); + messages.add(msg); + } + } + Reader<String> reader = pulsarClient.newReader(Schema.STRING) + .startMessageId(MessageId.earliest) + .topics(topics).readerName("my-reader").create(); + // receive messages + while (reader.hasMessageAvailable()) { + messages.remove(reader.readNext(5, TimeUnit.SECONDS).getValue()); + } + assertEquals(messages.size(), 0); + assertEquals(client.consumersCount(), 1); + // clean up + for (Producer<String> producer : producerList) { + producer.close(); + } + reader.close(); + Awaitility.await().untilAsserted(() -> assertEquals(client.consumersCount(), 0)); + } + + @Test(timeOut = 20000) + public void testMultiNonPartitionedTopicWithRollbackDuration() throws Exception { + final String topic1 = "persistent://my-property/my-ns/topic1" + UUID.randomUUID(); + final String topic2 = "persistent://my-property/my-ns/topic2" + UUID.randomUUID(); + List<String> topics = Arrays.asList(topic1, topic2); + PulsarClientImpl client = (PulsarClientImpl) pulsarClient; + + // create producer and send msg + List<Producer<String>> producerList = new ArrayList<>(); + for (String topicName : topics) { + producerList.add(pulsarClient.newProducer(Schema.STRING).topic(topicName).create()); + } + int totalMsg = 10; + Set<String> messages = new HashSet<>(); + long oldMsgPublishTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(5); // 5 hours old + long newMsgPublishTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1); // 5 hours old + for (int i = 0; i < producerList.size(); i++) { + Producer<String> producer = producerList.get(i); + // (1) Publish 10 messages with publish-time 5 HOUR back + for (int j = 0; j < totalMsg; j++) { + TypedMessageBuilderImpl<String> msg = (TypedMessageBuilderImpl<String>) producer.newMessage() + .value(i + "-old-msg-" + j); + msg.getMetadataBuilder() + .setPublishTime(oldMsgPublishTime) + .setProducerName(producer.getProducerName()) + .setReplicatedFrom("us-west1"); + msg.send(); + messages.add(msg.getMessage().getValue()); + } + // (2) Publish 10 messages with publish-time 1 HOUR back + for (int j = 0; j < totalMsg; j++) { + TypedMessageBuilderImpl<String> msg = (TypedMessageBuilderImpl<String>) producer.newMessage() + .value(i + "-new-msg-" + j); + msg.getMetadataBuilder() + .setPublishTime(newMsgPublishTime) + .setProducerName(producer.getProducerName()) + .setReplicatedFrom("us-west1"); + msg.send(); + messages.add(msg.getMessage().getValue()); + } + } + + Reader<String> reader = pulsarClient.newReader(Schema.STRING) + .startMessageFromRollbackDuration(2, TimeUnit.HOURS) + .topics(topics).readerName("my-reader").create(); + // receive messages + while (reader.hasMessageAvailable()) { + messages.remove(reader.readNext(5, TimeUnit.SECONDS).getValue()); + } + assertEquals(messages.size(), 2 * totalMsg); + for (String message : messages) { + assertTrue(message.contains("old-msg")); + } + assertEquals(client.consumersCount(), 1); + // clean up + for (Producer<String> producer : producerList) { + producer.close(); + } + reader.close(); + Awaitility.await().untilAsserted(() -> assertEquals(client.consumersCount(), 0)); + } + @Test(timeOut = 10000) public void testKeyHashRangeReader() throws Exception { final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index b29d6c7..88b6df1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2000,7 +2000,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle if (lastDequeuedMessageId == MessageId.earliest) { // if we are starting from latest, we should seek to the actual last message first. // allow the last one to be read when read head inclusively. - if (startMessageId.equals(MessageId.latest)) { + if (MessageId.latest.equals(startMessageId)) { CompletableFuture<GetLastMessageIdResponse> future = internalGetLastMessageIdAsync(); // if the consumer is configured to read inclusive then we need to seek to the last message diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index fa7c4a2..18bd7bc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -998,8 +998,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } else { ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig, client.externalExecutorProvider(), -1, - true, subFuture, null, schema, interceptors, - createIfDoesNotExist); + true, subFuture, startMessageId, schema, interceptors, + createIfDoesNotExist, startMessageRollbackDurationInSec); synchronized (pauseMutex) { if (paused) { @@ -1294,8 +1294,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl( client, partitionName, configurationData, client.externalExecutorProvider(), - partitionIndex, true, subFuture, null, schema, interceptors, - true /* createTopicIfDoesNotExist */); + partitionIndex, true, subFuture, startMessageId, schema, interceptors, + true /* createTopicIfDoesNotExist */, startMessageRollbackDurationInSec); synchronized (pauseMutex) { if (paused) { newConsumer.pause();
