This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f6bc5d71fd9 [fix][broker] Fix compaction/replication data loss when
expire messages (#21865)
f6bc5d71fd9 is described below
commit f6bc5d71fd9c5777ec61608b196c9052831382e5
Author: Cong Zhao <[email protected]>
AuthorDate: Thu Jan 11 09:06:48 2024 +0800
[fix][broker] Fix compaction/replication data loss when expire messages
(#21865)
---
.../broker/admin/impl/PersistentTopicsBase.java | 8 +--
.../persistent/GeoPersistentReplicator.java | 12 ----
.../persistent/PersistentMessageExpiryMonitor.java | 3 +-
.../broker/service/persistent/PersistentTopic.java | 12 ++--
.../service/persistent/ShadowReplicator.java | 12 ----
.../pulsar/broker/service/ReplicatorTest.java | 71 ++++++++++++++++++++++
.../apache/pulsar/compaction/CompactionTest.java | 64 +++++++++++++++++++
7 files changed, 147 insertions(+), 35 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index bfe4e8b8a29..2b9a34b382e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -146,6 +146,7 @@ import
org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.slf4j.Logger;
@@ -2179,10 +2180,9 @@ public class PersistentTopicsBase extends AdminResource {
final List<CompletableFuture<Void>> futures =
new ArrayList<>((int)
topic.getReplicators().size());
List<String> subNames =
- new ArrayList<>((int) topic.getReplicators().size()
- + (int) topic.getSubscriptions().size());
- subNames.addAll(topic.getReplicators().keys());
- subNames.addAll(topic.getSubscriptions().keys());
+ new ArrayList<>((int)
topic.getSubscriptions().size());
+
subNames.addAll(topic.getSubscriptions().keys().stream().filter(
+ subName ->
!subName.equals(Compactor.COMPACTION_SUBSCRIPTION)).toList());
for (int i = 0; i < subNames.size(); i++) {
try {
futures.add(internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
index b8287dd2c14..082dfed10c6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
@@ -123,18 +123,6 @@ public class GeoPersistentReplicator extends
PersistentReplicator {
continue;
}
- if (msg.isExpired(messageTTLInSeconds)) {
- msgExpired.recordEvent(0 /* no value stat */);
- if (log.isDebugEnabled()) {
- log.debug("[{}] Discarding expired message at position
{}, replicateTo {}",
- replicatorId, entry.getPosition(),
msg.getReplicateTo());
- }
- cursor.asyncDelete(entry.getPosition(), this,
entry.getPosition());
- entry.release();
- msg.recycle();
- continue;
- }
-
if (STATE_UPDATER.get(this) != State.Started ||
isLocalMessageSkippedOnce) {
// The producer is not ready yet after having
stopped/restarted. Drop the message because it will
// recovered when the producer is ready
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index bddcb1b334d..adc86e2f658 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -171,7 +171,8 @@ public class PersistentMessageExpiryMonitor implements
FindEntryCallback {
if (position != null) {
log.info("[{}][{}] Expiring all messages until position {}",
topicName, subName, position);
Position prevMarkDeletePos = cursor.getMarkDeletedPosition();
- cursor.asyncMarkDelete(position, markDeleteCallback,
cursor.getNumberOfEntriesInBacklog(false));
+ cursor.asyncMarkDelete(position, cursor.getProperties(),
markDeleteCallback,
+ cursor.getNumberOfEntriesInBacklog(false));
if (!Objects.equals(cursor.getMarkDeletedPosition(),
prevMarkDeletePos) && subscription != null) {
subscription.updateLastMarkDeleteAdvancedTimestamp();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index dabfd0379ec..fda80957554 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -496,7 +496,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
}
- private static boolean isCompactionSubscription(String subscriptionName) {
+ public static boolean isCompactionSubscription(String subscriptionName) {
return COMPACTION_SUBSCRIPTION.equals(subscriptionName);
}
@@ -1686,11 +1686,11 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
public void checkMessageExpiry() {
int messageTtlInSeconds = topicPolicies.getMessageTTLInSeconds().get();
if (messageTtlInSeconds != 0) {
- subscriptions.forEach((__, sub) ->
sub.expireMessages(messageTtlInSeconds));
- replicators.forEach((__, replicator)
- -> ((PersistentReplicator)
replicator).expireMessages(messageTtlInSeconds));
- shadowReplicators.forEach((__, replicator)
- -> ((PersistentReplicator)
replicator).expireMessages(messageTtlInSeconds));
+ subscriptions.forEach((__, sub) -> {
+ if (!isCompactionSubscription(sub.getName())) {
+ sub.expireMessages(messageTtlInSeconds);
+ }
+ });
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
index fb306348bcd..b48f748bf5d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
@@ -76,18 +76,6 @@ public class ShadowReplicator extends PersistentReplicator {
continue;
}
- if (msg.isExpired(messageTTLInSeconds)) {
- msgExpired.recordEvent(0 /* no value stat */);
- if (log.isDebugEnabled()) {
- log.debug("[{}] Discarding expired message at position
{}, replicateTo {}",
- replicatorId, entry.getPosition(),
msg.getReplicateTo());
- }
- cursor.asyncDelete(entry.getPosition(), this,
entry.getPosition());
- entry.release();
- msg.recycle();
- continue;
- }
-
if (STATE_UPDATER.get(this) != State.Started ||
isLocalMessageSkippedOnce) {
// The producer is not ready yet after having
stopped/restarted. Drop the message because it will
// recovered when the producer is ready
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 176eab0e94b..d0fd3365f96 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -83,6 +83,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -1773,4 +1774,74 @@ public class ReplicatorTest extends ReplicatorTestBase {
Assert.assertThrows(PulsarClientException.ProducerBusyException.class,
() -> new MessageProducer(url2, dest2));
}
+
+ @Test
+ public void testReplicatorWithTTL() throws Exception {
+ log.info("--- Starting ReplicatorTest::testReplicatorWithTTL ---");
+
+ final String cluster1 = pulsar1.getConfig().getClusterName();
+ final String cluster2 = pulsar2.getConfig().getClusterName();
+ final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");
+ final TopicName topic = TopicName
+ .get(BrokerTestUtil.newUniqueName("persistent://" + namespace
+ "/testReplicatorWithTTL"));
+ admin1.namespaces().createNamespace(namespace,
Sets.newHashSet(cluster1, cluster2));
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet(cluster1, cluster2));
+ admin1.topics().createNonPartitionedTopic(topic.toString());
+ admin1.topicPolicies().setMessageTTL(topic.toString(), 1);
+
+ @Cleanup
+ PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0,
TimeUnit.SECONDS)
+ .build();
+
+ @Cleanup
+ Producer<byte[]> persistentProducer1 =
client1.newProducer().topic(topic.toString()).create();
+ persistentProducer1.send("V1".getBytes());
+
+ waitReplicateFinish(topic, admin1);
+
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopicReference(topic.toString()).get();
+ persistentTopic.getReplicators().forEach((cluster, replicator) -> {
+ PersistentReplicator persistentReplicator = (PersistentReplicator)
replicator;
+ // Pause replicator
+ persistentReplicator.disconnect();
+ });
+
+ persistentProducer1.send("V2".getBytes());
+ persistentProducer1.send("V3".getBytes());
+
+ Thread.sleep(1000);
+
+ admin1.topics().expireMessagesForAllSubscriptions(topic.toString(), 1);
+
+ persistentTopic.getReplicators().forEach((cluster, replicator) -> {
+ PersistentReplicator persistentReplicator = (PersistentReplicator)
replicator;
+ persistentReplicator.startProducer();
+ });
+
+ waitReplicateFinish(topic, admin1);
+
+ persistentProducer1.send("V4".getBytes());
+
+ waitReplicateFinish(topic, admin1);
+
+ @Cleanup
+ PulsarClient client2 =
PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0,
TimeUnit.SECONDS)
+ .build();
+
+ @Cleanup
+ Consumer<byte[]> consumer =
client2.newConsumer().topic(topic.toString())
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName("sub").subscribe();
+
+ List<String> result = new ArrayList<>();
+ while (true) {
+ Message<byte[]> receive = consumer.receive(2, TimeUnit.SECONDS);
+ if (receive == null) {
+ break;
+ }
+ result.add(new String(receive.getValue()));
+ }
+
+ assertEquals(result, Lists.newArrayList("V1", "V2", "V3", "V4"));
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 58646105f31..050fc503df2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -2127,4 +2127,68 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
() -> pulsarTestContext.getBookKeeperClient().openLedger(
compactedLedgerId.get(), BookKeeper.DigestType.CRC32,
new byte[]{})));
}
+
+ @Test
+ public void testCompactionWithTTL() throws Exception {
+ String topicName =
"persistent://my-property/use/my-ns/testCompactionWithTTL";
+ String subName = "sub";
+
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).readCompacted(true)
+ .subscribe().close();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false).topic(topicName).create();
+
+ producer.newMessage().key("K1").value("V1").send();
+ producer.newMessage().key("K2").value("V2").send();
+
+ admin.topics().triggerCompaction(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin.topics().compactionStatus(topicName).status,
+ LongRunningProcessStatus.Status.SUCCESS);
+ });
+
+ producer.newMessage().key("K1").value("V3").send();
+ producer.newMessage().key("K2").value("V4").send();
+
+ Thread.sleep(1000);
+
+ // expire messages
+ admin.topics().expireMessagesForAllSubscriptions(topicName, 1);
+
+ // trim the topic
+ admin.topics().unload(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topicName, false);
+ assertEquals(internalStats.numberOfEntries, 4);
+ });
+
+ producer.newMessage().key("K3").value("V5").send();
+
+ admin.topics().triggerCompaction(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin.topics().compactionStatus(topicName).status,
+ LongRunningProcessStatus.Status.SUCCESS);
+ });
+
+ @Cleanup
+ Consumer<String> consumer =
+
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).readCompacted(true)
+ .subscribe();
+
+ List<String> result = new ArrayList<>();
+ while (true) {
+ Message<String> receive = consumer.receive(2, TimeUnit.SECONDS);
+ if (receive == null) {
+ break;
+ }
+
+ result.add(receive.getValue());
+ }
+
+ Assert.assertEquals(result, List.of("V3", "V4", "V5"));
+ }
}