This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cc60027 Fix: authorization while redirecting function admin call
(#2416)
cc60027 is described below
commit cc60027c8dd74469d6e0438ef8df8ea06ada6f2d
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Wed Aug 22 10:53:30 2018 -0700
Fix: authorization while redirecting function admin call (#2416)
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 4 +-
.../apache/pulsar/io/PulsarFunctionTlsTest.java | 2 +-
.../functions/worker/FunctionRuntimeManager.java | 45 ++++++++--------------
.../pulsar/functions/worker/WorkerConfig.java | 7 +++-
.../pulsar/functions/worker/WorkerService.java | 24 +++++++++---
.../functions/worker/rest/FunctionApiResource.java | 3 ++
.../functions/worker/rest/api/FunctionsImpl.java | 17 ++++----
.../worker/rest/api/v2/FunctionApiV2Resource.java | 5 ++-
.../worker/FunctionRuntimeManagerTest.java | 18 +++++++--
.../functions/worker/MembershipManagerTest.java | 19 +++++++--
10 files changed, 90 insertions(+), 54 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 4384f50..315027f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -277,7 +277,7 @@ public class FunctionsBase extends AdminResource implements
Supplier<WorkerServi
public Response restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final
@PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- return functions.restartFunctionInstance(tenant, namespace,
functionName, instanceId);
+ return functions.restartFunctionInstance(tenant, namespace,
functionName, instanceId, uri.getRequestUri());
}
@POST
@@ -302,7 +302,7 @@ public class FunctionsBase extends AdminResource implements
Supplier<WorkerServi
public Response stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final
@PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- return functions.stopFunctionInstance(tenant, namespace, functionName,
instanceId);
+ return functions.stopFunctionInstance(tenant, namespace, functionName,
instanceId, uri.getRequestUri());
}
@POST
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 2254626..97de0b8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -116,7 +116,7 @@ public class PulsarFunctionTlsTest {
PulsarAdmin admin = mock(PulsarAdmin.class);
Tenants tenants = mock(Tenants.class);
when(admin.tenants()).thenReturn(tenants);
- when(functionsWorkerService.getAdmin()).thenReturn(admin);
+ when(functionsWorkerService.getBrokerAdmin()).thenReturn(admin);
Set<String> admins = Sets.newHashSet("superUser");
TenantInfo tenantInfo = new TenantInfo(admins, null);
when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 1016171..93828de 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -25,6 +25,7 @@ import java.net.URISyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
@@ -45,6 +46,7 @@ import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.Response.Status;
import java.util.Collection;
@@ -91,15 +93,15 @@ public class FunctionRuntimeManager implements
AutoCloseable{
private MembershipManager membershipManager;
private final ConnectorsManager connectorsManager;
- public FunctionRuntimeManager(WorkerConfig workerConfig,
- PulsarClient pulsarClient,
- Namespace dlogNamespace,
- MembershipManager membershipManager,
- ConnectorsManager connectorsManager) throws
Exception {
+ private final PulsarAdmin functionAdmin;
+
+ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService
workerService, Namespace dlogNamespace,
+ MembershipManager membershipManager, ConnectorsManager
connectorsManager) throws Exception {
this.workerConfig = workerConfig;
this.connectorsManager = connectorsManager;
+ this.functionAdmin = workerService.getFunctionAdmin();
- Reader<byte[]> reader = pulsarClient.newReader()
+ Reader<byte[]> reader = workerService.getClient().newReader()
.topic(this.workerConfig.getFunctionAssignmentTopic())
.startMessageId(MessageId.earliest)
.create();
@@ -327,7 +329,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
}
public Response stopFunctionInstance(String tenant, String namespace,
String functionName, int instanceId,
- boolean restart) throws Exception {
+ boolean restart, URI uri) throws Exception {
Assignment assignment = this.findAssignment(tenant, namespace,
functionName, instanceId);
final String fullFunctionName = String.format("%s/%s/%s/%s", tenant,
namespace, functionName, instanceId);
if (assignment == null) {
@@ -355,19 +357,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
.entity(new ErrorData(fullFunctionName + " has not
been assigned yet")).build();
}
- URI redirect = null;
- String action = restart ? "restart" : "stop";
- final String redirectUrl =
String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/%s",
- workerInfo.getWorkerHostname(), workerInfo.getPort(),
tenant, namespace, functionName, instanceId,
- action);
- try {
- redirect = new URI(redirectUrl);
- } catch (URISyntaxException e) {
- log.error("Error in preparing redirect url for {}/{}/{}/{}:
{}", tenant, namespace, functionName,
- instanceId, e.getMessage(), e);
- return
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(fullFunctionName + " invalid
redirection url")).build();
- }
+ URI redirect =
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
throw new
WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}
@@ -401,14 +391,13 @@ public class FunctionRuntimeManager implements
AutoCloseable{
}
continue;
}
- Client client = ClientBuilder.newClient();
- String action = restart ? "restart" : "stop";
- // TODO: create and use pulsar-admin to support authorization
and authentication and manage redirect
- final String instanceRestartUrl =
String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/%s",
- workerInfo.getWorkerHostname(), workerInfo.getPort(),
tenant, namespace, functionName,
- assignment.getInstance().getInstanceId(), action);
-
client.target(instanceRestartUrl).request(MediaType.APPLICATION_JSON)
- .post(Entity.entity("", MediaType.APPLICATION_JSON),
ErrorData.class);
+ if (restart) {
+ this.functionAdmin.functions().restartFunction(tenant,
namespace, functionName,
+ assignment.getInstance().getInstanceId());
+ } else {
+ this.functionAdmin.functions().stopFunction(tenant,
namespace, functionName,
+ assignment.getInstance().getInstanceId());
+ }
}
}
return Response.status(Status.OK).build();
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 35d3e18..38ef5d3 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -56,6 +56,7 @@ public class WorkerConfig implements Serializable,
PulsarConfiguration {
private int workerPortTls;
private String connectorsDirectory = "./connectors";
private String functionMetadataTopicName;
+ private String functionWebServiceUrl;
private String pulsarServiceUrl;
private String pulsarWebServiceUrl;
private String clusterCoordinationTopicName;
@@ -153,7 +154,11 @@ public class WorkerConfig implements Serializable,
PulsarConfiguration {
}
return this.workerHostname;
}
-
+
+ public String getWorkerWebAddress() {
+ return String.format("http://%s:%d", this.getWorkerHostname(),
this.getWorkerPort());
+ }
+
public static String unsafeLocalhostResolve() {
try {
return InetAddress.getLocalHost().getHostName();
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index eb543c4..0850766 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -32,6 +32,8 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
@@ -63,7 +65,8 @@ public class WorkerService {
private final ScheduledExecutorService statsUpdater;
private AuthenticationService authenticationService;
private ConnectorsManager connectorsManager;
- private PulsarAdmin admin;
+ private PulsarAdmin brokerAdmin;
+ private PulsarAdmin functionAdmin;
private final MetricsGenerator metricsGenerator;
public WorkerService(WorkerConfig workerConfig) {
@@ -76,7 +79,14 @@ public class WorkerService {
public void start(URI dlogUri) throws InterruptedException {
log.info("Starting worker {}...", workerConfig.getWorkerId());
- this.admin =
Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
+ this.brokerAdmin =
Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
+ workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters(),
+ workerConfig.getTlsTrustCertsFilePath(),
workerConfig.isTlsAllowInsecureConnection());
+
+ final String functionWebServiceUrl =
StringUtils.isNotBlank(workerConfig.getFunctionWebServiceUrl())
+ ? workerConfig.getFunctionWebServiceUrl()
+ : workerConfig.getWorkerWebAddress();
+ this.functionAdmin = Utils.getPulsarAdminClient(functionWebServiceUrl,
workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters(),
workerConfig.getTlsTrustCertsFilePath(),
workerConfig.isTlsAllowInsecureConnection());
@@ -131,7 +141,7 @@ public class WorkerService {
// create function runtime manager
this.functionRuntimeManager = new FunctionRuntimeManager(
- this.workerConfig, this.client, this.dlogNamespace,
this.membershipManager, connectorsManager);
+ this.workerConfig, this, this.dlogNamespace,
this.membershipManager, connectorsManager);
// Setting references to managers in scheduler
this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
@@ -215,8 +225,12 @@ public class WorkerService {
schedulerManager.close();
}
- if (null != this.admin) {
- this.admin.close();
+ if (null != this.brokerAdmin) {
+ this.brokerAdmin.close();
+ }
+
+ if (null != this.functionAdmin) {
+ this.functionAdmin.close();
}
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
index 4673d56..be97986 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
@@ -23,6 +23,7 @@ import java.util.function.Supplier;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.functions.worker.WorkerService;
@@ -38,6 +39,8 @@ public class FunctionApiResource implements
Supplier<WorkerService> {
protected ServletContext servletContext;
@Context
protected HttpServletRequest httpRequest;
+ @Context
+ protected UriInfo uri;
public FunctionApiResource() {
this.functions = new FunctionsImpl(this);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index c2747f4..d5f3f57 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -33,6 +33,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
+import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
@@ -386,17 +387,17 @@ public class FunctionsImpl {
}
public Response stopFunctionInstance(final String tenant, final String
namespace, final String functionName,
- final String instanceId) {
- return stopFunctionInstance(tenant, namespace, functionName,
instanceId, false);
+ final String instanceId, URI uri) {
+ return stopFunctionInstance(tenant, namespace, functionName,
instanceId, false, uri);
}
public Response restartFunctionInstance(final String tenant, final String
namespace, final String functionName,
- final String instanceId) {
- return stopFunctionInstance(tenant, namespace, functionName,
instanceId, true);
+ final String instanceId, URI uri) {
+ return stopFunctionInstance(tenant, namespace, functionName,
instanceId, true, uri);
}
public Response stopFunctionInstance(final String tenant, final String
namespace, final String functionName,
- final String instanceId, boolean restart) {
+ final String instanceId, boolean restart, URI uri) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
@@ -413,7 +414,7 @@ public class FunctionsImpl {
FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace,
functionName)) {
- log.error("Function in getFunctionStatus does not exist @
/{}/{}/{}", tenant, namespace, functionName);
+ log.error("Function does not exist @ /{}/{}/{}", tenant,
namespace, functionName);
return
Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(String.format("Function %s doesn't
exist", functionName))).build();
}
@@ -421,7 +422,7 @@ public class FunctionsImpl {
FunctionRuntimeManager functionRuntimeManager =
worker().getFunctionRuntimeManager();
try {
return functionRuntimeManager.stopFunctionInstance(tenant,
namespace, functionName,
- Integer.parseInt(instanceId), restart);
+ Integer.parseInt(instanceId), restart, uri);
} catch (WebApplicationException we) {
throw we;
} catch (Exception e) {
@@ -1057,7 +1058,7 @@ public class FunctionsImpl {
if (isSuperUser(clientRole)) {
return true;
}
- TenantInfo tenantInfo =
worker().getAdmin().tenants().getTenantInfo(tenant);
+ TenantInfo tenantInfo =
worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
return clientRole != null && (tenantInfo.getAdminRoles() == null
|| tenantInfo.getAdminRoles().isEmpty()
|| tenantInfo.getAdminRoles().contains(clientRole));
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index ddea22a..e13f69c 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -25,6 +25,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
import org.apache.pulsar.common.io.ConnectorDefinition;
import java.io.IOException;
import java.io.InputStream;
+import java.net.URI;
import java.util.List;
import javax.ws.rs.Consumes;
@@ -171,7 +172,7 @@ public class FunctionApiV2Resource extends
FunctionApiResource {
public Response restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final
@PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- return functions.restartFunctionInstance(tenant, namespace,
functionName, instanceId);
+ return functions.restartFunctionInstance(tenant, namespace,
functionName, instanceId, this.uri.getRequestUri());
}
@POST
@@ -196,7 +197,7 @@ public class FunctionApiV2Resource extends
FunctionApiResource {
public Response stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final
@PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- return functions.stopFunctionInstance(tenant, namespace, functionName,
instanceId);
+ return functions.stopFunctionInstance(tenant, namespace, functionName,
instanceId, this.uri.getRequestUri());
}
@POST
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 4f618c4..85a2122 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker;
import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
@@ -86,11 +87,14 @@ public class FunctionRuntimeManagerTest {
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(mock(Reader.class)).when(readerBuilder).create();
-
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(pulsarClient).when(workerService).getClient();
+
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+
// test new assignment add functions
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
- pulsarClient,
+ workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class)
@@ -180,11 +184,14 @@ public class FunctionRuntimeManagerTest {
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(mock(Reader.class)).when(readerBuilder).create();
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(pulsarClient).when(workerService).getClient();
+
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
// test new assignment delete functions
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
- pulsarClient,
+ workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class)
@@ -278,11 +285,14 @@ public class FunctionRuntimeManagerTest {
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(mock(Reader.class)).when(readerBuilder).create();
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(pulsarClient).when(workerService).getClient();
+
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
// test new assignment update functions
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
- pulsarClient,
+ workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class)
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index e0a2428..7ed6aca 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -36,6 +36,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerEventListener;
@@ -129,9 +130,13 @@ public class MembershipManagerTest {
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(mock(Reader.class)).when(readerBuilder).create();
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(pulsarClient).when(workerService).getClient();
+
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
- pulsarClient,
+ workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class)
@@ -194,9 +199,13 @@ public class MembershipManagerTest {
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(mock(Reader.class)).when(readerBuilder).create();
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(pulsarClient).when(workerService).getClient();
+
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
- pulsarClient,
+ workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class)
@@ -284,9 +293,13 @@ public class MembershipManagerTest {
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(mock(Reader.class)).when(readerBuilder).create();
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(pulsarClient).when(workerService).getClient();
+
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
- pulsarClient,
+ workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class)