This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0f15d122fbda0e7e4f8c9dd0ef44f79bb0ba3b3f Author: Baodi Shi <[email protected]> AuthorDate: Thu Apr 28 15:06:11 2022 +0800 [fix][broker] Fix MessageDeduplication#inactiveProducers may not be persistence correctly (#15206) ### Motivation #15204 In the current implementation, When the first time execute `purgeInactiveProducers`, Although the produces does not expire, it removed directly from the collection(464 line). The will result in these producers never being remove. https://github.com/apache/pulsar/blob/9861dfb1208c4b6b8a1f17ef026e9af71c3e784c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java#L454-L472 ### Modifications 1. It is removed from the collection only when the producer is inactive. 2. Take a snapshot after each removal of an inactive producer. When `managedLedger.getLastConfirmedEntry` equals `managedCursor.getMarkDeletedPosition()`, The`deduplication-snapshot-monitor` thread does not trigger a snapshot. The persistence these producers only the next time a message is produced, The can be confusing for users. ``` PositionImpl position = (PositionImpl) managedLedger.getLastConfirmedEntry(); if (position == null) { return; } PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition(); if (markDeletedPosition != null && position.compareTo(markDeletedPosition) <= 0) { return; } ``` (cherry picked from commit 8e1ca487c1026510fee264d65a34067ac427ee9d) --- .../service/persistent/MessageDeduplication.java | 13 +++++++--- .../service/persistent/MessageDuplicationTest.java | 29 ++++++++++++++-------- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 90ee3b67e3c..761a8a65d2a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -35,6 +35,7 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Topic.PublishContext; @@ -401,7 +402,7 @@ public class MessageDeduplication { } } - private void takeSnapshot(PositionImpl position) { + private void takeSnapshot(Position position) { if (log.isDebugEnabled()) { log.debug("[{}] Taking snapshot of sequence ids map", topic.getName()); } @@ -412,7 +413,7 @@ public class MessageDeduplication { } }); - managedCursor.asyncMarkDelete(position, snapshot, new MarkDeleteCallback() { + getManagedCursor().asyncMarkDelete(position, snapshot, new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { if (log.isDebugEnabled()) { @@ -456,19 +457,23 @@ public class MessageDeduplication { .toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes()); Iterator<java.util.Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator(); + boolean hasInactive = false; while (mapIterator.hasNext()) { java.util.Map.Entry<String, Long> entry = mapIterator.next(); String producerName = entry.getKey(); long lastActiveTimestamp = entry.getValue(); - mapIterator.remove(); - if (lastActiveTimestamp < minimumActiveTimestamp) { log.info("[{}] Purging dedup information for producer {}", topic.getName(), producerName); + mapIterator.remove(); highestSequencedPushed.remove(producerName); highestSequencedPersisted.remove(producerName); + hasInactive = true; } } + if (hasInactive) { + takeSnapshot(getManagedCursor().getMarkDeletedPosition()); + } } public long getLastPublishedSequenceId(String producerName) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java index f62a65ad36a..765d7463f98 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java @@ -32,12 +32,12 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; import java.lang.reflect.Field; import java.util.Map; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -167,11 +167,14 @@ public class MessageDuplicationTest { MessageDeduplication messageDeduplication = spyWithClassAndConstructorArgs(MessageDeduplication.class, pulsarService, topic, managedLedger); doReturn(true).when(messageDeduplication).isEnabled(); + ManagedCursor managedCursor = mock(ManagedCursor.class); + doReturn(managedCursor).when(messageDeduplication).getManagedCursor(); + Topic.PublishContext publishContext = mock(Topic.PublishContext.class); Field field = MessageDeduplication.class.getDeclaredField("inactiveProducers"); field.setAccessible(true); - Map<String, Long> map = (Map<String, Long>) field.get(messageDeduplication); + Map<String, Long> inactiveProducers = (Map<String, Long>) field.get(messageDeduplication); String producerName1 = "test1"; when(publishContext.getHighestSequenceId()).thenReturn(2L); @@ -187,18 +190,23 @@ public class MessageDuplicationTest { when(publishContext.getProducerName()).thenReturn(producerName3); messageDeduplication.isDuplicate(publishContext, null); + // All 3 are added to the inactiveProducers list messageDeduplication.producerRemoved(producerName1); - assertTrue(map.containsKey(producerName1)); - messageDeduplication.producerAdded(producerName1); - assertFalse(map.containsKey(producerName1)); + messageDeduplication.producerRemoved(producerName2); + messageDeduplication.producerRemoved(producerName3); + + // Try first purgeInactive, all producer not inactive. messageDeduplication.purgeInactiveProducers(); + assertEquals(inactiveProducers.size(), 3); + + // Modify the inactive time of produce2 and produce3 // messageDeduplication.purgeInactiveProducers() will remove producer2 and producer3 - map.put(producerName2, System.currentTimeMillis() - 70000); - map.put(producerName3, System.currentTimeMillis() - 70000); + inactiveProducers.put(producerName2, System.currentTimeMillis() - 70000); + inactiveProducers.put(producerName3, System.currentTimeMillis() - 70000); + // Try second purgeInactive, produce2 and produce3 is inactive. messageDeduplication.purgeInactiveProducers(); - assertFalse(map.containsKey(producerName2)); - assertFalse(map.containsKey(producerName3)); - + assertFalse(inactiveProducers.containsKey(producerName2)); + assertFalse(inactiveProducers.containsKey(producerName3)); field = MessageDeduplication.class.getDeclaredField("highestSequencedPushed"); field.setAccessible(true); ConcurrentOpenHashMap<String, Long> highestSequencedPushed = (ConcurrentOpenHashMap<String, Long>) field.get(messageDeduplication); @@ -206,7 +214,6 @@ public class MessageDuplicationTest { assertEquals((long) highestSequencedPushed.get(producerName1), 2L); assertFalse(highestSequencedPushed.containsKey(producerName2)); assertFalse(highestSequencedPushed.containsKey(producerName3)); - } @Test
