This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e5d035ae413cbe67e0efa8afb7d62d979cc58659
Author: Michael Marshall <[email protected]>
AuthorDate: Wed Jun 1 02:14:49 2022 -0500

    Configure DLog Bookie, Pulsar, and Admin clients via pass through config 
(#15818)
    
    (cherry picked from commit aa673498f88d0ed4f9d5788a5036355834ea5119)
---
 conf/broker.conf                                   |  14 ++-
 conf/functions_worker.yml                          |  20 ++++
 conf/proxy.conf                                    |   4 +
 conf/websocket.conf                                |   4 +
 .../pulsar/broker/BookKeeperClientFactoryImpl.java |  16 ++--
 .../org/apache/pulsar/broker/PulsarService.java    |  32 ++++---
 .../pulsar/broker/namespace/NamespaceService.java  |  10 +-
 .../pulsar/broker/service/BrokerService.java       |  21 ++++-
 .../apache/pulsar/compaction/CompactorTool.java    |   6 ++
 ...kerInternalClientConfigurationOverrideTest.java | 103 +++++++++++++++++++++
 .../PulsarClientConfigurationOverrideTest.java     |  56 +++++++++++
 .../websocket/proxy/ProxyConfigurationTest.java    |   8 +-
 .../pulsar/client/admin/PulsarAdminBuilder.java    |  23 +++++
 .../admin/internal/PulsarAdminBuilderImpl.java     |   9 +-
 .../pulsar/client/internal/PropertiesUtils.java    |  64 +++++++++++++
 .../src/test/resources/test_worker_config.yml      |   3 +
 .../pulsar/functions/worker/WorkerService.java     |   7 +-
 .../pulsar/functions/worker/WorkerUtils.java       |  45 ++++++++-
 .../pulsar/functions/worker/WorkerUtilsTest.java   |  44 +++++++++
 .../pulsar/proxy/server/ProxyConnection.java       |  16 +++-
 .../apache/pulsar/websocket/WebSocketService.java  |   7 +-
 site2/docs/reference-configuration.md              |  22 +++++
 22 files changed, 489 insertions(+), 45 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 9534ed73c63..0c554ada913 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -570,6 +570,9 @@ brokerClientTlsCiphers=
 # used by the internal client to authenticate with Pulsar brokers
 brokerClientTlsProtocols=
 
+# You can add extra configuration options for the Pulsar Client and the Pulsar 
Admin Client
+# by prefixing them with "brokerClient_". These configurations are applied 
after hard coded configuration
+# and before the above brokerClient configurations named above.
 
 ### --- Authentication --- ###
 
@@ -798,8 +801,11 @@ managedLedgerDefaultWriteQuorum=2
 # Number of guaranteed copies (acks to wait before write is complete)
 managedLedgerDefaultAckQuorum=2
 
-# you can add other configuration options for the BookKeeper client
-# by prefixing them with bookkeeper_
+# You can add other configuration options for the BookKeeper client
+# by prefixing them with "bookkeeper_". These configurations are applied
+# to all bookkeeper clients started by the broker (including the managed 
ledger bookkeeper clients as well as
+# the BookkeeperPackagesStorage bookkeeper client), except the distributed log 
bookkeeper client.
+# The dlog bookkeeper client is configured in the functions worker 
configuration file.
 
 # How frequently to flush the cursor positions that were accumulated due to 
rate limiting. (seconds).
 # Default is 60 seconds
@@ -1175,6 +1181,10 @@ fileSystemURI=
 
 ### --- Deprecated config variables --- ###
 
+# When using BookKeeperPackagesStorageProvider, you can configure the
+# bookkeeper client by prefixing configurations with "bookkeeper_".
+# This config applies to managed ledger bookkeeper clients, as well.
+
 # Deprecated. Use configurationStoreServers
 globalZookeeperServers=
 
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 4dfcf586307..2376edbd2a3 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -243,3 +243,23 @@ functionsDirectory: ./functions
 
 # Should connector config be validated during during submission
 validateConnectorConfig: false
+###########################
+# Arbitrary Configuration
+###########################
+# When a configuration parameter is not explicitly named in the WorkerConfig 
class, it is only accessible from the
+# properties map. This map can be configured by supplying values to the 
properties map in this config file.
+
+# Configure the DLog bookkeeper client by prefixing configurations with 
"bookkeeper_". Because these are arbitrary, they
+# must be added to the properties map to get correctly applied. This 
configuration applies to the Dlog bookkeeper client
+# in both the standalone function workers and function workers initialized in 
the broker.
+
+# You can add extra configuration options for the Pulsar Client and the Pulsar 
Admin Client
+# by prefixing them with "brokerClient_". These configurations are applied 
after hard coded configuration
+# and before the above brokerClient configurations named above.
+
+## For example, when using the token authentication provider 
(AuthenticationProviderToken), you must configure several
+## custom configurations. Here is a sample for configuring one of the 
necessary configs:
+#properties:
+#    tokenPublicKey: "file:///path/to/my/key"
+#    tokenPublicAlg: "RSA256"
+
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 5ac664ef48b..f0fa60483af 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -126,6 +126,10 @@ tlsEnabledWithBroker=false
 # Tls cert refresh duration in seconds (set 0 to check on every new connection)
 tlsCertRefreshCheckDurationSec=300
 
+# You can add extra configuration options for the Pulsar Client
+# by prefixing them with "brokerClient_". These configurations are applied 
after hard coded configuration
+# and before the above brokerClient configurations named above.
+
 ##### --- Rate Limiting --- #####
 
 # Max concurrent inbound connections. The proxy will reject requests beyond 
that.
diff --git a/conf/websocket.conf b/conf/websocket.conf
index 8e4a59be945..85005bab46c 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -92,6 +92,10 @@ brokerClientAuthenticationPlugin=
 brokerClientAuthenticationParameters=
 brokerClientTrustCertsFilePath=
 
+# You can add extra configuration options for the Pulsar Client
+# by prefixing them with "brokerClient_". These configurations are applied 
after hard coded configuration
+# and before the above brokerClient configurations named above.
+
 # When this parameter is not empty, unauthenticated users perform as 
anonymousUserRole
 anonymousUserRole=
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index f00c9050d5f..e2bf0d91729 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -27,7 +27,6 @@ import static 
org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.RE
 import java.io.IOException;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import com.google.common.annotations.VisibleForTesting;
@@ -41,6 +40,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
 import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
@@ -141,15 +141,11 @@ public class BookKeeperClientFactoryImpl implements 
BookKeeperClientFactory {
                 conf.getBookkeeperClientGetBookieInfoIntervalSeconds(), 
TimeUnit.SECONDS);
         bkConf.setGetBookieInfoRetryIntervalSeconds(
                 conf.getBookkeeperClientGetBookieInfoRetryIntervalSeconds(), 
TimeUnit.SECONDS);
-        Properties allProps = conf.getProperties();
-        allProps.forEach((key, value) -> {
-            String sKey = key.toString();
-            if (sKey.startsWith("bookkeeper_") && value != null) {
-                String bkExtraConfigKey = sKey.substring(11);
-                log.info("Extra BookKeeper client configuration {}, setting 
{}={}", sKey, bkExtraConfigKey, value);
-                bkConf.setProperty(bkExtraConfigKey, value);
-            }
-        });
+        PropertiesUtils.filterAndMapProperties(conf.getProperties(), 
"bookkeeper_")
+                .forEach((key, value) -> {
+                    log.info("Applying BookKeeper client configuration setting 
{}={}", key, value);
+                    bkConf.setProperty(key, value);
+                });
         return bkConf;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 3cfdaf73366..4492c1d1e0b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -22,21 +22,17 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.admin.impl.NamespacesBase.getBundles;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.DefaultThreadFactory;
-
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.net.URI;
-
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -55,7 +51,6 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
-
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
@@ -68,12 +63,13 @@ import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.bookkeeper.mledger.offload.Offloaders;
-import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.mledger.offload.OffloadersCache;
+import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.ZookeeperSessionExpiredHandlers;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
@@ -108,6 +104,7 @@ import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
@@ -141,9 +138,8 @@ import 
org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
-import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
 import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
-import org.apache.pulsar.ZookeeperSessionExpiredHandlers;
+import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
 import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -1060,6 +1056,12 @@ public class PulsarService implements AutoCloseable {
                     .enableTls(this.getConfiguration().isTlsEnabled())
                     
.allowTlsInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection())
                     
.tlsTrustCertsFilePath(this.getConfiguration().getTlsCertificateFilePath());
+                // Apply all arbitrary configuration. This must be called 
before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of 
the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more 
information.
+                Map<String, Object> overrides = PropertiesUtils
+                        
.filterAndMapProperties(this.getConfiguration().getProperties(), 
"brokerClient_");
+                builder.loadConf(overrides);
 
                 if (this.getConfiguration().isBrokerClientTlsEnabled()) {
                     if 
(this.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
@@ -1098,10 +1100,16 @@ public class PulsarService implements AutoCloseable {
                             + ", webServiceAddressTls: " + webServiceAddressTls
                             + ", webServiceAddress: " + webServiceAddress);
                 }
-                PulsarAdminBuilder builder = 
PulsarAdmin.builder().serviceHttpUrl(adminApiUrl) //
-                        .authentication( //
-                                conf.getBrokerClientAuthenticationPlugin(), //
-                                
conf.getBrokerClientAuthenticationParameters());
+                PulsarAdminBuilder builder = 
PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
+
+                // Apply all arbitrary configuration. This must be called 
before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of 
the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more 
information.
+                
builder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), 
"brokerClient_"));
+
+                builder.authentication(
+                        conf.getBrokerClientAuthenticationPlugin(),
+                        conf.getBrokerClientAuthenticationParameters());
 
                 if (conf.isBrokerClientTlsEnabled()) {
                     if (conf.isBrokerClientTlsEnabledWithKeyStore()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 52cce387d55..72f233836c5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -29,13 +29,10 @@ import static 
org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_PO
 import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
 import static 
org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
 import static org.apache.pulsar.common.util.Codec.decode;
-
 import com.google.common.collect.Lists;
 import com.google.common.hash.Hashing;
-
 import io.netty.channel.EventLoopGroup;
 import io.prometheus.client.Counter;
-
 import java.net.URI;
 import java.net.URL;
 import java.util.ArrayList;
@@ -53,7 +50,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -77,6 +73,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -1256,6 +1253,11 @@ public class NamespaceService {
                     .enableTcpNoDelay(false)
                     .statsInterval(0, TimeUnit.SECONDS);
 
+                // Apply all arbitrary configuration. This must be called 
before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of 
the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more 
information.
+                
clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(),
 "brokerClient_"));
+
                 if (pulsar.getConfiguration().isAuthenticationEnabled()) {
                     
clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
                         
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 47ecd6e7cbf..29d1001e4bd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -126,6 +126,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.events.EventsTopicNames;
@@ -943,6 +944,12 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                         .enableTcpNoDelay(false)
                         
.connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker())
                         .statsInterval(0, TimeUnit.SECONDS);
+                // Apply all arbitrary configuration. This must be called 
before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of 
the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more 
information.
+                
clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(pulsar.getConfiguration().getProperties(),
+                    "brokerClient_"));
+
                 if (pulsar.getConfiguration().isAuthenticationEnabled()) {
                     
clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
                             
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
@@ -996,10 +1003,16 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
 
                 boolean isTlsUrl = conf.isBrokerClientTlsEnabled() && 
isNotBlank(data.getServiceUrlTls());
                 String adminApiUrl = isTlsUrl ? data.getServiceUrlTls() : 
data.getServiceUrl();
-                PulsarAdminBuilder builder = 
PulsarAdmin.builder().serviceHttpUrl(adminApiUrl)
-                        .authentication(
-                                conf.getBrokerClientAuthenticationPlugin(),
-                                
conf.getBrokerClientAuthenticationParameters());
+                PulsarAdminBuilder builder = 
PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
+
+                // Apply all arbitrary configuration. This must be called 
before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of 
the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more 
information.
+                
builder.loadConf(PropertiesUtils.filterAndMapProperties(conf.getProperties(), 
"brokerClient_"));
+
+                builder.authentication(
+                        conf.getBrokerClientAuthenticationPlugin(),
+                        conf.getBrokerClientAuthenticationParameters());
 
                 if (isTlsUrl) {
                     
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index 4e3b76e57de..88609157cd5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
@@ -80,6 +81,11 @@ public class CompactorTool {
 
         ClientBuilder clientBuilder = PulsarClient.builder();
 
+        // Apply all arbitrary configuration. This must be called before 
setting any fields annotated as
+        // @Secret on the ClientConfigurationData object because of the way 
they are serialized.
+        // See https://github.com/apache/pulsar/issues/8509 for more 
information.
+        
clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(brokerConfig.getProperties(),
 "brokerClient_"));
+
         if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) {
             
clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
                     brokerConfig.getBrokerClientAuthenticationParameters());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
new file mode 100644
index 00000000000..407e9ea05fd
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.Properties;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class BrokerInternalClientConfigurationOverrideTest extends 
BrokerTestBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testPulsarServiceAdminClientConfiguration() throws 
PulsarServerException {
+        Properties config = pulsar.getConfiguration().getProperties();
+        config.setProperty("brokerClient_operationTimeoutMs", "60000");
+        config.setProperty("brokerClient_statsIntervalSeconds", "10");
+        ClientConfigurationData clientConf = 
pulsar.getAdminClient().getClientConfigData();
+        Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000);
+        Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10);
+    }
+
+    @Test
+    public void testPulsarServicePulsarClientConfiguration() throws 
PulsarServerException {
+        Properties config = pulsar.getConfiguration().getProperties();
+        config.setProperty("brokerClient_operationTimeoutMs", "60000");
+        config.setProperty("brokerClient_statsIntervalSeconds", "10");
+        
pulsar.getConfiguration().setBrokerClientAuthenticationParameters("sensitive");
+        ClientConfigurationData clientConf = ((PulsarClientImpl) 
pulsar.getClient()).getConfiguration();
+        Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000);
+        // Config should override internal default, which is 0.
+        Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10);
+        Assert.assertEquals(clientConf.getAuthParams(), "sensitive");
+    }
+
+    @Test
+    public void testBrokerServicePulsarClientConfiguration() {
+        // This data only needs to have the service url for this test.
+        ClusterData data = new ClusterData("http://localhost:8080";);
+
+        // Set the configs and set some configs that won't apply
+        Properties config = pulsar.getConfiguration().getProperties();
+        config.setProperty("brokerClient_operationTimeoutMs", "60000");
+        config.setProperty("brokerClient_statsIntervalSeconds", "10");
+
+        PulsarClientImpl client = (PulsarClientImpl) pulsar.getBrokerService()
+                .getReplicationClient("test");
+        ClientConfigurationData clientConf = client.getConfiguration();
+        Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000);
+        // Config should override internal default, which is 0.
+        Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10);
+    }
+
+    @Test
+    public void testNamespaceServicePulsarClientConfiguration() {
+        // This data only needs to have the service url for this test.
+        ClusterData data = new ClusterData("http://localhost:8080";);
+
+        // Set the configs and set some configs that won't apply
+        Properties config = pulsar.getConfiguration().getProperties();
+        config.setProperty("brokerClient_operationTimeoutMs", "60000");
+        config.setProperty("brokerClient_statsIntervalSeconds", "10");
+
+        PulsarClientImpl client = 
pulsar.getNamespaceService().getNamespaceClient(data);
+        ClientConfigurationData clientConf = client.getConfiguration();
+        Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000);
+        // Config should override internal default, which is 0.
+        Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10);
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarClientConfigurationOverrideTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarClientConfigurationOverrideTest.java
new file mode 100644
index 00000000000..4f885ecc46b
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarClientConfigurationOverrideTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.internal.PropertiesUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+public class PulsarClientConfigurationOverrideTest {
+    @Test
+    public void testFilterAndMapProperties() {
+        // Create a default config
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.getProperties().setProperty("keepAliveIntervalSeconds", "15");
+        
conf.getProperties().setProperty("brokerClient_keepAliveIntervalSeconds", "25");
+
+        // Apply the filtering and mapping logic
+        Map<String, Object> result = 
PropertiesUtils.filterAndMapProperties(conf.getProperties(), "brokerClient_");
+
+        // Ensure the results match expectations
+        Assert.assertEquals(result.size(), 1, "The filtered map should have 
one entry.");
+        Assert.assertNull(result.get("brokerClient_keepAliveIntervalSeconds"),
+                "The mapped prop should not be in the result.");
+        Assert.assertEquals(result.get("keepAliveIntervalSeconds"), "25", "The 
original value is overridden.");
+
+        // Create sample ClientBuilder
+        ClientBuilder builder = PulsarClient.builder();
+        Assert.assertEquals(
+                ((ClientBuilderImpl) 
builder).getClientConfigurationData().getKeepAliveIntervalSeconds(), 30);
+        // Note: this test would fail if any @Secret fields were set before 
the loadConf and the accessed afterwards.
+        builder.loadConf(result);
+        Assert.assertEquals(
+                ((ClientBuilderImpl) 
builder).getClientConfigurationData().getKeepAliveIntervalSeconds(), 25);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
index afe91a77e3e..af2bd957c8d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
@@ -21,9 +21,7 @@ package org.apache.pulsar.websocket.proxy;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
-
 import java.util.Optional;
-
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.websocket.WebSocketService;
@@ -61,6 +59,9 @@ public class ProxyConfigurationTest extends 
ProducerConsumerBase {
     public void configTest(int numIoThreads, int connectionsPerBroker) throws 
Exception {
         config.setWebSocketNumIoThreads(numIoThreads);
         config.setWebSocketConnectionsPerBroker(connectionsPerBroker);
+        config.getProperties().setProperty("brokerClient_serviceUrl", 
"https://broker.com:8080";);
+        config.setServiceUrl("http://localhost:8080";);
+        config.getProperties().setProperty("brokerClient_requestTimeoutMs", 
"100");
         WebSocketService service = spy(new WebSocketService(config));
         
doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
         service.start();
@@ -68,6 +69,9 @@ public class ProxyConfigurationTest extends 
ProducerConsumerBase {
         PulsarClientImpl client = (PulsarClientImpl) service.getPulsarClient();
         assertEquals(client.getConfiguration().getNumIoThreads(), 
numIoThreads);
         assertEquals(client.getConfiguration().getConnectionsPerBroker(), 
connectionsPerBroker);
+        assertEquals(client.getConfiguration().getServiceUrl(), 
"http://localhost:8080";,
+                "brokerClient_ configs take precedence");
+        assertEquals(client.getConfiguration().getRequestTimeoutMs(), 100);
 
         service.close();
     }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
index 9de2d39a7ca..c1c9b800db2 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
@@ -37,6 +37,29 @@ public interface PulsarAdminBuilder {
      */
     PulsarAdmin build() throws PulsarClientException;
 
+    /**
+     * Load the configuration from provided <tt>config</tt> map.
+     *
+     * <p>Example:
+     *
+     * <pre>
+     * {@code
+     * Map<String, Object> config = new HashMap<>();
+     * config.put("serviceHttpUrl", "http://localhost:6650";);
+     *
+     * PulsarAdminBuilder builder = ...;
+     * builder = builder.loadConf(config);
+     *
+     * PulsarAdmin client = builder.build();
+     * }
+     * </pre>
+     *
+     * @param config
+     *            configuration to load
+     * @return the client builder instance
+     */
+    PulsarAdminBuilder loadConf(Map<String, Object> config);
+
     /**
      * Create a copy of the current client builder.
      * <p/>
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
index 146faaf29d7..7e552adfa49 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
@@ -29,10 +29,11 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 
 public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
 
-    protected final ClientConfigurationData conf;
+    protected ClientConfigurationData conf;
     private int connectTimeout = PulsarAdmin.DEFAULT_CONNECT_TIMEOUT_SECONDS;
     private int readTimeout = PulsarAdmin.DEFAULT_READ_TIMEOUT_SECONDS;
     private int requestTimeout = PulsarAdmin.DEFAULT_REQUEST_TIMEOUT_SECONDS;
@@ -61,6 +62,12 @@ public class PulsarAdminBuilderImpl implements 
PulsarAdminBuilder {
         return new PulsarAdminBuilderImpl(conf.clone());
     }
 
+    @Override
+    public PulsarAdminBuilder loadConf(Map<String, Object> config) {
+        conf = ConfigurationDataUtils.loadData(config, conf, 
ClientConfigurationData.class);
+        return this;
+    }
+
     @Override
     public PulsarAdminBuilder serviceHttpUrl(String serviceHttpUrl) {
         conf.setServiceUrl(serviceHttpUrl);
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PropertiesUtils.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PropertiesUtils.java
new file mode 100644
index 00000000000..4a418b1d515
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PropertiesUtils.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.internal;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Internal utility methods for filtering and mapping {@link Properties} 
objects.
+ */
+public class PropertiesUtils {
+
+    /**
+     * Filters the {@link Properties} object so that only properties with the 
configured prefix are retained,
+     * and then removes that prefix and puts the key value pairs into the 
result map.
+     * @param props - the properties object to filter
+     * @param prefix - the prefix to filter against and then remove for keys 
in the resulting map
+     * @return a map of properties
+     */
+    public static Map<String, Object> filterAndMapProperties(Properties props, 
String prefix) {
+        return filterAndMapProperties(props, prefix, "");
+    }
+
+    /**
+     * Filters the {@link Properties} object so that only properties with the 
configured prefix are retained,
+     * and then replaces the srcPrefix with the targetPrefix when putting the 
key value pairs in the resulting map.
+     * @param props - the properties object to filter
+     * @param srcPrefix - the prefix to filter against and then remove for 
keys in the resulting map
+     * @param targetPrefix - the prefix to add to keys in the result map
+     * @return a map of properties
+     */
+    public static Map<String, Object> filterAndMapProperties(Properties props, 
String srcPrefix, String targetPrefix) {
+        Map<String, Object> result = new HashMap<>();
+        int prefixLength = srcPrefix.length();
+        props.forEach((keyObject, value) -> {
+            if (!(keyObject instanceof String)) {
+                return;
+            }
+            String key = (String) keyObject;
+            if (key.startsWith(srcPrefix) && value != null) {
+                String truncatedKey = key.substring(prefixLength);
+                result.put(targetPrefix + truncatedKey, value);
+            }
+        });
+        return result;
+    }
+}
diff --git a/pulsar-functions/src/test/resources/test_worker_config.yml 
b/pulsar-functions/src/test/resources/test_worker_config.yml
index f2645f61e8d..36c762af45c 100644
--- a/pulsar-functions/src/test/resources/test_worker_config.yml
+++ b/pulsar-functions/src/test/resources/test_worker_config.yml
@@ -22,4 +22,7 @@ workerPort: 7654
 pulsarServiceUrl: pulsar://localhost:6650
 functionMetadataTopicName: test-function-metadata-topic
 numFunctionPackageReplicas: 3
+properties:
+  # Fake Bookkeeper Client config to be applied to the DLog Bookkeeper Client
+  bookkeeper_testKey: "fakeValue"
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index d0218b45790..b660ff64823 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -148,18 +148,19 @@ public class WorkerService {
                 this.brokerAdmin = 
WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
                     workerConfig.getBrokerClientAuthenticationPlugin(), 
workerConfig.getBrokerClientAuthenticationParameters(),
                     pulsarClientTlsTrustCertsFilePath, 
workerConfig.isTlsAllowInsecureConnection(),
-                    workerConfig.isTlsEnableHostnameVerification());
+                    workerConfig.isTlsEnableHostnameVerification(), 
workerConfig);
 
                 this.functionAdmin = 
WorkerUtils.getPulsarAdminClient(functionWebServiceUrl,
                     workerConfig.getBrokerClientAuthenticationPlugin(), 
workerConfig.getBrokerClientAuthenticationParameters(),
                     workerConfig.getTlsTrustCertsFilePath(), 
workerConfig.isTlsAllowInsecureConnection(),
-                    workerConfig.isTlsEnableHostnameVerification());
+                    workerConfig.isTlsEnableHostnameVerification(), 
workerConfig);
 
                 this.client = 
WorkerUtils.getPulsarClient(workerConfig.getPulsarServiceUrl(),
                         workerConfig.getBrokerClientAuthenticationPlugin(),
                         workerConfig.getBrokerClientAuthenticationParameters(),
                         workerConfig.isUseTls(), 
pulsarClientTlsTrustCertsFilePath,
-                        workerConfig.isTlsAllowInsecureConnection(), 
workerConfig.isTlsEnableHostnameVerification());
+                        workerConfig.isTlsAllowInsecureConnection(), 
workerConfig.isTlsEnableHostnameVerification(),
+                        workerConfig);
             } else {
                 this.brokerAdmin = 
WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl());
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 81659ad8095..05eb809337f 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.functions.proto.Function;
@@ -152,6 +153,13 @@ public final class WorkerUtils {
                         
workerConfig.getBookkeeperClientAuthenticationParameters());
             }
         }
+        // Map arbitrary bookkeeper client configuration into DLog Config. 
Note that this only configures the
+        // bookie client.
+        PropertiesUtils.filterAndMapProperties(workerConfig.getProperties(), 
"bookkeeper_", "bkc.")
+                .forEach((key, value) -> {
+                    log.info("Applying DLog BookKeeper client configuration 
setting {}={}", key, value);
+                    conf.setProperty(key, value);
+                });
         return conf;
     }
 
@@ -184,12 +192,20 @@ public final class WorkerUtils {
     }
 
     public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl) 
{
-        return getPulsarAdminClient(pulsarWebServiceUrl, null, null, null, 
null, null);
+        return getPulsarAdminClient(pulsarWebServiceUrl, null, null, null, 
null, null, null);
     }
 
     public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, 
