This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 14a5ee0  Don't pass any auth parameters when authentication is 
disabled (#3999)
14a5ee0 is described below

commit 14a5ee0e764cb7d76dfce8ab7ab81e827e38a9a5
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Mon Apr 8 15:49:28 2019 -0700

    Don't pass any auth parameters when authentication is disabled (#3999)
    
    * Don't pass any auth parameters when authentication is disabled
    
    * fix bug
    
    * add defensive programming
---
 .../auth/KubernetesSecretsTokenAuthProvider.java   |  10 ++
 .../functions/worker/FunctionRuntimeManager.java   |  18 ++--
 .../pulsar/functions/worker/MembershipManager.java |  23 +----
 .../org/apache/pulsar/functions/worker/Worker.java |   3 +-
 .../pulsar/functions/worker/WorkerConfig.java      |   5 +-
 .../pulsar/functions/worker/WorkerService.java     | 109 ++++++++++-----------
 .../pulsar/functions/worker/WorkerUtils.java       |  54 +++++++++-
 .../functions/worker/MembershipManagerTest.java    |  21 ++--
 8 files changed, 151 insertions(+), 92 deletions(-)

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
index e1cd3e8..606cd86 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.net.HttpURLConnection.HTTP_CONFLICT;
 import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static org.apache.commons.lang3.StringUtils.isBlank;
 import static 
org.apache.pulsar.broker.authentication.AuthenticationProviderToken.getToken;
 
 @Slf4j
@@ -115,6 +116,12 @@ public class KubernetesSecretsTokenAuthProvider implements 
KubernetesFunctionAut
         String fqfn = FunctionCommon.getFullyQualifiedName(tenant, namespace, 
name);
 
         String secretName = new String(functionAuthData.getData());
+        // Make sure secretName is empty.  Defensive programing
+        if (isBlank(secretName)) {
+            log.warn("Secret name for function {} is empty.", fqfn);
+            return;
+        }
+
         Actions.Action deleteSecrets = Actions.Action.builder()
                 .actionName(String.format("Deleting secrets for function %s", 
fqfn))
                 .numRetries(NUM_RETRIES)
@@ -125,6 +132,9 @@ public class KubernetesSecretsTokenAuthProvider implements 
KubernetesFunctionAut
                         v1DeleteOptions.setGracePeriodSeconds(0L);
                         v1DeleteOptions.setPropagationPolicy("Foreground");
 
+                        // make sure secretName is not null or empty string.
+                        // If deleteNamespacedSecret is called and secret name 
is null or empty string
+                        // it will delete all the secrets in the namespace
                         coreClient.deleteNamespacedSecret(secretName,
                                 kubeNamespace, v1DeleteOptions, "true",
                                 null, null, null);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index e4acc70..d55898f 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -120,12 +120,18 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         }
         
secretsProviderConfigurator.init(workerConfig.getSecretsProviderConfiguratorConfig());
 
