This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 761f8e4c084 [fix][broker]After the broker is restarted, the cache 
dynamic configuration is invalid (#17035)
761f8e4c084 is described below

commit 761f8e4c084785308abc3f8bcddae7f5f0136ddf
Author: LinChen <[email protected]>
AuthorDate: Tue Sep 6 16:14:07 2022 +0800

    [fix][broker]After the broker is restarted, the cache dynamic configuration 
is invalid (#17035)
---
 .../pulsar/broker/service/BrokerService.java       | 86 +++++++++++++++-------
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  8 ++
 2 files changed, 66 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 34a8063af4f..a45b75a2284 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -103,6 +103,7 @@ import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.resources.DynamicConfigurationResources;
 import org.apache.pulsar.broker.resources.LocalPoliciesResources;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import 
org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
@@ -2082,33 +2083,44 @@ public class BrokerService implements Closeable {
     }
 
     private void handleDynamicConfigurationUpdates() {
-        
pulsar().getPulsarResources().getDynamicConfigResources().getDynamicConfigurationAsync()
-                .thenAccept(optMap -> {
-                    if (!optMap.isPresent()) {
-                        return;
-                    }
-                    Map<String, String> data = optMap.get();
-                    data.forEach((configKey, value) -> {
-                        Field configField = 
dynamicConfigurationMap.get(configKey).field;
-                        Object newValue = 
FieldParser.value(data.get(configKey), configField);
-                        if (configField != null) {
-                            Consumer listener = 
configRegisteredListeners.get(configKey);
-                            try {
-                                Object existingValue = 
configField.get(pulsar.getConfiguration());
-                                configField.set(pulsar.getConfiguration(), 
newValue);
-                                log.info("Successfully updated configuration 
{}/{}", configKey,
-                                        data.get(configKey));
-                                if (listener != null && 
!existingValue.equals(newValue)) {
-                                    listener.accept(newValue);
+        DynamicConfigurationResources dynamicConfigResources = null;
+        try {
+            dynamicConfigResources = pulsar()
+                    .getPulsarResources()
+                    .getDynamicConfigResources();
+        } catch (Exception e) {
+            log.warn("Failed to read dynamic broker configuration", e);
+        }
+
+        if (dynamicConfigResources != null) {
+            dynamicConfigResources.getDynamicConfigurationAsync()
+                    .thenAccept(optMap -> {
+                        if (!optMap.isPresent()) {
+                            return;
+                        }
+                        Map<String, String> data = optMap.get();
+                        data.forEach((configKey, value) -> {
+                            Field configField = 
dynamicConfigurationMap.get(configKey).field;
+                            Object newValue = 
FieldParser.value(data.get(configKey), configField);
+                            if (configField != null) {
+                                Consumer listener = 
configRegisteredListeners.get(configKey);
+                                try {
+                                    Object existingValue = 
configField.get(pulsar.getConfiguration());
+                                    configField.set(pulsar.getConfiguration(), 
newValue);
+                                    log.info("Successfully updated 
configuration {}/{}", configKey,
+                                            data.get(configKey));
+                                    if (listener != null && 
!existingValue.equals(newValue)) {
+                                        listener.accept(newValue);
+                                    }
+                                } catch (Exception e) {
+                                    log.error("Failed to update config {}/{}", 
configKey, newValue);
                                 }
-                            } catch (Exception e) {
-                                log.error("Failed to update config {}/{}", 
configKey, newValue);
+                            } else {
+                                log.error("Found non-dynamic field in 
dynamicConfigMap {}/{}", configKey, newValue);
                             }
-                        } else {
-                            log.error("Found non-dynamic field in 
dynamicConfigMap {}/{}", configKey, newValue);
-                        }
+                        });
                     });
-                });
+        }
     }
 
     /**
@@ -2215,10 +2227,7 @@ public class BrokerService implements Closeable {
             return true;
         });
 
-        // (2) update ServiceConfiguration value by reading 
zk-configuration-map
-        updateDynamicServiceConfiguration();
-
-        // (3) Listener Registration
+        // (2) Listener Registration
         // add listener on "maxConcurrentLookupRequest" value change
         registerConfigurationListener("maxConcurrentLookupRequest",
                 (maxConcurrentLookupRequest) -> lookupRequestSemaphore.set(
@@ -2355,6 +2364,12 @@ public class BrokerService implements Closeable {
         });
 
         // add more listeners here
+
+        // (3) create dynamic-config if not exist.
+        createDynamicConfigPathIfNotExist();
+
+        // (4) update ServiceConfiguration value by reading 
zk-configuration-map and trigger corresponding listeners.
+        handleDynamicConfigurationUpdates();
     }
 
     private void updateDefaultNumPartitions(int numPartitions) {
@@ -2541,6 +2556,21 @@ public class BrokerService implements Closeable {
         }
     }
 
+    private void createDynamicConfigPathIfNotExist() {
+        try {
+            Optional<Map<String, String>> configCache =
+                    
pulsar().getPulsarResources().getDynamicConfigResources().getDynamicConfiguration();
+
+            // create dynamic-config if not exist.
+            if (!configCache.isPresent()) {
+                pulsar().getPulsarResources().getDynamicConfigResources()
+                        .setDynamicConfigurationWithCreate(n -> 
Maps.newHashMap());
+            }
+        } catch (Exception e) {
+            log.warn("Failed to read dynamic broker configuration", e);
+        }
+    }
+
     /**
      * Updates pulsar.ServiceConfiguration's dynamic field with value 
persistent into zk-dynamic path. It also validates
      * dynamic-value before updating it and throws {@code 
IllegalArgumentException} if validation fails
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 7993750583f..d3c91914cc8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -540,6 +540,14 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(),
 0.8);
         
assertEquals(pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold(), 
TimeUnit.MILLISECONDS
                 .toNanos(2000));
+
+        restartBroker();
+
+        // verify value again
+        
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize(),
 1 * 1024L * 1024L);
+        
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(),
 0.8);
+        
assertEquals(pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold(), 
TimeUnit.MILLISECONDS
+                .toNanos(2000));
     }
 
     /**

Reply via email to