This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 0c7d8c8e64c Revert "[fix][broker] Fix potential case cause retention
policy not working on topic level (#21041)"
0c7d8c8e64c is described below
commit 0c7d8c8e64c380d8288c0175b88b37cc49ec6cbd
Author: Jiwe Guo <[email protected]>
AuthorDate: Tue Sep 12 20:17:18 2023 +0800
Revert "[fix][broker] Fix potential case cause retention policy not working
on topic level (#21041)"
This reverts commit a7f0bc49fa27c9bc066b2ed52bf4be5f1bee160d.
---
.../broker/service/persistent/PersistentTopic.java | 22 ++++++++-------
.../service/persistent/PersistentTopicTest.java | 31 ----------------------
2 files changed, 12 insertions(+), 41 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 34bab21155e..4a3e38c5395 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1352,6 +1352,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
CompletableFuture<Void> result = new CompletableFuture<Void>();
checkReplication().thenAccept(res -> {
+ log.info("[{}] Policies updated successfully", topic);
result.complete(null);
}).exceptionally(th -> {
log.error("[{}] Policies update failed {}, scheduled retry in {}
seconds", topic, th.getMessage(),
@@ -1371,8 +1372,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return messageDeduplication.checkStatus();
}
- @VisibleForTesting
- CompletableFuture<Void> checkPersistencePolicies() {
+ private CompletableFuture<Void> checkPersistencePolicies() {
TopicName topicName = TopicName.get(topic);
CompletableFuture<Void> future = new CompletableFuture<>();
brokerService.getManagedLedgerConfig(topicName).thenAccept(config -> {
@@ -3110,14 +3110,16 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
updateSubscribeRateLimiter();
replicators.forEach((name, replicator) ->
replicator.updateRateLimiter());
checkMessageExpiry();
- })
- .thenCompose(__ -> checkReplicationAndRetryOnFailure())
- .thenCompose(__ -> checkDeduplicationStatus())
- .thenCompose(__ -> preCreateSubscriptionForCompactionIfNeeded())
- .thenCompose(__ -> checkPersistencePolicies())
- .thenAccept(__ -> log.info("[{}] Policies updated successfully",
topic))
- .exceptionally(e -> {
- Throwable t = FutureUtil.unwrapCompletionException(e);
+ checkReplicationAndRetryOnFailure();
+
+ checkDeduplicationStatus();
+
+ preCreateSubscriptionForCompactionIfNeeded();
+
+ // update managed ledger config
+ checkPersistencePolicies();
+ }).exceptionally(e -> {
+ Throwable t = e instanceof CompletionException ? e.getCause() : e;
log.error("[{}] update topic policy error: {}", topic,
t.getMessage(), t);
return null;
});
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index ca97c09bd67..50a54aaa0b7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -59,7 +59,6 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
-import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.api.Consumer;
@@ -76,9 +75,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.junit.Assert;
@@ -491,32 +488,4 @@ public class PersistentTopicTest extends BrokerTestBase {
return !topic.getManagedLedger().getCursors().iterator().hasNext();
});
}
-
- @Test
- public void testCheckPersistencePolicies() throws Exception {
- final String myNamespace = "prop/ns";
- admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
- final String topic = "persistent://" + myNamespace + "/testConfig" +
UUID.randomUUID();
- conf.setForceDeleteNamespaceAllowed(true);
- pulsarClient.newProducer().topic(topic).create().close();
- RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1);
- PersistentTopic persistentTopic = spy((PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(topic).get().get());
- TopicPoliciesService policiesService =
spy(pulsar.getTopicPoliciesService());
- doReturn(policiesService).when(pulsar).getTopicPoliciesService();
- TopicPolicies policies = new TopicPolicies();
- policies.setRetentionPolicies(retentionPolicies);
-
doReturn(policies).when(policiesService).getTopicPoliciesIfExists(TopicName.get(topic));
- persistentTopic.onUpdate(policies);
- verify(persistentTopic, times(1)).checkPersistencePolicies();
- Awaitility.await().untilAsserted(() -> {
-
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(),
1L);
-
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(),
TimeUnit.MINUTES.toMillis(1));
- });
- // throw exception
- doReturn(CompletableFuture.failedFuture(new
RuntimeException())).when(persistentTopic).checkPersistencePolicies();
- policies.setRetentionPolicies(new RetentionPolicies(2, 2));
- persistentTopic.onUpdate(policies);
-
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(),
1L);
-
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(),
TimeUnit.MINUTES.toMillis(1));
- }
}