This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new b4a95c2  Can't create functions with m-TLS (#9553)
b4a95c2 is described below

commit b4a95c2f83e0a7620e65a16a312a40380995de76
Author: Yong Zhang <[email protected]>
AuthorDate: Sun Feb 21 20:37:00 2021 +0800

    Can't create functions with m-TLS (#9553)
    
    * Can't create functions with m-TLS
    ---
    
    *Motivation*
    
    We need the same fix on the branch 2.7.0. When cherry-picking the PR #9260
    to the branch 2.7, we have a few conflict to fix it in the branch 2.7. So
    we do the same fix on the branch 2.7.
---
 .github/workflows/ci-build-multi-os.yaml           |   2 +-
 .github/workflows/ci-go-functions-style.yaml       |   1 -
 .github/workflows/ci-go-functions-test.yaml        |   1 -
 .github/workflows/ci-license.yaml                  |   1 +
 .github/workflows/ci-unit-broker-broker-gp2.yaml   |   1 +
 .github/workflows/ci-unit-broker-client-api.yaml   |   1 +
 .github/workflows/ci-unit-broker-client-impl.yaml  |   1 +
 .github/workflows/ci-unit-broker-other.yaml        |   1 +
 .github/workflows/ci-unit-proxy.yaml               |   1 +
 .github/workflows/ci-unit.yaml                     |   1 +
 .../org/apache/pulsar/PulsarBrokerStarter.java     |   4 +
 .../java/org/apache/pulsar/PulsarStandalone.java   |   4 +
 .../org/apache/pulsar/broker/PulsarService.java    |   7 +
 .../functions/worker/PulsarFunctionTlsTest.java    | 306 +++++++++++++++++++++
 .../pulsar/client/admin/internal/BaseResource.java |   4 +
 .../apache/pulsar/common/functions/WorkerInfo.java |  12 +-
 .../pulsar/functions/worker/WorkerConfig.java      |   6 +-
 .../pulsar/functions/worker/LeaderService.java     |   2 +-
 .../pulsar/functions/worker/WorkerService.java     |   3 +-
 .../pulsar/functions/worker/WorkerUtils.java       |   5 +
 .../pulsar/functions/worker/rest/WorkerServer.java |   2 +-
 21 files changed, 356 insertions(+), 10 deletions(-)

diff --git a/.github/workflows/ci-build-multi-os.yaml 
b/.github/workflows/ci-build-multi-os.yaml
index ec6a67e..9a2e11b 100644
--- a/.github/workflows/ci-build-multi-os.yaml
+++ b/.github/workflows/ci-build-multi-os.yaml
@@ -68,4 +68,4 @@ jobs:
 
       - name: build package
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn clean install -DskipTests
\ No newline at end of file
+        run: mvn clean install -DskipTests
diff --git a/.github/workflows/ci-go-functions-style.yaml 
b/.github/workflows/ci-go-functions-style.yaml
index 450b098..05ed581 100644
--- a/.github/workflows/ci-go-functions-style.yaml
+++ b/.github/workflows/ci-go-functions-style.yaml
@@ -23,7 +23,6 @@ on:
     branches:
       - master
     paths:
-      - '.github/workflows/**'
       - 'pulsar-function-go/**'
   push:
     branches:
diff --git a/.github/workflows/ci-go-functions-test.yaml 
b/.github/workflows/ci-go-functions-test.yaml
index e14b4b3..41151b1 100644
--- a/.github/workflows/ci-go-functions-test.yaml
+++ b/.github/workflows/ci-go-functions-test.yaml
@@ -23,7 +23,6 @@ on:
     branches:
       - master
     paths:
-      - '.github/workflows/**'
       - 'pulsar-function-go/**'
   push:
     branches:
diff --git a/.github/workflows/ci-license.yaml 
b/.github/workflows/ci-license.yaml
index 2cfcaf2..298a2f5 100644
--- a/.github/workflows/ci-license.yaml
+++ b/.github/workflows/ci-license.yaml
@@ -22,6 +22,7 @@ on:
   pull_request:
     branches:
       - master
+      - branch-*
   push:
     branches:
       - branch-*
