This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 05ac1f9dda5 [fix][build] Fix compatibility issue introduced by #20750
05ac1f9dda5 is described below
commit 05ac1f9dda575e668627df966504f2ba422354ea
Author: xymeng <[email protected]>
AuthorDate: Tue Jan 23 12:46:02 2024 +0800
[fix][build] Fix compatibility issue introduced by #20750
---
.../org/apache/pulsar/client/impl/NegativeAcksTest.java | 3 ++-
.../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 15 ++++++++++++++-
2 files changed, 16 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 3b9a3afcc1f..fdefd31f1c4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import static org.junit.Assert.assertTrue;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import java.util.HashSet;
@@ -143,7 +144,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
consumer.negativeAcknowledge(msg);
}
- assertTrue(consumer instanceof ConsumerBase<String>);
+ assertTrue(consumer instanceof ConsumerBase);
assertEquals(((ConsumerBase<String>)
consumer).getUnAckedMessageTracker().size(), 0);
Set<String> receivedMessages = new HashSet<>();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 0b785b87452..90ac520dce6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -770,7 +770,20 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
negativeAcksTracker.add(message);
// Ensure the message is not redelivered for ack-timeout, since we did
receive an "ack"
-
unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId()));
+ unAckedMessageTracker.remove(discardBatch(message.getMessageId()));
+ }
+
+ static MessageIdImpl discardBatch(MessageId messageId) {
+ if (messageId instanceof ChunkMessageIdImpl) {
+ return (MessageIdImpl) messageId;
+ }
+ MessageIdImpl msgId;
+ if (messageId instanceof TopicMessageIdImpl) {
+ msgId = (MessageIdImpl) ((TopicMessageIdImpl)
messageId).getInnerMessageId();
+ } else {
+ msgId = (MessageIdImpl) messageId;
+ }
+ return new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(),
msgId.getPartitionIndex());
}
@Override