This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5ce12583439f2dc1ee80b815c6fbfae4643f7ccd Author: Zixuan Liu <[email protected]> AuthorDate: Fri Dec 24 00:44:29 2021 +0800 [Broker] Fix create the dynamic configuration resource if not exist (#13420) ### Motivation When request the `DELETE: /admin/brokers/configuration/dispatcherMinReadBatchSize`, which return the`org.apache.pulsar.metadata.api.MetadataStoreException$NotFoundException`. The Pulsar does not create the dynamic configuration resource path on metadata if it does not exist, this behavior is different with Pulsar 2.8. (cherry picked from commit 3a1e8da9230343a9f16da503e27525263369743b) --- .../resources/DynamicConfigurationResources.java | 4 +- .../pulsar/broker/admin/impl/BrokersBase.java | 3 +- .../pulsar/broker/service/BrokerService.java | 11 +++- .../admin/AdminApiDynamicConfigurationsTest.java | 72 ++++++++++++++++++++++ 4 files changed, 85 insertions(+), 5 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/DynamicConfigurationResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/DynamicConfigurationResources.java index 8137dd8..d918dc8 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/DynamicConfigurationResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/DynamicConfigurationResources.java @@ -40,8 +40,8 @@ public class DynamicConfigurationResources extends BaseResources<Map<String, Str return getAsync(BROKER_SERVICE_CONFIGURATION_PATH); } - public Map<String, String> getDynamicConfiguration() throws MetadataStoreException { - return get(BROKER_SERVICE_CONFIGURATION_PATH).orElse(Collections.emptyMap()); + public Optional<Map<String, String>> getDynamicConfiguration() throws MetadataStoreException { + return get(BROKER_SERVICE_CONFIGURATION_PATH); } public void setDynamicConfigurationWithCreate( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 17f497c..756d141 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -24,6 +24,7 @@ import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import java.time.Duration; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -181,7 +182,7 @@ public class BrokersBase extends PulsarWebResource { public Map<String, String> getAllDynamicConfigurations() throws Exception { validateSuperUserAccess(); try { - return dynamicConfigurationResources().getDynamicConfiguration(); + return dynamicConfigurationResources().getDynamicConfiguration().orElseGet(Collections::emptyMap); } catch (RestException e) { LOG.error("[{}] couldn't find any configuration in zk {}", clientAppId(), e.getMessage(), e); throw e; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b3fca06..cbe02c8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2236,9 +2236,16 @@ public class BrokerService implements Closeable { */ private void updateDynamicServiceConfiguration() { Optional<Map<String, String>> configCache = Optional.empty(); + try { - configCache = - Optional.of(pulsar().getPulsarResources().getDynamicConfigResources().getDynamicConfiguration()); + configCache = + pulsar().getPulsarResources().getDynamicConfigResources().getDynamicConfiguration(); + + // create dynamic-config if not exist. + if (!configCache.isPresent()) { + pulsar().getPulsarResources().getDynamicConfigResources() + .setDynamicConfigurationWithCreate(n -> Maps.newHashMap()); + } } catch (Exception e) { log.warn("Failed to read dynamic broker configuration", e); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java new file mode 100644 index 0000000..d3e4b2a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.junit.Assert.fail; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import java.util.Map; +import javax.ws.rs.core.Response; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class AdminApiDynamicConfigurationsTest extends MockedPulsarServiceBaseTest { + @BeforeMethod + @Override + public void setup() throws Exception { + super.internalSetup(); + } + + @AfterMethod(alwaysRun = true) + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void TestGetAllDynamicConfigurations() throws Exception { + Map<String,String> configs = admin.brokers().getAllDynamicConfigurations(); + assertNotNull(configs); + } + + @Test + public void TestDeleteDynamicConfiguration() throws Exception { + admin.brokers().deleteDynamicConfiguration("dispatcherMinReadBatchSize"); + } + + @Test + public void TestDeleteInvalidDynamicConfiguration() { + try { + admin.brokers().deleteDynamicConfiguration("errorName"); + fail("exception should be thrown"); + } catch (Exception e) { + if (e instanceof PulsarAdminException) { + assertEquals(((PulsarAdminException) e).getStatusCode(), Response.Status.PRECONDITION_FAILED.getStatusCode()); + } else { + fail("PulsarAdminException should be thrown"); + } + } + } +}
