This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 220a3d60160 [fix][broker]Support setting `autoSkipNonRecoverableData` 
dynamically in expiryMon… (#21991)
220a3d60160 is described below

commit 220a3d601602d67f5f44516c5d9895dfaa270380
Author: atomchen <[email protected]>
AuthorDate: Sun Feb 18 15:51:49 2024 +0800

    [fix][broker]Support setting `autoSkipNonRecoverableData` dynamically in 
expiryMon… (#21991)
    
    Co-authored-by: atomchchen <[email protected]>
---
 .../persistent/PersistentMessageExpiryMonitor.java | 10 ++++++---
 .../service/persistent/PersistentTopicTest.java    | 26 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 978cd3f886f..5d3596d0d05 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -48,7 +49,6 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
     private final String topicName;
     private final Rate msgExpired;
     private final LongAdder totalMsgExpired;
-    private final boolean autoSkipNonRecoverableData;
     private final PersistentSubscription subscription;
 
     private static final int FALSE = 0;
@@ -68,8 +68,12 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
         this.subscription = subscription;
         this.msgExpired = new Rate();
         this.totalMsgExpired = new LongAdder();
+    }
+
+    @VisibleForTesting
+    public boolean isAutoSkipNonRecoverableData() {
         // check to avoid test failures
-        this.autoSkipNonRecoverableData = this.cursor.getManagedLedger() != 
null
+        return this.cursor.getManagedLedger() != null
                 && 
this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
     }
 
@@ -196,7 +200,7 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Finding expired entry operation failed", 
topicName, subName, exception);
         }
-        if (autoSkipNonRecoverableData && failedReadPosition.isPresent()
+        if (isAutoSkipNonRecoverableData() && failedReadPosition.isPresent()
                 && (exception instanceof NonRecoverableLedgerException)) {
             log.warn("[{}][{}] read failed from ledger at position:{} : {}", 
topicName, subName, failedReadPosition,
                     exception.getMessage());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 4eb2aa15fa2..06a46f86c03 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -635,4 +635,30 @@ public class PersistentTopicTest extends BrokerTestBase {
         
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(),
 1L);
         
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(),
 TimeUnit.MINUTES.toMillis(1));
     }
+
+    @Test
+    public void testDynamicConfigurationAutoSkipNonRecoverableData() throws 
Exception {
+        pulsar.getConfiguration().setAutoSkipNonRecoverableData(false);
+        final String topicName = 
"persistent://prop/ns-abc/testAutoSkipNonRecoverableData";
+        final String subName = "test_sub";
+
+        Consumer<byte[]> subscribe = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        PersistentSubscription subscription = 
persistentTopic.getSubscription(subName);
+
+        
assertFalse(persistentTopic.ledger.getConfig().isAutoSkipNonRecoverableData());
+        
assertFalse(subscription.getExpiryMonitor().isAutoSkipNonRecoverableData());
+
+        String key = "autoSkipNonRecoverableData";
+        admin.brokers().updateDynamicConfiguration(key, "true");
+        Awaitility.await()
+                .untilAsserted(() -> 
assertEquals(admin.brokers().getAllDynamicConfigurations().get(key), "true"));
+
+        
assertTrue(persistentTopic.ledger.getConfig().isAutoSkipNonRecoverableData());
+        
assertTrue(subscription.getExpiryMonitor().isAutoSkipNonRecoverableData());
+
+        subscribe.close();
+        admin.topics().delete(topicName);
+    }
 }

Reply via email to