This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fd35094990 [fix][broker] Revert 5895: fix redeliveryCount (#17060)
2fd35094990 is described below

commit 2fd350949906fb312f6a9c6ec790100173727674
Author: Michael Marshall <[email protected]>
AuthorDate: Wed Aug 24 10:04:39 2022 -0700

    [fix][broker] Revert 5895: fix redeliveryCount (#17060)
    
    Reverts: #5881
    
    ### Motivation
    
    The `redeliveryCount` was introduced in [PIP 
22](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) 
with this PR https://github.com/apache/pulsar/pull/2508. It is an extra field 
on a message that indicates how many times a message has been redelivered. In 
the original design, it was only incremented for shared subscriptions when the 
consumer sent `REDELIVER_UNACKNOWLEDGED_MESSAGES` to the broker.
    
    In #5881, this field's logic changed so that it is incremented each time a 
broker delivers a message to a consumer (after the initial delivery). The 
problem with this logic is that it counts messages that are sent to a 
consumer's `receiveQueue`, but not actually received by the client application, 
as "delivered" messages. This is especially problematic for the DLQ 
implementation because it relies on the counter to track deliveries, and this 
eager incrementing of the `redeliveryCount`  [...]
    
    This PR returns the broker's behavior to the original state before #5881.
    
    Note that the DLQ logic is only triggered by messages that hit their ack 
timeout or are negatively acknowledged. This means that in some cases, a 
message could be delivered many times to a `receiveQueue` and once to the 
application and then sent to the DLQ. Given that our DLQ implementation has an 
intentional preference towards over delivery instead of under delivery, I think 
this logic should be fixed.
    
    One of the consequences of this PR is that the message filter logic for 
redelivering messages triggers this logic for incrementing `redeliveryCount`. 
See this code here: 
https://github.com/apache/pulsar/blob/b1a29b520d34d60e60160e3a7b9b0e26926063ee/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L198-L206
    
    I'll need feedback from someone more familiar with message filtering to 
understand if this is a problematic change. If it is, I think we might need to 
revisit the logic in `filterEntriesForConsumer`.
    
    ### Modifications
    
    * Revert the relevant changes from #5895. I kept the test that was added in 
the PR and modified the assertion.
    * Fix test assertion ordering and modify expected value to align with new 
paradigm.
    
    ### Verifying this change
    
    This change includes modifications to tests as well as existing test 
coverage.
    
    ### Does this pull request potentially affect one of the following parts:
    
    This change is a break in current behavior, so I will send an email to the 
dev mailing list: 
https://lists.apache.org/thread/ts9d6zbtlz3y5xtv7p0c3dslk0vljpj2.
    
    ### Documentation
    
    - [x] `doc-not-needed`
---
 .../pulsar/broker/service/InMemoryRedeliveryTracker.java     | 12 ------------
 .../pulsar/broker/service/PulsarCommandSenderImpl.java       |  7 ++-----
 .../org/apache/pulsar/broker/service/RedeliveryTracker.java  |  4 ----
 .../pulsar/broker/service/RedeliveryTrackerDisabled.java     | 10 ----------
 .../persistent/PersistentDispatcherMultipleConsumers.java    | 10 +++++-----
 .../persistent/PersistentDispatcherSingleActiveConsumer.java |  1 -
 .../pulsar/client/api/ExposeMessageRedeliveryCountTest.java  |  8 ++++----
 7 files changed, 11 insertions(+), 41 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
index 92df7b3e760..19e425799d6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
@@ -61,16 +61,4 @@ public class InMemoryRedeliveryTracker implements 
RedeliveryTracker {
     public void clear() {
         trackerCache.clear();
     }
-
-    @Override
-    public boolean contains(Position position) {
-        PositionImpl positionImpl = (PositionImpl) position;
-        return trackerCache.containsKey(positionImpl.getLedgerId(), 
positionImpl.getEntryId());
-    }
-
-    @Override
-    public void addIfAbsent(Position position) {
-        PositionImpl positionImpl = (PositionImpl) position;
-        trackerCache.putIfAbsent(positionImpl.getLedgerId(), 
positionImpl.getEntryId(), 0, 0L);
-    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 8eb00b6c295..543739bae27 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -267,11 +267,8 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
                             topicName, subscription,  consumerId, 
entry.getLedgerId(), entry.getEntryId(), batchSize);
                 }
 
-                int redeliveryCount = 0;
-                PositionImpl position = PositionImpl.get(entry.getLedgerId(), 
entry.getEntryId());
-                if (redeliveryTracker.contains(position)) {
-                    redeliveryCount = 
redeliveryTracker.incrementAndGetRedeliveryCount(position);
-                }
+                int redeliveryCount = redeliveryTracker
+                        
.getRedeliveryCount(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
 
                 ctx.write(
                         cnx.newMessageAndIntercept(consumerId, 
entry.getLedgerId(), entry.getEntryId(), partitionIdx,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
index cedd50626c5..8fffaa4890d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
@@ -32,8 +32,4 @@ public interface RedeliveryTracker {
     void removeBatch(List<Position> positions);
 
     void clear();
-
-    boolean contains(Position position);
-
-    void addIfAbsent(Position position);
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
index 483b1cd1dd5..2c119fe3714 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
@@ -51,14 +51,4 @@ public class RedeliveryTrackerDisabled implements 
RedeliveryTracker {
     public void clear() {
         // no-op
     }
-
-    @Override
-    public boolean contains(Position position) {
-        return false;
-    }
-
-    @Override
-    public void addIfAbsent(Position position) {
-        // no-op
-    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 94cee65f912..a302eb8d2a5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -202,9 +202,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                     log.debug("[{}] Consumer are left, reading more entries", 
name);
                 }
                 consumer.getPendingAcks().forEach((ledgerId, entryId, 
batchSize, stickyKeyHash) -> {
-                    if (addMessageToReplay(ledgerId, entryId, stickyKeyHash)) {
-                        
redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId));
-                    }
+                    addMessageToReplay(ledgerId, entryId, stickyKeyHash);
                 });
                 totalAvailablePermits -= consumer.getAvailablePermits();
                 if (log.isDebugEnabled()) {
@@ -894,7 +892,9 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     @Override
     public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer, long consumerEpoch) {
         consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, 
stickyKeyHash) -> {
-            addMessageToReplay(ledgerId, entryId, stickyKeyHash);
+            if (addMessageToReplay(ledgerId, entryId, stickyKeyHash)) {
+                
redeliveryTracker.incrementAndGetRedeliveryCount((PositionImpl.get(ledgerId, 
entryId)));
+            }
         });
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] Redelivering unacknowledged messages for 
consumer {}", name, consumer,
@@ -909,7 +909,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
             // TODO: We want to pass a sticky key hash as a third argument to 
guarantee the order of the messages
             // on Key_Shared subscription, but it's difficult to get the 
sticky key here
             if (addMessageToReplay(position.getLedgerId(), 
position.getEntryId())) {
-                redeliveryTracker.addIfAbsent(position);
+                redeliveryTracker.incrementAndGetRedeliveryCount(position);
             }
         });
         if (log.isDebugEnabled()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index c9d810357c2..47830e669af 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -322,7 +322,6 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
     @Override
     public void redeliverUnacknowledgedMessages(Consumer consumer, 
List<PositionImpl> positions) {
         // We cannot redeliver single messages to single consumers to preserve 
ordering.
-        positions.forEach(redeliveryTracker::addIfAbsent);
         redeliverUnacknowledgedMessages(consumer, DEFAULT_CONSUMER_EPOCH);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java
index ce3bbdc3c39..dae6124223f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java
@@ -103,11 +103,10 @@ public class ExposeMessageRedeliveryCountTest extends 
ProducerConsumerBase {
 
         do {
             Message<byte[]> message = consumer.receive();
-            message.getProperties();
             final int redeliveryCount = message.getRedeliveryCount();
             if (redeliveryCount > 2) {
                 consumer.acknowledge(message);
-                Assert.assertEquals(3, redeliveryCount);
+                Assert.assertEquals(redeliveryCount, 3);
                 break;
             }
         } while (true);
@@ -165,14 +164,15 @@ public class ExposeMessageRedeliveryCountTest extends 
ProducerConsumerBase {
                 receivedMessagesForConsumer1.add(msg);
             } else {
                 break;
-            }        }
+            }
+        }
 
         Assert.assertEquals(receivedMessagesForConsumer0.size() + 
receivedMessagesForConsumer1.size(), messages);
 
         consumer0.close();
 
         for (int i = 0; i < receivedMessagesForConsumer0.size(); i++) {
-            Assert.assertEquals(consumer1.receive().getRedeliveryCount(), 1);
+            Assert.assertEquals(consumer1.receive().getRedeliveryCount(), 0);
         }
 
     }

Reply via email to