This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d7f996f8ae4fb0c665b43df4fa9047dfa3912088 Author: Zike Yang <[email protected]> AuthorDate: Wed Jun 22 09:46:55 2022 +0800 [fix][client] Fix the startMessageId can't be respected as the ChunkMessageID (#16154) ### Motivation This is the same problem as when the consumer inclusive seeks the chunked message. See more detail in [PIP-107](https://github.com/apache/pulsar/issues/12402) ### Modifications * Use the first chunk message id as the startMessageId when creating the consumer/reader. (cherry picked from commit 33cf2d09502cec160dcf637786dd5b8fb5669343) --- .../java/org/apache/pulsar/client/impl/MessageChunkingTest.java | 9 +++++++++ .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 9 ++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java index 3504e263d51..3f3557290b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java @@ -488,6 +488,15 @@ public class MessageChunkingTest extends ProducerConsumerBase { assertEquals(msgIds.get(i), msgAfterSeek.getMessageId()); } + Reader<byte[]> reader = pulsarClient.newReader() + .topic(topicName) + .startMessageIdInclusive() + .startMessageId(msgIds.get(1)) + .create(); + + Message<byte[]> readMsg = reader.readNext(5, TimeUnit.SECONDS); + assertEquals(msgIds.get(1), readMsg.getMessageId()); + consumer1.close(); consumer2.close(); producer.close(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 9422877ef63..40fc120464e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -251,7 +251,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle interceptors); this.consumerId = client.newConsumerId(); this.subscriptionMode = conf.getSubscriptionMode(); - this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null; + if (startMessageId != null) { + if (startMessageId instanceof ChunkMessageIdImpl) { + this.startMessageId = new BatchMessageIdImpl( + ((ChunkMessageIdImpl) startMessageId).getFirstChunkMessageId()); + } else { + this.startMessageId = new BatchMessageIdImpl((MessageIdImpl) startMessageId); + } + } this.initialStartMessageId = this.startMessageId; this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec; AVAILABLE_PERMITS_UPDATER.set(this, 0);
