This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 05ac1f9dda5 [fix][build] Fix compatibility issue introduced by #20750
05ac1f9dda5 is described below

commit 05ac1f9dda575e668627df966504f2ba422354ea
Author: xymeng <[email protected]>
AuthorDate: Tue Jan 23 12:46:02 2024 +0800

    [fix][build] Fix compatibility issue introduced by #20750
---
 .../org/apache/pulsar/client/impl/NegativeAcksTest.java   |  3 ++-
 .../java/org/apache/pulsar/client/impl/ConsumerImpl.java  | 15 ++++++++++++++-
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 3b9a3afcc1f..fdefd31f1c4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.junit.Assert.assertTrue;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import java.util.HashSet;
@@ -143,7 +144,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
             consumer.negativeAcknowledge(msg);
         }
 
-        assertTrue(consumer instanceof ConsumerBase<String>);
+        assertTrue(consumer instanceof ConsumerBase);
         assertEquals(((ConsumerBase<String>) 
consumer).getUnAckedMessageTracker().size(), 0);
 
         Set<String> receivedMessages = new HashSet<>();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 0b785b87452..90ac520dce6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -770,7 +770,20 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         negativeAcksTracker.add(message);
 
         // Ensure the message is not redelivered for ack-timeout, since we did 
receive an "ack"
-        
unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId()));
+        unAckedMessageTracker.remove(discardBatch(message.getMessageId()));
+    }
+
+    static MessageIdImpl discardBatch(MessageId messageId) {
+        if (messageId instanceof ChunkMessageIdImpl) {
+            return (MessageIdImpl) messageId;
+        }
+        MessageIdImpl msgId;
+        if (messageId instanceof TopicMessageIdImpl) {
+           msgId = (MessageIdImpl) ((TopicMessageIdImpl) 
messageId).getInnerMessageId();
+        } else {
+            msgId = (MessageIdImpl) messageId;
+        }
+        return new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), 
msgId.getPartitionIndex());
     }
 
     @Override

Reply via email to