This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new d9029c67dea [fix][broker] Fix compaction/replication data loss when 
expire messages (#21865)
d9029c67dea is described below

commit d9029c67dea620541d35920549a52dc68381e088
Author: Cong Zhao <zhaoc...@apache.org>
AuthorDate: Thu Jan 11 09:06:48 2024 +0800

    [fix][broker] Fix compaction/replication data loss when expire messages 
(#21865)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  8 +--
 .../persistent/GeoPersistentReplicator.java        | 12 ----
 .../persistent/PersistentMessageExpiryMonitor.java |  3 +-
 .../broker/service/persistent/PersistentTopic.java | 12 ++--
 .../service/persistent/ShadowReplicator.java       | 12 ----
 .../pulsar/broker/service/ReplicatorTest.java      | 71 ++++++++++++++++++++++
 .../apache/pulsar/compaction/CompactionTest.java   | 64 +++++++++++++++++++
 7 files changed, 147 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index aeaad3e4424..8e7bf71b222 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -147,6 +147,7 @@ import 
org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.slf4j.Logger;
@@ -2097,10 +2098,9 @@ public class PersistentTopicsBase extends AdminResource {
                     final List<CompletableFuture<Void>> futures =
                             new ArrayList<>((int) 
topic.getReplicators().size());
                     List<String> subNames =
-                            new ArrayList<>((int) topic.getReplicators().size()
-                                    + (int) topic.getSubscriptions().size());
-                    subNames.addAll(topic.getReplicators().keys());
-                    subNames.addAll(topic.getSubscriptions().keys());
+                            new ArrayList<>((int) 
topic.getSubscriptions().size());
+                    
subNames.addAll(topic.getSubscriptions().keys().stream().filter(
+                            subName -> 
!subName.equals(Compactor.COMPACTION_SUBSCRIPTION)).toList());
                     for (int i = 0; i < subNames.size(); i++) {
                         try {
                             
futures.add(internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
index b8287dd2c14..082dfed10c6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
@@ -123,18 +123,6 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
                     continue;
                 }
 
-                if (msg.isExpired(messageTTLInSeconds)) {
-                    msgExpired.recordEvent(0 /* no value stat */);
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] Discarding expired message at position 
{}, replicateTo {}",
-                                replicatorId, entry.getPosition(), 
msg.getReplicateTo());
-                    }
-                    cursor.asyncDelete(entry.getPosition(), this, 
entry.getPosition());
-                    entry.release();
-                    msg.recycle();
-                    continue;
-                }
-
                 if (STATE_UPDATER.get(this) != State.Started || 
isLocalMessageSkippedOnce) {
                     // The producer is not ready yet after having 
stopped/restarted. Drop the message because it will
                     // recovered when the producer is ready
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 020dc5323e5..978cd3f886f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -177,7 +177,8 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
         if (position != null) {
             log.info("[{}][{}] Expiring all messages until position {}", 
topicName, subName, position);
             Position prevMarkDeletePos = cursor.getMarkDeletedPosition();
-            cursor.asyncMarkDelete(position, markDeleteCallback, 
cursor.getNumberOfEntriesInBacklog(false));
+            cursor.asyncMarkDelete(position, cursor.getProperties(), 
markDeleteCallback,
+                    cursor.getNumberOfEntriesInBacklog(false));
             if (!Objects.equals(cursor.getMarkDeletedPosition(), 
prevMarkDeletePos) && subscription != null) {
                 subscription.updateLastMarkDeleteAdvancedTimestamp();
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f34aee0c842..1619c977d1f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -498,7 +498,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         }
     }
 
-    private static boolean isCompactionSubscription(String subscriptionName) {
+    public static boolean isCompactionSubscription(String subscriptionName) {
         return COMPACTION_SUBSCRIPTION.equals(subscriptionName);
     }
 
@@ -1696,11 +1696,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     public void checkMessageExpiry() {
         int messageTtlInSeconds = topicPolicies.getMessageTTLInSeconds().get();
         if (messageTtlInSeconds != 0) {
-            subscriptions.forEach((__, sub) -> 
sub.expireMessages(messageTtlInSeconds));
-            replicators.forEach((__, replicator)
-                    -> ((PersistentReplicator) 
replicator).expireMessages(messageTtlInSeconds));
-            shadowReplicators.forEach((__, replicator)
-                    -> ((PersistentReplicator) 
replicator).expireMessages(messageTtlInSeconds));
+            subscriptions.forEach((__, sub) -> {
+                if (!isCompactionSubscription(sub.getName())) {
+                   sub.expireMessages(messageTtlInSeconds);
+                }
+            });
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
index fb306348bcd..b48f748bf5d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
@@ -76,18 +76,6 @@ public class ShadowReplicator extends PersistentReplicator {
                     continue;
                 }
 
-                if (msg.isExpired(messageTTLInSeconds)) {
-                    msgExpired.recordEvent(0 /* no value stat */);
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] Discarding expired message at position 
{}, replicateTo {}",
-                                replicatorId, entry.getPosition(), 
msg.getReplicateTo());
-                    }
-                    cursor.asyncDelete(entry.getPosition(), this, 
entry.getPosition());
-                    entry.release();
-                    msg.recycle();
-                    continue;
-                }
-
                 if (STATE_UPDATER.get(this) != State.Started || 
isLocalMessageSkippedOnce) {
                     // The producer is not ready yet after having 
stopped/restarted. Drop the message because it will
                     // recovered when the producer is ready
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index f710c8541d1..88a6f7c9f69 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -83,6 +83,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -1803,4 +1804,74 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         Assert.assertThrows(PulsarClientException.ProducerBusyException.class, 
() -> new MessageProducer(url2, dest2));
     }
+
+    @Test
+    public void testReplicatorWithTTL() throws Exception {
+        log.info("--- Starting ReplicatorTest::testReplicatorWithTTL ---");
+
+        final String cluster1 = pulsar1.getConfig().getClusterName();
+        final String cluster2 = pulsar2.getConfig().getClusterName();
+        final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");
+        final TopicName topic = TopicName
+                .get(BrokerTestUtil.newUniqueName("persistent://" + namespace 
+ "/testReplicatorWithTTL"));
+        admin1.namespaces().createNamespace(namespace, 
Sets.newHashSet(cluster1, cluster2));
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet(cluster1, cluster2));
+        admin1.topics().createNonPartitionedTopic(topic.toString());
+        admin1.topicPolicies().setMessageTTL(topic.toString(), 1);
+
+        @Cleanup
+        PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, 
TimeUnit.SECONDS)
+                .build();
+
+        @Cleanup
+        Producer<byte[]> persistentProducer1 = 
client1.newProducer().topic(topic.toString()).create();
+        persistentProducer1.send("V1".getBytes());
+
+        waitReplicateFinish(topic, admin1);
+
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar1.getBrokerService().getTopicReference(topic.toString()).get();
+        persistentTopic.getReplicators().forEach((cluster, replicator) -> {
+            PersistentReplicator persistentReplicator = (PersistentReplicator) 
replicator;
+            // Pause replicator
+            persistentReplicator.disconnect();
+        });
+
+        persistentProducer1.send("V2".getBytes());
+        persistentProducer1.send("V3".getBytes());
+
+        Thread.sleep(1000);
+
+        admin1.topics().expireMessagesForAllSubscriptions(topic.toString(), 1);
+
+        persistentTopic.getReplicators().forEach((cluster, replicator) -> {
+            PersistentReplicator persistentReplicator = (PersistentReplicator) 
replicator;
+            persistentReplicator.startProducer();
+        });
+
+        waitReplicateFinish(topic, admin1);
+
+        persistentProducer1.send("V4".getBytes());
+
+        waitReplicateFinish(topic, admin1);
+
+        @Cleanup
+        PulsarClient client2 = 
PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, 
TimeUnit.SECONDS)
+                .build();
+
+        @Cleanup
+        Consumer<byte[]> consumer = 
client2.newConsumer().topic(topic.toString())
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName("sub").subscribe();
+
+        List<String> result = new ArrayList<>();
+        while (true) {
+            Message<byte[]> receive = consumer.receive(2, TimeUnit.SECONDS);
+            if (receive == null) {
+                break;
+            }
+            result.add(new String(receive.getValue()));
+        }
+
+        assertEquals(result, Lists.newArrayList("V1", "V2", "V3", "V4"));
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 25fd18228dd..4e82a8958e9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -2119,4 +2119,68 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                 () -> pulsarTestContext.getBookKeeperClient().openLedger(
                         compactedLedgerId.get(), BookKeeper.DigestType.CRC32, 
new byte[]{})));
     }
