This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 33cf2d09502 [fix][client] Fix the startMessageId can't be respected as
the ChunkMessageID (#16154)
33cf2d09502 is described below
commit 33cf2d09502cec160dcf637786dd5b8fb5669343
Author: Zike Yang <[email protected]>
AuthorDate: Wed Jun 22 09:46:55 2022 +0800
[fix][client] Fix the startMessageId can't be respected as the
ChunkMessageID (#16154)
### Motivation
This is the same problem as when the consumer inclusive seeks the chunked
message.
See more detail in [PIP-107](https://github.com/apache/pulsar/issues/12402)
### Modifications
* Use the first chunk message id as the startMessageId when creating the
consumer/reader.
---
.../org/apache/pulsar/client/impl/MessageChunkingTest.java | 11 ++++++++++-
.../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 9 ++++++++-
2 files changed, 18 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 85d67c3de0d..00e6c2f78e3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -520,6 +520,15 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
assertEquals(msgIds.get(i), msgAfterSeek.getMessageId());
}
+ Reader<byte[]> reader = pulsarClient.newReader()
+ .topic(topicName)
+ .startMessageIdInclusive()
+ .startMessageId(msgIds.get(1))
+ .create();
+
+ Message<byte[]> readMsg = reader.readNext(5, TimeUnit.SECONDS);
+ assertEquals(msgIds.get(1), readMsg.getMessageId());
+
consumer1.close();
consumer2.close();
producer.close();
@@ -549,5 +558,5 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
}
return str.toString();
}
-
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 9e86770ee8f..3e25ba0facb 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -249,7 +249,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
interceptors);
this.consumerId = client.newConsumerId();
this.subscriptionMode = conf.getSubscriptionMode();
- this.startMessageId = startMessageId != null ? new
BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
+ if (startMessageId != null) {
+ if (startMessageId instanceof ChunkMessageIdImpl) {
+ this.startMessageId = new BatchMessageIdImpl(
+ ((ChunkMessageIdImpl)
startMessageId).getFirstChunkMessageId());
+ } else {
+ this.startMessageId = new BatchMessageIdImpl((MessageIdImpl)
startMessageId);
+ }
+ }
this.initialStartMessageId = this.startMessageId;
this.startMessageRollbackDurationInSec =
startMessageRollbackDurationInSec;
AVAILABLE_PERMITS_UPDATER.set(this, 0);