rdhabalia closed pull request #2213: Add authorization support on function apis
URL: https://github.com/apache/incubator-pulsar/pull/2213
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 58bcf1dfd3..4194337e4a 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -45,3 +45,11 @@ initialBrokerReconnectMaxRetries: 60
assignmentWriteMaxRetries: 60
instanceLivenessCheckFreqMs: 30000
metricsSamplingPeriodSec: 60
+# Enforce authentication
+authenticationEnabled: false
+# Enforce authorization on accessing functions api
+authorizationEnabled: false
+# Set of autentication provider name list, which is a list of class names
+authenticationProviders:
+# Set of role names that are treated as "super-user", meaning they will be
able to access any admin-api
+superUserRoles:
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 6338ce98f2..f97f1807c9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -82,8 +82,8 @@ public Response registerFunction(final @PathParam("tenant")
String tenant,
final @FormDataParam("url") String
functionPkgUrl,
final @FormDataParam("functionDetails")
String functionDetailsJson) {
- return functions.registerFunction(
- tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson);
+ return functions.registerFunction(tenant, namespace, functionName,
uploadedInputStream, fileDetail,
+ functionPkgUrl, functionDetailsJson, clientAppId());
}
@PUT
@@ -103,8 +103,8 @@ public Response updateFunction(final @PathParam("tenant")
String tenant,
final @FormDataParam("url") String
functionPkgUrl,
final @FormDataParam("functionDetails")
String functionDetailsJson) {
- return functions.updateFunction(
- tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson);
+ return functions.updateFunction(tenant, namespace, functionName,
uploadedInputStream, fileDetail,
+ functionPkgUrl, functionDetailsJson, clientAppId());
}
@@ -122,8 +122,7 @@ public Response updateFunction(final @PathParam("tenant")
String tenant,
public Response deregisterFunction(final @PathParam("tenant") String
tenant,
final @PathParam("namespace") String
namespace,
final @PathParam("functionName") String
functionName) {
- return functions.deregisterFunction(
- tenant, namespace, functionName);
+ return functions.deregisterFunction(tenant, namespace, functionName,
clientAppId());
}
@GET
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index a7b34ae857..16f1a76597 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -77,6 +77,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import com.google.common.collect.Maps;
@@ -99,7 +100,6 @@
PulsarAdmin admin;
PulsarClient pulsarClient;
BrokerStats brokerStatsClient;
- WorkerServer functionsWorkerServer;
WorkerService functionsWorkerService;
final String tenant = "external-repl-prop";
String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
@@ -120,6 +120,11 @@
private static final Logger log =
LoggerFactory.getLogger(PulsarSinkE2ETest.class);
+ @DataProvider(name = "validRoleName")
+ public Object[][] validRoleName() {
+ return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+ }
+
@BeforeMethod
void setup(Method method) throws Exception {
@@ -187,6 +192,7 @@ void setup(Method method) throws Exception {
pulsarClient = clientBuilder.build();
TenantInfo propAdmin = new TenantInfo();
+ propAdmin.getAdminRoles().add("superUser");
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
admin.tenants().updateTenant(tenant, propAdmin);
@@ -231,6 +237,9 @@ private WorkerService
createPulsarFunctionWorker(ServiceConfiguration config) {
workerConfig.setUseTls(true);
workerConfig.setTlsAllowInsecureConnection(true);
workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
+
+ workerConfig.setAuthenticationEnabled(true);
+ workerConfig.setAuthorizationEnabled(true);
return new WorkerService(workerConfig);
}
@@ -416,4 +425,34 @@ protected FunctionDetails createSinkConfig(String jarFile,
String tenant, String
return functionDetailsBuilder.build();
}
+
+ @Test(dataProvider = "validRoleName")
+ public void testAuthorization(boolean validRoleName) throws Exception {
+
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sinkTopic = "persistent://" + replNamespace + "/output";
+ final String functionName = "PulsarSink-test";
+ final String subscriptionName = "test-sub";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace,
clusters);
+
+ String roleName = validRoleName ? "superUser" : "invalid";
+ TenantInfo propAdmin = new TenantInfo();
+ propAdmin.getAdminRoles().add(roleName);
+
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
+ admin.tenants().updateTenant(tenant, propAdmin);
+
+ String jarFilePathUrl = Utils.FILE + ":"
+ +
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+ FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl,
tenant, namespacePortion, functionName,
+ sinkTopic, subscriptionName);
+ try {
+ admin.functions().createFunctionWithUrl(functionDetails,
jarFilePathUrl);
+ Assert.assertTrue(validRoleName);
+ } catch
(org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne)
{
+ Assert.assertFalse(validRoleName);
+ }
+ }
}
\ No newline at end of file
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index 9b10cbe55a..029f573d76 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -34,6 +34,12 @@
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-broker-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-runtime</artifactId>
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 09f54ef424..eda9b1515f 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -20,14 +20,18 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Properties;
+import java.util.Set;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.configuration.PulsarConfiguration;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -42,7 +46,7 @@
@EqualsAndHashCode
@ToString
@Accessors(chain = true)
-public class WorkerConfig implements Serializable {
+public class WorkerConfig implements Serializable, PulsarConfiguration {
private static final long serialVersionUID = 1L;
@@ -74,6 +78,17 @@
private boolean tlsAllowInsecureConnection = false;
private boolean tlsHostnameVerificationEnable = false;
private int metricsSamplingPeriodSec = 60;
+ // Enforce authentication
+ private boolean authenticationEnabled = false;
+ // Autentication provider name list, which is a list of class names
+ private Set<String> authenticationProviders = Sets.newTreeSet();
+ // Enforce authorization on accessing functions admin-api
+ private boolean authorizationEnabled = false;
+ // Role names that are treated as "super-user", meaning they will be able
to access any admin-api
+ private Set<String> superUserRoles = Sets.newTreeSet();
+
+ private Properties properties = new Properties();
+
@Data
@Setter
@@ -135,4 +150,9 @@ public static String unsafeLocalhostResolve() {
throw new IllegalStateException("Failed to resolve localhost
name.", ex);
}
}
+
+ @Override
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
}
\ No newline at end of file
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 29dff537f1..6af9c8f2bf 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -35,9 +35,12 @@
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
/**
* A service component contains everything to run a worker except rest server.
@@ -57,8 +60,9 @@
private SchedulerManager schedulerManager;
private boolean isInitialized = false;
private final ScheduledExecutorService statsUpdater;
-
+ private AuthenticationService authenticationService;
private ConnectorsManager connectorsManager;
+ private PulsarAdmin admin;
public WorkerService(WorkerConfig workerConfig) {
this.workerConfig = workerConfig;
@@ -68,6 +72,11 @@ public WorkerService(WorkerConfig workerConfig) {
public void start(URI dlogUri) throws InterruptedException {
log.info("Starting worker {}...", workerConfig.getWorkerId());
+
+ this.admin =
Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
+ workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters(),
+ workerConfig.getTlsTrustCertsFilePath(),
workerConfig.isTlsAllowInsecureConnection());
+
try {
log.info("Worker Configs: {}", new
ObjectMapper().writerWithDefaultPrettyPrinter()
.writeValueAsString(workerConfig));
@@ -128,6 +137,8 @@ public void start(URI dlogUri) throws InterruptedException {
// initialize function metadata manager
this.functionMetaDataManager.initialize();
+
+ authenticationService = new
AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig));
// Starting cluster services
log.info("Start cluster services...");
@@ -200,6 +211,10 @@ public void stop() {
if (null != schedulerManager) {
schedulerManager.close();
}
+
+ if (null != this.admin) {
+ this.admin.close();
+ }
}
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
index 1c5c739982..4673d5645f 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
@@ -18,9 +18,13 @@
*/
package org.apache.pulsar.functions.worker.rest;
+import java.util.Optional;
import java.util.function.Supplier;
import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Context;
+
+import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
@@ -32,6 +36,8 @@
private WorkerService workerService;
@Context
protected ServletContext servletContext;
+ @Context
+ protected HttpServletRequest httpRequest;
public FunctionApiResource() {
this.functions = new FunctionsImpl(this);
@@ -44,4 +50,10 @@ public synchronized WorkerService get() {
}
return this.workerService;
}
+
+ public String clientAppId() {
+ return httpRequest != null
+ ? (String)
httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName)
+ : null;
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 524a6ad295..a57a9529fa 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -19,8 +19,15 @@
package org.apache.pulsar.functions.worker.rest;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
+
+import javax.servlet.DispatcherType;
+
import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.eclipse.jetty.server.Handler;
@@ -28,9 +35,11 @@
import java.net.BindException;
import java.net.URI;
+
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.glassfish.jersey.server.ResourceConfig;
@@ -41,6 +50,7 @@
private final WorkerConfig workerConfig;
private final WorkerService workerService;
+ private static final String MATCH_ALL = "/*";
private static String getErrorMessage(Server server, int port, Exception
ex) {
if (ex instanceof BindException) {
@@ -106,7 +116,12 @@ public static ServletContextHandler
newServletContextHandler(String contextPath,
final ServletHolder apiServlet =
new ServletHolder(new ServletContainer(config));
contextHandler.addServlet(apiServlet, "/*");
+ if (workerService.getWorkerConfig().isAuthenticationEnabled()) {
+ FilterHolder filter = new FilterHolder(new
AuthenticationFilter(workerService.getAuthenticationService()));
+ contextHandler.addFilter(filter, MATCH_ALL,
EnumSet.allOf(DispatcherType.class));
+ }
return contextHandler;
}
+
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 2ac48dc960..b935bf5f58 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -57,12 +57,14 @@
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -117,11 +119,24 @@ private boolean isWorkerServiceAvailable() {
public Response registerFunction(final String tenant, final String
namespace, final String functionName,
final InputStream uploadedInputStream, final
FormDataContentDisposition fileDetail,
- final String functionPkgUrl, final String functionDetailsJson) {
+ final String functionPkgUrl, final String functionDetailsJson,
final String clientRole) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
}
+
+ try {
+ if (!isAuthorizedRole(tenant, clientRole)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to
register function", tenant, namespace, functionName,
+ clientRole);
+ return
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData("client is not authorize to
perform operation")).build();
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace,
functionName, e);
+ return
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(e.getMessage())).build();
+ }
FunctionDetails functionDetails;
boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl);
@@ -168,12 +183,25 @@ public Response registerFunction(final String tenant,
final String namespace, fi
public Response updateFunction(final String tenant, final String
namespace, final String functionName,
final InputStream uploadedInputStream, final
FormDataContentDisposition fileDetail,
- final String functionPkgUrl, final String functionDetailsJson) {
+ final String functionPkgUrl, final String functionDetailsJson,
final String clientRole) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
}
+ try {
+ if (!isAuthorizedRole(tenant, clientRole)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to
update function", tenant, namespace,
+ functionName, clientRole);
+ return
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData("client is not authorize to
perform operation")).build();
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace,
functionName, e);
+ return
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(e.getMessage())).build();
+ }
+
FunctionDetails functionDetails;
boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl);
// validate parameters
@@ -217,12 +245,26 @@ public Response updateFunction(final String tenant, final
String namespace, fina
: updateRequest(functionMetaDataBuilder.build(),
uploadedInputStream);
}
- public Response deregisterFunction(final String tenant, final String
namespace, final String functionName) {
+ public Response deregisterFunction(final String tenant, final String
namespace, final String functionName,
+ String clientRole) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
}
+ try {
+ if (!isAuthorizedRole(tenant, clientRole)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to
deregister function", tenant, namespace,
+ functionName, clientRole);
+ return
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData("client is not authorize to
perform operation")).build();
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace,
functionName, e);
+ return
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(e.getMessage())).build();
+ }
+
// validate parameters
try {
validateDeregisterRequestParams(tenant, namespace, functionName);
@@ -893,5 +935,18 @@ public static String createPackagePath(String tenant,
String namespace, String f
return String.format("%s/%s/%s/%s", tenant, namespace,
Codec.encode(functionName),
Utils.getUniquePackageName(Codec.encode(fileName)));
}
+
+ private boolean isAuthorizedRole(String tenant, String clientRole) throws
PulsarAdminException {
+ if (worker().getWorkerConfig().isAuthorizationEnabled()) {
+ // skip authorization if client role is super-user
+ if (clientRole != null &&
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole)) {
+ return true;
+ }
+ TenantInfo tenantInfo =
worker().getAdmin().tenants().getTenantInfo(tenant);
+ return clientRole != null && (tenantInfo.getAdminRoles() == null
|| tenantInfo.getAdminRoles().isEmpty()
+ || tenantInfo.getAdminRoles().contains(clientRole));
+ }
+ return true;
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index a23c41bc1e..92957b32d4 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -62,8 +62,8 @@ public Response registerFunction(final @PathParam("tenant")
String tenant,
final @FormDataParam("url") String
functionPkgUrl,
final @FormDataParam("functionDetails")
String functionDetailsJson) {
- return functions.registerFunction(
- tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson);
+ return functions.registerFunction(tenant, namespace, functionName,
uploadedInputStream, fileDetail,
+ functionPkgUrl, functionDetailsJson, clientAppId());
}
@@ -78,8 +78,8 @@ public Response updateFunction(final @PathParam("tenant")
String tenant,
final @FormDataParam("url") String
functionPkgUrl,
final @FormDataParam("functionDetails")
String functionDetailsJson) {
- return functions.updateFunction(
- tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson);
+ return functions.updateFunction(tenant, namespace, functionName,
uploadedInputStream, fileDetail,
+ functionPkgUrl, functionDetailsJson, clientAppId());
}
@@ -87,10 +87,8 @@ public Response updateFunction(final @PathParam("tenant")
String tenant,
@DELETE
@Path("/{tenant}/{namespace}/{functionName}")
public Response deregisterFunction(final @PathParam("tenant") String
tenant,
- final @PathParam("namespace") String
namespace,
- final @PathParam("functionName") String
functionName) {
- return functions.deregisterFunction(
- tenant, namespace, functionName);
+ final @PathParam("namespace") String namespace, final
@PathParam("functionName") String functionName) {
+ return functions.deregisterFunction(tenant, namespace, functionName,
clientAppId());
}
@GET
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 866b92e8ed..c59d03dde4 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -314,7 +314,8 @@ private void testRegisterFunctionMissingArguments(
inputStream,
details,
null,
-
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+ null);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
if (missingFieldName.equals("parallelism")) {
@@ -342,7 +343,8 @@ private Response registerDefaultFunction() throws
IOException {
mockedInputStream,
mockedFormData,
null,
-
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+ org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+ null);
}
@Test
@@ -587,7 +589,8 @@ private void testUpdateFunctionMissingArguments(
inputStream,
details,
null,
-
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+ org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+ null);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
if (missingFieldName.equals("parallelism")) {
@@ -615,7 +618,8 @@ private Response updateDefaultFunction() throws IOException
{
mockedInputStream,
mockedFormData,
null,
-
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+ org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+ null);
}
@Test
@@ -696,7 +700,8 @@ public void testUpdateFunctionWithUrl() throws IOException {
null,
null,
filePackageUrl,
-
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+ org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+ null);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
}
@@ -783,7 +788,8 @@ private void testDeregisterFunctionMissingArguments(
Response response = resource.deregisterFunction(
tenant,
namespace,
- function);
+ function,
+ null);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
assertEquals(new ErrorData(missingFieldName + " is not
provided").reason, ((ErrorData) response.getEntity()).reason);
@@ -793,7 +799,8 @@ private Response deregisterDefaultFunction() {
return resource.deregisterFunction(
tenant,
namespace,
- function);
+ function,
+ null);
}
@Test
@@ -1043,7 +1050,7 @@ public void
testRegisterFunctionFileUrlWithValidSinkClass() throws IOException {
.setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName))
.build();
Response response = resource.registerFunction(tenant, namespace,
function, null, null, filePackageUrl,
-
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), null);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
}
@@ -1068,7 +1075,7 @@ public void
testRegisterFunctionFileUrlWithInValidSinkClass() throws IOException
.setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName))
.build();
Response response = resource.registerFunction(tenant, namespace,
function, null, null, filePackageUrl,
-
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), null);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services