-        AuthenticationConfig authConfig = AuthenticationConfig.builder()
-                
.clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin())
-                
.clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters())
-                .tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath())
-                
.useTls(workerConfig.isUseTls()).tlsAllowInsecureConnection(workerConfig.isTlsAllowInsecureConnection())
-                
.tlsHostnameVerificationEnable(workerConfig.isTlsHostnameVerificationEnable()).build();
+
+        AuthenticationConfig authConfig = null;
+        if (workerConfig.isAuthenticationEnabled()) {
+            authConfig = AuthenticationConfig.builder()
+                    
.clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin())
+                    
.clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters())
+                    
.tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath())
+                    .useTls(workerConfig.isUseTls())
+                    
.tlsAllowInsecureConnection(workerConfig.isTlsAllowInsecureConnection())
+                    
.tlsHostnameVerificationEnable(workerConfig.isTlsHostnameVerificationEnable())
+                    .build();
+        }
 
         if (workerConfig.getThreadContainerFactory() != null) {
             this.runtimeFactory = new ThreadRuntimeFactory(
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index f78513a..50a2f97 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -59,7 +59,7 @@ public class MembershipManager implements AutoCloseable, 
ConsumerEventListener {
     private final String consumerName;
     private final ConsumerImpl<byte[]> consumer;
     private final WorkerConfig workerConfig;
-    private PulsarAdmin pulsarAdminClient;
+    private PulsarAdmin pulsarAdmin;
     private final CompletableFuture<Void> firstConsumerEventFuture;
     private final AtomicBoolean isLeader = new AtomicBoolean();
 
@@ -72,9 +72,10 @@ public class MembershipManager implements AutoCloseable, 
ConsumerEventListener {
     @VisibleForTesting
     Map<Function.Instance, Long> unsignedFunctionDurations = new HashMap<>();
 
-    MembershipManager(WorkerService service, PulsarClient client)
+    MembershipManager(WorkerService service, PulsarClient client, PulsarAdmin 
pulsarAdmin)
             throws PulsarClientException {
         this.workerConfig = service.getWorkerConfig();
+        this.pulsarAdmin = pulsarAdmin;
         consumerName = String.format(
             "%s:%s:%d",
             workerConfig.getWorkerId(),
@@ -121,9 +122,8 @@ public class MembershipManager implements AutoCloseable, 
ConsumerEventListener {
 
         List<WorkerInfo> workerIds = new LinkedList<>();
         TopicStats topicStats = null;
-        PulsarAdmin pulsarAdmin = this.getPulsarAdminClient();
         try {
-            topicStats = 
pulsarAdmin.topics().getStats(this.workerConfig.getClusterCoordinationTopic());
+            topicStats = 
this.pulsarAdmin.topics().getStats(this.workerConfig.getClusterCoordinationTopic());
         } catch (PulsarAdminException e) {
             log.error("Failed to get status of coordinate topic {}",
                     this.workerConfig.getClusterCoordinationTopic(), e);
@@ -140,9 +140,8 @@ public class MembershipManager implements AutoCloseable, 
ConsumerEventListener {
 
     public WorkerInfo getLeader() {
         TopicStats topicStats = null;
-        PulsarAdmin pulsarAdmin = this.getPulsarAdminClient();
         try {
-            topicStats = 
pulsarAdmin.topics().getStats(this.workerConfig.getClusterCoordinationTopic());
+            topicStats = 
this.pulsarAdmin.topics().getStats(this.workerConfig.getClusterCoordinationTopic());
         } catch (PulsarAdminException e) {
             log.error("Failed to get status of coordinate topic {}",
                     this.workerConfig.getClusterCoordinationTopic(), e);
@@ -166,9 +165,6 @@ public class MembershipManager implements AutoCloseable, 
ConsumerEventListener {
     @Override
     public void close() throws PulsarClientException {
         consumer.close();
-        if (this.pulsarAdminClient != null) {
-            this.pulsarAdminClient.close();
-        }
     }
 
     public void checkFailures(FunctionMetaDataManager functionMetaDataManager,
@@ -283,15 +279,6 @@ public class MembershipManager implements AutoCloseable, 
ConsumerEventListener {
      * Private methods
      */
 
-    private PulsarAdmin getPulsarAdminClient() {
-        if (this.pulsarAdminClient == null) {
-            this.pulsarAdminClient = 
WorkerUtils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl(),
-                    workerConfig.getClientAuthenticationPlugin(), 
workerConfig.getClientAuthenticationParameters(),
-                    workerConfig.getTlsTrustCertsFilePath(), 
workerConfig.isTlsAllowInsecureConnection());
-        }
-        return this.pulsarAdminClient;
-    }
-
     private boolean checkLeader(WorkerService service, String consumerName) {
         try {
             TopicStats stats = service.getBrokerAdmin().topics()
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index b9814b3..52386f5 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -76,7 +76,8 @@ public class Worker {
         // initializing pulsar functions namespace
         PulsarAdmin admin = 
WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
                 workerConfig.getClientAuthenticationPlugin(), 
workerConfig.getClientAuthenticationParameters(),
-                workerConfig.getTlsTrustCertsFilePath(), 
workerConfig.isTlsAllowInsecureConnection());
+                workerConfig.getTlsTrustCertsFilePath(), 
workerConfig.isTlsAllowInsecureConnection(),
+                workerConfig.isTlsHostnameVerificationEnable());
         InternalConfigurationData internalConf;
         // make sure pulsar broker is up
         log.info("Checking if pulsar service at {} is up...", 
workerConfig.getPulsarWebServiceUrl());
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 6c42a24..dd403f6 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -261,9 +261,12 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
     private boolean tlsRequireTrustedClientCertOnConnect = false;
     @FieldContext(
         category = CATEGORY_CLIENT_SECURITY,
-        doc = "Whether to enable TLS when clients connect to broker"
+        doc = "Whether to enable TLS when clients connect to broker",
+        deprecated = true
     )
     // TLS for Functions -> Broker
+    // @deprecated use "pulsar+ssl://" in serviceUrl to enable
+    @Deprecated
     private boolean useTls = false;
     @FieldContext(
         category = CATEGORY_SECURITY,
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index c3211b2..9ec9688 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -21,18 +21,9 @@ package org.apache.pulsar.functions.worker;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
-
 import io.netty.util.concurrent.DefaultThreadFactory;
-
-import java.net.URI;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
 import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
@@ -43,10 +34,13 @@ import 
org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 
+import java.net.URI;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
 /**
  * A service component contains everything to run a worker except rest server.
  */
@@ -92,17 +86,6 @@ public class WorkerService {
                       AuthorizationService authorizationService) throws 
InterruptedException {
         log.info("Starting worker {}...", workerConfig.getWorkerId());
 
-        this.brokerAdmin = 
WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
-                workerConfig.getClientAuthenticationPlugin(), 
workerConfig.getClientAuthenticationParameters(),
-                workerConfig.getTlsTrustCertsFilePath(), 
workerConfig.isTlsAllowInsecureConnection());
-        
-        final String functionWebServiceUrl = 
StringUtils.isNotBlank(workerConfig.getFunctionWebServiceUrl())
-                ? workerConfig.getFunctionWebServiceUrl()
-                : workerConfig.getWorkerWebAddress(); 
-        this.functionAdmin = 
WorkerUtils.getPulsarAdminClient(functionWebServiceUrl,
-                workerConfig.getClientAuthenticationPlugin(), 
workerConfig.getClientAuthenticationParameters(),
-                workerConfig.getTlsTrustCertsFilePath(), 
workerConfig.isTlsAllowInsecureConnection());
-
         try {
             log.info("Worker Configs: {}", new 
ObjectMapper().writerWithDefaultPrettyPrinter()
                     .writeValueAsString(workerConfig));
@@ -110,45 +93,59 @@ public class WorkerService {
             log.warn("Failed to print worker configs with error {}", 
e.getMessage(), e);
         }
 
-        // create the dlog namespace for storing function packages
-        this.dlogUri = dlogUri;
-        DistributedLogConfiguration dlogConf = 
WorkerUtils.getDlogConf(workerConfig);
         try {
-            this.dlogNamespace = NamespaceBuilder.newBuilder()
-                    .conf(dlogConf)
-                    .clientId("function-worker-" + workerConfig.getWorkerId())
-                    .uri(this.dlogUri)
-                    .build();
-        } catch (Exception e) {
-            log.error("Failed to initialize dlog namespace {} for storing 
function packages",
-                    dlogUri, e);
-            throw new RuntimeException(e);
-        }
+            // create the dlog namespace for storing function packages
+            this.dlogUri = dlogUri;
+            DistributedLogConfiguration dlogConf = 
WorkerUtils.getDlogConf(workerConfig);
+            try {
+                this.dlogNamespace = NamespaceBuilder.newBuilder()
+                        .conf(dlogConf)
+                        .clientId("function-worker-" + 
workerConfig.getWorkerId())
+                        .uri(this.dlogUri)
+                        .build();
+            } catch (Exception e) {
+                log.error("Failed to initialize dlog namespace {} for storing 
function packages",
+                        dlogUri, e);
+                throw new RuntimeException(e);
+            }
 
-        // create the state storage client for accessing function state
-        if (workerConfig.getStateStorageServiceUrl() != null) {
-            StorageClientSettings clientSettings = 
StorageClientSettings.newBuilder()
-                .serviceUri(workerConfig.getStateStorageServiceUrl())
-                .build();
-            this.stateStoreAdminClient = StorageClientBuilder.newBuilder()
-                .withSettings(clientSettings)
-                .buildAdmin();
-        }
+            // create the state storage client for accessing function state
+            if (workerConfig.getStateStorageServiceUrl() != null) {
+                StorageClientSettings clientSettings = 
StorageClientSettings.newBuilder()
+                        .serviceUri(workerConfig.getStateStorageServiceUrl())
+                        .build();
+                this.stateStoreAdminClient = StorageClientBuilder.newBuilder()
+                        .withSettings(clientSettings)
+                        .buildAdmin();
+            }
 
-        // initialize the function metadata manager
-        try {
+            final String functionWebServiceUrl = 
StringUtils.isNotBlank(workerConfig.getFunctionWebServiceUrl())
+                    ? workerConfig.getFunctionWebServiceUrl()
+                    : workerConfig.getWorkerWebAddress();
+
+            if (workerConfig.isAuthenticationEnabled()) {
+                this.brokerAdmin = 
WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
+                    workerConfig.getClientAuthenticationPlugin(), 
workerConfig.getClientAuthenticationParameters(),
+                    workerConfig.getTlsTrustCertsFilePath(), 
workerConfig.isTlsAllowInsecureConnection(),
+                    workerConfig.isTlsHostnameVerificationEnable());
+
+                this.functionAdmin = 
WorkerUtils.getPulsarAdminClient(functionWebServiceUrl,
+                    workerConfig.getClientAuthenticationPlugin(), 
workerConfig.getClientAuthenticationParameters(),
+                    workerConfig.getTlsTrustCertsFilePath(), 
workerConfig.isTlsAllowInsecureConnection(),
+                    workerConfig.isTlsHostnameVerificationEnable());
+
+                this.client = 
WorkerUtils.getPulsarClient(this.workerConfig.getPulsarServiceUrl(),
+                        workerConfig.getClientAuthenticationPlugin(),
+                        workerConfig.getClientAuthenticationParameters(),
+                        workerConfig.isUseTls(), 
workerConfig.getTlsTrustCertsFilePath(),
+                        workerConfig.isTlsAllowInsecureConnection(), 
workerConfig.isTlsHostnameVerificationEnable());
+            } else {
+                this.brokerAdmin = 
WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl());
+
+                this.functionAdmin = 
WorkerUtils.getPulsarAdminClient(functionWebServiceUrl);
 
-            ClientBuilder clientBuilder = 
PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
-            if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
-                    && 
isNotBlank(workerConfig.getClientAuthenticationParameters())) {
-                
clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
-                        workerConfig.getClientAuthenticationParameters());
+                this.client = 
WorkerUtils.getPulsarClient(this.workerConfig.getPulsarServiceUrl());
             }
-            clientBuilder.enableTls(workerConfig.isUseTls());
-            
clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection());
-            
clientBuilder.tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath());
-            
clientBuilder.enableTlsHostnameVerification(workerConfig.isTlsHostnameVerificationEnable());
-            this.client = clientBuilder.build();
             log.info("Created Pulsar client");
 
             //create scheduler manager
@@ -162,7 +159,7 @@ public class WorkerService {
             this.connectorsManager = new ConnectorsManager(workerConfig);
 
             //create membership manager
-            this.membershipManager = new MembershipManager(this, this.client);
+            this.membershipManager = new MembershipManager(this, this.client, 
this.brokerAdmin);
 
             // create function runtime manager
             this.functionRuntimeManager = new FunctionRuntimeManager(
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index cae82cd..afbbdd1 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -29,6 +29,8 @@ import org.apache.distributedlog.impl.metadata.BKDLConfig;
 import org.apache.distributedlog.metadata.DLMetadata;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.functions.proto.Function;
@@ -154,7 +156,13 @@ public final class WorkerUtils {
         return dlogUri;
     }
 
-    public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, 
String authPlugin, String authParams, String tlsTrustCertsFilePath, boolean 
allowTlsInsecureConnection) {
+    public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl) 
{
+        return getPulsarAdminClient(pulsarWebServiceUrl, null, null, null, 
null, null);
+    }
+
+    public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, 
String authPlugin, String authParams,
+                                                   String 
tlsTrustCertsFilePath, Boolean allowTlsInsecureConnection,
+                                                   Boolean 
enableTlsHostnameVerificationEnable) {
         try {
             PulsarAdminBuilder adminBuilder = 
PulsarAdmin.builder().serviceHttpUrl(pulsarWebServiceUrl);
             if (isNotBlank(authPlugin) && isNotBlank(authParams)) {
@@ -163,7 +171,12 @@ public final class WorkerUtils {
             if (isNotBlank(tlsTrustCertsFilePath)) {
                 adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
             }
-            
adminBuilder.allowTlsInsecureConnection(allowTlsInsecureConnection);
+            if (allowTlsInsecureConnection != null) {
+                
adminBuilder.allowTlsInsecureConnection(allowTlsInsecureConnection);
+            }
+            if (enableTlsHostnameVerificationEnable != null) {
+                
adminBuilder.enableTlsHostnameVerification(enableTlsHostnameVerificationEnable);
+            }
             return adminBuilder.build();
         } catch (PulsarClientException e) {
             log.error("Error creating pulsar admin client", e);
@@ -171,6 +184,43 @@ public final class WorkerUtils {
         }
     }
 
+    public static PulsarClient getPulsarClient(String pulsarServiceUrl) {
+        return getPulsarClient(pulsarServiceUrl, null, null, null,
+                null, null, null);
+    }
+
+    public static PulsarClient getPulsarClient(String pulsarServiceUrl, String 
authPlugin, String authParams,
+                                               Boolean useTls, String 
tlsTrustCertsFilePath,
+                                               Boolean 
allowTlsInsecureConnection,
+                                               Boolean 
enableTlsHostnameVerificationEnable) {
+
+        try {
+            ClientBuilder clientBuilder = 
PulsarClient.builder().serviceUrl(pulsarServiceUrl);
+
+            if (isNotBlank(authPlugin)
+                    && isNotBlank(authParams)) {
+                clientBuilder.authentication(authPlugin, authParams);
+            }
+            if (useTls != null) {
+                clientBuilder.enableTls(useTls);
+            }
+            if (allowTlsInsecureConnection != null) {
+                
clientBuilder.allowTlsInsecureConnection(allowTlsInsecureConnection);
+            }
+            if (isNotBlank(tlsTrustCertsFilePath)) {
+                clientBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
+            }
+            if (enableTlsHostnameVerificationEnable != null) {
+                
clientBuilder.enableTlsHostnameVerification(enableTlsHostnameVerificationEnable);
+            }
+
+            return clientBuilder.build();
+        } catch (PulsarClientException e) {
+            log.error("Error creating pulsar client", e);
+            throw new RuntimeException(e);
+        }
+    }
+
     public static FunctionStats.FunctionInstanceStats 
getFunctionInstanceStats(String fullyQualifiedInstanceName,
                                                                                
FunctionRuntimeInfo functionRuntimeInfo,
                                                                                
int instanceId) {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 527557b..cfbfa47 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -68,6 +68,7 @@ public class MembershipManagerTest {
     @Test
     public void testConsumerEventListener() throws Exception {
         PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+        PulsarAdmin mockAdmin = mock(PulsarAdmin.class);
 
         ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
         ConsumerBuilder<byte[]> mockConsumerBuilder = 
mock(ConsumerBuilder.class);
@@ -92,7 +93,7 @@ public class MembershipManagerTest {
 
         when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
 
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockClient));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockClient, mockAdmin));
         assertFalse(membershipManager.isLeader());
         verify(mockClient, times(1))
             .newConsumer();
@@ -137,7 +138,8 @@ public class MembershipManagerTest {
         WorkerService workerService = mock(WorkerService.class);
         doReturn(pulsarClient).when(workerService).getClient();
         doReturn(workerConfig).when(workerService).getWorkerConfig();
-        
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+        PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
+        doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
 
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
         FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
@@ -147,7 +149,7 @@ public class MembershipManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 functionMetaDataManager));
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerService, pulsarClient));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerService, pulsarClient, pulsarAdmin));
 
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
@@ -208,7 +210,8 @@ public class MembershipManagerTest {
         WorkerService workerService = mock(WorkerService.class);
         doReturn(pulsarClient).when(workerService).getClient();
         doReturn(workerConfig).when(workerService).getWorkerConfig();
-        
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+        PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
+        doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
 
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
         FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
@@ -219,7 +222,7 @@ public class MembershipManagerTest {
                 mock(ConnectorsManager.class),
                 functionMetaDataManager));
 
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockPulsarClient()));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
 
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
@@ -304,7 +307,8 @@ public class MembershipManagerTest {
         WorkerService workerService = mock(WorkerService.class);
         doReturn(pulsarClient).when(workerService).getClient();
         doReturn(workerConfig).when(workerService).getWorkerConfig();
-        
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+        PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
+        doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
 
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
         FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
@@ -314,7 +318,7 @@ public class MembershipManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 functionMetaDataManager));
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockPulsarClient()));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
 
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
@@ -379,6 +383,7 @@ public class MembershipManagerTest {
         WorkerService workerService = mock(WorkerService.class);
         doReturn(pulsarClient).when(workerService).getClient();
         doReturn(workerConfig).when(workerService).getWorkerConfig();
+        PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
         
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
 
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
@@ -389,7 +394,7 @@ public class MembershipManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 functionMetaDataManager));
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockPulsarClient()));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
 
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));

Reply via email to