This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 421253790eb4a424d2098ccc0c5e8cd9516b7c63 Author: Baodi Shi <[email protected]> AuthorDate: Thu Apr 28 15:06:11 2022 +0800 [fix][broker] Fix MessageDeduplication#inactiveProducers may not be persistence correctly (#15206) In the current implementation, When the first time execute `purgeInactiveProducers`, Although the produces does not expire, it removed directly from the collection(464 line). The will result in these producers never being remove. https://github.com/apache/pulsar/blob/9861dfb1208c4b6b8a1f17ef026e9af71c3e784c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java#L454-L472 1. It is removed from the collection only when the producer is inactive. 2. Take a snapshot after each removal of an inactive producer. When `managedLedger.getLastConfirmedEntry` equals `managedCursor.getMarkDeletedPosition()`, The`deduplication-snapshot-monitor` thread does not trigger a snapshot. The persistence these producers only the next time a message is produced, The can be confusing for users. ``` PositionImpl position = (PositionImpl) managedLedger.getLastConfirmedEntry(); if (position == null) { return; } PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition(); if (markDeletedPosition != null && position.compareTo(markDeletedPosition) <= 0) { return; } ``` (cherry picked from commit 8e1ca487c1026510fee264d65a34067ac427ee9d) --- .../service/persistent/MessageDeduplication.java | 13 +++-- .../service/persistent/MessageDuplicationTest.java | 58 ++++++++++++---------- 2 files changed, 40 insertions(+), 31 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index ff44fa1ed7f..5d0d8f8b1ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -37,6 +37,7 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.admin.AdminResource; @@ -409,7 +410,7 @@ public class MessageDeduplication { } } - private void takeSnapshot(PositionImpl position) { + private void takeSnapshot(Position position) { if (log.isDebugEnabled()) { log.debug("[{}] Taking snapshot of sequence ids map", topic.getName()); } @@ -420,7 +421,7 @@ public class MessageDeduplication { } }); - managedCursor.asyncMarkDelete(position, snapshot, new MarkDeleteCallback() { + getManagedCursor().asyncMarkDelete(position, snapshot, new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { if (log.isDebugEnabled()) { @@ -478,19 +479,23 @@ public class MessageDeduplication { .toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes()); Iterator<java.util.Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator(); + boolean hasInactive = false; while (mapIterator.hasNext()) { java.util.Map.Entry<String, Long> entry = mapIterator.next(); String producerName = entry.getKey(); long lastActiveTimestamp = entry.getValue(); - mapIterator.remove(); - if (lastActiveTimestamp < minimumActiveTimestamp) { log.info("[{}] Purging dedup information for producer {}", topic.getName(), producerName); + mapIterator.remove(); highestSequencedPushed.remove(producerName); highestSequencedPersisted.remove(producerName); + hasInactive = true; } } + if (hasInactive) { + takeSnapshot(getManagedCursor().getMarkDeletedPosition()); + } } public long getLastPublishedSequenceId(String producerName) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java index 56f833f7592..5c2598ceac2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java @@ -19,9 +19,23 @@ package org.apache.pulsar.broker.service.persistent; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; +import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -34,27 +48,10 @@ import org.apache.pulsar.common.protocol.Commands; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; -import org.awaitility.Awaitility; import org.testng.annotations.Test; - import java.lang.reflect.Field; import java.util.Map; -import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; - @Slf4j @Test(groups = "broker") public class MessageDuplicationTest { @@ -170,11 +167,14 @@ public class MessageDuplicationTest { MessageDeduplication messageDeduplication = spyWithClassAndConstructorArgs(MessageDeduplication.class, pulsarService, topic, managedLedger); doReturn(true).when(messageDeduplication).isEnabled(); + ManagedCursor managedCursor = mock(ManagedCursor.class); + doReturn(managedCursor).when(messageDeduplication).getManagedCursor(); + Topic.PublishContext publishContext = mock(Topic.PublishContext.class); Field field = MessageDeduplication.class.getDeclaredField("inactiveProducers"); field.setAccessible(true); - Map<String, Long> map = (Map<String, Long>) field.get(messageDeduplication); + Map<String, Long> inactiveProducers = (Map<String, Long>) field.get(messageDeduplication); String producerName1 = "test1"; when(publishContext.getHighestSequenceId()).thenReturn(2L); @@ -190,18 +190,23 @@ public class MessageDuplicationTest { when(publishContext.getProducerName()).thenReturn(producerName3); messageDeduplication.isDuplicate(publishContext, null); + // All 3 are added to the inactiveProducers list messageDeduplication.producerRemoved(producerName1); - assertTrue(map.containsKey(producerName1)); - messageDeduplication.producerAdded(producerName1); - assertFalse(map.containsKey(producerName1)); + messageDeduplication.producerRemoved(producerName2); + messageDeduplication.producerRemoved(producerName3); + + // Try first purgeInactive, all producer not inactive. messageDeduplication.purgeInactiveProducers(); + assertEquals(inactiveProducers.size(), 3); + + // Modify the inactive time of produce2 and produce3 // messageDeduplication.purgeInactiveProducers() will remove producer2 and producer3 - map.put(producerName2, System.currentTimeMillis() - 70000); - map.put(producerName3, System.currentTimeMillis() - 70000); + inactiveProducers.put(producerName2, System.currentTimeMillis() - 70000); + inactiveProducers.put(producerName3, System.currentTimeMillis() - 70000); + // Try second purgeInactive, produce2 and produce3 is inactive. messageDeduplication.purgeInactiveProducers(); - assertFalse(map.containsKey(producerName2)); - assertFalse(map.containsKey(producerName3)); - + assertFalse(inactiveProducers.containsKey(producerName2)); + assertFalse(inactiveProducers.containsKey(producerName3)); field = MessageDeduplication.class.getDeclaredField("highestSequencedPushed"); field.setAccessible(true); ConcurrentOpenHashMap<String, Long> highestSequencedPushed = (ConcurrentOpenHashMap<String, Long>) field.get(messageDeduplication); @@ -209,7 +214,6 @@ public class MessageDuplicationTest { assertEquals((long) highestSequencedPushed.get(producerName1), 2L); assertFalse(highestSequencedPushed.containsKey(producerName2)); assertFalse(highestSequencedPushed.containsKey(producerName3)); - } @Test
