This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 06e4147b68b [fix][broker] Fix potential case cause retention policy
not working on topic level (#21041)
06e4147b68b is described below
commit 06e4147b68bb92ff84e34ebb4925399287e52d06
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Aug 22 22:08:09 2023 +0800
[fix][broker] Fix potential case cause retention policy not working on
topic level (#21041)
---
.../broker/service/persistent/PersistentTopic.java | 22 +++++++--------
.../service/persistent/PersistentTopicTest.java | 31 ++++++++++++++++++++++
2 files changed, 41 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 410e67c6858..38fe0639cfa 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1547,7 +1547,6 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
CompletableFuture<Void> result = new CompletableFuture<Void>();
checkReplication().thenAccept(res -> {
- log.info("[{}] Policies updated successfully", topic);
result.complete(null);
}).exceptionally(th -> {
log.error("[{}] Policies update failed {}, scheduled retry in {}
seconds", topic, th.getMessage(),
@@ -1567,7 +1566,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return messageDeduplication.checkStatus();
}
- private CompletableFuture<Void> checkPersistencePolicies() {
+ @VisibleForTesting
+ CompletableFuture<Void> checkPersistencePolicies() {
TopicName topicName = TopicName.get(topic);
CompletableFuture<Void> future = new CompletableFuture<>();
brokerService.getManagedLedgerConfig(topicName).thenAccept(config -> {
@@ -3500,16 +3500,14 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
replicators.forEach((name, replicator) ->
replicator.updateRateLimiter());
shadowReplicators.forEach((name, replicator) ->
replicator.updateRateLimiter());
checkMessageExpiry();
- checkReplicationAndRetryOnFailure();
-
- checkDeduplicationStatus();
-
- preCreateSubscriptionForCompactionIfNeeded();
-
- // update managed ledger config
- checkPersistencePolicies();
- }).exceptionally(e -> {
- Throwable t = e instanceof CompletionException ? e.getCause() : e;
+ })
+ .thenCompose(__ -> checkReplicationAndRetryOnFailure())
+ .thenCompose(__ -> checkDeduplicationStatus())
+ .thenCompose(__ -> preCreateSubscriptionForCompactionIfNeeded())
+ .thenCompose(__ -> checkPersistencePolicies())
+ .thenAccept(__ -> log.info("[{}] Policies updated successfully",
topic))
+ .exceptionally(e -> {
+ Throwable t = FutureUtil.unwrapCompletionException(e);
log.error("[{}] update topic policy error: {}", topic,
t.getMessage(), t);
return null;
});
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index bda6d0bdceb..e29d015c45d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -60,6 +60,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -77,7 +78,9 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.testng.Assert;
@@ -603,4 +606,32 @@ public class PersistentTopicTest extends BrokerTestBase {
return !topic.getManagedLedger().getCursors().iterator().hasNext();
});
}
+
+ @Test
+ public void testCheckPersistencePolicies() throws Exception {
+ final String myNamespace = "prop/ns";
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+ final String topic = "persistent://" + myNamespace + "/testConfig" +
UUID.randomUUID();
+ conf.setForceDeleteNamespaceAllowed(true);
+ pulsarClient.newProducer().topic(topic).create().close();
+ RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1);
+ PersistentTopic persistentTopic = spy((PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(topic).get().get());
+ TopicPoliciesService policiesService =
spy(pulsar.getTopicPoliciesService());
+ doReturn(policiesService).when(pulsar).getTopicPoliciesService();
+ TopicPolicies policies = new TopicPolicies();
+ policies.setRetentionPolicies(retentionPolicies);
+
doReturn(policies).when(policiesService).getTopicPoliciesIfExists(TopicName.get(topic));
+ persistentTopic.onUpdate(policies);
+ verify(persistentTopic, times(1)).checkPersistencePolicies();
+ Awaitility.await().untilAsserted(() -> {
+
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(),
1L);
+
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(),
TimeUnit.MINUTES.toMillis(1));
+ });
+ // throw exception
+ doReturn(CompletableFuture.failedFuture(new
RuntimeException())).when(persistentTopic).checkPersistencePolicies();
+ policies.setRetentionPolicies(new RetentionPolicies(2, 2));
+ persistentTopic.onUpdate(policies);
+
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(),
1L);
+
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(),
TimeUnit.MINUTES.toMillis(1));
+ }
}