This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 14a5ee0 Don't pass any auth parameters when authentication is
disabled (#3999)
14a5ee0 is described below
commit 14a5ee0e764cb7d76dfce8ab7ab81e827e38a9a5
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Mon Apr 8 15:49:28 2019 -0700
Don't pass any auth parameters when authentication is disabled (#3999)
* Don't pass any auth parameters when authentication is disabled
* fix bug
* add defensive programming
---
.../auth/KubernetesSecretsTokenAuthProvider.java | 10 ++
.../functions/worker/FunctionRuntimeManager.java | 18 ++--
.../pulsar/functions/worker/MembershipManager.java | 23 +----
.../org/apache/pulsar/functions/worker/Worker.java | 3 +-
.../pulsar/functions/worker/WorkerConfig.java | 5 +-
.../pulsar/functions/worker/WorkerService.java | 109 ++++++++++-----------
.../pulsar/functions/worker/WorkerUtils.java | 54 +++++++++-
.../functions/worker/MembershipManagerTest.java | 21 ++--
8 files changed, 151 insertions(+), 92 deletions(-)
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
index e1cd3e8..606cd86 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static org.apache.commons.lang3.StringUtils.isBlank;
import static
org.apache.pulsar.broker.authentication.AuthenticationProviderToken.getToken;
@Slf4j
@@ -115,6 +116,12 @@ public class KubernetesSecretsTokenAuthProvider implements
KubernetesFunctionAut
String fqfn = FunctionCommon.getFullyQualifiedName(tenant, namespace,
name);
String secretName = new String(functionAuthData.getData());
+ // Make sure secretName is empty. Defensive programing
+ if (isBlank(secretName)) {
+ log.warn("Secret name for function {} is empty.", fqfn);
+ return;
+ }
+
Actions.Action deleteSecrets = Actions.Action.builder()
.actionName(String.format("Deleting secrets for function %s",
fqfn))
.numRetries(NUM_RETRIES)
@@ -125,6 +132,9 @@ public class KubernetesSecretsTokenAuthProvider implements
KubernetesFunctionAut
v1DeleteOptions.setGracePeriodSeconds(0L);
v1DeleteOptions.setPropagationPolicy("Foreground");
+ // make sure secretName is not null or empty string.
+ // If deleteNamespacedSecret is called and secret name
is null or empty string
+ // it will delete all the secrets in the namespace
coreClient.deleteNamespacedSecret(secretName,
kubeNamespace, v1DeleteOptions, "true",
null, null, null);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index e4acc70..d55898f 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -120,12 +120,18 @@ public class FunctionRuntimeManager implements
AutoCloseable{
}
secretsProviderConfigurator.init(workerConfig.getSecretsProviderConfiguratorConfig());
- AuthenticationConfig authConfig = AuthenticationConfig.builder()
-
.clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin())
-
.clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters())
- .tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath())
-
.useTls(workerConfig.isUseTls()).tlsAllowInsecureConnection(workerConfig.isTlsAllowInsecureConnection())
-
.tlsHostnameVerificationEnable(workerConfig.isTlsHostnameVerificationEnable()).build();
+
+ AuthenticationConfig authConfig = null;
+ if (workerConfig.isAuthenticationEnabled()) {
+ authConfig = AuthenticationConfig.builder()
+
.clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin())
+
.clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters())
+
.tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath())
+ .useTls(workerConfig.isUseTls())
+
.tlsAllowInsecureConnection(workerConfig.isTlsAllowInsecureConnection())
+
.tlsHostnameVerificationEnable(workerConfig.isTlsHostnameVerificationEnable())
+ .build();
+ }
if (workerConfig.getThreadContainerFactory() != null) {
this.runtimeFactory = new ThreadRuntimeFactory(
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index f78513a..50a2f97 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -59,7 +59,7 @@ public class MembershipManager implements AutoCloseable,
ConsumerEventListener {
private final String consumerName;
private final ConsumerImpl<byte[]> consumer;
private final WorkerConfig workerConfig;
- private PulsarAdmin pulsarAdminClient;
+ private PulsarAdmin pulsarAdmin;
private final CompletableFuture<Void> firstConsumerEventFuture;
private final AtomicBoolean isLeader = new AtomicBoolean();
@@ -72,9 +72,10 @@ public class MembershipManager implements AutoCloseable,
ConsumerEventListener {
@VisibleForTesting
Map<Function.Instance, Long> unsignedFunctionDurations = new HashMap<>();
- MembershipManager(WorkerService service, PulsarClient client)
+ MembershipManager(WorkerService service, PulsarClient client, PulsarAdmin
pulsarAdmin)
throws PulsarClientException {
this.workerConfig = service.getWorkerConfig();
+ this.pulsarAdmin = pulsarAdmin;
consumerName = String.format(
"%s:%s:%d",
workerConfig.getWorkerId(),
@@ -121,9 +122,8 @@ public class MembershipManager implements AutoCloseable,
ConsumerEventListener {
List<WorkerInfo> workerIds = new LinkedList<>();
TopicStats topicStats = null;
- PulsarAdmin pulsarAdmin = this.getPulsarAdminClient();
try {
- topicStats =
pulsarAdmin.topics().getStats(this.workerConfig.getClusterCoordinationTopic());
+ topicStats =
this.pulsarAdmin.topics().getStats(this.workerConfig.getClusterCoordinationTopic());
} catch (PulsarAdminException e) {
log.error("Failed to get status of coordinate topic {}",
this.workerConfig.getClusterCoordinationTopic(), e);
@@ -140,9 +140,8 @@ public class MembershipManager implements AutoCloseable,
ConsumerEventListener {
public WorkerInfo getLeader() {
TopicStats topicStats = null;
- PulsarAdmin pulsarAdmin = this.getPulsarAdminClient();
try {
- topicStats =
pulsarAdmin.topics().getStats(this.workerConfig.getClusterCoordinationTopic());
+ topicStats =
this.pulsarAdmin.topics().getStats(this.workerConfig.getClusterCoordinationTopic());
} catch (PulsarAdminException e) {
log.error("Failed to get status of coordinate topic {}",
this.workerConfig.getClusterCoordinationTopic(), e);
@@ -166,9 +165,6 @@ public class MembershipManager implements AutoCloseable,
ConsumerEventListener {
@Override
public void close() throws PulsarClientException {
consumer.close();
- if (this.pulsarAdminClient != null) {
- this.pulsarAdminClient.close();
- }
}
public void checkFailures(FunctionMetaDataManager functionMetaDataManager,
@@ -283,15 +279,6 @@ public class MembershipManager implements AutoCloseable,
ConsumerEventListener {
* Private methods
*/
- private PulsarAdmin getPulsarAdminClient() {
- if (this.pulsarAdminClient == null) {
- this.pulsarAdminClient =
WorkerUtils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl(),
- workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters(),
- workerConfig.getTlsTrustCertsFilePath(),
workerConfig.isTlsAllowInsecureConnection());
- }
- return this.pulsarAdminClient;
- }
-
private boolean checkLeader(WorkerService service, String consumerName) {
try {
TopicStats stats = service.getBrokerAdmin().topics()
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index b9814b3..52386f5 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -76,7 +76,8 @@ public class Worker {
// initializing pulsar functions namespace
PulsarAdmin admin =
WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters(),
- workerConfig.getTlsTrustCertsFilePath(),
workerConfig.isTlsAllowInsecureConnection());
+ workerConfig.getTlsTrustCertsFilePath(),
workerConfig.isTlsAllowInsecureConnection(),
+ workerConfig.isTlsHostnameVerificationEnable());
InternalConfigurationData internalConf;
// make sure pulsar broker is up
log.info("Checking if pulsar service at {} is up...",
workerConfig.getPulsarWebServiceUrl());
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 6c42a24..dd403f6 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -261,9 +261,12 @@ public class WorkerConfig implements Serializable,
PulsarConfiguration {
private boolean tlsRequireTrustedClientCertOnConnect = false;
@FieldContext(
category = CATEGORY_CLIENT_SECURITY,
- doc = "Whether to enable TLS when clients connect to broker"
+ doc = "Whether to enable TLS when clients connect to broker",
+ deprecated = true
)
// TLS for Functions -> Broker
+ // @deprecated use "pulsar+ssl://" in serviceUrl to enable
+ @Deprecated
private boolean useTls = false;
@FieldContext(
category = CATEGORY_SECURITY,
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index c3211b2..9ec9688 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -21,18 +21,9 @@ package org.apache.pulsar.functions.worker;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-
import io.netty.util.concurrent.DefaultThreadFactory;
-
-import java.net.URI;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.admin.StorageAdminClient;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
@@ -43,10 +34,13 @@ import
org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import java.net.URI;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
/**
* A service component contains everything to run a worker except rest server.
*/
@@ -92,17 +86,6 @@ public class WorkerService {
AuthorizationService authorizationService) throws
InterruptedException {
log.info("Starting worker {}...", workerConfig.getWorkerId());
- this.brokerAdmin =
WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
- workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters(),
- workerConfig.getTlsTrustCertsFilePath(),
workerConfig.isTlsAllowInsecureConnection());
-
- final String functionWebServiceUrl =
StringUtils.isNotBlank(workerConfig.getFunctionWebServiceUrl())
- ? workerConfig.getFunctionWebServiceUrl()
- : workerConfig.getWorkerWebAddress();
- this.functionAdmin =
WorkerUtils.getPulsarAdminClient(functionWebServiceUrl,
- workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters(),
- workerConfig.getTlsTrustCertsFilePath(),
workerConfig.isTlsAllowInsecureConnection());
-
try {
log.info("Worker Configs: {}", new
ObjectMapper().writerWithDefaultPrettyPrinter()
.writeValueAsString(workerConfig));
@@ -110,45 +93,59 @@ public class WorkerService {
log.warn("Failed to print worker configs with error {}",
e.getMessage(), e);
}
- // create the dlog namespace for storing function packages
- this.dlogUri = dlogUri;
- DistributedLogConfiguration dlogConf =
WorkerUtils.getDlogConf(workerConfig);
try {
- this.dlogNamespace = NamespaceBuilder.newBuilder()
- .conf(dlogConf)
- .clientId("function-worker-" + workerConfig.getWorkerId())
- .uri(this.dlogUri)
- .build();
- } catch (Exception e) {
- log.error("Failed to initialize dlog namespace {} for storing
function packages",
- dlogUri, e);
- throw new RuntimeException(e);
- }
+ // create the dlog namespace for storing function packages
+ this.dlogUri = dlogUri;
+ DistributedLogConfiguration dlogConf =
WorkerUtils.getDlogConf(workerConfig);
+ try {
+ this.dlogNamespace = NamespaceBuilder.newBuilder()
+ .conf(dlogConf)
+ .clientId("function-worker-" +
workerConfig.getWorkerId())
+ .uri(this.dlogUri)
+ .build();
+ } catch (Exception e) {
+ log.error("Failed to initialize dlog namespace {} for storing
function packages",
+ dlogUri, e);
+ throw new RuntimeException(e);
+ }
- // create the state storage client for accessing function state
- if (workerConfig.getStateStorageServiceUrl() != null) {
- StorageClientSettings clientSettings =
StorageClientSettings.newBuilder()
- .serviceUri(workerConfig.getStateStorageServiceUrl())
- .build();
- this.stateStoreAdminClient = StorageClientBuilder.newBuilder()
- .withSettings(clientSettings)
- .buildAdmin();
- }
+ // create the state storage client for accessing function state
+ if (workerConfig.getStateStorageServiceUrl() != null) {
+ StorageClientSettings clientSettings =
StorageClientSettings.newBuilder()
+ .serviceUri(workerConfig.getStateStorageServiceUrl())
+ .build();
+ this.stateStoreAdminClient = StorageClientBuilder.newBuilder()
+ .withSettings(clientSettings)
+ .buildAdmin();
+ }
- // initialize the function metadata manager
- try {
+ final String functionWebServiceUrl =
StringUtils.isNotBlank(workerConfig.getFunctionWebServiceUrl())
+ ? workerConfig.getFunctionWebServiceUrl()
+ : workerConfig.getWorkerWebAddress();
+
+ if (workerConfig.isAuthenticationEnabled()) {
+ this.brokerAdmin =
WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
+ workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters(),
+ workerConfig.getTlsTrustCertsFilePath(),
workerConfig.isTlsAllowInsecureConnection(),
+ workerConfig.isTlsHostnameVerificationEnable());
+
+ this.functionAdmin =
WorkerUtils.getPulsarAdminClient(functionWebServiceUrl,
+ workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters(),
+ workerConfig.getTlsTrustCertsFilePath(),
workerConfig.isTlsAllowInsecureConnection(),
+ workerConfig.isTlsHostnameVerificationEnable());
+
+ this.client =
WorkerUtils.getPulsarClient(this.workerConfig.getPulsarServiceUrl(),
+ workerConfig.getClientAuthenticationPlugin(),
+ workerConfig.getClientAuthenticationParameters(),
+ workerConfig.isUseTls(),
workerConfig.getTlsTrustCertsFilePath(),
+ workerConfig.isTlsAllowInsecureConnection(),
workerConfig.isTlsHostnameVerificationEnable());
+ } else {
+ this.brokerAdmin =
WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl());
+
+ this.functionAdmin =
WorkerUtils.getPulsarAdminClient(functionWebServiceUrl);
- ClientBuilder clientBuilder =
PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
- if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
- &&
isNotBlank(workerConfig.getClientAuthenticationParameters())) {
-
clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
- workerConfig.getClientAuthenticationParameters());
+ this.client =
WorkerUtils.getPulsarClient(this.workerConfig.getPulsarServiceUrl());
}
- clientBuilder.enableTls(workerConfig.isUseTls());
-
clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection());
-
clientBuilder.tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath());
-
clientBuilder.enableTlsHostnameVerification(workerConfig.isTlsHostnameVerificationEnable());
- this.client = clientBuilder.build();
log.info("Created Pulsar client");
//create scheduler manager
@@ -162,7 +159,7 @@ public class WorkerService {
this.connectorsManager = new ConnectorsManager(workerConfig);
//create membership manager
- this.membershipManager = new MembershipManager(this, this.client);
+ this.membershipManager = new MembershipManager(this, this.client,
this.brokerAdmin);
// create function runtime manager
this.functionRuntimeManager = new FunctionRuntimeManager(
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index cae82cd..afbbdd1 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -29,6 +29,8 @@ import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.functions.proto.Function;
@@ -154,7 +156,13 @@ public final class WorkerUtils {
return dlogUri;
}
- public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl,
String authPlugin, String authParams, String tlsTrustCertsFilePath, boolean
allowTlsInsecureConnection) {
+ public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl)
{
+ return getPulsarAdminClient(pulsarWebServiceUrl, null, null, null,
null, null);
+ }
+
+ public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl,
String authPlugin, String authParams,
+ String
tlsTrustCertsFilePath, Boolean allowTlsInsecureConnection,
+ Boolean
enableTlsHostnameVerificationEnable) {
try {
PulsarAdminBuilder adminBuilder =
PulsarAdmin.builder().serviceHttpUrl(pulsarWebServiceUrl);
if (isNotBlank(authPlugin) && isNotBlank(authParams)) {
@@ -163,7 +171,12 @@ public final class WorkerUtils {
if (isNotBlank(tlsTrustCertsFilePath)) {
adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
}
-
adminBuilder.allowTlsInsecureConnection(allowTlsInsecureConnection);
+ if (allowTlsInsecureConnection != null) {
+
adminBuilder.allowTlsInsecureConnection(allowTlsInsecureConnection);
+ }
+ if (enableTlsHostnameVerificationEnable != null) {
+
adminBuilder.enableTlsHostnameVerification(enableTlsHostnameVerificationEnable);
+ }
return adminBuilder.build();
} catch (PulsarClientException e) {
log.error("Error creating pulsar admin client", e);
@@ -171,6 +184,43 @@ public final class WorkerUtils {
}
}
+ public static PulsarClient getPulsarClient(String pulsarServiceUrl) {
+ return getPulsarClient(pulsarServiceUrl, null, null, null,
+ null, null, null);
+ }
+
+ public static PulsarClient getPulsarClient(String pulsarServiceUrl, String
authPlugin, String authParams,
+ Boolean useTls, String
tlsTrustCertsFilePath,
+ Boolean
allowTlsInsecureConnection,
+ Boolean
enableTlsHostnameVerificationEnable) {
+
+ try {
+ ClientBuilder clientBuilder =
PulsarClient.builder().serviceUrl(pulsarServiceUrl);
+
+ if (isNotBlank(authPlugin)
+ && isNotBlank(authParams)) {
+ clientBuilder.authentication(authPlugin, authParams);
+ }
+ if (useTls != null) {
+ clientBuilder.enableTls(useTls);
+ }
+ if (allowTlsInsecureConnection != null) {
+
clientBuilder.allowTlsInsecureConnection(allowTlsInsecureConnection);
+ }
+ if (isNotBlank(tlsTrustCertsFilePath)) {
+ clientBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
+ }
+ if (enableTlsHostnameVerificationEnable != null) {
+
clientBuilder.enableTlsHostnameVerification(enableTlsHostnameVerificationEnable);
+ }
+
+ return clientBuilder.build();
+ } catch (PulsarClientException e) {
+ log.error("Error creating pulsar client", e);
+ throw new RuntimeException(e);
+ }
+ }
+
public static FunctionStats.FunctionInstanceStats
getFunctionInstanceStats(String fullyQualifiedInstanceName,
FunctionRuntimeInfo functionRuntimeInfo,
int instanceId) {
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 527557b..cfbfa47 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -68,6 +68,7 @@ public class MembershipManagerTest {
@Test
public void testConsumerEventListener() throws Exception {
PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+ PulsarAdmin mockAdmin = mock(PulsarAdmin.class);
ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
ConsumerBuilder<byte[]> mockConsumerBuilder =
mock(ConsumerBuilder.class);
@@ -92,7 +93,7 @@ public class MembershipManagerTest {
when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
- MembershipManager membershipManager = spy(new
MembershipManager(workerService, mockClient));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerService, mockClient, mockAdmin));
assertFalse(membershipManager.isLeader());
verify(mockClient, times(1))
.newConsumer();
@@ -137,7 +138,8 @@ public class MembershipManagerTest {
WorkerService workerService = mock(WorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
doReturn(workerConfig).when(workerService).getWorkerConfig();
-
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+ PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
+ doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
@@ -147,7 +149,7 @@ public class MembershipManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
functionMetaDataManager));
- MembershipManager membershipManager = spy(new
MembershipManager(workerService, pulsarClient));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerService, pulsarClient, pulsarAdmin));
List<WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
@@ -208,7 +210,8 @@ public class MembershipManagerTest {
WorkerService workerService = mock(WorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
doReturn(workerConfig).when(workerService).getWorkerConfig();
-
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+ PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
+ doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
@@ -219,7 +222,7 @@ public class MembershipManagerTest {
mock(ConnectorsManager.class),
functionMetaDataManager));
- MembershipManager membershipManager = spy(new
MembershipManager(workerService, mockPulsarClient()));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
List<WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
@@ -304,7 +307,8 @@ public class MembershipManagerTest {
WorkerService workerService = mock(WorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
doReturn(workerConfig).when(workerService).getWorkerConfig();
-
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+ PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
+ doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
@@ -314,7 +318,7 @@ public class MembershipManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
functionMetaDataManager));
- MembershipManager membershipManager = spy(new
MembershipManager(workerService, mockPulsarClient()));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
List<WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
@@ -379,6 +383,7 @@ public class MembershipManagerTest {
WorkerService workerService = mock(WorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
doReturn(workerConfig).when(workerService).getWorkerConfig();
+ PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
@@ -389,7 +394,7 @@ public class MembershipManagerTest {
mock(MembershipManager.class),
mock(ConnectorsManager.class),
functionMetaDataManager));
- MembershipManager membershipManager = spy(new
MembershipManager(workerService, mockPulsarClient()));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
List<WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));