This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 220a3d60160 [fix][broker]Support setting `autoSkipNonRecoverableData`
dynamically in expiryMon… (#21991)
220a3d60160 is described below
commit 220a3d601602d67f5f44516c5d9895dfaa270380
Author: atomchen <[email protected]>
AuthorDate: Sun Feb 18 15:51:49 2024 +0800
[fix][broker]Support setting `autoSkipNonRecoverableData` dynamically in
expiryMon… (#21991)
Co-authored-by: atomchchen <[email protected]>
---
.../persistent/PersistentMessageExpiryMonitor.java | 10 ++++++---
.../service/persistent/PersistentTopicTest.java | 26 ++++++++++++++++++++++
2 files changed, 33 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 978cd3f886f..5d3596d0d05 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import com.google.common.annotations.VisibleForTesting;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -48,7 +49,6 @@ public class PersistentMessageExpiryMonitor implements
FindEntryCallback, Messag
private final String topicName;
private final Rate msgExpired;
private final LongAdder totalMsgExpired;
- private final boolean autoSkipNonRecoverableData;
private final PersistentSubscription subscription;
private static final int FALSE = 0;
@@ -68,8 +68,12 @@ public class PersistentMessageExpiryMonitor implements
FindEntryCallback, Messag
this.subscription = subscription;
this.msgExpired = new Rate();
this.totalMsgExpired = new LongAdder();
+ }
+
+ @VisibleForTesting
+ public boolean isAutoSkipNonRecoverableData() {
// check to avoid test failures
- this.autoSkipNonRecoverableData = this.cursor.getManagedLedger() !=
null
+ return this.cursor.getManagedLedger() != null
&&
this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
}
@@ -196,7 +200,7 @@ public class PersistentMessageExpiryMonitor implements
FindEntryCallback, Messag
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Finding expired entry operation failed",
topicName, subName, exception);
}
- if (autoSkipNonRecoverableData && failedReadPosition.isPresent()
+ if (isAutoSkipNonRecoverableData() && failedReadPosition.isPresent()
&& (exception instanceof NonRecoverableLedgerException)) {
log.warn("[{}][{}] read failed from ledger at position:{} : {}",
topicName, subName, failedReadPosition,
exception.getMessage());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 4eb2aa15fa2..06a46f86c03 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -635,4 +635,30 @@ public class PersistentTopicTest extends BrokerTestBase {
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(),
1L);
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(),
TimeUnit.MINUTES.toMillis(1));
}
+
+ @Test
+ public void testDynamicConfigurationAutoSkipNonRecoverableData() throws
Exception {
+ pulsar.getConfiguration().setAutoSkipNonRecoverableData(false);
+ final String topicName =
"persistent://prop/ns-abc/testAutoSkipNonRecoverableData";
+ final String subName = "test_sub";
+
+ Consumer<byte[]> subscribe =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ PersistentSubscription subscription =
persistentTopic.getSubscription(subName);
+
+
assertFalse(persistentTopic.ledger.getConfig().isAutoSkipNonRecoverableData());
+
assertFalse(subscription.getExpiryMonitor().isAutoSkipNonRecoverableData());
+
+ String key = "autoSkipNonRecoverableData";
+ admin.brokers().updateDynamicConfiguration(key, "true");
+ Awaitility.await()
+ .untilAsserted(() ->
assertEquals(admin.brokers().getAllDynamicConfigurations().get(key), "true"));
+
+
assertTrue(persistentTopic.ledger.getConfig().isAutoSkipNonRecoverableData());
+
assertTrue(subscription.getExpiryMonitor().isAutoSkipNonRecoverableData());
+
+ subscribe.close();
+ admin.topics().delete(topicName);
+ }
}