This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2fd35094990 [fix][broker] Revert 5895: fix redeliveryCount (#17060)
2fd35094990 is described below
commit 2fd350949906fb312f6a9c6ec790100173727674
Author: Michael Marshall <[email protected]>
AuthorDate: Wed Aug 24 10:04:39 2022 -0700
[fix][broker] Revert 5895: fix redeliveryCount (#17060)
Reverts: #5881
### Motivation
The `redeliveryCount` was introduced in [PIP
22](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic)
with this PR https://github.com/apache/pulsar/pull/2508. It is an extra field
on a message that indicates how many times a message has been redelivered. In
the original design, it was only incremented for shared subscriptions when the
consumer sent `REDELIVER_UNACKNOWLEDGED_MESSAGES` to the broker.
In #5881, this field's logic changed so that it is incremented each time a
broker delivers a message to a consumer (after the initial delivery). The
problem with this logic is that it counts messages that are sent to a
consumer's `receiveQueue`, but not actually received by the client application,
as "delivered" messages. This is especially problematic for the DLQ
implementation because it relies on the counter to track deliveries, and this
eager incrementing of the `redeliveryCount` [...]
This PR returns the broker's behavior to the original state before #5881.
Note that the DLQ logic is only triggered by messages that hit their ack
timeout or are negatively acknowledged. This means that in some cases, a
message could be delivered many times to a `receiveQueue` and once to the
application and then sent to the DLQ. Given that our DLQ implementation has an
intentional preference towards over delivery instead of under delivery, I think
this logic should be fixed.
One of the consequences of this PR is that the message filter logic for
redelivering messages triggers this logic for incrementing `redeliveryCount`.
See this code here:
https://github.com/apache/pulsar/blob/b1a29b520d34d60e60160e3a7b9b0e26926063ee/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L198-L206
I'll need feedback from someone more familiar with message filtering to
understand if this is a problematic change. If it is, I think we might need to
revisit the logic in `filterEntriesForConsumer`.
### Modifications
* Revert the relevant changes from #5895. I kept the test that was added in
the PR and modified the assertion.
* Fix test assertion ordering and modify expected value to align with new
paradigm.
### Verifying this change
This change includes modifications to tests as well as existing test
coverage.
### Does this pull request potentially affect one of the following parts:
This change is a break in current behavior, so I will send an email to the
dev mailing list:
https://lists.apache.org/thread/ts9d6zbtlz3y5xtv7p0c3dslk0vljpj2.
### Documentation
- [x] `doc-not-needed`
---
.../pulsar/broker/service/InMemoryRedeliveryTracker.java | 12 ------------
.../pulsar/broker/service/PulsarCommandSenderImpl.java | 7 ++-----
.../org/apache/pulsar/broker/service/RedeliveryTracker.java | 4 ----
.../pulsar/broker/service/RedeliveryTrackerDisabled.java | 10 ----------
.../persistent/PersistentDispatcherMultipleConsumers.java | 10 +++++-----
.../persistent/PersistentDispatcherSingleActiveConsumer.java | 1 -
.../pulsar/client/api/ExposeMessageRedeliveryCountTest.java | 8 ++++----
7 files changed, 11 insertions(+), 41 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
index 92df7b3e760..19e425799d6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
@@ -61,16 +61,4 @@ public class InMemoryRedeliveryTracker implements
RedeliveryTracker {
public void clear() {
trackerCache.clear();
}
-
- @Override
- public boolean contains(Position position) {
- PositionImpl positionImpl = (PositionImpl) position;
- return trackerCache.containsKey(positionImpl.getLedgerId(),
positionImpl.getEntryId());
- }
-
- @Override
- public void addIfAbsent(Position position) {
- PositionImpl positionImpl = (PositionImpl) position;
- trackerCache.putIfAbsent(positionImpl.getLedgerId(),
positionImpl.getEntryId(), 0, 0L);
- }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 8eb00b6c295..543739bae27 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -267,11 +267,8 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
topicName, subscription, consumerId,
entry.getLedgerId(), entry.getEntryId(), batchSize);
}
- int redeliveryCount = 0;
- PositionImpl position = PositionImpl.get(entry.getLedgerId(),
entry.getEntryId());
- if (redeliveryTracker.contains(position)) {
- redeliveryCount =
redeliveryTracker.incrementAndGetRedeliveryCount(position);
- }
+ int redeliveryCount = redeliveryTracker
+
.getRedeliveryCount(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
ctx.write(
cnx.newMessageAndIntercept(consumerId,
entry.getLedgerId(), entry.getEntryId(), partitionIdx,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
index cedd50626c5..8fffaa4890d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
@@ -32,8 +32,4 @@ public interface RedeliveryTracker {
void removeBatch(List<Position> positions);
void clear();
-
- boolean contains(Position position);
-
- void addIfAbsent(Position position);
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
index 483b1cd1dd5..2c119fe3714 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
@@ -51,14 +51,4 @@ public class RedeliveryTrackerDisabled implements
RedeliveryTracker {
public void clear() {
// no-op
}
-
- @Override
- public boolean contains(Position position) {
- return false;
- }
-
- @Override
- public void addIfAbsent(Position position) {
- // no-op
- }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 94cee65f912..a302eb8d2a5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -202,9 +202,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
log.debug("[{}] Consumer are left, reading more entries",
name);
}
consumer.getPendingAcks().forEach((ledgerId, entryId,
batchSize, stickyKeyHash) -> {
- if (addMessageToReplay(ledgerId, entryId, stickyKeyHash)) {
-
redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId));
- }
+ addMessageToReplay(ledgerId, entryId, stickyKeyHash);
});
totalAvailablePermits -= consumer.getAvailablePermits();
if (log.isDebugEnabled()) {
@@ -894,7 +892,9 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer
consumer, long consumerEpoch) {
consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize,
stickyKeyHash) -> {
- addMessageToReplay(ledgerId, entryId, stickyKeyHash);
+ if (addMessageToReplay(ledgerId, entryId, stickyKeyHash)) {
+
redeliveryTracker.incrementAndGetRedeliveryCount((PositionImpl.get(ledgerId,
entryId)));
+ }
});
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Redelivering unacknowledged messages for
consumer {}", name, consumer,
@@ -909,7 +909,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
// TODO: We want to pass a sticky key hash as a third argument to
guarantee the order of the messages
// on Key_Shared subscription, but it's difficult to get the
sticky key here
if (addMessageToReplay(position.getLedgerId(),
position.getEntryId())) {
- redeliveryTracker.addIfAbsent(position);
+ redeliveryTracker.incrementAndGetRedeliveryCount(position);
}
});
if (log.isDebugEnabled()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index c9d810357c2..47830e669af 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -322,7 +322,6 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
@Override
public void redeliverUnacknowledgedMessages(Consumer consumer,
List<PositionImpl> positions) {
// We cannot redeliver single messages to single consumers to preserve
ordering.
- positions.forEach(redeliveryTracker::addIfAbsent);
redeliverUnacknowledgedMessages(consumer, DEFAULT_CONSUMER_EPOCH);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java
index ce3bbdc3c39..dae6124223f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java
@@ -103,11 +103,10 @@ public class ExposeMessageRedeliveryCountTest extends
ProducerConsumerBase {
do {
Message<byte[]> message = consumer.receive();
- message.getProperties();
final int redeliveryCount = message.getRedeliveryCount();
if (redeliveryCount > 2) {
consumer.acknowledge(message);
- Assert.assertEquals(3, redeliveryCount);
+ Assert.assertEquals(redeliveryCount, 3);
break;
}
} while (true);
@@ -165,14 +164,15 @@ public class ExposeMessageRedeliveryCountTest extends
ProducerConsumerBase {
receivedMessagesForConsumer1.add(msg);
} else {
break;
- } }
+ }
+ }
Assert.assertEquals(receivedMessagesForConsumer0.size() +
receivedMessagesForConsumer1.size(), messages);
consumer0.close();
for (int i = 0; i < receivedMessagesForConsumer0.size(); i++) {
- Assert.assertEquals(consumer1.receive().getRedeliveryCount(), 1);
+ Assert.assertEquals(consumer1.receive().getRedeliveryCount(), 0);
}
}