This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 80a8f8d307a [fix][broker] Fix compaction subscription delete by
inactive subscription check. (#20983)
80a8f8d307a is described below
commit 80a8f8d307ac2c023147410e31d567cfce8f17c5
Author: lifepuzzlefun <[email protected]>
AuthorDate: Mon Aug 21 13:44:39 2023 +0800
[fix][broker] Fix compaction subscription delete by inactive subscription
check. (#20983)
---
.../broker/service/persistent/PersistentTopic.java | 4 +-
.../pulsar/broker/service/BrokerServiceTest.java | 67 ++++++++++++++++++++++
2 files changed, 70 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 7223dbf3f64..593f741ce39 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2713,7 +2713,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
.toMillis(nsExpirationTime == null ? defaultExpirationTime
: nsExpirationTime);
if (expirationTimeMillis > 0) {
subscriptions.forEach((subName, sub) -> {
- if (sub.dispatcher != null &&
sub.dispatcher.isConsumerConnected() || sub.isReplicated()) {
+ if (sub.dispatcher != null &&
sub.dispatcher.isConsumerConnected()
+ || sub.isReplicated()
+ || isCompactionSubscription(subName)) {
return;
}
if (System.currentTimeMillis() -
sub.cursor.getLastActive() > expirationTimeMillis) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 4a0800dbafe..1eb02b71972 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -65,6 +65,7 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.http.HttpResponse;
@@ -75,6 +76,7 @@ import org.apache.pulsar.broker.PulsarService;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.namespace.NamespaceService;
import
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.client.admin.BrokerStats;
@@ -107,7 +109,9 @@ import
org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.apache.pulsar.compaction.Compactor;
import org.awaitility.Awaitility;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -1160,6 +1164,69 @@ public class BrokerServiceTest extends BrokerTestBase {
}
}
+ @Test
+ public void
testCheckInactiveSubscriptionsShouldNotDeleteCompactionCursor() throws
Exception {
+ String namespace = "prop/test";
+
+ // set up broker set compaction threshold.
+ cleanup();
+ conf.setBrokerServiceCompactionThresholdInBytes(8);
+ setup();
+
+ try {
+ admin.namespaces().createNamespace(namespace);
+ } catch (PulsarAdminException.ConflictException e) {
+ // Ok.. (if test fails intermittently and namespace is already
created)
+ }
+
+ // set enable subscription expiration.
+ admin.namespaces().setSubscriptionExpirationTime(namespace, 1);
+
+ String compactionInactiveTestTopic =
"persistent://prop/test/testCompactionCursorShouldNotDelete";
+
+ admin.topics().createNonPartitionedTopic(compactionInactiveTestTopic);
+
+ CompletableFuture<Optional<Topic>> topicCf =
+
pulsar.getBrokerService().getTopic(compactionInactiveTestTopic, true);
+
+ Optional<Topic> topicOptional = topicCf.get();
+ assertTrue(topicOptional.isPresent());
+
+ PersistentTopic topic = (PersistentTopic) topicOptional.get();
+
+ PersistentSubscription sub = (PersistentSubscription)
topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION);
+ assertNotNull(sub);
+
+ topic.checkCompaction();
+
+ Field currentCompaction =
PersistentTopic.class.getDeclaredField("currentCompaction");
+ currentCompaction.setAccessible(true);
+ CompletableFuture<Long> compactionFuture =
(CompletableFuture<Long>)currentCompaction.get(topic);
+
+ compactionFuture.get();
+
+ ManagedCursorImpl cursor = (ManagedCursorImpl) sub.getCursor();
+
+ // make cursor last active time to very small to check if it will be
deleted
+ Field cursorLastActiveField =
ManagedCursorImpl.class.getDeclaredField("lastActive");
+ cursorLastActiveField.setAccessible(true);
+ cursorLastActiveField.set(cursor, 0);
+
+ // replace origin object. so we can check if subscription is deleted.
+ PersistentSubscription spySubscription = Mockito.spy(sub);
+ topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION,
spySubscription);
+
+ // trigger inactive check.
+ topic.checkInactiveSubscriptions();
+
+ // Compaction subscription should not call delete method.
+ Mockito.verify(spySubscription, Mockito.never()).delete();
+
+ // check if the subscription exist.
+
assertNotNull(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION));
+
+ }
+
/**
* Verifies brokerService should not have deadlock and successfully remove
topic from topicMap on topic-failure and
* it should not introduce deadlock while performing it.