This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4606385f4e3 [fix][client] Fix wrong start message id when it's a 
chunked message id (#23713)
4606385f4e3 is described below

commit 4606385f4e30392119b813326d493245a3504aac
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Dec 13 14:38:03 2024 +0800

    [fix][client] Fix wrong start message id when it's a chunked message id 
(#23713)
---
 .../pulsar/client/impl/MessageChunkingTest.java    | 25 ++++++++++++++--------
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  8 ++++++-
 2 files changed, 23 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 8df5a38bb46..18ba6a5ab5b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -561,8 +561,12 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
         clientBuilder.memoryLimit(10000L, SizeUnit.BYTES);
     }
 
+    interface ThrowingBiConsumer<T, U> {
+        void accept(T t, U u) throws Exception;
+    }
+
     @Test
-    public void testSeekChunkMessages() throws PulsarClientException {
+    public void testSeekChunkMessages() throws Exception {
         log.info("-- Starting {} test --", methodName);
         this.conf.setMaxMessageSize(50);
         final int totalMessages = 5;
@@ -612,14 +616,17 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
             assertEquals(msgIds.get(i), msgAfterSeek.getMessageId());
         }
 
-        Reader<byte[]> reader = pulsarClient.newReader()
-                .topic(topicName)
-                .startMessageIdInclusive()
-                .startMessageId(msgIds.get(1))
-                .create();
-
-        Message<byte[]> readMsg = reader.readNext(5, TimeUnit.SECONDS);
-        assertEquals(msgIds.get(1), readMsg.getMessageId());
+        ThrowingBiConsumer<Boolean, MessageId> assertStartMessageId = 
(inclusive, expectedFirstMsgId) -> {
+            final var builder = 
pulsarClient.newReader().topic(topicName).startMessageId(msgIds.get(1));
+            if (inclusive) {
+                builder.startMessageIdInclusive();
+            }
+            @Cleanup final var reader = builder.create();
+            final var readMsg = reader.readNext(5, TimeUnit.SECONDS);
+            assertEquals(expectedFirstMsgId, readMsg.getMessageId());
+        };
+        assertStartMessageId.accept(true, msgIds.get(1));
+        assertStartMessageId.accept(false, msgIds.get(2));
 
         consumer1.close();
         consumer2.close();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index d2753856264..e01c6d4643b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -298,7 +298,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         this.subscriptionMode = conf.getSubscriptionMode();
         if (startMessageId != null) {
             MessageIdAdv firstChunkMessageId = ((MessageIdAdv) 
startMessageId).getFirstChunkMessageId();
-            this.startMessageId = (firstChunkMessageId == null) ? 
(MessageIdAdv) startMessageId : firstChunkMessageId;
+            if (conf.isResetIncludeHead() && firstChunkMessageId != null) {
+                // The chunk message id's ledger id and entry id are the last 
chunk's ledger id and entry id, when
+                // startMessageIdInclusive() is enabled, we need to start from 
the first chunk's message id
+                this.startMessageId = firstChunkMessageId;
+            } else {
+                this.startMessageId = (MessageIdAdv) startMessageId;
+            }
         }
         this.initialStartMessageId = this.startMessageId;
         this.startMessageRollbackDurationInSec = 
startMessageRollbackDurationInSec;

Reply via email to