This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 95d4bf7 Add authorization support on function apis (#2213)
95d4bf7 is described below
commit 95d4bf7c2cd52448b5e482f6fac44b24c088d554
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Jul 23 20:18:25 2018 -0700
Add authorization support on function apis (#2213)
* Add authorization support on function apis
* fix authorization enable check
---
conf/functions_worker.yml | 8 +++
.../pulsar/broker/admin/impl/FunctionsBase.java | 11 ++--
.../org/apache/pulsar/io/PulsarSinkE2ETest.java | 41 ++++++++++++++-
pulsar-functions/worker/pom.xml | 6 +++
.../pulsar/functions/worker/WorkerConfig.java | 22 +++++++-
.../pulsar/functions/worker/WorkerService.java | 17 +++++-
.../functions/worker/rest/FunctionApiResource.java | 12 +++++
.../pulsar/functions/worker/rest/WorkerServer.java | 15 ++++++
.../functions/worker/rest/api/FunctionsImpl.java | 61 ++++++++++++++++++++--
.../worker/rest/api/v2/FunctionApiV2Resource.java | 14 +++--
.../rest/api/v2/FunctionApiV2ResourceTest.java | 25 +++++----
11 files changed, 203 insertions(+), 29 deletions(-)
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 58bcf1d..4194337 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -45,3 +45,11 @@ initialBrokerReconnectMaxRetries: 60
assignmentWriteMaxRetries: 60
instanceLivenessCheckFreqMs: 30000
metricsSamplingPeriodSec: 60
+# Enforce authentication
+authenticationEnabled: false
+# Enforce authorization on accessing functions api
+authorizationEnabled: false
+# Set of autentication provider name list, which is a list of class names
+authenticationProviders:
+# Set of role names that are treated as "super-user", meaning they will be
able to access any admin-api
+superUserRoles:
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 6338ce9..f97f180 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -82,8 +82,8 @@ public class FunctionsBase extends AdminResource implements
Supplier<WorkerServi
final @FormDataParam("url") String
functionPkgUrl,
final @FormDataParam("functionDetails")
String functionDetailsJson) {
- return functions.registerFunction(
- tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson);
+ return functions.registerFunction(tenant, namespace, functionName,
uploadedInputStream, fileDetail,
+ functionPkgUrl, functionDetailsJson, clientAppId());
}
@PUT
@@ -103,8 +103,8 @@ public class FunctionsBase extends AdminResource implements
Supplier<WorkerServi
final @FormDataParam("url") String
functionPkgUrl,
final @FormDataParam("functionDetails")
String functionDetailsJson) {
- return functions.updateFunction(
- tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson);
+ return functions.updateFunction(tenant, namespace, functionName,
uploadedInputStream, fileDetail,
+ functionPkgUrl, functionDetailsJson, clientAppId());
}
@@ -122,8 +122,7 @@ public class FunctionsBase extends AdminResource implements
Supplier<WorkerServi
public Response deregisterFunction(final @PathParam("tenant") String
tenant,
final @PathParam("namespace") String
namespace,
final @PathParam("functionName") String
functionName) {
- return functions.deregisterFunction(
- tenant, namespace, functionName);
+ return functions.deregisterFunction(tenant, namespace, functionName,
clientAppId());
}
@GET
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index a7b34ae..16f1a76 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -77,6 +77,7 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import com.google.common.collect.Maps;
@@ -99,7 +100,6 @@ public class PulsarSinkE2ETest {
PulsarAdmin admin;
PulsarClient pulsarClient;
BrokerStats brokerStatsClient;
- WorkerServer functionsWorkerServer;
WorkerService functionsWorkerService;
final String tenant = "external-repl-prop";
String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
@@ -120,6 +120,11 @@ public class PulsarSinkE2ETest {
private static final Logger log =
LoggerFactory.getLogger(PulsarSinkE2ETest.class);
+ @DataProvider(name = "validRoleName")
+ public Object[][] validRoleName() {
+ return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+ }
+
@BeforeMethod
void setup(Method method) throws Exception {
@@ -187,6 +192,7 @@ public class PulsarSinkE2ETest {
pulsarClient = clientBuilder.build();
TenantInfo propAdmin = new TenantInfo();
+ propAdmin.getAdminRoles().add("superUser");
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
admin.tenants().updateTenant(tenant, propAdmin);
@@ -231,6 +237,9 @@ public class PulsarSinkE2ETest {
workerConfig.setUseTls(true);
workerConfig.setTlsAllowInsecureConnection(true);
workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
+
+ workerConfig.setAuthenticationEnabled(true);
+ workerConfig.setAuthorizationEnabled(true);
return new WorkerService(workerConfig);
}
@@ -416,4 +425,34 @@ public class PulsarSinkE2ETest {
return functionDetailsBuilder.build();
}
+
+ @Test(dataProvider = "validRoleName")
+ public void testAuthorization(boolean validRoleName) throws Exception {
+
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sinkTopic = "persistent://" + replNamespace + "/output";
+ final String functionName = "PulsarSink-test";
+ final String subscriptionName = "test-sub";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace,
clusters);
+
+ String roleName = validRoleName ? "superUser" : "invalid";
+ TenantInfo propAdmin = new TenantInfo();
+ propAdmin.getAdminRoles().add(roleName);
+
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
+ admin.tenants().updateTenant(tenant, propAdmin);
+
+ String jarFilePathUrl = Utils.FILE + ":"
+ +
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+ FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl,
tenant, namespacePortion, functionName,
+ sinkTopic, subscriptionName);
+ try {
+ admin.functions().createFunctionWithUrl(functionDetails,
jarFilePathUrl);
+ Assert.assertTrue(validRoleName);
+ } catch
(org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne)
{
+ Assert.assertFalse(validRoleName);
+ }
+ }
}
\ No newline at end of file
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index f55781e..bec8818 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -36,6 +36,12 @@
<dependency>
<groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-broker-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-runtime</artifactId>
<version>${project.version}</version>
</dependency>
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 09f54ef..eda9b15 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -20,14 +20,18 @@ package org.apache.pulsar.functions.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Properties;
+import java.util.Set;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.configuration.PulsarConfiguration;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -42,7 +46,7 @@ import lombok.experimental.Accessors;
@EqualsAndHashCode
@ToString
@Accessors(chain = true)
-public class WorkerConfig implements Serializable {
+public class WorkerConfig implements Serializable, PulsarConfiguration {
private static final long serialVersionUID = 1L;
@@ -74,6 +78,17 @@ public class WorkerConfig implements Serializable {
private boolean tlsAllowInsecureConnection = false;
private boolean tlsHostnameVerificationEnable = false;
private int metricsSamplingPeriodSec = 60;
+ // Enforce authentication
+ private boolean authenticationEnabled = false;
+ // Autentication provider name list, which is a list of class names
+ private Set<String> authenticationProviders = Sets.newTreeSet();
+ // Enforce authorization on accessing functions admin-api
+ private boolean authorizationEnabled = false;
+ // Role names that are treated as "super-user", meaning they will be able
to access any admin-api
+ private Set<String> superUserRoles = Sets.newTreeSet();
+
+ private Properties properties = new Properties();
+
@Data
@Setter
@@ -135,4 +150,9 @@ public class WorkerConfig implements Serializable {
throw new IllegalStateException("Failed to resolve localhost
name.", ex);
}
}
+
+ @Override
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
}
\ No newline at end of file
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 29dff53..6af9c8f 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -35,9 +35,12 @@ import static
org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
/**
* A service component contains everything to run a worker except rest server.
@@ -57,8 +60,9 @@ public class WorkerService {
private SchedulerManager schedulerManager;
private boolean isInitialized = false;
private final ScheduledExecutorService statsUpdater;
-
+ private AuthenticationService authenticationService;
private ConnectorsManager connectorsManager;
+ private PulsarAdmin admin;
public WorkerService(WorkerConfig workerConfig) {
this.workerConfig = workerConfig;
@@ -68,6 +72,11 @@ public class WorkerService {
public void start(URI dlogUri) throws InterruptedException {
log.info("Starting worker {}...", workerConfig.getWorkerId());
+
+ this.admin =
Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
+ workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters(),
+ workerConfig.getTlsTrustCertsFilePath(),
workerConfig.isTlsAllowInsecureConnection());
+
try {
log.info("Worker Configs: {}", new
ObjectMapper().writerWithDefaultPrettyPrinter()
.writeValueAsString(workerConfig));
@@ -128,6 +137,8 @@ public class WorkerService {
// initialize function metadata manager
this.functionMetaDataManager.initialize();
+
+ authenticationService = new
AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig));
// Starting cluster services
log.info("Start cluster services...");
@@ -200,6 +211,10 @@ public class WorkerService {
if (null != schedulerManager) {
schedulerManager.close();
}
+
+ if (null != this.admin) {
+ this.admin.close();
+ }
}
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
index 1c5c739..4673d56 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
@@ -18,9 +18,13 @@
*/
package org.apache.pulsar.functions.worker.rest;
+import java.util.Optional;
import java.util.function.Supplier;
import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Context;
+
+import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
@@ -32,6 +36,8 @@ public class FunctionApiResource implements
Supplier<WorkerService> {
private WorkerService workerService;
@Context
protected ServletContext servletContext;
+ @Context
+ protected HttpServletRequest httpRequest;
public FunctionApiResource() {
this.functions = new FunctionsImpl(this);
@@ -44,4 +50,10 @@ public class FunctionApiResource implements
Supplier<WorkerService> {
}
return this.workerService;
}
+
+ public String clientAppId() {
+ return httpRequest != null
+ ? (String)
httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName)
+ : null;
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 524a6ad..a57a952 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -19,8 +19,15 @@
package org.apache.pulsar.functions.worker.rest;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
+
+import javax.servlet.DispatcherType;
+
import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.eclipse.jetty.server.Handler;
@@ -28,9 +35,11 @@ import org.eclipse.jetty.server.Server;
import java.net.BindException;
import java.net.URI;
+
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.glassfish.jersey.server.ResourceConfig;
@@ -41,6 +50,7 @@ public class WorkerServer implements Runnable {
private final WorkerConfig workerConfig;
private final WorkerService workerService;
+ private static final String MATCH_ALL = "/*";
private static String getErrorMessage(Server server, int port, Exception
ex) {
if (ex instanceof BindException) {
@@ -106,7 +116,12 @@ public class WorkerServer implements Runnable {
final ServletHolder apiServlet =
new ServletHolder(new ServletContainer(config));
contextHandler.addServlet(apiServlet, "/*");
+ if (workerService.getWorkerConfig().isAuthenticationEnabled()) {
+ FilterHolder filter = new FilterHolder(new
AuthenticationFilter(workerService.getAuthenticationService()));
+ contextHandler.addFilter(filter, MATCH_ALL,
EnumSet.allOf(DispatcherType.class));
+ }
return contextHandler;
}
+
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 2ac48dc..b935bf5 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -57,12 +57,14 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -117,11 +119,24 @@ public class FunctionsImpl {
public Response registerFunction(final String tenant, final String
namespace, final String functionName,
final InputStream uploadedInputStream, final
FormDataContentDisposition fileDetail,
- final String functionPkgUrl, final String functionDetailsJson) {
+ final String functionPkgUrl, final String functionDetailsJson,
final String clientRole) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
}
+
+ try {
+ if (!isAuthorizedRole(tenant, clientRole)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to
register function", tenant, namespace, functionName,
+ clientRole);
+ return
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData("client is not authorize to
perform operation")).build();
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace,
functionName, e);
+ return
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(e.getMessage())).build();
+ }
FunctionDetails functionDetails;
boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl);
@@ -168,12 +183,25 @@ public class FunctionsImpl {
public Response updateFunction(final String tenant, final String
namespace, final String functionName,
final InputStream uploadedInputStream, final
FormDataContentDisposition fileDetail,
- final String functionPkgUrl, final String functionDetailsJson) {
+ final String functionPkgUrl, final String functionDetailsJson,
final String clientRole) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
}
+ try {
+ if (!isAuthorizedRole(tenant, clientRole)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to
update function", tenant, namespace,
+ functionName, clientRole);
+ return
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData("client is not authorize to
perform operation")).build();
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace,
functionName, e);
+ return
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(e.getMessage())).build();
+ }
+
FunctionDetails functionDetails;
boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl);
// validate parameters
@@ -217,12 +245,26 @@ public class FunctionsImpl {
: updateRequest(functionMetaDataBuilder.build(),
uploadedInputStream);
}
- public Response deregisterFunction(final String tenant, final String
namespace, final String functionName) {
+ public Response deregisterFunction(final String tenant, final String
namespace, final String functionName,
+ String clientRole) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
}
+ try {
+ if (!isAuthorizedRole(tenant, clientRole)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to
deregister function", tenant, namespace,
+ functionName, clientRole);
+ return
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData("client is not authorize to
perform operation")).build();
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace,
functionName, e);
+ return
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(e.getMessage())).build();
+ }
+
// validate parameters
try {
validateDeregisterRequestParams(tenant, namespace, functionName);
@@ -893,5 +935,18 @@ public class FunctionsImpl {
return String.format("%s/%s/%s/%s", tenant, namespace,
Codec.encode(functionName),
Utils.getUniquePackageName(Codec.encode(fileName)));
}
+
+ private boolean isAuthorizedRole(String tenant, String clientRole) throws
PulsarAdminException {
+ if (worker().getWorkerConfig().isAuthorizationEnabled()) {
+ // skip authorization if client role is super-user
+ if (clientRole != null &&
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole)) {
+ return true;
+ }
+ TenantInfo tenantInfo =
worker().getAdmin().tenants().getTenantInfo(tenant);
+ return clientRole != null && (tenantInfo.getAdminRoles() == null
|| tenantInfo.getAdminRoles().isEmpty()
+ || tenantInfo.getAdminRoles().contains(clientRole));
+ }
+ return true;
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index a23c41b..92957b3 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -62,8 +62,8 @@ public class FunctionApiV2Resource extends
FunctionApiResource {
final @FormDataParam("url") String
functionPkgUrl,
final @FormDataParam("functionDetails")
String functionDetailsJson) {
- return functions.registerFunction(
- tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson);
+ return functions.registerFunction(tenant, namespace, functionName,
uploadedInputStream, fileDetail,
+ functionPkgUrl, functionDetailsJson, clientAppId());
}
@@ -78,8 +78,8 @@ public class FunctionApiV2Resource extends
FunctionApiResource {
final @FormDataParam("url") String
functionPkgUrl,
final @FormDataParam("functionDetails")
String functionDetailsJson) {
- return functions.updateFunction(
- tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson);
+ return functions.updateFunction(tenant, namespace, functionName,
uploadedInputStream, fileDetail,
+ functionPkgUrl, functionDetailsJson, clientAppId());
}
@@ -87,10 +87,8 @@ public class FunctionApiV2Resource extends
FunctionApiResource {
@DELETE
@Path("/{tenant}/{namespace}/{functionName}")
public Response deregisterFunction(final @PathParam("tenant") String
tenant,
- final @PathParam("namespace") String
namespace,
- final @PathParam("functionName") String
functionName) {
- return functions.deregisterFunction(
- tenant, namespace, functionName);
+ final @PathParam("namespace") String namespace, final
@PathParam("functionName") String functionName) {
+ return functions.deregisterFunction(tenant, namespace, functionName,
clientAppId());
}
@GET
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 866b92e..c59d03d 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -314,7 +314,8 @@ public class FunctionApiV2ResourceTest {
inputStream,
details,
null,
-
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+ null);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
if (missingFieldName.equals("parallelism")) {
@@ -342,7 +343,8 @@ public class FunctionApiV2ResourceTest {
mockedInputStream,
mockedFormData,
null,
-
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+ org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+ null);
}
@Test
@@ -587,7 +589,8 @@ public class FunctionApiV2ResourceTest {
inputStream,
details,
null,
-
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+ org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+ null);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
if (missingFieldName.equals("parallelism")) {
@@ -615,7 +618,8 @@ public class FunctionApiV2ResourceTest {
mockedInputStream,
mockedFormData,
null,
-
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+ org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+ null);
}
@Test
@@ -696,7 +700,8 @@ public class FunctionApiV2ResourceTest {
null,
null,
filePackageUrl,
-
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+ org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+ null);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
}
@@ -783,7 +788,8 @@ public class FunctionApiV2ResourceTest {
Response response = resource.deregisterFunction(
tenant,
namespace,
- function);
+ function,
+ null);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
assertEquals(new ErrorData(missingFieldName + " is not
provided").reason, ((ErrorData) response.getEntity()).reason);
@@ -793,7 +799,8 @@ public class FunctionApiV2ResourceTest {
return resource.deregisterFunction(
tenant,
namespace,
- function);
+ function,
+ null);
}
@Test
@@ -1043,7 +1050,7 @@ public class FunctionApiV2ResourceTest {
.setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName))
.build();
Response response = resource.registerFunction(tenant, namespace,
function, null, null, filePackageUrl,
-
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), null);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
}
@@ -1068,7 +1075,7 @@ public class FunctionApiV2ResourceTest {
.setSubscriptionType(subscriptionType).putAllTopicsToSerDeClassName(topicsToSerDeClassName))
.build();
Response response = resource.registerFunction(tenant, namespace,
function, null, null, filePackageUrl,
-
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
+
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails), null);
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
}