String authPlugin, String authParams,
                                                    String 
tlsTrustCertsFilePath, Boolean allowTlsInsecureConnection,
                                                    Boolean 
enableTlsHostnameVerificationEnable) {
+        return getPulsarAdminClient(pulsarWebServiceUrl, authPlugin, 
authParams, tlsTrustCertsFilePath,
+                allowTlsInsecureConnection, 
enableTlsHostnameVerificationEnable, null);
+    }
+
+    public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, 
String authPlugin, String authParams,
+                                                   String 
tlsTrustCertsFilePath, Boolean allowTlsInsecureConnection,
+                                                   Boolean 
enableTlsHostnameVerificationEnable,
+                                                   WorkerConfig workerConfig) {
         log.info("Create Pulsar Admin to service url {}: "
                 + "authPlugin = {}, authParams = {}, "
                 + "tlsTrustCerts = {}, allowTlsInsecureConnector = {}, 
enableTlsHostnameVerification = {}",
@@ -197,6 +213,13 @@ public final class WorkerUtils {
             tlsTrustCertsFilePath, allowTlsInsecureConnection, 
enableTlsHostnameVerificationEnable);
         try {
             PulsarAdminBuilder adminBuilder = 
PulsarAdmin.builder().serviceHttpUrl(pulsarWebServiceUrl);
+            if (workerConfig != null) {
+                // Apply all arbitrary configuration. This must be called 
before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of 
the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more 
information.
+                adminBuilder.loadConf(
+                        
PropertiesUtils.filterAndMapProperties(workerConfig.getProperties(), 
"brokerClient_"));
+            }
             if (isNotBlank(authPlugin) && isNotBlank(authParams)) {
                 adminBuilder.authentication(authPlugin, authParams);
             }
@@ -209,6 +232,7 @@ public final class WorkerUtils {
             if (enableTlsHostnameVerificationEnable != null) {
                 
adminBuilder.enableTlsHostnameVerification(enableTlsHostnameVerificationEnable);
             }
+
             return adminBuilder.build();
         } catch (PulsarClientException e) {
             log.error("Error creating pulsar admin client", e);
@@ -218,17 +242,33 @@ public final class WorkerUtils {
 
     public static PulsarClient getPulsarClient(String pulsarServiceUrl) {
         return getPulsarClient(pulsarServiceUrl, null, null, null,
-                null, null, null);
+                null, null, null, null);
     }
 
     public static PulsarClient getPulsarClient(String pulsarServiceUrl, String 
authPlugin, String authParams,
                                                Boolean useTls, String 
tlsTrustCertsFilePath,
                                                Boolean 
allowTlsInsecureConnection,
                                                Boolean 
enableTlsHostnameVerificationEnable) {
+        return getPulsarClient(pulsarServiceUrl, authPlugin, authParams, 
useTls, tlsTrustCertsFilePath,
+                allowTlsInsecureConnection, 
enableTlsHostnameVerificationEnable, null);
+    }
+
+    public static PulsarClient getPulsarClient(String pulsarServiceUrl, String 
authPlugin, String authParams,
+                                               Boolean useTls, String 
tlsTrustCertsFilePath,
+                                               Boolean 
allowTlsInsecureConnection,
+                                               Boolean 
enableTlsHostnameVerificationEnable,
+                                               WorkerConfig workerConfig) {
 
         try {
             ClientBuilder clientBuilder = 
PulsarClient.builder().serviceUrl(pulsarServiceUrl);
 
+            if (workerConfig != null) {
+                // Apply all arbitrary configuration. This must be called 
before setting any fields annotated as
+                // @Secret on the ClientConfigurationData object because of 
the way they are serialized.
+                // See https://github.com/apache/pulsar/issues/8509 for more 
information.
+                clientBuilder.loadConf(
+                        
PropertiesUtils.filterAndMapProperties(workerConfig.getProperties(), 
"brokerClient_"));
+            }
             if (isNotBlank(authPlugin)
                     && isNotBlank(authParams)) {
                 clientBuilder.authentication(authPlugin, authParams);
@@ -245,7 +285,6 @@ public final class WorkerUtils {
             if (enableTlsHostnameVerificationEnable != null) {
                 
clientBuilder.enableTlsHostnameVerification(enableTlsHostnameVerificationEnable);
             }
-
             return clientBuilder.build();
         } catch (PulsarClientException e) {
             log.error("Error creating pulsar client", e);
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
new file mode 100644
index 00000000000..042b505472d
--- /dev/null
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import static org.testng.Assert.assertEquals;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.testng.annotations.Test;
+
+public class WorkerUtilsTest {
+
+
+    @Test
+    public void testDLogConfiguration() throws URISyntaxException, IOException 
{
+        // The config yml is seeded with a fake bookie config.
+        URL yamlUrl = 
getClass().getClassLoader().getResource("test_worker_config.yml");
+        WorkerConfig config = WorkerConfig.load(yamlUrl.toURI().getPath());
+
+        // Map the config.
+        DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(config);
+
+        // Verify the outcome.
+        assertEquals(dlogConf.getString("bkc.testKey"), "fakeValue",
+                "The bookkeeper client config mapping should apply.");
+    }
+}
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index bdfb11a9f0e..3c73284e7ed 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -27,6 +27,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
@@ -47,6 +48,8 @@ import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarChannelInitializer;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarHandler;
@@ -524,10 +527,17 @@ public class ProxyConnection extends PulsarHandler {
     }
 
     ClientConfigurationData createClientConfiguration()
-            throws PulsarClientException.UnsupportedAuthenticationException {
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl(service.getServiceUrl());
+        throws PulsarClientException.UnsupportedAuthenticationException {
+        ClientConfigurationData initialConf = new ClientConfigurationData();
+        initialConf.setServiceUrl(service.getServiceUrl());
         ProxyConfiguration proxyConfig = service.getConfiguration();
+        // Apply all arbitrary configuration. This must be called before 
setting any fields annotated as
+        // @Secret on the ClientConfigurationData object because of the way 
they are serialized.
+        // See https://github.com/apache/pulsar/issues/8509 for more 
information.
+        Map<String, Object> overrides = PropertiesUtils
+                .filterAndMapProperties(proxyConfig.getProperties(), 
"brokerClient_");
+        ClientConfigurationData clientConf = ConfigurationDataUtils
+                .loadData(overrides, initialConf, 
ClientConfigurationData.class);
         if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
             
clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
                     proxyConfig.getBrokerClientAuthenticationParameters()));
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 7b7ba26f44e..c36ad98740a 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -40,6 +40,7 @@ import 
org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -186,6 +187,11 @@ public class WebSocketService implements Closeable {
                 .ioThreads(config.getWebSocketNumIoThreads()) //
                 
.connectionsPerBroker(config.getWebSocketConnectionsPerBroker());
 
+        // Apply all arbitrary configuration. This must be called before 
setting any fields annotated as
+        // @Secret on the ClientConfigurationData object because of the way 
they are serialized.
+        // See https://github.com/apache/pulsar/issues/8509 for more 
information.
+        
clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(),
 "brokerClient_"));
+
         if (isNotBlank(config.getBrokerClientAuthenticationPlugin())
                 && 
isNotBlank(config.getBrokerClientAuthenticationParameters())) {
             
clientBuilder.authentication(config.getBrokerClientAuthenticationPlugin(),
@@ -203,7 +209,6 @@ public class WebSocketService implements Closeable {
         } else {
             clientBuilder.serviceUrl(clusterData.getServiceUrl());
         }
-
         return clientBuilder.build();
     }
 
diff --git a/site2/docs/reference-configuration.md 
b/site2/docs/reference-configuration.md
index d03cb2eae9d..7c55b744e03 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -345,6 +345,15 @@ subscriptionExpirationTimeMinutes | How long to delete 
inactive subscriptions fr
 | preciseTopicPublishRateLimiterEnable | Enable precise topic publish rate 
limiting. | false |
 | lazyCursorRecovery | Whether to recover cursors lazily when trying to 
recover a managed ledger backing a persistent topic. It can improve write 
availability of topics. The caveat is now when recovered ledger is ready to 
write we're not sure if all old consumers' last mark delete position(ack 
position) can be recovered or not. So user can make the trade off or have 
custom logic in application to checkpoint consumer state.| false |  
 
+#### Configuration Override For Clients Internal to Broker
+
+It's possible to configure some clients by using the appropriate prefix.
+
+|Prefix|Description|
+|brokerClient_| Configure **all** the broker's Pulsar Clients and Pulsar Admin 
Clients. These configurations are applied after hard coded configuration and 
before the above brokerClient configurations named above.|
+|bookkeeper_| Configure the broker's bookkeeper clients used by managed 
ledgers and the BookkeeperPackagesStorage bookkeeper client. Takes precedence 
over most other configuration values.|
+
+Note: when running the function worker within the broker, these prefixed 
configurations do not apply to any of those clients. You must instead configure 
those clients using the `functions_worker.yml`.
 
 
 ## Client
@@ -682,6 +691,12 @@ The value of 0 disables message-byte 
dispatch-throttling.|0|
 |tlsKeyFilePath |||
 |tlsTrustCertsFilePath|||
 
+#### Configuration Override For Clients Internal to WebSocket
+
+It's possible to configure some clients by using the appropriate prefix.
+
+|Prefix|Description|
+|brokerClient_| Configure **all** the broker's Pulsar Clients. These 
configurations are applied after hard coded configuration and before the above 
brokerClient configurations named above.|
 
 ## Pulsar proxy
 
@@ -746,6 +761,13 @@ The [Pulsar 
proxy](concepts-architecture-overview.md#pulsar-proxy) can be config
 | tokenAudience | The token audience stands for this broker. The field 
`tokenAudienceClaim` of a valid token need contains this parameter.| |
 | proxyLogLevel | Set the Pulsar Proxy log level. <li> If the value is set to 
0, no TCP channel information is logged. <li> If the value is set to 1, only 
the TCP channel information and command information (without message body) are 
parsed and logged. <li> If the value is set to 2, all TCP channel information, 
command information, and message body are parsed and logged. | 0 |
 
+#### Configuration Override For Clients Internal to Proxy
+
+It's possible to configure some clients by using the appropriate prefix.
+
+|Prefix|Description|
+|brokerClient_| Configure **all** the proxy's Pulsar Clients. These 
configurations are applied after hard coded configuration and before the above 
brokerClient configurations named above.|
+
 ## ZooKeeper
 
 ZooKeeper handles a broad range of essential configuration- and 
coordination-related tasks for Pulsar. The default configuration file for 
ZooKeeper is in the `conf/zookeeper.conf` file in your Pulsar installation. The 
following parameters are available:

Reply via email to