This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 761f8e4c084 [fix][broker]After the broker is restarted, the cache
dynamic configuration is invalid (#17035)
761f8e4c084 is described below
commit 761f8e4c084785308abc3f8bcddae7f5f0136ddf
Author: LinChen <[email protected]>
AuthorDate: Tue Sep 6 16:14:07 2022 +0800
[fix][broker]After the broker is restarted, the cache dynamic configuration
is invalid (#17035)
---
.../pulsar/broker/service/BrokerService.java | 86 +++++++++++++++-------
.../apache/pulsar/broker/admin/AdminApiTest.java | 8 ++
2 files changed, 66 insertions(+), 28 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 34a8063af4f..a45b75a2284 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -103,6 +103,7 @@ import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.resources.DynamicConfigurationResources;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.NamespaceResources;
import
org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
@@ -2082,33 +2083,44 @@ public class BrokerService implements Closeable {
}
private void handleDynamicConfigurationUpdates() {
-
pulsar().getPulsarResources().getDynamicConfigResources().getDynamicConfigurationAsync()
- .thenAccept(optMap -> {
- if (!optMap.isPresent()) {
- return;
- }
- Map<String, String> data = optMap.get();
- data.forEach((configKey, value) -> {
- Field configField =
dynamicConfigurationMap.get(configKey).field;
- Object newValue =
FieldParser.value(data.get(configKey), configField);
- if (configField != null) {
- Consumer listener =
configRegisteredListeners.get(configKey);
- try {
- Object existingValue =
configField.get(pulsar.getConfiguration());
- configField.set(pulsar.getConfiguration(),
newValue);
- log.info("Successfully updated configuration
{}/{}", configKey,
- data.get(configKey));
- if (listener != null &&
!existingValue.equals(newValue)) {
- listener.accept(newValue);
+ DynamicConfigurationResources dynamicConfigResources = null;
+ try {
+ dynamicConfigResources = pulsar()
+ .getPulsarResources()
+ .getDynamicConfigResources();
+ } catch (Exception e) {
+ log.warn("Failed to read dynamic broker configuration", e);
+ }
+
+ if (dynamicConfigResources != null) {
+ dynamicConfigResources.getDynamicConfigurationAsync()
+ .thenAccept(optMap -> {
+ if (!optMap.isPresent()) {
+ return;
+ }
+ Map<String, String> data = optMap.get();
+ data.forEach((configKey, value) -> {
+ Field configField =
dynamicConfigurationMap.get(configKey).field;
+ Object newValue =
FieldParser.value(data.get(configKey), configField);
+ if (configField != null) {
+ Consumer listener =
configRegisteredListeners.get(configKey);
+ try {
+ Object existingValue =
configField.get(pulsar.getConfiguration());
+ configField.set(pulsar.getConfiguration(),
newValue);
+ log.info("Successfully updated
configuration {}/{}", configKey,
+ data.get(configKey));
+ if (listener != null &&
!existingValue.equals(newValue)) {
+ listener.accept(newValue);
+ }
+ } catch (Exception e) {
+ log.error("Failed to update config {}/{}",
configKey, newValue);
}
- } catch (Exception e) {
- log.error("Failed to update config {}/{}",
configKey, newValue);
+ } else {
+ log.error("Found non-dynamic field in
dynamicConfigMap {}/{}", configKey, newValue);
}
- } else {
- log.error("Found non-dynamic field in
dynamicConfigMap {}/{}", configKey, newValue);
- }
+ });
});
- });
+ }
}
/**
@@ -2215,10 +2227,7 @@ public class BrokerService implements Closeable {
return true;
});
- // (2) update ServiceConfiguration value by reading
zk-configuration-map
- updateDynamicServiceConfiguration();
-
- // (3) Listener Registration
+ // (2) Listener Registration
// add listener on "maxConcurrentLookupRequest" value change
registerConfigurationListener("maxConcurrentLookupRequest",
(maxConcurrentLookupRequest) -> lookupRequestSemaphore.set(
@@ -2355,6 +2364,12 @@ public class BrokerService implements Closeable {
});
// add more listeners here
+
+ // (3) create dynamic-config if not exist.
+ createDynamicConfigPathIfNotExist();
+
+ // (4) update ServiceConfiguration value by reading
zk-configuration-map and trigger corresponding listeners.
+ handleDynamicConfigurationUpdates();
}
private void updateDefaultNumPartitions(int numPartitions) {
@@ -2541,6 +2556,21 @@ public class BrokerService implements Closeable {
}
}
+ private void createDynamicConfigPathIfNotExist() {
+ try {
+ Optional<Map<String, String>> configCache =
+
pulsar().getPulsarResources().getDynamicConfigResources().getDynamicConfiguration();
+
+ // create dynamic-config if not exist.
+ if (!configCache.isPresent()) {
+ pulsar().getPulsarResources().getDynamicConfigResources()
+ .setDynamicConfigurationWithCreate(n ->
Maps.newHashMap());
+ }
+ } catch (Exception e) {
+ log.warn("Failed to read dynamic broker configuration", e);
+ }
+ }
+
/**
* Updates pulsar.ServiceConfiguration's dynamic field with value
persistent into zk-dynamic path. It also validates
* dynamic-value before updating it and throws {@code
IllegalArgumentException} if validation fails
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 7993750583f..d3c91914cc8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -540,6 +540,14 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(),
0.8);
assertEquals(pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold(),
TimeUnit.MILLISECONDS
.toNanos(2000));
+
+ restartBroker();
+
+ // verify value again
+
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize(),
1 * 1024L * 1024L);
+
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(),
0.8);
+
assertEquals(pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold(),
TimeUnit.MILLISECONDS
+ .toNanos(2000));
}
/**