This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0b8d24917e692d714e47dd93c914b8a62530d54a
Author: Zixuan Liu <node...@gmail.com>
AuthorDate: Fri Feb 28 23:20:03 2025 +0800

    [fix][broker] Add expire check for replicator (#23975)
    
    Signed-off-by: Zixuan Liu <node...@gmail.com>
    (cherry picked from commit d0025e79e9b866b9d2b5024f16e4d631e744ecd9)
---
 .../broker/service/persistent/GeoPersistentReplicator.java | 12 ++++++++++++
 .../pulsar/broker/service/persistent/PersistentTopic.java  |  4 ++++
 .../pulsar/broker/service/persistent/ShadowReplicator.java | 12 ++++++++++++
 .../org/apache/pulsar/broker/service/ReplicatorTest.java   | 14 +++++++-------
 4 files changed, 35 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
index c81ebe8d6ff..bc480635bab 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
@@ -152,6 +152,18 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
                     continue;
                 }
 
+                if (msg.isExpired(messageTTLInSeconds)) {
+                    msgExpired.recordEvent(0 /* no value stat */);
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Discarding expired message at position 
{}, replicateTo {}",
+                                replicatorId, entry.getPosition(), 
msg.getReplicateTo());
+                    }
+                    cursor.asyncDelete(entry.getPosition(), this, 
entry.getPosition());
+                    entry.release();
+                    msg.recycle();
+                    continue;
+                }
+
                 if (STATE_UPDATER.get(this) != State.Started || 
isLocalMessageSkippedOnce) {
                     // The producer is not ready yet after having 
stopped/restarted. Drop the message because it will
                     // recovered when the producer is ready
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 084bbc9b063..34c8b535b2d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2006,6 +2006,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                    sub.expireMessages(messageTtlInSeconds);
                 }
             });
+            replicators.forEach((__, replicator)
+                    -> ((PersistentReplicator) 
replicator).expireMessages(messageTtlInSeconds));
+            shadowReplicators.forEach((__, replicator)
+                    -> ((PersistentReplicator) 
replicator).expireMessages(messageTtlInSeconds));
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
index f8a602f68b9..cb2e0457e36 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
@@ -77,6 +77,18 @@ public class ShadowReplicator extends PersistentReplicator {
                     continue;
                 }
 
+                if (msg.isExpired(messageTTLInSeconds)) {
+                    msgExpired.recordEvent(0 /* no value stat */);
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Discarding expired message at position 
{}, replicateTo {}",
+                                replicatorId, entry.getPosition(), 
msg.getReplicateTo());
+                    }
+                    cursor.asyncDelete(entry.getPosition(), this, 
entry.getPosition());
+                    entry.release();
+                    msg.recycle();
+                    continue;
+                }
+
                 if (STATE_UPDATER.get(this) != State.Started || 
isLocalMessageSkippedOnce) {
                     // The producer is not ready yet after having 
stopped/restarted. Drop the message because it will
                     // recovered when the producer is ready
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index bf2276fdf41..d1d7358f346 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -1781,36 +1781,36 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         @Cleanup
         Producer<byte[]> persistentProducer1 = 
client1.newProducer().topic(topic.toString()).create();
+        // Send V1 message, which will be replicated to the remote cluster by 
the replicator.
         persistentProducer1.send("V1".getBytes());
-
         waitReplicateFinish(topic, admin1);
 
+        // Pause replicator
         PersistentTopic persistentTopic =
                 (PersistentTopic) 
pulsar1.getBrokerService().getTopicReference(topic.toString()).get();
         persistentTopic.getReplicators().forEach((cluster, replicator) -> {
             PersistentReplicator persistentReplicator = (PersistentReplicator) 
replicator;
-            // Pause replicator
             pauseReplicator(persistentReplicator);
         });
 
+        // Send V2 and V3 messages, then let them expire. These messages will 
not be replicated to the remote cluster.
         persistentProducer1.send("V2".getBytes());
         persistentProducer1.send("V3".getBytes());
-
         Thread.sleep(1000);
-
         admin1.topics().expireMessagesForAllSubscriptions(topic.toString(), 1);
 
+        // Start replicator
         persistentTopic.getReplicators().forEach((cluster, replicator) -> {
             PersistentReplicator persistentReplicator = (PersistentReplicator) 
replicator;
             persistentReplicator.startProducer();
         });
-
         waitReplicateFinish(topic, admin1);
 
+        // Send V4 message, which will be replicated to the remote cluster.
         persistentProducer1.send("V4".getBytes());
-
         waitReplicateFinish(topic, admin1);
 
+        // Receive messages from the remote cluster: only V1 and V4 messages 
should be received.
         @Cleanup
         PulsarClient client2 = 
PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, 
TimeUnit.SECONDS)
                 .build();
@@ -1828,7 +1828,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
             result.add(new String(receive.getValue()));
         }
 
-        assertEquals(result, Lists.newArrayList("V1", "V2", "V3", "V4"));
+        assertEquals(result, Lists.newArrayList("V1", "V4"));
     }
 
     @Test

Reply via email to