This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 0c7d8c8e64c Revert "[fix][broker] Fix potential case cause retention 
policy not working on topic level (#21041)"
0c7d8c8e64c is described below

commit 0c7d8c8e64c380d8288c0175b88b37cc49ec6cbd
Author: Jiwe Guo <[email protected]>
AuthorDate: Tue Sep 12 20:17:18 2023 +0800

    Revert "[fix][broker] Fix potential case cause retention policy not working 
on topic level (#21041)"
    
    This reverts commit a7f0bc49fa27c9bc066b2ed52bf4be5f1bee160d.
---
 .../broker/service/persistent/PersistentTopic.java | 22 ++++++++-------
 .../service/persistent/PersistentTopicTest.java    | 31 ----------------------
 2 files changed, 12 insertions(+), 41 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 34bab21155e..4a3e38c5395 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1352,6 +1352,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
         CompletableFuture<Void> result = new CompletableFuture<Void>();
         checkReplication().thenAccept(res -> {
+            log.info("[{}] Policies updated successfully", topic);
             result.complete(null);
         }).exceptionally(th -> {
             log.error("[{}] Policies update failed {}, scheduled retry in {} 
seconds", topic, th.getMessage(),
@@ -1371,8 +1372,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return messageDeduplication.checkStatus();
     }
 
-    @VisibleForTesting
-    CompletableFuture<Void> checkPersistencePolicies() {
+    private CompletableFuture<Void> checkPersistencePolicies() {
         TopicName topicName = TopicName.get(topic);
         CompletableFuture<Void> future = new CompletableFuture<>();
         brokerService.getManagedLedgerConfig(topicName).thenAccept(config -> {
@@ -3110,14 +3110,16 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             updateSubscribeRateLimiter();
             replicators.forEach((name, replicator) -> 
replicator.updateRateLimiter());
             checkMessageExpiry();
-        })
-        .thenCompose(__ -> checkReplicationAndRetryOnFailure())
-        .thenCompose(__ -> checkDeduplicationStatus())
-        .thenCompose(__ -> preCreateSubscriptionForCompactionIfNeeded())
-        .thenCompose(__ -> checkPersistencePolicies())
-        .thenAccept(__ -> log.info("[{}] Policies updated successfully", 
topic))
-        .exceptionally(e -> {
-            Throwable t = FutureUtil.unwrapCompletionException(e);
+            checkReplicationAndRetryOnFailure();
+
+            checkDeduplicationStatus();
+
+            preCreateSubscriptionForCompactionIfNeeded();
+
+            // update managed ledger config
+            checkPersistencePolicies();
+        }).exceptionally(e -> {
+            Throwable t = e instanceof CompletionException ? e.getCause() : e;
             log.error("[{}] update topic policy error: {}", topic, 
t.getMessage(), t);
             return null;
         });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index ca97c09bd67..50a54aaa0b7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -59,7 +59,6 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
-import org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.api.Consumer;
@@ -76,9 +75,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.awaitility.Awaitility;
 import org.junit.Assert;
@@ -491,32 +488,4 @@ public class PersistentTopicTest extends BrokerTestBase {
             return !topic.getManagedLedger().getCursors().iterator().hasNext();
         });
     }
-
-    @Test
-    public void testCheckPersistencePolicies() throws Exception {
-        final String myNamespace = "prop/ns";
-        admin.namespaces().createNamespace(myNamespace, 
Sets.newHashSet("test"));
-        final String topic = "persistent://" + myNamespace + "/testConfig" + 
UUID.randomUUID();
-        conf.setForceDeleteNamespaceAllowed(true);
-        pulsarClient.newProducer().topic(topic).create().close();
-        RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1);
-        PersistentTopic persistentTopic = spy((PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topic).get().get());
-        TopicPoliciesService policiesService = 
spy(pulsar.getTopicPoliciesService());
-        doReturn(policiesService).when(pulsar).getTopicPoliciesService();
-        TopicPolicies policies = new TopicPolicies();
-        policies.setRetentionPolicies(retentionPolicies);
-        
doReturn(policies).when(policiesService).getTopicPoliciesIfExists(TopicName.get(topic));
-        persistentTopic.onUpdate(policies);
-        verify(persistentTopic, times(1)).checkPersistencePolicies();
-        Awaitility.await().untilAsserted(() -> {
-            
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(),
 1L);
-            
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(),
 TimeUnit.MINUTES.toMillis(1));
-        });
-        // throw exception
-        doReturn(CompletableFuture.failedFuture(new 
RuntimeException())).when(persistentTopic).checkPersistencePolicies();
-        policies.setRetentionPolicies(new RetentionPolicies(2, 2));
-        persistentTopic.onUpdate(policies);
-        
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(),
 1L);
-        
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(),
 TimeUnit.MINUTES.toMillis(1));
-    }
 }

Reply via email to