diff --git a/.github/workflows/ci-unit-broker-broker-gp2.yaml 
b/.github/workflows/ci-unit-broker-broker-gp2.yaml
index 00387d7..31bb6ff 100644
--- a/.github/workflows/ci-unit-broker-broker-gp2.yaml
+++ b/.github/workflows/ci-unit-broker-broker-gp2.yaml
@@ -22,6 +22,7 @@ on:
   pull_request:
     branches:
       - master
+      - branch-*
   push:
     branches:
       - branch-*
diff --git a/.github/workflows/ci-unit-broker-client-api.yaml 
b/.github/workflows/ci-unit-broker-client-api.yaml
index b228c53..d8abb00 100644
--- a/.github/workflows/ci-unit-broker-client-api.yaml
+++ b/.github/workflows/ci-unit-broker-client-api.yaml
@@ -22,6 +22,7 @@ on:
   pull_request:
     branches:
       - master
+      - branch-*
   push:
     branches:
       - branch-*
diff --git a/.github/workflows/ci-unit-broker-client-impl.yaml 
b/.github/workflows/ci-unit-broker-client-impl.yaml
index 4ffc83c..5563071 100644
--- a/.github/workflows/ci-unit-broker-client-impl.yaml
+++ b/.github/workflows/ci-unit-broker-client-impl.yaml
@@ -22,6 +22,7 @@ on:
   pull_request:
     branches:
       - master
+      - branch-*
   push:
     branches:
       - branch-*
diff --git a/.github/workflows/ci-unit-broker-other.yaml 
b/.github/workflows/ci-unit-broker-other.yaml
index abc22f4..b95e590 100644
--- a/.github/workflows/ci-unit-broker-other.yaml
+++ b/.github/workflows/ci-unit-broker-other.yaml
@@ -22,6 +22,7 @@ on:
   pull_request:
     branches:
       - master
+      - branch-*
   push:
     branches:
       - branch-*
diff --git a/.github/workflows/ci-unit-proxy.yaml 
b/.github/workflows/ci-unit-proxy.yaml
index 1fa940e..6f0b34b 100644
--- a/.github/workflows/ci-unit-proxy.yaml
+++ b/.github/workflows/ci-unit-proxy.yaml
@@ -22,6 +22,7 @@ on:
   pull_request:
     branches:
       - master
+      - branch-*
   push:
     branches:
       - branch-*
diff --git a/.github/workflows/ci-unit.yaml b/.github/workflows/ci-unit.yaml
index 7f96490..c7e2650 100644
--- a/.github/workflows/ci-unit.yaml
+++ b/.github/workflows/ci-unit.yaml
@@ -22,6 +22,7 @@ on:
   pull_request:
     branches:
       - master
+      - branch-*
   push:
     branches:
       - branch-*
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index c2f2b1c..96b2eac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -168,6 +168,10 @@ public class PulsarBrokerStarter {
                 } else {
                     workerConfig = 
WorkerConfig.load(starterArguments.fnWorkerConfigFile);
                 }
+                brokerConfig.getWebServicePort()
+                    .map(port -> workerConfig.setWorkerPort(port));
+                brokerConfig.getWebServicePortTls()
+                    .map(port -> workerConfig.setWorkerPortTls(port));
                 // worker talks to local broker
                 String hostname = 
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
                     brokerConfig.getAdvertisedAddress());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index 49877e3..d5ec2eb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -277,6 +277,10 @@ public class PulsarStandalone implements AutoCloseable {
             } else if (workerConfig.getStateStorageServiceUrl() == null) {
                 workerConfig.setStateStorageServiceUrl("bk://127.0.0.1:" + 
this.getStreamStoragePort());
             }
