This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new b4a95c2 Can't create functions with m-TLS (#9553)
b4a95c2 is described below
commit b4a95c2f83e0a7620e65a16a312a40380995de76
Author: Yong Zhang <[email protected]>
AuthorDate: Sun Feb 21 20:37:00 2021 +0800
Can't create functions with m-TLS (#9553)
* Can't create functions with m-TLS
---
*Motivation*
We need the same fix on the branch 2.7.0. When cherry-picking the PR #9260
to the branch 2.7, we have a few conflict to fix it in the branch 2.7. So
we do the same fix on the branch 2.7.
---
.github/workflows/ci-build-multi-os.yaml | 2 +-
.github/workflows/ci-go-functions-style.yaml | 1 -
.github/workflows/ci-go-functions-test.yaml | 1 -
.github/workflows/ci-license.yaml | 1 +
.github/workflows/ci-unit-broker-broker-gp2.yaml | 1 +
.github/workflows/ci-unit-broker-client-api.yaml | 1 +
.github/workflows/ci-unit-broker-client-impl.yaml | 1 +
.github/workflows/ci-unit-broker-other.yaml | 1 +
.github/workflows/ci-unit-proxy.yaml | 1 +
.github/workflows/ci-unit.yaml | 1 +
.../org/apache/pulsar/PulsarBrokerStarter.java | 4 +
.../java/org/apache/pulsar/PulsarStandalone.java | 4 +
.../org/apache/pulsar/broker/PulsarService.java | 7 +
.../functions/worker/PulsarFunctionTlsTest.java | 306 +++++++++++++++++++++
.../pulsar/client/admin/internal/BaseResource.java | 4 +
.../apache/pulsar/common/functions/WorkerInfo.java | 12 +-
.../pulsar/functions/worker/WorkerConfig.java | 6 +-
.../pulsar/functions/worker/LeaderService.java | 2 +-
.../pulsar/functions/worker/WorkerService.java | 3 +-
.../pulsar/functions/worker/WorkerUtils.java | 5 +
.../pulsar/functions/worker/rest/WorkerServer.java | 2 +-
21 files changed, 356 insertions(+), 10 deletions(-)
diff --git a/.github/workflows/ci-build-multi-os.yaml
b/.github/workflows/ci-build-multi-os.yaml
index ec6a67e..9a2e11b 100644
--- a/.github/workflows/ci-build-multi-os.yaml
+++ b/.github/workflows/ci-build-multi-os.yaml
@@ -68,4 +68,4 @@ jobs:
- name: build package
if: steps.docs.outputs.changed_only == 'no'
- run: mvn clean install -DskipTests
\ No newline at end of file
+ run: mvn clean install -DskipTests
diff --git a/.github/workflows/ci-go-functions-style.yaml
b/.github/workflows/ci-go-functions-style.yaml
index 450b098..05ed581 100644
--- a/.github/workflows/ci-go-functions-style.yaml
+++ b/.github/workflows/ci-go-functions-style.yaml
@@ -23,7 +23,6 @@ on:
branches:
- master
paths:
- - '.github/workflows/**'
- 'pulsar-function-go/**'
push:
branches:
diff --git a/.github/workflows/ci-go-functions-test.yaml
b/.github/workflows/ci-go-functions-test.yaml
index e14b4b3..41151b1 100644
--- a/.github/workflows/ci-go-functions-test.yaml
+++ b/.github/workflows/ci-go-functions-test.yaml
@@ -23,7 +23,6 @@ on:
branches:
- master
paths:
- - '.github/workflows/**'
- 'pulsar-function-go/**'
push:
branches:
diff --git a/.github/workflows/ci-license.yaml
b/.github/workflows/ci-license.yaml
index 2cfcaf2..298a2f5 100644
--- a/.github/workflows/ci-license.yaml
+++ b/.github/workflows/ci-license.yaml
@@ -22,6 +22,7 @@ on:
pull_request:
branches:
- master
+ - branch-*
push:
branches:
- branch-*
diff --git a/.github/workflows/ci-unit-broker-broker-gp2.yaml
b/.github/workflows/ci-unit-broker-broker-gp2.yaml
index 00387d7..31bb6ff 100644
--- a/.github/workflows/ci-unit-broker-broker-gp2.yaml
+++ b/.github/workflows/ci-unit-broker-broker-gp2.yaml
@@ -22,6 +22,7 @@ on:
pull_request:
branches:
- master
+ - branch-*
push:
branches:
- branch-*
diff --git a/.github/workflows/ci-unit-broker-client-api.yaml
b/.github/workflows/ci-unit-broker-client-api.yaml
index b228c53..d8abb00 100644
--- a/.github/workflows/ci-unit-broker-client-api.yaml
+++ b/.github/workflows/ci-unit-broker-client-api.yaml
@@ -22,6 +22,7 @@ on:
pull_request:
branches:
- master
+ - branch-*
push:
branches:
- branch-*
diff --git a/.github/workflows/ci-unit-broker-client-impl.yaml
b/.github/workflows/ci-unit-broker-client-impl.yaml
index 4ffc83c..5563071 100644
--- a/.github/workflows/ci-unit-broker-client-impl.yaml
+++ b/.github/workflows/ci-unit-broker-client-impl.yaml
@@ -22,6 +22,7 @@ on:
pull_request:
branches:
- master
+ - branch-*
push:
branches:
- branch-*
diff --git a/.github/workflows/ci-unit-broker-other.yaml
b/.github/workflows/ci-unit-broker-other.yaml
index abc22f4..b95e590 100644
--- a/.github/workflows/ci-unit-broker-other.yaml
+++ b/.github/workflows/ci-unit-broker-other.yaml
@@ -22,6 +22,7 @@ on:
pull_request:
branches:
- master
+ - branch-*
push:
branches:
- branch-*
diff --git a/.github/workflows/ci-unit-proxy.yaml
b/.github/workflows/ci-unit-proxy.yaml
index 1fa940e..6f0b34b 100644
--- a/.github/workflows/ci-unit-proxy.yaml
+++ b/.github/workflows/ci-unit-proxy.yaml
@@ -22,6 +22,7 @@ on:
pull_request:
branches:
- master
+ - branch-*
push:
branches:
- branch-*
diff --git a/.github/workflows/ci-unit.yaml b/.github/workflows/ci-unit.yaml
index 7f96490..c7e2650 100644
--- a/.github/workflows/ci-unit.yaml
+++ b/.github/workflows/ci-unit.yaml
@@ -22,6 +22,7 @@ on:
pull_request:
branches:
- master
+ - branch-*
push:
branches:
- branch-*
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index c2f2b1c..96b2eac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -168,6 +168,10 @@ public class PulsarBrokerStarter {
} else {
workerConfig =
WorkerConfig.load(starterArguments.fnWorkerConfigFile);
}
+ brokerConfig.getWebServicePort()
+ .map(port -> workerConfig.setWorkerPort(port));
+ brokerConfig.getWebServicePortTls()
+ .map(port -> workerConfig.setWorkerPortTls(port));
// worker talks to local broker
String hostname =
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
brokerConfig.getAdvertisedAddress());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index 49877e3..d5ec2eb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -277,6 +277,10 @@ public class PulsarStandalone implements AutoCloseable {
} else if (workerConfig.getStateStorageServiceUrl() == null) {
workerConfig.setStateStorageServiceUrl("bk://127.0.0.1:" +
this.getStreamStoragePort());
}
+ config.getWebServicePort()
+ .map(port -> workerConfig.setWorkerPort(port));
+ config.getWebServicePortTls()
+ .map(port -> workerConfig.setWorkerPortTls(port));
String hostname =
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
config.getAdvertisedAddress());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index eb7f849..3004adb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1235,10 +1235,17 @@ public class PulsarService implements AutoCloseable {
if (workerConfig.isUseTls()) {
workerConfig.setPulsarServiceUrl(brokerServiceUrlTls);
workerConfig.setPulsarWebServiceUrl(webServiceAddressTls);
+ workerConfig.setFunctionWebServiceUrl(webServiceAddressTls);
} else {
workerConfig.setPulsarServiceUrl(brokerServiceUrl);
workerConfig.setPulsarWebServiceUrl(webServiceAddress);
+ workerConfig.setFunctionWebServiceUrl(webServiceAddress);
}
+ LOG.info("Starting function worker service: serviceUrl = {},"
+ + " webServiceUrl = {}, functionWebServiceUrl = {}",
+ workerConfig.getPulsarServiceUrl(),
+ workerConfig.getPulsarWebServiceUrl(),
+ workerConfig.getFunctionWebServiceUrl());
String namespace = functionWorkerService.get()
.getWorkerConfig().getPulsarFunctionsNamespace();
String[] a =
functionWorkerService.get().getWorkerConfig().getPulsarFunctionsNamespace().split("/");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
new file mode 100644
index 0000000..0394196
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.junit.Assert.assertEquals;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.net.MalformedURLException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.util.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class PulsarFunctionTlsTest {
+
+ protected static final int BROKER_COUNT = 2;
+
+ private static final String TLS_SERVER_CERT_FILE_PATH =
"./src/test/resources/authentication/tls/broker-cert.pem";
+ private static final String TLS_SERVER_KEY_FILE_PATH =
"./src/test/resources/authentication/tls/broker-key.pem";
+ private static final String TLS_CLIENT_CERT_FILE_PATH =
"./src/test/resources/authentication/tls/client-cert.pem";
+ private static final String TLS_CLIENT_KEY_FILE_PATH =
"./src/test/resources/authentication/tls/client-key.pem";
+
+ LocalBookkeeperEnsemble bkEnsemble;
+ protected PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT];
+ protected ServiceConfiguration[] configurations = new
ServiceConfiguration[BROKER_COUNT];
+ protected PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT];
+ protected PulsarService leaderPulsar;
+ protected PulsarAdmin leaderAdmin;
+ protected String testCluster = "my-cluster";
+ protected String testTenant = "my-tenant";
+ protected String testNamespace = testTenant + "/my-ns";
+
+ @BeforeMethod
+ void setup() throws Exception {
+ log.info("---- Initializing TopicOwnerTest -----");
+ // Start local bookkeeper ensemble
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble.start();
+
+ // start brokers
+ for (int i = 0; i < BROKER_COUNT; i++) {
+ int brokerPort = PortManager.nextFreePort();
+ int webPort = PortManager.nextFreePort();
+
+ ServiceConfiguration config = new ServiceConfiguration();
+ config.setWebServicePort(Optional.empty());
+ config.setWebServicePortTls(Optional.of(webPort));
+ config.setBrokerServicePort(Optional.empty());
+ config.setBrokerServicePortTls(Optional.of(brokerPort));
+ config.setClusterName("my-cluster");
+ config.setAdvertisedAddress("localhost");
+ config.setZookeeperServers("127.0.0.1" + ":" +
bkEnsemble.getZookeeperPort());
+ config.setDefaultNumberOfNamespaceBundles(1);
+ config.setLoadBalancerEnabled(false);
+ Set<String> superUsers = Sets.newHashSet("superUser");
+ config.setSuperUserRoles(superUsers);
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderTls.class.getName());
+ config.setAuthenticationEnabled(true);
+ config.setAuthorizationEnabled(true);
+ config.setAuthenticationProviders(providers);
+ config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+ config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+ config.setTlsAllowInsecureConnection(true);
+ config.setBrokerClientTlsEnabled(true);
+
config.setBrokerClientTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
+
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+ config.setBrokerClientAuthenticationParameters(
+ "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + ",tlsKeyFile:" +
TLS_CLIENT_KEY_FILE_PATH);
+ config.setFunctionsWorkerEnabled(true);
+ config.setTlsEnabled(true);
+
+ WorkerConfig workerConfig = new WorkerConfig();
+ ServiceConfiguration brokerConfig = config;
+ brokerConfig.getWebServicePort()
+ .map(port -> workerConfig.setWorkerPort(port));
+ brokerConfig.getWebServicePortTls()
+ .map(port -> workerConfig.setWorkerPortTls(port));
+
+ // worker talks to local broker
+ String hostname =
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
+ brokerConfig.getAdvertisedAddress());
+ workerConfig.setWorkerHostname(hostname);
+ // inherit broker authorization setting
+
workerConfig.setAuthenticationEnabled(brokerConfig.isAuthenticationEnabled());
+
workerConfig.setAuthenticationProviders(brokerConfig.getAuthenticationProviders());
+
workerConfig.setAuthorizationEnabled(brokerConfig.isAuthorizationEnabled());
+
workerConfig.setAuthorizationProvider(brokerConfig.getAuthorizationProvider());
+
workerConfig.setConfigurationStoreServers(brokerConfig.getConfigurationStoreServers());
+
workerConfig.setZooKeeperSessionTimeoutMillis(brokerConfig.getZooKeeperSessionTimeoutMillis());
+
workerConfig.setZooKeeperOperationTimeoutSeconds(brokerConfig.getZooKeeperOperationTimeoutSeconds());
+
+
workerConfig.setTlsAllowInsecureConnection(brokerConfig.isTlsAllowInsecureConnection());
+ workerConfig.setTlsEnabled(brokerConfig.isTlsEnabled());
+ workerConfig.setTlsEnableHostnameVerification(false);
+
workerConfig.setBrokerClientTrustCertsFilePath(brokerConfig.getTlsTrustCertsFilePath());
+
+ // client in worker will use this config to authenticate with
broker
+
workerConfig.setBrokerClientAuthenticationPlugin(brokerConfig.getBrokerClientAuthenticationPlugin());
+
workerConfig.setBrokerClientAuthenticationParameters(brokerConfig.getBrokerClientAuthenticationParameters());
+
+ // inherit super users
+ workerConfig.setSuperUserRoles(brokerConfig.getSuperUserRoles());
+ workerConfig.setWorkerId(
+ "c-" + brokerConfig.getClusterName()
+ + "-fw-" + hostname
+ + "-" + (workerConfig.getTlsEnabled()
+ ? workerConfig.getWorkerPortTls() :
workerConfig.getWorkerPort()));
+
+ workerConfig.setPulsarFunctionsNamespace("public/functions");
+ workerConfig.setPulsarFunctionsCluster("my-cluster");
+ workerConfig.setSchedulerClassName(
+
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
+
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new
ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
+ workerConfig.setFailureCheckFreqMs(100);
+ workerConfig.setNumFunctionPackageReplicas(1);
+ workerConfig.setClusterCoordinationTopicName("coordinate");
+ workerConfig.setFunctionAssignmentTopicName("assignment");
+ workerConfig.setFunctionMetadataTopicName("metadata");
+ workerConfig.setInstanceLivenessCheckFreqMs(100);
+ workerConfig.setBrokerClientAuthenticationEnabled(true);
+ workerConfig.setTlsEnabled(true);
+ workerConfig.setUseTls(true);
+ WorkerService fnWorkerService = new WorkerService(workerConfig);
+
+ configurations[i] = config;
+
+ pulsarServices[i] = new PulsarService(
+ config, Optional.of(fnWorkerService), code -> {});
+ pulsarServices[i].start();
+
+ // Sleep until pulsarServices[0] becomes leader, this way we can
spy namespace bundle assignment easily.
+ if (i == 0) {
+ Awaitility.await()
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> {
+ log.info("Waiting to become to leader...");
+ return
pulsarServices[0].getLeaderElectionService().isLeader();
+ });
+ }
+
+ Map<String, String> authParams = new HashMap<>();
+ authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+ authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+ Authentication authTls = new AuthenticationTls();
+ authTls.configure(authParams);
+
+ pulsarAdmins[i] = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsarServices[i].getWebServiceAddressTls())
+ .tlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH)
+ .allowTlsInsecureConnection(true)
+ .authentication(authTls)
+ .build();
+ }
+ leaderPulsar = pulsarServices[0];
+ leaderAdmin = pulsarAdmins[0];
+
+ TenantInfo tenantInfo = new TenantInfo();
+ tenantInfo.setAllowedClusters(Sets.newHashSet(testCluster));
+ pulsarAdmins[0].tenants().createTenant(testTenant, tenantInfo);
+ pulsarAdmins[0].namespaces().createNamespace(testNamespace, 16);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ void tearDown() throws Exception {
+ for (int i = 0; i < BROKER_COUNT; i++) {
+ if (pulsarServices[i] != null) {
+ pulsarServices[i].close();
+ }
+ if (pulsarAdmins[i] != null) {
+ pulsarAdmins[i].close();
+ }
+ }
+ bkEnsemble.stop();
+ }
+
+ @Test
+ public void testFunctionsCreation() throws Exception {
+ log.info("Starting functions creation testing...");
+ TimeUnit.SECONDS.sleep(10);
+ String jarFilePathUrl = String.format("%s:%s",
org.apache.pulsar.common.functions.Utils.FILE,
+
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
+
+ for (int i = 0; i < BROKER_COUNT; i++) {
+ pulsarServices[i].getFunctionWorkerService().ifPresent(f -> {
+ Awaitility.await().pollInterval(Duration.ofSeconds(1))
+ .until(() -> {
+ log.info("Checking the function worker service is
starting.");
+ return
f.getFunctionAdmin().functions().getFunctions(testTenant, "my-ns").size() == 0;
+ });
+ });
+ String functionName = "function-" + i;
+ FunctionConfig functionConfig =
createFunctionConfig(jarFilePathUrl, testTenant, "my-ns",
+ functionName, "my.*", "sink-topic-" + i, "sub-" + i);
+
+ log.info(" -------- Start test function : {}", functionName);
+
+ final PulsarAdmin admin = pulsarAdmins[i];
+ Awaitility.await().pollInterval(Duration.ofSeconds(1))
+ .until(() -> {
+ try {
+ admin.functions().createFunctionWithUrl(
+ functionConfig, jarFilePathUrl
+ );
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ });
+
+ FunctionConfig config =
pulsarAdmins[i].functions().getFunction(testTenant, "my-ns", functionName);
+ assertEquals(config.getTenant(), testTenant);
+ assertEquals(config.getNamespace(), "my-ns");
+ assertEquals(config.getName(), functionName);
+
+ pulsarAdmins[i].functions().deleteFunction(config.getTenant(),
config.getNamespace(), config.getName());
+ }
+ }
+
+ protected static FunctionConfig createFunctionConfig(
+ String jarFile,
+ String tenant,
+ String namespace,
+ String functionName,
+ String sourceTopic,
+ String sinkTopic,
+ String subscriptionName
+ ) throws JsonProcessingException {
+ File file = new File(jarFile);
+ try {
+ ClassLoaderUtils.loadJar(file);
+ } catch (MalformedURLException e) {
+ throw new RuntimeException("Failed to load user jar " + file, e);
+ }
+ String sourceTopicPattern = String.format("persistent://%s/%s/%s",
tenant, namespace, sourceTopic);
+
+ FunctionConfig functionConfig = new FunctionConfig();
+ functionConfig.setTenant(tenant);
+ functionConfig.setNamespace(namespace);
+ functionConfig.setName(functionName);
+ functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+ functionConfig.setParallelism(1);
+ functionConfig.setClassName(IdentityFunction.class.getName());
+
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+ functionConfig.setSubName(subscriptionName);
+ functionConfig.setTopicsPattern(sourceTopicPattern);
+ functionConfig.setAutoAck(true);
+ functionConfig.setOutput(sinkTopic);
+
+ log.info("Function Config: {}", new
ObjectMapper().writerWithDefaultPrettyPrinter()
+ .writeValueAsString(functionConfig));
+
+ return functionConfig;
+ }
+
+}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index 8f2517a..ac63aae 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -203,6 +203,10 @@ public abstract class BaseResource {
// Handle 5xx exceptions
if (e instanceof ServerErrorException) {
ServerErrorException see = (ServerErrorException) e;
+ if (((ServerErrorException) e).getResponse().hasEntity()) {
+ return new ServerSideErrorException(see, e.getMessage()
+ + ((ServerErrorException)
e).getResponse().getEntity().toString());
+ }
return new ServerSideErrorException(see, e.getMessage());
} else if (e instanceof ClientErrorException) {
// Handle 4xx exceptions
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WorkerInfo.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WorkerInfo.java
index 245cb7d..c0f5113 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WorkerInfo.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/WorkerInfo.java
@@ -23,6 +23,7 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
/**
* Worker information.
@@ -31,6 +32,7 @@ import lombok.ToString;
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@NoArgsConstructor
@ToString
+@Slf4j
public class WorkerInfo {
private String workerId;
private String workerHostname;
@@ -48,8 +50,12 @@ public class WorkerInfo {
String workerId = tokens[0];
String workerHostname = tokens[1];
- int port = Integer.parseInt(tokens[2]);
-
- return new WorkerInfo(workerId, workerHostname, port);
+ try {
+ int port = Integer.parseInt(tokens[2]);
+ return new WorkerInfo(workerId, workerHostname, port);
+ } catch (NumberFormatException nfe) {
+ log.warn("Invalid worker info : {}", str);
+ throw nfe;
+ }
}
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 18f1a96..222ea9a 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -395,7 +395,7 @@ public class WorkerConfig implements Serializable,
PulsarConfiguration {
private Properties properties = new Properties();
public boolean getTlsEnabled() {
- return tlsEnabled || workerPortTls != null;
+ return tlsEnabled && workerPortTls != null;
}
/******** security settings for pulsar broker client **********/
@@ -532,6 +532,10 @@ public class WorkerConfig implements Serializable,
PulsarConfiguration {
return String.format("http://%s:%d", this.getWorkerHostname(),
this.getWorkerPort());
}
+ public String getWorkerWebAddressTls() {
+ return String.format("https://%s:%d", this.getWorkerHostname(),
this.getWorkerPortTls());
+ }
+
public static String unsafeLocalhostResolve() {
try {
// Get the fully qualified hostname
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
index 9d3d469..b0d7585 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
@@ -62,7 +62,7 @@ public class LeaderService implements AutoCloseable,
ConsumerEventListener {
"%s:%s:%d",
workerConfig.getWorkerId(),
workerConfig.getWorkerHostname(),
- workerConfig.getWorkerPort()
+ workerConfig.getTlsEnabled() ? workerConfig.getWorkerPortTls()
: workerConfig.getWorkerPort()
);
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 4255d89..4af8aad 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -131,7 +131,8 @@ public class WorkerService {
final String functionWebServiceUrl =
StringUtils.isNotBlank(workerConfig.getFunctionWebServiceUrl())
? workerConfig.getFunctionWebServiceUrl()
- : workerConfig.getWorkerWebAddress();
+ : (workerConfig.getTlsEnabled()
+ ? workerConfig.getWorkerWebAddressTls() :
workerConfig.getWorkerWebAddress());
// using isBrokerClientAuthenticationEnabled instead of
isAuthenticationEnabled in function-worker
if (workerConfig.isBrokerClientAuthenticationEnabled()) {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index bbf101a..81659ad 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -190,6 +190,11 @@ public final class WorkerUtils {
public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl,
String authPlugin, String authParams,
String
tlsTrustCertsFilePath, Boolean allowTlsInsecureConnection,
Boolean
enableTlsHostnameVerificationEnable) {
+ log.info("Create Pulsar Admin to service url {}: "
+ + "authPlugin = {}, authParams = {}, "
+ + "tlsTrustCerts = {}, allowTlsInsecureConnector = {},
enableTlsHostnameVerification = {}",
+ pulsarWebServiceUrl, authPlugin, authParams,
+ tlsTrustCertsFilePath, allowTlsInsecureConnection,
enableTlsHostnameVerificationEnable);
try {
PulsarAdminBuilder adminBuilder =
PulsarAdmin.builder().serviceHttpUrl(pulsarWebServiceUrl);
if (isNotBlank(authPlugin) && isNotBlank(authParams)) {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 51ddafc..19a5581 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -132,7 +132,7 @@ public class WorkerServer {
handlers.add(stats);
server.setHandler(stats);
- if (this.workerConfig.getWorkerPortTls() != null) {
+ if (this.workerConfig.getTlsEnabled()) {
try {
SslContextFactory sslCtxFactory =
SecurityUtility.createSslContextFactory(
this.workerConfig.isTlsAllowInsecureConnection(),
this.workerConfig.getTlsTrustCertsFilePath(),