This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9994614 [Broker] Synchronize updates to the inactiveProducers map in
MessageDeduplication (#12820)
9994614 is described below
commit 9994614173205abd075fcc670396cebd71227047
Author: Michael Marshall <[email protected]>
AuthorDate: Mon Nov 15 15:02:36 2021 -0600
[Broker] Synchronize updates to the inactiveProducers map in
MessageDeduplication (#12820)
---
.../apache/pulsar/broker/service/persistent/MessageDeduplication.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index b2c42b0..e2436bb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -138,7 +138,7 @@ public class MessageDeduplication {
private CompletableFuture<Void> recoverSequenceIdsMap() {
// Load the sequence ids from the snapshot in the cursor properties
managedCursor.getProperties().forEach((k, v) -> {
- inactiveProducers.put(k, System.currentTimeMillis());
+ producerRemoved(k);
highestSequencedPushed.put(k, v);
highestSequencedPersisted.put(k, v);
});
@@ -169,7 +169,7 @@ public class MessageDeduplication {
long sequenceId = Math.max(md.getHighestSequenceId(),
md.getSequenceId());
highestSequencedPushed.put(producerName, sequenceId);
highestSequencedPersisted.put(producerName, sequenceId);
- inactiveProducers.put(producerName,
System.currentTimeMillis());
+ producerRemoved(producerName);
entry.release();
}