+            config.getWebServicePort()
+                .map(port -> workerConfig.setWorkerPort(port));
+            config.getWebServicePortTls()
+                .map(port -> workerConfig.setWorkerPortTls(port));
 
             String hostname = 
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
                 config.getAdvertisedAddress());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index eb7f849..3004adb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1235,10 +1235,17 @@ public class PulsarService implements AutoCloseable {
             if (workerConfig.isUseTls()) {
                 workerConfig.setPulsarServiceUrl(brokerServiceUrlTls);
                 workerConfig.setPulsarWebServiceUrl(webServiceAddressTls);
+                workerConfig.setFunctionWebServiceUrl(webServiceAddressTls);
             } else {
                 workerConfig.setPulsarServiceUrl(brokerServiceUrl);
                 workerConfig.setPulsarWebServiceUrl(webServiceAddress);
+                workerConfig.setFunctionWebServiceUrl(webServiceAddress);
             }
+            LOG.info("Starting function worker service: serviceUrl = {},"
+                    + " webServiceUrl = {}, functionWebServiceUrl = {}",
+                workerConfig.getPulsarServiceUrl(),
+                workerConfig.getPulsarWebServiceUrl(),
+                workerConfig.getFunctionWebServiceUrl());
             String namespace = functionWorkerService.get()
                     .getWorkerConfig().getPulsarFunctionsNamespace();
             String[] a = 
functionWorkerService.get().getWorkerConfig().getPulsarFunctionsNamespace().split("/");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
new file mode 100644
index 0000000..0394196
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.junit.Assert.assertEquals;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.net.MalformedURLException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.util.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class PulsarFunctionTlsTest {
+
+    protected static final int BROKER_COUNT = 2;
+
+    private static final String TLS_SERVER_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/broker-cert.pem";
+    private static final String TLS_SERVER_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/broker-key.pem";
+    private static final String TLS_CLIENT_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/client-cert.pem";
+    private static final String TLS_CLIENT_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/client-key.pem";
+
+    LocalBookkeeperEnsemble bkEnsemble;
+    protected PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT];
+    protected ServiceConfiguration[] configurations = new 
ServiceConfiguration[BROKER_COUNT];
+    protected PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT];
+    protected PulsarService leaderPulsar;
+    protected PulsarAdmin leaderAdmin;
+    protected String testCluster = "my-cluster";
+    protected String testTenant = "my-tenant";
+    protected String testNamespace = testTenant + "/my-ns";
+
+    @BeforeMethod
+    void setup() throws Exception {
+        log.info("---- Initializing TopicOwnerTest -----");
+        // Start local bookkeeper ensemble
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble.start();
+
+        // start brokers
+        for (int i = 0; i < BROKER_COUNT; i++) {
+            int brokerPort = PortManager.nextFreePort();
+            int webPort = PortManager.nextFreePort();
+
+            ServiceConfiguration config = new ServiceConfiguration();
+            config.setWebServicePort(Optional.empty());
+            config.setWebServicePortTls(Optional.of(webPort));
+            config.setBrokerServicePort(Optional.empty());
+            config.setBrokerServicePortTls(Optional.of(brokerPort));
+            config.setClusterName("my-cluster");
+            config.setAdvertisedAddress("localhost");
+            config.setZookeeperServers("127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+            config.setDefaultNumberOfNamespaceBundles(1);
+            config.setLoadBalancerEnabled(false);
+            Set<String> superUsers = Sets.newHashSet("superUser");
+            config.setSuperUserRoles(superUsers);
+            Set<String> providers = new HashSet<>();
+            providers.add(AuthenticationProviderTls.class.getName());
+            config.setAuthenticationEnabled(true);
+            config.setAuthorizationEnabled(true);
+            config.setAuthenticationProviders(providers);
+            config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+            config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+            config.setTlsAllowInsecureConnection(true);
+            config.setBrokerClientTlsEnabled(true);
+            
config.setBrokerClientTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
+            
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+            config.setBrokerClientAuthenticationParameters(
+                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + ",tlsKeyFile:" + 
TLS_CLIENT_KEY_FILE_PATH);
+            config.setFunctionsWorkerEnabled(true);
+            config.setTlsEnabled(true);
+
+            WorkerConfig workerConfig = new WorkerConfig();
+            ServiceConfiguration brokerConfig = config;
+            brokerConfig.getWebServicePort()
+                .map(port -> workerConfig.setWorkerPort(port));
+            brokerConfig.getWebServicePortTls()
+                .map(port -> workerConfig.setWorkerPortTls(port));
+
+            // worker talks to local broker
+            String hostname = 
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
+                brokerConfig.getAdvertisedAddress());
+            workerConfig.setWorkerHostname(hostname);
+            // inherit broker authorization setting
+            
workerConfig.setAuthenticationEnabled(brokerConfig.isAuthenticationEnabled());
+            
workerConfig.setAuthenticationProviders(brokerConfig.getAuthenticationProviders());
+            
workerConfig.setAuthorizationEnabled(brokerConfig.isAuthorizationEnabled());
+            
workerConfig.setAuthorizationProvider(brokerConfig.getAuthorizationProvider());
+            
workerConfig.setConfigurationStoreServers(brokerConfig.getConfigurationStoreServers());
+            
workerConfig.setZooKeeperSessionTimeoutMillis(brokerConfig.getZooKeeperSessionTimeoutMillis());
+            
workerConfig.setZooKeeperOperationTimeoutSeconds(brokerConfig.getZooKeeperOperationTimeoutSeconds());
+
+            
workerConfig.setTlsAllowInsecureConnection(brokerConfig.isTlsAllowInsecureConnection());
+            workerConfig.setTlsEnabled(brokerConfig.isTlsEnabled());
+            workerConfig.setTlsEnableHostnameVerification(false);
+            
workerConfig.setBrokerClientTrustCertsFilePath(brokerConfig.getTlsTrustCertsFilePath());
+
+            // client in worker will use this config to authenticate with 
broker
+            
workerConfig.setBrokerClientAuthenticationPlugin(brokerConfig.getBrokerClientAuthenticationPlugin());
+            
workerConfig.setBrokerClientAuthenticationParameters(brokerConfig.getBrokerClientAuthenticationParameters());
+
+            // inherit super users
+            workerConfig.setSuperUserRoles(brokerConfig.getSuperUserRoles());
+            workerConfig.setWorkerId(
+                "c-" + brokerConfig.getClusterName()
+                    + "-fw-" + hostname
+                    + "-" + (workerConfig.getTlsEnabled()
+                    ? workerConfig.getWorkerPortTls() : 
workerConfig.getWorkerPort()));
+
+            workerConfig.setPulsarFunctionsNamespace("public/functions");
+            workerConfig.setPulsarFunctionsCluster("my-cluster");
+            workerConfig.setSchedulerClassName(
+                
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
+            
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+            workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                    new 
ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
+            workerConfig.setFailureCheckFreqMs(100);
+            workerConfig.setNumFunctionPackageReplicas(1);
+            workerConfig.setClusterCoordinationTopicName("coordinate");
+            workerConfig.setFunctionAssignmentTopicName("assignment");
+            workerConfig.setFunctionMetadataTopicName("metadata");
+            workerConfig.setInstanceLivenessCheckFreqMs(100);
+            workerConfig.setBrokerClientAuthenticationEnabled(true);
+            workerConfig.setTlsEnabled(true);
+            workerConfig.setUseTls(true);
+            WorkerService fnWorkerService = new WorkerService(workerConfig);
+
+            configurations[i] = config;
+
+            pulsarServices[i] = new PulsarService(
+                config, Optional.of(fnWorkerService), code -> {});
+            pulsarServices[i].start();
+
+            // Sleep until pulsarServices[0] becomes leader, this way we can 
spy namespace bundle assignment easily.
+            if (i == 0) {
+                Awaitility.await()
+                    .pollInterval(Duration.ofSeconds(1))
+                    .until(() -> {
+                        log.info("Waiting to become to leader...");
+                        return 
pulsarServices[0].getLeaderElectionService().isLeader();
+                    });
+            }
+
+            Map<String, String> authParams = new HashMap<>();
+            authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+            authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+            Authentication authTls = new AuthenticationTls();
+            authTls.configure(authParams);
+
+            pulsarAdmins[i] = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsarServices[i].getWebServiceAddressTls())
+                .tlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH)
+                .allowTlsInsecureConnection(true)
+                .authentication(authTls)
+                .build();
+        }
+        leaderPulsar = pulsarServices[0];
+        leaderAdmin = pulsarAdmins[0];
+
+        TenantInfo tenantInfo = new TenantInfo();
+        tenantInfo.setAllowedClusters(Sets.newHashSet(testCluster));
+        pulsarAdmins[0].tenants().createTenant(testTenant, tenantInfo);
+        pulsarAdmins[0].namespaces().createNamespace(testNamespace, 16);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    void tearDown() throws Exception {
+        for (int i = 0; i < BROKER_COUNT; i++) {
+            if (pulsarServices[i] != null) {
+                pulsarServices[i].close();
+            }
+            if (pulsarAdmins[i] != null) {
+                pulsarAdmins[i].close();
+            }
+        }
+        bkEnsemble.stop();
+    }
+
+    @Test
+    public void testFunctionsCreation() throws Exception {
+        log.info("Starting functions creation testing...");
+        TimeUnit.SECONDS.sleep(10);
+        String jarFilePathUrl = String.format("%s:%s", 
org.apache.pulsar.common.functions.Utils.FILE,
+                
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
+
+        for (int i = 0; i < BROKER_COUNT; i++) {
+            pulsarServices[i].getFunctionWorkerService().ifPresent(f -> {
+                Awaitility.await().pollInterval(Duration.ofSeconds(1))
+                    .until(() -> {
+                        log.info("Checking the function worker service is 
starting.");
+                        return 
f.getFunctionAdmin().functions().getFunctions(testTenant, "my-ns").size() == 0;
+                    });
+            });
+            String functionName = "function-" + i;
+            FunctionConfig functionConfig = 
createFunctionConfig(jarFilePathUrl, testTenant, "my-ns",
+                functionName, "my.*", "sink-topic-" + i, "sub-" + i);
+
+            log.info(" -------- Start test function : {}", functionName);
+
+            final PulsarAdmin admin = pulsarAdmins[i];
+            Awaitility.await().pollInterval(Duration.ofSeconds(1))
+                .until(() -> {
+                    try {
+                        admin.functions().createFunctionWithUrl(
+                            functionConfig, jarFilePathUrl
+                        );
+                        return true;
+                    } catch (Exception e) {
+                        return false;
+                    }
+                });
+
+            FunctionConfig config = 
pulsarAdmins[i].functions().getFunction(testTenant, "my-ns", functionName);
+            assertEquals(config.getTenant(), testTenant);
+            assertEquals(config.getNamespace(), "my-ns");
+            assertEquals(config.getName(), functionName);
+
+            pulsarAdmins[i].functions().deleteFunction(config.getTenant(), 
config.getNamespace(), config.getName());
+        }
+    }
+
+    protected static FunctionConfig createFunctionConfig(
+        String jarFile,
+        String tenant,
+        String namespace,
+        String functionName,
+        String sourceTopic,
+        String sinkTopic,
+        String subscriptionName
+    ) throws JsonProcessingException {
+        File file = new File(jarFile);
+        try {
+            ClassLoaderUtils.loadJar(file);
+        } catch (MalformedURLException e) {
+            throw new RuntimeException("Failed to load user jar " + file, e);
+        }
+        String sourceTopicPattern = String.format("persistent://%s/%s/%s", 
tenant, namespace, sourceTopic);
+
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(functionName);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setParallelism(1);
+        functionConfig.setClassName(IdentityFunction.class.getName());
+        
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+        functionConfig.setSubName(subscriptionName);
+        functionConfig.setTopicsPattern(sourceTopicPattern);
+        functionConfig.setAutoAck(true);
+        functionConfig.setOutput(sinkTopic);
+
+        log.info("Function Config: {}", new 
ObjectMapper().writerWithDefaultPrettyPrinter()
+            .writeValueAsString(functionConfig));
+
+        return functionConfig;
+    }
+
+}
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index 8f2517a..ac63aae 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -203,6 +203,10 @@ public abstract class BaseResource {
             // Handle 5xx exceptions
             if (e instanceof ServerErrorException) {
                 ServerErrorException see = (ServerErrorException) e;
+                if (((ServerErrorException) e).getResponse().hasEntity()) {
+                    return new ServerSideErrorException(see, e.getMessage()
+                        + ((ServerErrorException) 
e).getResponse().getEntity().toString());
+                }
                 return new ServerSideErrorException(see, e.getMessage());
             } else if (e instanceof ClientErrorException) {
                 // Handle 4xx exceptions
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WorkerInfo.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WorkerInfo.java
index 245cb7d..c0f5113 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WorkerInfo.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WorkerInfo.java
@@ -23,6 +23,7 @@ import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * Worker information.
@@ -31,6 +32,7 @@ import lombok.ToString;
 @AllArgsConstructor(access = AccessLevel.PRIVATE)
 @NoArgsConstructor
 @ToString
+@Slf4j
 public class WorkerInfo {
     private String workerId;
     private String workerHostname;
@@ -48,8 +50,12 @@ public class WorkerInfo {
 
         String workerId = tokens[0];
         String workerHostname = tokens[1];
-        int port = Integer.parseInt(tokens[2]);
-
-        return new WorkerInfo(workerId, workerHostname, port);
+        try {
+            int port = Integer.parseInt(tokens[2]);
+            return new WorkerInfo(workerId, workerHostname, port);
+        } catch (NumberFormatException nfe) {
+            log.warn("Invalid worker info : {}", str);
+            throw nfe;
+        }
     }
 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 18f1a96..222ea9a 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -395,7 +395,7 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
     private Properties properties = new Properties();
 
     public boolean getTlsEnabled() {
-       return tlsEnabled || workerPortTls != null;
+       return tlsEnabled && workerPortTls != null;
     }
 
     /******** security settings for pulsar broker client **********/
@@ -532,6 +532,10 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
         return String.format("http://%s:%d";, this.getWorkerHostname(), 
this.getWorkerPort());
     }
 
+    public String getWorkerWebAddressTls() {
+        return String.format("https://%s:%d";, this.getWorkerHostname(), 
this.getWorkerPortTls());
+    }
+
     public static String unsafeLocalhostResolve() {
         try {
             // Get the fully qualified hostname
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
index 9d3d469..b0d7585 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
@@ -62,7 +62,7 @@ public class LeaderService implements AutoCloseable, 
ConsumerEventListener {
                 "%s:%s:%d",
                 workerConfig.getWorkerId(),
                 workerConfig.getWorkerHostname(),
-                workerConfig.getWorkerPort()
+                workerConfig.getTlsEnabled() ? workerConfig.getWorkerPortTls() 
: workerConfig.getWorkerPort()
         );
 
     }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 4255d89..4af8aad 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -131,7 +131,8 @@ public class WorkerService {
 
             final String functionWebServiceUrl = 
StringUtils.isNotBlank(workerConfig.getFunctionWebServiceUrl())
                     ? workerConfig.getFunctionWebServiceUrl()
-                    : workerConfig.getWorkerWebAddress();
+                    : (workerConfig.getTlsEnabled()
+                ? workerConfig.getWorkerWebAddressTls() : 
workerConfig.getWorkerWebAddress());
 
              // using isBrokerClientAuthenticationEnabled instead of 
isAuthenticationEnabled in function-worker
             if (workerConfig.isBrokerClientAuthenticationEnabled()) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index bbf101a..81659ad 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -190,6 +190,11 @@ public final class WorkerUtils {
     public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, 
String authPlugin, String authParams,
                                                    String 
tlsTrustCertsFilePath, Boolean allowTlsInsecureConnection,
                                                    Boolean 
enableTlsHostnameVerificationEnable) {
+        log.info("Create Pulsar Admin to service url {}: "
+                + "authPlugin = {}, authParams = {}, "
+                + "tlsTrustCerts = {}, allowTlsInsecureConnector = {}, 
enableTlsHostnameVerification = {}",
+            pulsarWebServiceUrl, authPlugin, authParams,
+            tlsTrustCertsFilePath, allowTlsInsecureConnection, 
enableTlsHostnameVerificationEnable);
         try {
             PulsarAdminBuilder adminBuilder = 
PulsarAdmin.builder().serviceHttpUrl(pulsarWebServiceUrl);
             if (isNotBlank(authPlugin) && isNotBlank(authParams)) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 51ddafc..19a5581 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -132,7 +132,7 @@ public class WorkerServer {
         handlers.add(stats);
         server.setHandler(stats);
 
-        if (this.workerConfig.getWorkerPortTls() != null) {
+        if (this.workerConfig.getTlsEnabled()) {
             try {
                 SslContextFactory sslCtxFactory = 
SecurityUtility.createSslContextFactory(
                         this.workerConfig.isTlsAllowInsecureConnection(), 
this.workerConfig.getTlsTrustCertsFilePath(),

Reply via email to