rdhabalia closed pull request #2416: Fix: authorization while redirecting
function admin call
URL: https://github.com/apache/incubator-pulsar/pull/2416
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 4384f50688..315027fd15 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -277,7 +277,7 @@ public Response triggerFunction(final @PathParam("tenant")
String tenant,
public Response restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final
@PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- return functions.restartFunctionInstance(tenant, namespace,
functionName, instanceId);
+ return functions.restartFunctionInstance(tenant, namespace,
functionName, instanceId, uri.getRequestUri());
}
@POST
@@ -302,7 +302,7 @@ public Response restartFunction(final @PathParam("tenant")
String tenant,
public Response stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final
@PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- return functions.stopFunctionInstance(tenant, namespace, functionName,
instanceId);
+ return functions.stopFunctionInstance(tenant, namespace, functionName,
instanceId, uri.getRequestUri());
}
@POST
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 2254626804..97de0b88ca 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -116,7 +116,7 @@ void setup(Method method) throws Exception {
PulsarAdmin admin = mock(PulsarAdmin.class);
Tenants tenants = mock(Tenants.class);
when(admin.tenants()).thenReturn(tenants);
- when(functionsWorkerService.getAdmin()).thenReturn(admin);
+ when(functionsWorkerService.getBrokerAdmin()).thenReturn(admin);
Set<String> admins = Sets.newHashSet("superUser");
TenantInfo tenantInfo = new TenantInfo(admins, null);
when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 1016171502..93828de40d 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -25,6 +25,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
@@ -45,6 +46,7 @@
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.Response.Status;
import java.util.Collection;
@@ -91,15 +93,15 @@
private MembershipManager membershipManager;
private final ConnectorsManager connectorsManager;
- public FunctionRuntimeManager(WorkerConfig workerConfig,
- PulsarClient pulsarClient,
- Namespace dlogNamespace,
- MembershipManager membershipManager,
- ConnectorsManager connectorsManager) throws
Exception {
+ private final PulsarAdmin functionAdmin;
+
+ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService
workerService, Namespace dlogNamespace,
+ MembershipManager membershipManager, ConnectorsManager
connectorsManager) throws Exception {
this.workerConfig = workerConfig;
this.connectorsManager = connectorsManager;
+ this.functionAdmin = workerService.getFunctionAdmin();
- Reader<byte[]> reader = pulsarClient.newReader()
+ Reader<byte[]> reader = workerService.getClient().newReader()
.topic(this.workerConfig.getFunctionAssignmentTopic())
.startMessageId(MessageId.earliest)
.create();
@@ -327,7 +329,7 @@ public synchronized void
removeAssignments(Collection<Assignment> assignments) {
}
public Response stopFunctionInstance(String tenant, String namespace,
String functionName, int instanceId,
- boolean restart) throws Exception {
+ boolean restart, URI uri) throws Exception {
Assignment assignment = this.findAssignment(tenant, namespace,
functionName, instanceId);
final String fullFunctionName = String.format("%s/%s/%s/%s", tenant,
namespace, functionName, instanceId);
if (assignment == null) {
@@ -355,19 +357,7 @@ public Response stopFunctionInstance(String tenant, String
namespace, String fun
.entity(new ErrorData(fullFunctionName + " has not
been assigned yet")).build();
}
- URI redirect = null;
- String action = restart ? "restart" : "stop";
- final String redirectUrl =
String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/%s",
- workerInfo.getWorkerHostname(), workerInfo.getPort(),
tenant, namespace, functionName, instanceId,
- action);
- try {
- redirect = new URI(redirectUrl);
- } catch (URISyntaxException e) {
- log.error("Error in preparing redirect url for {}/{}/{}/{}:
{}", tenant, namespace, functionName,
- instanceId, e.getMessage(), e);
- return
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(fullFunctionName + " invalid
redirection url")).build();
- }
+ URI redirect =
UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
throw new
WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}
@@ -401,14 +391,13 @@ public Response stopFunctionInstances(String tenant,
String namespace, String fu
}
continue;
}
- Client client = ClientBuilder.newClient();
- String action = restart ? "restart" : "stop";
- // TODO: create and use pulsar-admin to support authorization
and authentication and manage redirect
- final String instanceRestartUrl =
String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/%s",
- workerInfo.getWorkerHostname(), workerInfo.getPort(),
tenant, namespace, functionName,
- assignment.getInstance().getInstanceId(), action);
-
client.target(instanceRestartUrl).request(MediaType.APPLICATION_JSON)
- .post(Entity.entity("", MediaType.APPLICATION_JSON),
ErrorData.class);
+ if (restart) {
+ this.functionAdmin.functions().restartFunction(tenant,
namespace, functionName,
+ assignment.getInstance().getInstanceId());
+ } else {
+ this.functionAdmin.functions().stopFunction(tenant,
namespace, functionName,
+ assignment.getInstance().getInstanceId());
+ }
}
}
return Response.status(Status.OK).build();
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 35d3e18307..38ef5d3a6c 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -56,6 +56,7 @@
private int workerPortTls;
private String connectorsDirectory = "./connectors";
private String functionMetadataTopicName;
+ private String functionWebServiceUrl;
private String pulsarServiceUrl;
private String pulsarWebServiceUrl;
private String clusterCoordinationTopicName;
@@ -153,7 +154,11 @@ public String getWorkerHostname() {
}
return this.workerHostname;
}
-
+
+ public String getWorkerWebAddress() {
+ return String.format("http://%s:%d", this.getWorkerHostname(),
this.getWorkerPort());
+ }
+
public static String unsafeLocalhostResolve() {
try {
return InetAddress.getLocalHost().getHostName();
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index eb543c4668..0850766e8d 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -32,6 +32,8 @@
import lombok.extern.slf4j.Slf4j;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
@@ -63,7 +65,8 @@
private final ScheduledExecutorService statsUpdater;
private AuthenticationService authenticationService;
private ConnectorsManager connectorsManager;
- private PulsarAdmin admin;
+ private PulsarAdmin brokerAdmin;
+ private PulsarAdmin functionAdmin;
private final MetricsGenerator metricsGenerator;
public WorkerService(WorkerConfig workerConfig) {
@@ -76,7 +79,14 @@ public WorkerService(WorkerConfig workerConfig) {
public void start(URI dlogUri) throws InterruptedException {
log.info("Starting worker {}...", workerConfig.getWorkerId());
- this.admin =
Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
+ this.brokerAdmin =
Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
+ workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters(),
+ workerConfig.getTlsTrustCertsFilePath(),
workerConfig.isTlsAllowInsecureConnection());
+
+ final String functionWebServiceUrl =
StringUtils.isNotBlank(workerConfig.getFunctionWebServiceUrl())
+ ? workerConfig.getFunctionWebServiceUrl()
+ : workerConfig.getWorkerWebAddress();
+ this.functionAdmin = Utils.getPulsarAdminClient(functionWebServiceUrl,
workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters(),
workerConfig.getTlsTrustCertsFilePath(),
workerConfig.isTlsAllowInsecureConnection());
@@ -131,7 +141,7 @@ public void start(URI dlogUri) throws InterruptedException {
// create function runtime manager
this.functionRuntimeManager = new FunctionRuntimeManager(
- this.workerConfig, this.client, this.dlogNamespace,
this.membershipManager, connectorsManager);
+ this.workerConfig, this, this.dlogNamespace,
this.membershipManager, connectorsManager);
// Setting references to managers in scheduler
this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
@@ -215,8 +225,12 @@ public void stop() {
schedulerManager.close();
}
- if (null != this.admin) {
- this.admin.close();
+ if (null != this.brokerAdmin) {
+ this.brokerAdmin.close();
+ }
+
+ if (null != this.functionAdmin) {
+ this.functionAdmin.close();
}
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
index 4673d5645f..be979864b2 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
@@ -23,6 +23,7 @@
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.functions.worker.WorkerService;
@@ -38,6 +39,8 @@
protected ServletContext servletContext;
@Context
protected HttpServletRequest httpRequest;
+ @Context
+ protected UriInfo uri;
public FunctionApiResource() {
this.functions = new FunctionsImpl(this);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index c2747f4320..d5f3f57ec5 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -33,6 +33,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
+import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
@@ -386,17 +387,17 @@ public Response getFunctionInstanceStatus(final String
tenant, final String name
}
public Response stopFunctionInstance(final String tenant, final String
namespace, final String functionName,
- final String instanceId) {
- return stopFunctionInstance(tenant, namespace, functionName,
instanceId, false);
+ final String instanceId, URI uri) {
+ return stopFunctionInstance(tenant, namespace, functionName,
instanceId, false, uri);
}
public Response restartFunctionInstance(final String tenant, final String
namespace, final String functionName,
- final String instanceId) {
- return stopFunctionInstance(tenant, namespace, functionName,
instanceId, true);
+ final String instanceId, URI uri) {
+ return stopFunctionInstance(tenant, namespace, functionName,
instanceId, true, uri);
}
public Response stopFunctionInstance(final String tenant, final String
namespace, final String functionName,
- final String instanceId, boolean restart) {
+ final String instanceId, boolean restart, URI uri) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
@@ -413,7 +414,7 @@ public Response stopFunctionInstance(final String tenant,
final String namespace
FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace,
functionName)) {
- log.error("Function in getFunctionStatus does not exist @
/{}/{}/{}", tenant, namespace, functionName);
+ log.error("Function does not exist @ /{}/{}/{}", tenant,
namespace, functionName);
return
Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(String.format("Function %s doesn't
exist", functionName))).build();
}
@@ -421,7 +422,7 @@ public Response stopFunctionInstance(final String tenant,
final String namespace
FunctionRuntimeManager functionRuntimeManager =
worker().getFunctionRuntimeManager();
try {
return functionRuntimeManager.stopFunctionInstance(tenant,
namespace, functionName,
- Integer.parseInt(instanceId), restart);
+ Integer.parseInt(instanceId), restart, uri);
} catch (WebApplicationException we) {
throw we;
} catch (Exception e) {
@@ -1057,7 +1058,7 @@ public boolean isAuthorizedRole(String tenant, String
clientRole) throws PulsarA
if (isSuperUser(clientRole)) {
return true;
}
- TenantInfo tenantInfo =
worker().getAdmin().tenants().getTenantInfo(tenant);
+ TenantInfo tenantInfo =
worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
return clientRole != null && (tenantInfo.getAdminRoles() == null
|| tenantInfo.getAdminRoles().isEmpty()
|| tenantInfo.getAdminRoles().contains(clientRole));
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index ddea22ace0..e13f69cb5a 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -25,6 +25,7 @@
import org.apache.pulsar.common.io.ConnectorDefinition;
import java.io.IOException;
import java.io.InputStream;
+import java.net.URI;
import java.util.List;
import javax.ws.rs.Consumes;
@@ -171,7 +172,7 @@ public Response triggerFunction(final @PathParam("tenant")
String tenant,
public Response restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final
@PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- return functions.restartFunctionInstance(tenant, namespace,
functionName, instanceId);
+ return functions.restartFunctionInstance(tenant, namespace,
functionName, instanceId, this.uri.getRequestUri());
}
@POST
@@ -196,7 +197,7 @@ public Response restartFunction(final @PathParam("tenant")
String tenant,
public Response stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final
@PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- return functions.stopFunctionInstance(tenant, namespace, functionName,
instanceId);
+ return functions.stopFunctionInstance(tenant, namespace, functionName,
instanceId, this.uri.getRequestUri());
}
@POST
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 4f618c41b4..85a2122dd0 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker;
import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
@@ -86,11 +87,14 @@ public void testProcessAssignmentUpdateAddFunctions()
throws Exception {
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(mock(Reader.class)).when(readerBuilder).create();
-
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(pulsarClient).when(workerService).getClient();
+
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+
// test new assignment add functions
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
- pulsarClient,
+ workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class)
@@ -180,11 +184,14 @@ public void testProcessAssignmentUpdateDeleteFunctions()
throws Exception {
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(mock(Reader.class)).when(readerBuilder).create();
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(pulsarClient).when(workerService).getClient();
+
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
// test new assignment delete functions
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
- pulsarClient,
+ workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class)
@@ -278,11 +285,14 @@ public void testProcessAssignmentUpdateModifyFunctions()
throws Exception {
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(mock(Reader.class)).when(readerBuilder).create();
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(pulsarClient).when(workerService).getClient();
+
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
// test new assignment update functions
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
- pulsarClient,
+ workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class)
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index e0a24285d5..7ed6acad86 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -36,6 +36,7 @@
import java.util.concurrent.atomic.AtomicReference;
import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerEventListener;
@@ -129,9 +130,13 @@ public void testCheckFailuresNoFailures() throws Exception
{
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(mock(Reader.class)).when(readerBuilder).create();
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(pulsarClient).when(workerService).getClient();
+
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
- pulsarClient,
+ workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class)
@@ -194,9 +199,13 @@ public void testCheckFailuresSomeFailures() throws
Exception {
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(mock(Reader.class)).when(readerBuilder).create();
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(pulsarClient).when(workerService).getClient();
+
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
- pulsarClient,
+ workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class)
@@ -284,9 +293,13 @@ public void testCheckFailuresSomeUnassigned() throws
Exception {
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(mock(Reader.class)).when(readerBuilder).create();
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(pulsarClient).when(workerService).getClient();
+
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
- pulsarClient,
+ workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services