+
+    @Test
+    public void testCompactionWithTTL() throws Exception {
+        String topicName = 
"persistent://my-property/use/my-ns/testCompactionWithTTL";
+        String subName = "sub";
+        
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).readCompacted(true)
+                .subscribe().close();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topicName).create();
+
+        producer.newMessage().key("K1").value("V1").send();
+        producer.newMessage().key("K2").value("V2").send();
+
+        admin.topics().triggerCompaction(topicName);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(admin.topics().compactionStatus(topicName).status,
+                    LongRunningProcessStatus.Status.SUCCESS);
+        });
+
+        producer.newMessage().key("K1").value("V3").send();
+        producer.newMessage().key("K2").value("V4").send();
+
+        Thread.sleep(1000);
+
+        // expire messages
+        admin.topics().expireMessagesForAllSubscriptions(topicName, 1);
+
+        // trim the topic
+        admin.topics().unload(topicName);
+
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(topicName, false);
+            assertEquals(internalStats.numberOfEntries, 4);
+        });
+
+        producer.newMessage().key("K3").value("V5").send();
+
+        admin.topics().triggerCompaction(topicName);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(admin.topics().compactionStatus(topicName).status,
+                    LongRunningProcessStatus.Status.SUCCESS);
+        });
+
+        @Cleanup
+        Consumer<String> consumer =
+                
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).readCompacted(true)
+                        .subscribe();
+
+        List<String> result = new ArrayList<>();
+        while (true) {
+            Message<String> receive = consumer.receive(2, TimeUnit.SECONDS);
+            if (receive == null) {
+                break;
+            }
+
+            result.add(receive.getValue());
+        }
+
+        Assert.assertEquals(result, List.of("V3", "V4", "V5"));
+    }
 }

Reply via email to