This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 920fac779fb [fix][broker]Support setting `autoSkipNonRecoverableData` 
dynamically in expiryMon… (#21991)
920fac779fb is described below

commit 920fac779fb1e8d5e52ad23865e198a56657e909
Author: atomchen <[email protected]>
AuthorDate: Sun Feb 18 15:51:49 2024 +0800

    [fix][broker]Support setting `autoSkipNonRecoverableData` dynamically in 
expiryMon… (#21991)
    
    Co-authored-by: atomchchen <[email protected]>
---
 .../persistent/PersistentMessageExpiryMonitor.java | 10 ++++++---
 .../service/persistent/PersistentTopicTest.java    | 26 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 978cd3f886f..5d3596d0d05 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -48,7 +49,6 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
     private final String topicName;
     private final Rate msgExpired;
     private final LongAdder totalMsgExpired;
-    private final boolean autoSkipNonRecoverableData;
     private final PersistentSubscription subscription;
 
     private static final int FALSE = 0;
@@ -68,8 +68,12 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
         this.subscription = subscription;
         this.msgExpired = new Rate();
         this.totalMsgExpired = new LongAdder();
+    }
+
+    @VisibleForTesting
+    public boolean isAutoSkipNonRecoverableData() {
         // check to avoid test failures
-        this.autoSkipNonRecoverableData = this.cursor.getManagedLedger() != 
null
+        return this.cursor.getManagedLedger() != null
                 && 
this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
     }
 
@@ -196,7 +200,7 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Finding expired entry operation failed", 
topicName, subName, exception);
         }
-        if (autoSkipNonRecoverableData && failedReadPosition.isPresent()
+        if (isAutoSkipNonRecoverableData() && failedReadPosition.isPresent()
                 && (exception instanceof NonRecoverableLedgerException)) {
             log.warn("[{}][{}] read failed from ledger at position:{} : {}", 
topicName, subName, failedReadPosition,
                     exception.getMessage());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 6f60a13fd48..8c61c0e9da8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -634,4 +634,30 @@ public class PersistentTopicTest extends BrokerTestBase {
         
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(),
 1L);
         
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(),
 TimeUnit.MINUTES.toMillis(1));
     }
+
+    @Test
+    public void testDynamicConfigurationAutoSkipNonRecoverableData() throws 
Exception {
+        pulsar.getConfiguration().setAutoSkipNonRecoverableData(false);
+        final String topicName = 
"persistent://prop/ns-abc/testAutoSkipNonRecoverableData";
+        final String subName = "test_sub";
+
+        Consumer<byte[]> subscribe = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        PersistentSubscription subscription = 
persistentTopic.getSubscription(subName);
+
+        
assertFalse(persistentTopic.ledger.getConfig().isAutoSkipNonRecoverableData());
+        
assertFalse(subscription.getExpiryMonitor().isAutoSkipNonRecoverableData());
+
+        String key = "autoSkipNonRecoverableData";
+        admin.brokers().updateDynamicConfiguration(key, "true");
+        Awaitility.await()
+                .untilAsserted(() -> 
assertEquals(admin.brokers().getAllDynamicConfigurations().get(key), "true"));
+
+        
assertTrue(persistentTopic.ledger.getConfig().isAutoSkipNonRecoverableData());
+        
assertTrue(subscription.getExpiryMonitor().isAutoSkipNonRecoverableData());
+
+        subscribe.close();
+        admin.topics().delete(topicName);
+    }
 }

Reply via email to