This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4606385f4e3 [fix][client] Fix wrong start message id when it's a
chunked message id (#23713)
4606385f4e3 is described below
commit 4606385f4e30392119b813326d493245a3504aac
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Dec 13 14:38:03 2024 +0800
[fix][client] Fix wrong start message id when it's a chunked message id
(#23713)
---
.../pulsar/client/impl/MessageChunkingTest.java | 25 ++++++++++++++--------
.../apache/pulsar/client/impl/ConsumerImpl.java | 8 ++++++-
2 files changed, 23 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 8df5a38bb46..18ba6a5ab5b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -561,8 +561,12 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
clientBuilder.memoryLimit(10000L, SizeUnit.BYTES);
}
+ interface ThrowingBiConsumer<T, U> {
+ void accept(T t, U u) throws Exception;
+ }
+
@Test
- public void testSeekChunkMessages() throws PulsarClientException {
+ public void testSeekChunkMessages() throws Exception {
log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(50);
final int totalMessages = 5;
@@ -612,14 +616,17 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
assertEquals(msgIds.get(i), msgAfterSeek.getMessageId());
}
- Reader<byte[]> reader = pulsarClient.newReader()
- .topic(topicName)
- .startMessageIdInclusive()
- .startMessageId(msgIds.get(1))
- .create();
-
- Message<byte[]> readMsg = reader.readNext(5, TimeUnit.SECONDS);
- assertEquals(msgIds.get(1), readMsg.getMessageId());
+ ThrowingBiConsumer<Boolean, MessageId> assertStartMessageId =
(inclusive, expectedFirstMsgId) -> {
+ final var builder =
pulsarClient.newReader().topic(topicName).startMessageId(msgIds.get(1));
+ if (inclusive) {
+ builder.startMessageIdInclusive();
+ }
+ @Cleanup final var reader = builder.create();
+ final var readMsg = reader.readNext(5, TimeUnit.SECONDS);
+ assertEquals(expectedFirstMsgId, readMsg.getMessageId());
+ };
+ assertStartMessageId.accept(true, msgIds.get(1));
+ assertStartMessageId.accept(false, msgIds.get(2));
consumer1.close();
consumer2.close();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index d2753856264..e01c6d4643b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -298,7 +298,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
this.subscriptionMode = conf.getSubscriptionMode();
if (startMessageId != null) {
MessageIdAdv firstChunkMessageId = ((MessageIdAdv)
startMessageId).getFirstChunkMessageId();
- this.startMessageId = (firstChunkMessageId == null) ?
(MessageIdAdv) startMessageId : firstChunkMessageId;
+ if (conf.isResetIncludeHead() && firstChunkMessageId != null) {
+ // The chunk message id's ledger id and entry id are the last
chunk's ledger id and entry id, when
+ // startMessageIdInclusive() is enabled, we need to start from
the first chunk's message id
+ this.startMessageId = firstChunkMessageId;
+ } else {
+ this.startMessageId = (MessageIdAdv) startMessageId;
+ }
}
this.initialStartMessageId = this.startMessageId;
this.startMessageRollbackDurationInSec =
startMessageRollbackDurationInSec;