This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 536c89198249f3f55136b0385b26dc19af4107c6 Author: Jiwei Guo <[email protected]> AuthorDate: Tue Apr 26 11:50:15 2022 +0800 [fix][client] Fix negative ack not redelivery. (#15312) (cherry picked from commit 9f6532a43eff5021896ed2fd8e3a771ce4d8cc7b) --- .../pulsar/client/impl/NegativeAcksTest.java | 3 +++ .../pulsar/client/impl/NegativeAcksTracker.java | 28 ++++++---------------- 2 files changed, 10 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 8ff339fed07..ba35529d024 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -106,6 +106,9 @@ public class NegativeAcksTest extends ProducerConsumerBase { log.info("Test negative acks batching={} partitions={} subType={} negAckDelayMs={}", batching, usePartitions, subscriptionType, negAcksDelayMillis); String topic = BrokerTestUtil.newUniqueName("testNegativeAcks"); + if (usePartitions) { + admin.topics().createPartitionedTopic(topic, 2); + } @Cleanup Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index 17238ece38e..6273f4d582e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -85,29 +85,10 @@ class NegativeAcksTracker implements Closeable { } public synchronized void add(MessageId messageId) { - if (messageId instanceof BatchMessageIdImpl) { - BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; - messageId = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(), - batchMessageId.getPartitionIndex()); - } - - if (nackedMessages == null) { - nackedMessages = new HashMap<>(); - } - nackedMessages.put(messageId, System.nanoTime() + nackDelayNanos); - - if (this.timeout == null) { - // Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for - // nack immediately following the current one will be batched into the same redeliver request. - this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS); - } + add(messageId, 0); } public synchronized void add(Message<?> message) { - if (negativeAckRedeliveryBackoff == null) { - add(message.getMessageId()); - return; - } add(message.getMessageId(), message.getRedeliveryCount()); } @@ -127,7 +108,12 @@ class NegativeAcksTracker implements Closeable { nackedMessages = new HashMap<>(); } - long backoffNs = TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount)); + long backoffNs; + if (negativeAckRedeliveryBackoff != null) { + backoffNs = TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount)); + } else { + backoffNs = nackDelayNanos; + } nackedMessages.put(messageId, System.nanoTime() + backoffNs); if (this.timeout == null) {
