This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0b8d24917e692d714e47dd93c914b8a62530d54a Author: Zixuan Liu <node...@gmail.com> AuthorDate: Fri Feb 28 23:20:03 2025 +0800 [fix][broker] Add expire check for replicator (#23975) Signed-off-by: Zixuan Liu <node...@gmail.com> (cherry picked from commit d0025e79e9b866b9d2b5024f16e4d631e744ecd9) --- .../broker/service/persistent/GeoPersistentReplicator.java | 12 ++++++++++++ .../pulsar/broker/service/persistent/PersistentTopic.java | 4 ++++ .../pulsar/broker/service/persistent/ShadowReplicator.java | 12 ++++++++++++ .../org/apache/pulsar/broker/service/ReplicatorTest.java | 14 +++++++------- 4 files changed, 35 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java index c81ebe8d6ff..bc480635bab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java @@ -152,6 +152,18 @@ public class GeoPersistentReplicator extends PersistentReplicator { continue; } + if (msg.isExpired(messageTTLInSeconds)) { + msgExpired.recordEvent(0 /* no value stat */); + if (log.isDebugEnabled()) { + log.debug("[{}] Discarding expired message at position {}, replicateTo {}", + replicatorId, entry.getPosition(), msg.getReplicateTo()); + } + cursor.asyncDelete(entry.getPosition(), this, entry.getPosition()); + entry.release(); + msg.recycle(); + continue; + } + if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) { // The producer is not ready yet after having stopped/restarted. Drop the message because it will // recovered when the producer is ready diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 084bbc9b063..34c8b535b2d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2006,6 +2006,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal sub.expireMessages(messageTtlInSeconds); } }); + replicators.forEach((__, replicator) + -> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds)); + shadowReplicators.forEach((__, replicator) + -> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds)); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java index f8a602f68b9..cb2e0457e36 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java @@ -77,6 +77,18 @@ public class ShadowReplicator extends PersistentReplicator { continue; } + if (msg.isExpired(messageTTLInSeconds)) { + msgExpired.recordEvent(0 /* no value stat */); + if (log.isDebugEnabled()) { + log.debug("[{}] Discarding expired message at position {}, replicateTo {}", + replicatorId, entry.getPosition(), msg.getReplicateTo()); + } + cursor.asyncDelete(entry.getPosition(), this, entry.getPosition()); + entry.release(); + msg.recycle(); + continue; + } + if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) { // The producer is not ready yet after having stopped/restarted. Drop the message because it will // recovered when the producer is ready diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index bf2276fdf41..d1d7358f346 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -1781,36 +1781,36 @@ public class ReplicatorTest extends ReplicatorTestBase { @Cleanup Producer<byte[]> persistentProducer1 = client1.newProducer().topic(topic.toString()).create(); + // Send V1 message, which will be replicated to the remote cluster by the replicator. persistentProducer1.send("V1".getBytes()); - waitReplicateFinish(topic, admin1); + // Pause replicator PersistentTopic persistentTopic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topic.toString()).get(); persistentTopic.getReplicators().forEach((cluster, replicator) -> { PersistentReplicator persistentReplicator = (PersistentReplicator) replicator; - // Pause replicator pauseReplicator(persistentReplicator); }); + // Send V2 and V3 messages, then let them expire. These messages will not be replicated to the remote cluster. persistentProducer1.send("V2".getBytes()); persistentProducer1.send("V3".getBytes()); - Thread.sleep(1000); - admin1.topics().expireMessagesForAllSubscriptions(topic.toString(), 1); + // Start replicator persistentTopic.getReplicators().forEach((cluster, replicator) -> { PersistentReplicator persistentReplicator = (PersistentReplicator) replicator; persistentReplicator.startProducer(); }); - waitReplicateFinish(topic, admin1); + // Send V4 message, which will be replicated to the remote cluster. persistentProducer1.send("V4".getBytes()); - waitReplicateFinish(topic, admin1); + // Receive messages from the remote cluster: only V1 and V4 messages should be received. @Cleanup PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) .build(); @@ -1828,7 +1828,7 @@ public class ReplicatorTest extends ReplicatorTestBase { result.add(new String(receive.getValue())); } - assertEquals(result, Lists.newArrayList("V1", "V2", "V3", "V4")); + assertEquals(result, Lists.newArrayList("V1", "V4")); } @Test