This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 95df092 Refactoring Function Component implementation (#4541)
95df092 is described below
commit 95df092832a3633a81fd719a5c096919141cc4f9
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Sun Jun 23 20:01:37 2019 -0700
Refactoring Function Component implementation (#4541)
* Refactoring Function Component implementation
* cleaning up
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 15 +-
.../apache/pulsar/broker/admin/impl/SinksBase.java | 21 +-
.../pulsar/broker/admin/impl/SourcesBase.java | 20 +-
.../functions/worker/rest/api/ComponentImpl.java | 485 +--------------------
.../functions/worker/rest/api/FunctionsImpl.java | 394 ++++++++++++++++-
.../functions/worker/rest/api/FunctionsImplV2.java | 19 +-
.../functions/worker/rest/api/SinksImpl.java | 403 ++++++++++++++++-
.../functions/worker/rest/api/SourcesImpl.java | 401 ++++++++++++++++-
.../worker/rest/api/v3/FunctionsApiV3Resource.java | 8 +-
.../worker/rest/api/v3/SinksApiV3Resource.java | 21 +-
.../worker/rest/api/v3/SourcesApiV3Resource.java | 21 +-
.../rest/api/v3/FunctionApiV3ResourceTest.java | 14 +-
.../worker/rest/api/v3/SinkApiV3ResourceTest.java | 28 +-
.../rest/api/v3/SourceApiV3ResourceTest.java | 24 +-
14 files changed, 1290 insertions(+), 584 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 11e20de..c6f38d0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -18,7 +18,12 @@
*/
package org.apache.pulsar.broker.admin.impl;
-import io.swagger.annotations.*;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Example;
+import io.swagger.annotations.ExampleProperty;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.functions.FunctionConfig;
@@ -163,10 +168,10 @@ public class FunctionsBase extends AdminResource
implements Supplier<WorkerServi
)
)
)
- final @FormDataParam("functionConfig") String functionConfigJson) {
+ final @FormDataParam("functionConfig") FunctionConfig
functionConfig) {
functions.registerFunction(tenant, namespace, functionName,
uploadedInputStream, fileDetail,
- functionPkgUrl, functionConfigJson, clientAppId(),
clientAuthData());
+ functionPkgUrl, functionConfig, clientAppId(), clientAuthData());
}
@PUT
@@ -270,12 +275,12 @@ public class FunctionsBase extends AdminResource
implements Supplier<WorkerServi
)
)
)
- final @FormDataParam("functionConfig") String functionConfigJson,
+ final @FormDataParam("functionConfig") FunctionConfig
functionConfig,
@ApiParam(value = "The update options is for the Pulsar Function
that needs to be updated.")
final @FormDataParam("updateOptions") UpdateOptions updateOptions)
throws IOException {
functions.updateFunction(tenant, namespace, functionName,
uploadedInputStream, fileDetail,
- functionPkgUrl, functionConfigJson, clientAppId(),
clientAuthData(), updateOptions);
+ functionPkgUrl, functionConfig, clientAppId(),
clientAuthData(), updateOptions);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
index c527bd3..e21d861 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
@@ -77,7 +77,7 @@ public class SinksBase extends AdminResource implements
Supplier<WorkerService>
final @PathParam("sinkName") String sinkName,
final @FormDataParam("data") InputStream
uploadedInputStream,
final @FormDataParam("data")
FormDataContentDisposition fileDetail,
- final @FormDataParam("url") String functionPkgUrl,
+ final @FormDataParam("url") String sinkPkgUrl,
@ApiParam(
value =
"A JSON value presenting a sink config
playload. All available configuration options are: \n" +
@@ -136,10 +136,9 @@ public class SinksBase extends AdminResource implements
Supplier<WorkerService>
)
)
)
- final @FormDataParam("sinkConfig") String
sinkConfigJson) {
-
- sink.registerFunction(tenant, namespace, sinkName,
uploadedInputStream, fileDetail,
- functionPkgUrl, sinkConfigJson, clientAppId(),
clientAuthData());
+ final @FormDataParam("sinkConfig") SinkConfig
sinkConfig) {
+ sink.registerSink(tenant, namespace, sinkName, uploadedInputStream,
fileDetail,
+ sinkPkgUrl, sinkConfig, clientAppId(), clientAuthData());
}
@PUT
@@ -162,7 +161,8 @@ public class SinksBase extends AdminResource implements
Supplier<WorkerService>
final @PathParam("sinkName") String sinkName,
final @FormDataParam("data") InputStream
uploadedInputStream,
final @FormDataParam("data")
FormDataContentDisposition fileDetail,
- final @FormDataParam("url") String functionPkgUrl,
+ @ApiParam(value = "URL of sink's archive")
+ final @FormDataParam("url") String sinkPkgUrl,
@ApiParam(
value =
"A JSON value presenting a sink config
playload. All available configuration options are: \n" +
@@ -221,12 +221,11 @@ public class SinksBase extends AdminResource implements
Supplier<WorkerService>
)
)
)
- final @FormDataParam("sinkConfig") String
sinkConfigJson,
- @ApiParam()
+ final @FormDataParam("sinkConfig") SinkConfig
sinkConfig,
+ @ApiParam(value = "Update options for sink")
final @FormDataParam("updateOptions") UpdateOptions
updateOptions) {
-
- sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream,
fileDetail,
- functionPkgUrl, sinkConfigJson, clientAppId(),
clientAuthData(), updateOptions);
+ sink.updateSink(tenant, namespace, sinkName, uploadedInputStream,
fileDetail,
+ sinkPkgUrl, sinkConfig, clientAppId(), clientAuthData(),
updateOptions);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
index 4856800..8dc20c9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
@@ -79,7 +79,7 @@ public class SourcesBase extends AdminResource implements
Supplier<WorkerService
final @PathParam("sourceName") String sourceName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
- final @FormDataParam("url") String functionPkgUrl,
+ final @FormDataParam("url") String sourcePkgUrl,
@ApiParam(
value = "A JSON value presenting source configuration
payload. An example of the expected functions can be found here. \n" +
"classname \n" +
@@ -126,10 +126,9 @@ public class SourcesBase extends AdminResource implements
Supplier<WorkerService
)
)
)
- final @FormDataParam("sourceConfig") String sourceConfigJson) {
-
- source.registerFunction(tenant, namespace, sourceName,
uploadedInputStream, fileDetail,
- functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData());
+ final @FormDataParam("sourceConfig") SourceConfig sourceConfig) {
+ source.registerSource(tenant, namespace, sourceName,
uploadedInputStream, fileDetail,
+ sourcePkgUrl, sourceConfig, clientAppId(), clientAuthData());
}
@PUT
@@ -154,7 +153,8 @@ public class SourcesBase extends AdminResource implements
Supplier<WorkerService
final @PathParam("sourceName") String sourceName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
- final @FormDataParam("url") String functionPkgUrl,
+ @ApiParam(value = "URL of sources' archive")
+ final @FormDataParam("url") String sourcePkgUrl,
@ApiParam(
value = "A JSON value presenting source configuration
payload. An example of the expected functions can be found here. \n" +
"classname \n" +
@@ -201,11 +201,11 @@ public class SourcesBase extends AdminResource implements
Supplier<WorkerService
)
)
)
- final @FormDataParam("sourceConfig") String sourceConfigJson,
+ final @FormDataParam("sourceConfig") SourceConfig sourceConfig,
+ @ApiParam(value = "Update options for source")
final @FormDataParam("updateOptions") UpdateOptions updateOptions)
{
-
- source.updateFunction(tenant, namespace, sourceName,
uploadedInputStream, fileDetail,
- functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData(),
updateOptions);
+ source.updateSource(tenant, namespace, sourceName,
uploadedInputStream, fileDetail,
+ sourcePkgUrl, sourceConfig, clientAppId(), clientAuthData(),
updateOptions);
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 62ac9bd..345ea56 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -18,9 +18,6 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.gson.Gson;
-import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
@@ -34,7 +31,6 @@ import
org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -46,19 +42,14 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionState;
-import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.Codec;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.functions.auth.FunctionAuthData;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -68,11 +59,9 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
-import org.apache.pulsar.functions.utils.SinkConfigUtils;
-import org.apache.pulsar.functions.utils.SourceConfigUtils;
-import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -95,12 +84,10 @@ import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
-import java.nio.file.Path;
import java.util.Base64;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -111,10 +98,8 @@ import static
com.google.common.base.Preconditions.checkNotNull;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static
org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
import static
org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace;
import static
org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
import static
org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
@@ -285,170 +270,10 @@ public abstract class ComponentImpl {
return true;
}
- public void registerFunction(final String tenant,
- final String namespace,
- final String componentName,
- final InputStream uploadedInputStream,
- final FormDataContentDisposition fileDetail,
- final String functionPkgUrl,
- final String componentConfigJson,
- final String clientRole,
- AuthenticationDataHttps
clientAuthenticationDataHttps) {
-
- if (!isWorkerServiceAvailable()) {
- throwUnavailableException();
- }
-
- if (tenant == null) {
- throw new RestException(Status.BAD_REQUEST, "Tenant is not
provided");
- }
- if (namespace == null) {
- throw new RestException(Status.BAD_REQUEST, "Namespace is not
provided");
- }
- if (componentName == null) {
- throw new RestException(Status.BAD_REQUEST,
ComponentTypeUtils.toString(componentType) + " Name is not provided");
- }
-
- try {
- if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
register {}", tenant, namespace,
- componentName, clientRole,
ComponentTypeUtils.toString(componentType));
- throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
- }
- } catch (PulsarAdminException e) {
- log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace,
componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
e.getMessage());
- }
-
- try {
- // Check tenant exists
- final TenantInfo tenantInfo =
worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
-
- String qualifiedNamespace = tenant + "/" + namespace;
- List<String> namespaces =
worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
- if (namespaces != null &&
!namespaces.contains(qualifiedNamespace)) {
- String qualifiedNamespaceWithCluster =
String.format("%s/%s/%s", tenant,
-
worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
- if (namespaces != null &&
!namespaces.contains(qualifiedNamespaceWithCluster)) {
- log.error("{}/{}/{} Namespace {} does not exist", tenant,
namespace, componentName, namespace);
- throw new RestException(Status.BAD_REQUEST, "Namespace
does not exist");
- }
- }
- } catch (PulsarAdminException.NotAuthorizedException e) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
operate {} on tenant", tenant, namespace,
- componentName, clientRole,
ComponentTypeUtils.toString(componentType));
- throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
- } catch (PulsarAdminException.NotFoundException e) {
- log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace,
componentName, tenant);
- throw new RestException(Status.BAD_REQUEST, "Tenant does not
exist");
- } catch (PulsarAdminException e) {
- log.error("{}/{}/{} Issues getting tenant data", tenant,
namespace, componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
e.getMessage());
- }
-
- FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
-
- if (functionMetaDataManager.containsFunction(tenant, namespace,
componentName)) {
- log.error("{} {}/{}/{} already exists",
ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
- throw new RestException(Status.BAD_REQUEST, String.format("%s %s
already exists", ComponentTypeUtils.toString(componentType), componentName));
- }
-
- FunctionDetails functionDetails = null;
- boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
- File componentPackageFile = null;
- try {
-
- // validate parameters
- try {
- if (isPkgUrlProvided) {
-
- if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
- throw new IllegalArgumentException("Function Package
url is not valid. supported url (http/https/file)");
- }
- try {
- componentPackageFile =
FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
- } catch (Exception e) {
- throw new
IllegalArgumentException(String.format("Encountered error \"%s\" when getting
%s package from %s", e.getMessage(),
ComponentTypeUtils.toString(componentType), functionPkgUrl));
- }
- functionDetails = validateUpdateRequestParams(tenant,
namespace, componentName,
- componentConfigJson, componentType,
componentPackageFile);
- } else {
- if (uploadedInputStream != null) {
- componentPackageFile =
WorkerUtils.dumpToTmpFile(uploadedInputStream);
- }
- functionDetails = validateUpdateRequestParams(tenant,
namespace, componentName,
- componentConfigJson, componentType,
componentPackageFile);
- if (!isFunctionCodeBuiltin(functionDetails) &&
(componentPackageFile == null || fileDetail == null)) {
- throw new
IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package
is not provided");
- }
- }
- } catch (Exception e) {
- log.error("Invalid register {} request @ /{}/{}/{}",
ComponentTypeUtils.toString(componentType), tenant, namespace, componentName,
e);
- throw new RestException(Status.BAD_REQUEST, e.getMessage());
- }
-
- try {
-
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
- } catch (Exception e) {
- log.error("{} {}/{}/{} cannot be admitted by the runtime
factory", ComponentTypeUtils.toString(componentType), tenant, namespace,
componentName);
- throw new RestException(Status.BAD_REQUEST, String.format("%s
%s cannot be admitted:- %s", ComponentTypeUtils.toString(componentType),
componentName, e.getMessage()));
- }
-
- // function state
- FunctionMetaData.Builder functionMetaDataBuilder =
FunctionMetaData.newBuilder()
- .setFunctionDetails(functionDetails)
- .setCreateTime(System.currentTimeMillis())
- .setVersion(0);
-
- // cache auth if need
- if (worker().getWorkerConfig().isAuthenticationEnabled()) {
-
- if (clientAuthenticationDataHttps != null) {
- try {
- Optional<FunctionAuthData> functionAuthData =
worker().getFunctionRuntimeManager()
- .getRuntimeFactory()
- .getAuthProvider()
- .cacheAuthData(tenant, namespace,
componentName, clientAuthenticationDataHttps);
-
- if (functionAuthData.isPresent()) {
- functionMetaDataBuilder.setFunctionAuthSpec(
-
Function.FunctionAuthenticationSpec.newBuilder()
-
.setData(ByteString.copyFrom(functionAuthData.get().getData()))
- .build());
- }
- } catch (Exception e) {
- log.error("Error caching authentication data for {}
{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace,
componentName, e);
-
-
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
String.format("Error caching authentication data for %s %s:- %s",
ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
- }
- }
- }
-
- PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
- try {
- packageLocationMetaDataBuilder =
getFunctionPackageLocation(functionMetaDataBuilder.build(),
- functionPkgUrl, fileDetail, componentPackageFile);
- } catch (Exception e) {
- log.error("Failed process {} {}/{}/{} package: ",
ComponentTypeUtils.toString(componentType), tenant, namespace, componentName,
e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
e.getMessage());
- }
-
-
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
- updateRequest(functionMetaDataBuilder.build());
- } finally {
-
- if (!(functionPkgUrl != null &&
functionPkgUrl.startsWith(Utils.FILE))
- && componentPackageFile != null &&
componentPackageFile.exists()) {
- componentPackageFile.delete();
- }
- }
- }
-
- private PackageLocationMetaData.Builder getFunctionPackageLocation(final
FunctionMetaData functionMetaData,
- final
String functionPkgUrl,
- final
FormDataContentDisposition fileDetail,
- final
File uploadedInputStreamAsFile) throws Exception {
+ PackageLocationMetaData.Builder getFunctionPackageLocation(final
FunctionMetaData functionMetaData,
+ final String
functionPkgUrl,
+ final
FormDataContentDisposition fileDetail,
+ final File
uploadedInputStreamAsFile) throws Exception {
FunctionDetails functionDetails =
functionMetaData.getFunctionDetails();
String tenant = functionDetails.getTenant();
String namespace = functionDetails.getNamespace();
@@ -512,230 +337,6 @@ public abstract class ComponentImpl {
return packageLocationMetaDataBuilder;
}
-
- public void updateFunction(final String tenant,
- final String namespace,
- final String componentName,
- final InputStream uploadedInputStream,
- final FormDataContentDisposition fileDetail,
- final String functionPkgUrl,
- final String componentConfigJson,
- final String clientRole,
- AuthenticationDataHttps
clientAuthenticationDataHttps,
- UpdateOptions updateOptions) {
-
- if (!isWorkerServiceAvailable()) {
- throwUnavailableException();
- }
-
- if (tenant == null) {
- throw new RestException(Status.BAD_REQUEST, "Tenant is not
provided");
- }
- if (namespace == null) {
- throw new RestException(Status.BAD_REQUEST, "Namespace is not
provided");
- }
- if (componentName == null) {
- throw new RestException(Status.BAD_REQUEST,
ComponentTypeUtils.toString(componentType) + " Name is not provided");
- }
-
- try {
- if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
update {}", tenant, namespace,
- componentName, clientRole,
ComponentTypeUtils.toString(componentType));
- throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
-
- }
- } catch (PulsarAdminException e) {
- log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace,
componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
e.getMessage());
- }
-
- FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
-
- if (!functionMetaDataManager.containsFunction(tenant, namespace,
componentName)) {
- throw new RestException(Status.BAD_REQUEST, String.format("%s %s
doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
- }
-
- String mergedComponentConfigJson;
- String existingComponentConfigJson;
-
- FunctionMetaData existingComponent =
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
-
- if
(!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType))
{
- log.error("{}/{}/{} is not a {}", tenant, namespace,
componentName, ComponentTypeUtils.toString(componentType));
- throw new RestException(Status.NOT_FOUND, String.format("%s %s
doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
- }
-
- if (componentType.equals(FunctionDetails.ComponentType.FUNCTION)) {
- FunctionConfig existingFunctionConfig =
FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
- existingComponentConfigJson = new
Gson().toJson(existingFunctionConfig);
- FunctionConfig functionConfig = new
Gson().fromJson(componentConfigJson, FunctionConfig.class);
- // The rest end points take precedence over whatever is there in
functionconfig
- functionConfig.setTenant(tenant);
- functionConfig.setNamespace(namespace);
- functionConfig.setName(componentName);
- try {
- FunctionConfig mergedConfig =
FunctionConfigUtils.validateUpdate(existingFunctionConfig,
- functionConfig);
- mergedComponentConfigJson = new Gson().toJson(mergedConfig);
- } catch (Exception e) {
- throw new RestException(Status.BAD_REQUEST, e.getMessage());
- }
- } else if (componentType.equals(FunctionDetails.ComponentType.SOURCE))
{
- SourceConfig existingSourceConfig =
SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
- existingComponentConfigJson = new
Gson().toJson(existingSourceConfig);
- SourceConfig sourceConfig = new
Gson().fromJson(componentConfigJson, SourceConfig.class);
- // The rest end points take precedence over whatever is there in
functionconfig
- sourceConfig.setTenant(tenant);
- sourceConfig.setNamespace(namespace);
- sourceConfig.setName(componentName);
- try {
- SourceConfig mergedConfig =
SourceConfigUtils.validateUpdate(existingSourceConfig, sourceConfig);
- mergedComponentConfigJson = new Gson().toJson(mergedConfig);
- } catch (Exception e) {
- throw new RestException(Status.BAD_REQUEST, e.getMessage());
- }
- } else {
- SinkConfig existingSinkConfig =
SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
- existingComponentConfigJson = new
Gson().toJson(existingSinkConfig);
- SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson,
SinkConfig.class);
- // The rest end points take precedence over whatever is there in
functionconfig
- sinkConfig.setTenant(tenant);
- sinkConfig.setNamespace(namespace);
- sinkConfig.setName(componentName);
- try {
- SinkConfig mergedConfig =
SinkConfigUtils.validateUpdate(existingSinkConfig, sinkConfig);
- mergedComponentConfigJson = new Gson().toJson(mergedConfig);
- } catch (Exception e) {
- throw new RestException(Status.BAD_REQUEST, e.getMessage());
- }
- }
-
- if (existingComponentConfigJson.equals(mergedComponentConfigJson) &&
isBlank(functionPkgUrl) && uploadedInputStream == null) {
- log.error("{}/{}/{} Update contains no changes", tenant,
namespace, componentName);
- throw new RestException(Status.BAD_REQUEST, "Update contains no
change");
- }
-
- FunctionDetails functionDetails = null;
- File componentPackageFile = null;
- try {
-
- // validate parameters
- try {
- if (isNotBlank(functionPkgUrl)) {
- try {
- componentPackageFile =
FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
- } catch (Exception e) {
- throw new
IllegalArgumentException(String.format("Encountered error \"%s\" when getting
%s package from %s", e.getMessage(),
ComponentTypeUtils.toString(componentType), functionPkgUrl));
- }
- functionDetails = validateUpdateRequestParams(tenant,
namespace, componentName,
- mergedComponentConfigJson, componentType,
componentPackageFile);
-
- } else if
(existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
- ||
existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP))
{
- try {
- componentPackageFile =
FunctionCommon.extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
- } catch (Exception e) {
- throw new
IllegalArgumentException(String.format("Encountered error \"%s\" when getting
%s package from %s", e.getMessage(),
ComponentTypeUtils.toString(componentType), functionPkgUrl));
- }
- functionDetails = validateUpdateRequestParams(tenant,
namespace, componentName,
- mergedComponentConfigJson, componentType,
componentPackageFile);
- } else if (uploadedInputStream != null) {
-
- componentPackageFile =
WorkerUtils.dumpToTmpFile(uploadedInputStream);
- functionDetails = validateUpdateRequestParams(tenant,
namespace, componentName,
- mergedComponentConfigJson, componentType,
componentPackageFile);
-
- } else if
(existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN))
{
- functionDetails = validateUpdateRequestParams(tenant,
namespace, componentName,
- mergedComponentConfigJson, componentType,
componentPackageFile);
- if (!isFunctionCodeBuiltin(functionDetails) &&
(componentPackageFile == null || fileDetail == null)) {
- throw new
IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package
is not provided");
- }
- } else {
-
- componentPackageFile = FunctionCommon.createPkgTempFile();
- componentPackageFile.deleteOnExit();
- log.info("componentPackageFile: {}", componentPackageFile);
-
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(),
componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
-
- functionDetails = validateUpdateRequestParams(tenant,
namespace, componentName,
- mergedComponentConfigJson, componentType,
componentPackageFile);
- }
- } catch (Exception e) {
- log.error("Invalid update {} request @ /{}/{}/{}",
ComponentTypeUtils.toString(componentType), tenant, namespace, componentName,
e);
- throw new RestException(Status.BAD_REQUEST, e.getMessage());
- }
-
- try {
-
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
- } catch (Exception e) {
- log.error("Updated {} {}/{}/{} cannot be submitted to runtime
factory", ComponentTypeUtils.toString(componentType), tenant, namespace,
componentName);
- throw new RestException(Status.BAD_REQUEST, String.format("%s
%s cannot be admitted:- %s",
- ComponentTypeUtils.toString(componentType),
componentName, e.getMessage()));
- }
-
- // merge from existing metadata
- FunctionMetaData.Builder functionMetaDataBuilder =
FunctionMetaData.newBuilder().mergeFrom(existingComponent)
- .setFunctionDetails(functionDetails);
-
- // update auth data if need
- if (worker().getWorkerConfig().isAuthenticationEnabled()) {
- if (clientAuthenticationDataHttps != null && updateOptions !=
null && updateOptions.isUpdateAuthData()) {
- // get existing auth data if it exists
- Optional<FunctionAuthData> existingFunctionAuthData =
Optional.empty();
- if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
- existingFunctionAuthData =
Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
- }
-
- try {
- Optional<FunctionAuthData> newFunctionAuthData =
worker().getFunctionRuntimeManager()
- .getRuntimeFactory()
- .getAuthProvider()
- .updateAuthData(
- tenant, namespace,
- componentName,
existingFunctionAuthData,
- clientAuthenticationDataHttps);
-
- if (newFunctionAuthData.isPresent()) {
- functionMetaDataBuilder.setFunctionAuthSpec(
-
Function.FunctionAuthenticationSpec.newBuilder()
-
.setData(ByteString.copyFrom(newFunctionAuthData.get().getData()))
- .build());
- } else {
- functionMetaDataBuilder.clearFunctionAuthSpec();
- }
- } catch (Exception e) {
- log.error("Error updating authentication data for {}
{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace,
componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
String.format("Error caching authentication data for %s %s:- %s",
ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
- }
- }
- }
-
- PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
- if (isNotBlank(functionPkgUrl) || uploadedInputStream != null) {
- try {
- packageLocationMetaDataBuilder =
getFunctionPackageLocation(functionMetaDataBuilder.build(),
- functionPkgUrl, fileDetail, componentPackageFile);
- } catch (Exception e) {
- log.error("Failed process {} {}/{}/{} package: ",
ComponentTypeUtils.toString(componentType), tenant, namespace, componentName,
e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
e.getMessage());
- }
- } else {
- packageLocationMetaDataBuilder =
PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
- }
-
-
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
-
- updateRequest(functionMetaDataBuilder.build());
- } finally {
- if (!(functionPkgUrl != null &&
functionPkgUrl.startsWith(Utils.FILE))
- && componentPackageFile != null &&
componentPackageFile.exists()) {
- componentPackageFile.delete();
- }
- }
- }
-
public void deregisterFunction(final String tenant,
final String namespace,
final String componentName,
@@ -1269,7 +870,7 @@ public abstract class ComponentImpl {
return retVals;
}
- private void updateRequest(final FunctionMetaData functionMetaData) {
+ void updateRequest(final FunctionMetaData functionMetaData) {
// Submit to FMT
FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
@@ -1708,80 +1309,6 @@ public abstract class ComponentImpl {
return null;
}
-private FunctionDetails validateUpdateRequestParams(final String tenant,
- final String namespace,
- final String componentName,
- final String
componentConfigJson,
- final
FunctionDetails.ComponentType componentType,
- final File
componentPackageFile) throws IOException {
- if (tenant == null) {
- throw new IllegalArgumentException("Tenant is not provided");
- }
- if (namespace == null) {
- throw new IllegalArgumentException("Namespace is not provided");
- }
- if (componentName == null) {
- throw new IllegalArgumentException(String.format("%s Name is not
provided", ComponentTypeUtils.toString(componentType)));
- }
-
- if (componentType.equals(FunctionDetails.ComponentType.FUNCTION) &&
!isEmpty(componentConfigJson)) {
- FunctionConfig functionConfig = new
Gson().fromJson(componentConfigJson, FunctionConfig.class);
- // The rest end points take precedence over whatever is there in
functionconfig
- functionConfig.setTenant(tenant);
- functionConfig.setNamespace(namespace);
- functionConfig.setName(componentName);
- FunctionConfigUtils.inferMissingArguments(functionConfig);
- ClassLoader clsLoader =
FunctionConfigUtils.validate(functionConfig, componentPackageFile);
- return FunctionConfigUtils.convert(functionConfig, clsLoader);
- }
- if (componentType.equals(FunctionDetails.ComponentType.SOURCE)) {
- Path archivePath = null;
- SourceConfig sourceConfig = new
Gson().fromJson(componentConfigJson, SourceConfig.class);
- // The rest end points take precedence over whatever is there in
sourceconfig
- sourceConfig.setTenant(tenant);
- sourceConfig.setNamespace(namespace);
- sourceConfig.setName(componentName);
-
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig);
- if (!StringUtils.isEmpty(sourceConfig.getArchive())) {
- String builtinArchive = sourceConfig.getArchive();
- if
(builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
- builtinArchive =
builtinArchive.replaceFirst("^builtin://", "");
- }
- try {
- archivePath =
this.worker().getConnectorsManager().getSourceArchive(builtinArchive);
- } catch (Exception e) {
- throw new IllegalArgumentException(String.format("No
Source archive %s found", archivePath));
- }
- }
- SourceConfigUtils.ExtractedSourceDetails sourceDetails =
SourceConfigUtils.validate(sourceConfig, archivePath, componentPackageFile);
- return SourceConfigUtils.convert(sourceConfig, sourceDetails);
- }
- if (componentType.equals(FunctionDetails.ComponentType.SINK)) {
- Path archivePath = null;
- SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson,
SinkConfig.class);
- // The rest end points take precedence over whatever is there in
sinkConfig
- sinkConfig.setTenant(tenant);
- sinkConfig.setNamespace(namespace);
- sinkConfig.setName(componentName);
-
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sinkConfig);
- if (!StringUtils.isEmpty(sinkConfig.getArchive())) {
- String builtinArchive = sinkConfig.getArchive();
- if
(builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
- builtinArchive =
builtinArchive.replaceFirst("^builtin://", "");
- }
- try {
- archivePath =
this.worker().getConnectorsManager().getSinkArchive(builtinArchive);
- } catch (Exception e) {
- throw new IllegalArgumentException(String.format("No Sink
archive %s found", archivePath));
- }
- }
- SinkConfigUtils.ExtractedSinkDetails sinkDetails =
SinkConfigUtils.validate(sinkConfig, archivePath, componentPackageFile);
- return SinkConfigUtils.convert(sinkConfig, sinkDetails);
- } else {
- throw new IllegalArgumentException("Unrecognized component type: "
+ ComponentTypeUtils.toString(componentType));
- }
- }
-
private void validateTriggerRequestParams(final String tenant,
final String namespace,
final String functionName,
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index bf09fbf..da86189 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -18,27 +18,401 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
+import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.functions.auth.FunctionAuthData;
+import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.ComponentTypeUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.rest.RestException;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
import java.util.function.Supplier;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static
org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static
org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
+import static
org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
+
@Slf4j
public class FunctionsImpl extends ComponentImpl {
+ public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
+ super(workerServiceSupplier,
Function.FunctionDetails.ComponentType.FUNCTION);
+ }
+
+ public void registerFunction(final String tenant,
+ final String namespace,
+ final String functionName,
+ final InputStream uploadedInputStream,
+ final FormDataContentDisposition fileDetail,
+ final String functionPkgUrl,
+ final FunctionConfig functionConfig,
+ final String clientRole,
+ AuthenticationDataHttps
clientAuthenticationDataHttps) {
+
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+
+ if (tenant == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant is
not provided");
+ }
+ if (namespace == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Namespace is
not provided");
+ }
+ if (functionName == null) {
+ throw new RestException(Response.Status.BAD_REQUEST,
ComponentTypeUtils.toString(componentType) + " Name is not provided");
+ }
+
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to
register {}", tenant, namespace,
+ functionName, clientRole,
ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client
is not authorize to perform operation");
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace,
functionName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+
+ try {
+ // Check tenant exists
+ worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
+
+ String qualifiedNamespace = tenant + "/" + namespace;
+ List<String> namespaces =
worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
+ if (namespaces != null &&
!namespaces.contains(qualifiedNamespace)) {
+ String qualifiedNamespaceWithCluster =
String.format("%s/%s/%s", tenant,
+
worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
+ if (namespaces != null &&
!namespaces.contains(qualifiedNamespaceWithCluster)) {
+ log.error("{}/{}/{} Namespace {} does not exist", tenant,
namespace, functionName, namespace);
+ throw new RestException(Response.Status.BAD_REQUEST,
"Namespace does not exist");
+ }
+ }
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to
operate {} on tenant", tenant, namespace,
+ functionName, clientRole,
ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client is
not authorize to perform operation");
+ } catch (PulsarAdminException.NotFoundException e) {
+ log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace,
functionName, tenant);
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant does
not exist");
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Issues getting tenant data", tenant,
namespace, functionName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+
+ FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
+
+ if (functionMetaDataManager.containsFunction(tenant, namespace,
functionName)) {
+ log.error("{} {}/{}/{} already exists",
ComponentTypeUtils.toString(componentType), tenant, namespace, functionName);
+ throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s already exists",
ComponentTypeUtils.toString(componentType), functionName));
+ }
+
+ Function.FunctionDetails functionDetails = null;
+ boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
+ File componentPackageFile = null;
+ try {
+
+ // validate parameters
+ try {
+ if (isPkgUrlProvided) {
+
+ if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
+ throw new IllegalArgumentException("Function Package
url is not valid. supported url (http/https/file)");
+ }
+ try {
+ componentPackageFile =
FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
+ } catch (Exception e) {
+ throw new
IllegalArgumentException(String.format("Encountered error \"%s\" when getting
%s package from %s", e.getMessage(),
ComponentTypeUtils.toString(componentType), functionPkgUrl));
+ }
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, functionName,
+ functionConfig, componentPackageFile);
+ } else {
+ if (uploadedInputStream != null) {
+ componentPackageFile =
WorkerUtils.dumpToTmpFile(uploadedInputStream);
+ }
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, functionName,
+ functionConfig, componentPackageFile);
+ if (!isFunctionCodeBuiltin(functionDetails) &&
(componentPackageFile == null || fileDetail == null)) {
+ throw new
IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package
is not provided");
+ }
+ }
+ } catch (Exception e) {
+ log.error("Invalid register {} request @ /{}/{}/{}",
ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+ throw new RestException(Response.Status.BAD_REQUEST,
e.getMessage());
+ }
+
+ try {
+
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+ } catch (Exception e) {
+ log.error("{} {}/{}/{} cannot be admitted by the runtime
factory", ComponentTypeUtils.toString(componentType), tenant, namespace,
functionName);
+ throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s cannot be admitted:- %s",
ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
+ }
+
+ // function state
+ Function.FunctionMetaData.Builder functionMetaDataBuilder =
Function.FunctionMetaData.newBuilder()
+ .setFunctionDetails(functionDetails)
+ .setCreateTime(System.currentTimeMillis())
+ .setVersion(0);
+
+ // cache auth if need
+ if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+
+ if (clientAuthenticationDataHttps != null) {
+ try {
+ Optional<FunctionAuthData> functionAuthData =
worker().getFunctionRuntimeManager()
+ .getRuntimeFactory()
+ .getAuthProvider()
+ .cacheAuthData(tenant, namespace,
functionName, clientAuthenticationDataHttps);
+
+ if (functionAuthData.isPresent()) {
+ functionMetaDataBuilder.setFunctionAuthSpec(
+
Function.FunctionAuthenticationSpec.newBuilder()
+
.setData(ByteString.copyFrom(functionAuthData.get().getData()))
+ .build());
+ }
+ } catch (Exception e) {
+ log.error("Error caching authentication data for {}
{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace,
functionName, e);
+
+
+ throw new
RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error
caching authentication data for %s %s:- %s",
ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
+ }
+ }
+ }
+
+ Function.PackageLocationMetaData.Builder
packageLocationMetaDataBuilder;
+ try {
+ packageLocationMetaDataBuilder =
getFunctionPackageLocation(functionMetaDataBuilder.build(),
+ functionPkgUrl, fileDetail, componentPackageFile);
+ } catch (Exception e) {
+ log.error("Failed process {} {}/{}/{} package: ",
ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+
+
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+ updateRequest(functionMetaDataBuilder.build());
+ } finally {
+
+ if (!(functionPkgUrl != null &&
functionPkgUrl.startsWith(Utils.FILE))
+ && componentPackageFile != null &&
componentPackageFile.exists()) {
+ componentPackageFile.delete();
+ }
+ }
+ }
+
+ public void updateFunction(final String tenant,
+ final String namespace,
+ final String functionName,
+ final InputStream uploadedInputStream,
+ final FormDataContentDisposition fileDetail,
+ final String functionPkgUrl,
+ final FunctionConfig functionConfig,
+ final String clientRole,
+ AuthenticationDataHttps
clientAuthenticationDataHttps,
+ UpdateOptions updateOptions) {
+
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+
+ if (tenant == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant is
not provided");
+ }
+ if (namespace == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Namespace is
not provided");
+ }
+ if (functionName == null) {
+ throw new RestException(Response.Status.BAD_REQUEST,
ComponentTypeUtils.toString(componentType) + " Name is not provided");
+ }
+
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to
update {}", tenant, namespace,
+ functionName, clientRole,
ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client
is not authorize to perform operation");
+
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace,
functionName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+
+ FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
+
+ if (!functionMetaDataManager.containsFunction(tenant, namespace,
functionName)) {
+ throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s doesn't exist",
ComponentTypeUtils.toString(componentType), functionName));
+ }
+
+ Function.FunctionMetaData existingComponent =
functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
+
+ if
(!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType))
{
+ log.error("{}/{}/{} is not a {}", tenant, namespace, functionName,
ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.NOT_FOUND,
String.format("%s %s doesn't exist",
ComponentTypeUtils.toString(componentType), functionName));
+ }
+
+ FunctionConfig existingFunctionConfig =
FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
+ // The rest end points take precedence over whatever is there in
function config
+ functionConfig.setTenant(tenant);
+ functionConfig.setNamespace(namespace);
+ functionConfig.setName(functionName);
+ FunctionConfig mergedConfig;
+ try {
+ mergedConfig =
FunctionConfigUtils.validateUpdate(existingFunctionConfig, functionConfig);
+ } catch (Exception e) {
+ throw new RestException(Response.Status.BAD_REQUEST,
e.getMessage());
+ }
+
+ if (existingFunctionConfig.equals(mergedConfig) &&
isBlank(functionPkgUrl) && uploadedInputStream == null) {
+ log.error("{}/{}/{} Update contains no changes", tenant,
namespace, functionName);
+ throw new RestException(Response.Status.BAD_REQUEST, "Update
contains no change");
+ }
+
+ Function.FunctionDetails functionDetails = null;
+ File componentPackageFile = null;
+ try {
+
+ // validate parameters
+ try {
+ if (isNotBlank(functionPkgUrl)) {
+ try {
+ componentPackageFile =
FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
+ } catch (Exception e) {
+ throw new
IllegalArgumentException(String.format("Encountered error \"%s\" when getting
%s package from %s", e.getMessage(),
ComponentTypeUtils.toString(componentType), functionPkgUrl));
+ }
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, functionName,
+ mergedConfig, componentPackageFile);
+
+ } else if
(existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
+ ||
existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP))
{
+ try {
+ componentPackageFile =
FunctionCommon.extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
+ } catch (Exception e) {
+ throw new
IllegalArgumentException(String.format("Encountered error \"%s\" when getting
%s package from %s", e.getMessage(),
ComponentTypeUtils.toString(componentType), functionPkgUrl));
+ }
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, functionName,
+ mergedConfig, componentPackageFile);
+ } else if (uploadedInputStream != null) {
+
+ componentPackageFile =
WorkerUtils.dumpToTmpFile(uploadedInputStream);
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, functionName,
+ mergedConfig, componentPackageFile);
+
+ } else if
(existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN))
{
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, functionName,
+ mergedConfig, componentPackageFile);
+ if (!isFunctionCodeBuiltin(functionDetails) &&
(componentPackageFile == null || fileDetail == null)) {
+ throw new
IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package
is not provided");
+ }
+ } else {
+
+ componentPackageFile = FunctionCommon.createPkgTempFile();
+ componentPackageFile.deleteOnExit();
+ log.info("componentPackageFile: {}", componentPackageFile);
+
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(),
componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
+
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, functionName,
+ mergedConfig, componentPackageFile);
+ }
+ } catch (Exception e) {
+ log.error("Invalid update {} request @ /{}/{}/{}",
ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+ throw new RestException(Response.Status.BAD_REQUEST,
e.getMessage());
+ }
+
+ try {
+
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+ } catch (Exception e) {
+ log.error("Updated {} {}/{}/{} cannot be submitted to runtime
factory", ComponentTypeUtils.toString(componentType), tenant, namespace,
functionName);
+ throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s cannot be admitted:- %s",
+ ComponentTypeUtils.toString(componentType),
functionName, e.getMessage()));
+ }
+
+ // merge from existing metadata
+ Function.FunctionMetaData.Builder functionMetaDataBuilder =
Function.FunctionMetaData.newBuilder().mergeFrom(existingComponent)
+ .setFunctionDetails(functionDetails);
+
+ // update auth data if need
+ if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+ if (clientAuthenticationDataHttps != null && updateOptions !=
null && updateOptions.isUpdateAuthData()) {
+ // get existing auth data if it exists
+ Optional<FunctionAuthData> existingFunctionAuthData =
Optional.empty();
+ if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
+ existingFunctionAuthData =
Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
+ }
+
+ try {
+ Optional<FunctionAuthData> newFunctionAuthData =
worker().getFunctionRuntimeManager()
+ .getRuntimeFactory()
+ .getAuthProvider()
+ .updateAuthData(
+ tenant, namespace,
+ functionName, existingFunctionAuthData,
+ clientAuthenticationDataHttps);
+
+ if (newFunctionAuthData.isPresent()) {
+ functionMetaDataBuilder.setFunctionAuthSpec(
+
Function.FunctionAuthenticationSpec.newBuilder()
+
.setData(ByteString.copyFrom(newFunctionAuthData.get().getData()))
+ .build());
+ } else {
+ functionMetaDataBuilder.clearFunctionAuthSpec();
+ }
+ } catch (Exception e) {
+ log.error("Error updating authentication data for {}
{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace,
functionName, e);
+ throw new
RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error
caching authentication data for %s %s:- %s",
ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
+ }
+ }
+ }
+
+ Function.PackageLocationMetaData.Builder
packageLocationMetaDataBuilder;
+ if (isNotBlank(functionPkgUrl) || uploadedInputStream != null) {
+ try {
+ packageLocationMetaDataBuilder =
getFunctionPackageLocation(functionMetaDataBuilder.build(),
+ functionPkgUrl, fileDetail, componentPackageFile);
+ } catch (Exception e) {
+ log.error("Failed process {} {}/{}/{} package: ",
ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+ throw new
RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+ } else {
+ packageLocationMetaDataBuilder =
Function.PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
+ }
+
+
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+
+ updateRequest(functionMetaDataBuilder.build());
+ } finally {
+ if (!(functionPkgUrl != null &&
functionPkgUrl.startsWith(Utils.FILE))
+ && componentPackageFile != null &&
componentPackageFile.exists()) {
+ componentPackageFile.delete();
+ }
+ }
+ }
+
private class GetFunctionStatus extends GetStatus<FunctionStatus,
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> {
@Override
@@ -195,10 +569,6 @@ public class FunctionsImpl extends ComponentImpl {
return exceptionInformation;
}
- public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier,
Function.FunctionDetails.ComponentType.FUNCTION);
- }
-
/**
* Get status of a function instance. If this worker is not running the
function instance,
* @param tenant the tenant the function belongs to
@@ -262,4 +632,20 @@ public class FunctionsImpl extends ComponentImpl {
return functionStatus;
}
+
+ private Function.FunctionDetails validateUpdateRequestParams(final String
tenant,
+ final String
namespace,
+ final String
componentName,
+ final
FunctionConfig functionConfig,
+ final File
componentPackageFile) throws IOException {
+
+ // The rest end points take precedence over whatever is there in
function config
+ functionConfig.setTenant(tenant);
+ functionConfig.setNamespace(namespace);
+ functionConfig.setName(componentName);
+ FunctionConfigUtils.inferMissingArguments(functionConfig);
+ ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig,
componentPackageFile);
+ return FunctionConfigUtils.convert(functionConfig, clsLoader);
+
+ }
}
\ No newline at end of file
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
index b1da329..d4e6414 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -18,14 +18,12 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.FunctionStatus;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.FunctionCommon;
@@ -103,14 +101,9 @@ public class FunctionsImplV2 {
throw new RestException(Response.Status.BAD_REQUEST,
e.getMessage());
}
FunctionConfig functionConfig =
FunctionConfigUtils.convertFromDetails(functionDetailsBuilder.build());
- String functionConfigJson = null;
- try {
- functionConfigJson =
ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig);
- } catch (JsonProcessingException e) {
- throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
- }
+
delegate.registerFunction(tenant, namespace, functionName,
uploadedInputStream, fileDetail,
- functionPkgUrl, functionConfigJson, clientRole, null);
+ functionPkgUrl, functionConfig, clientRole, null);
return Response.ok().build();
}
@@ -125,15 +118,9 @@ public class FunctionsImplV2 {
throw new RestException(Response.Status.BAD_REQUEST,
e.getMessage());
}
FunctionConfig functionConfig =
FunctionConfigUtils.convertFromDetails(functionDetailsBuilder.build());
- String functionConfigJson = null;
- try {
- functionConfigJson =
ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig);
- } catch (JsonProcessingException e) {
- throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
- }
delegate.updateFunction(tenant, namespace, functionName,
uploadedInputStream, fileDetail,
- functionPkgUrl, functionConfigJson, clientRole, null, null);
+ functionPkgUrl, functionConfig, clientRole, null, null);
return Response.ok().build();
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index fa07160..504722f 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -18,34 +18,406 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
+import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SinkStatus;
+import org.apache.pulsar.functions.auth.FunctionAuthData;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.rest.RestException;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
+import java.nio.file.Path;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
import java.util.function.Supplier;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static
org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static
org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
import static
org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
@Slf4j
public class SinksImpl extends ComponentImpl {
+ public SinksImpl(Supplier<WorkerService> workerServiceSupplier) {
+ super(workerServiceSupplier,
Function.FunctionDetails.ComponentType.SINK);
+ }
+
+ public void registerSink(final String tenant,
+ final String namespace,
+ final String sinkName,
+ final InputStream uploadedInputStream,
+ final FormDataContentDisposition fileDetail,
+ final String sinkPkgUrl,
+ final SinkConfig sinkConfig,
+ final String clientRole,
+ AuthenticationDataHttps
clientAuthenticationDataHttps) {
+
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+
+ if (tenant == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant is
not provided");
+ }
+ if (namespace == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Namespace is
not provided");
+ }
+ if (sinkName == null) {
+ throw new RestException(Response.Status.BAD_REQUEST,
ComponentTypeUtils.toString(componentType) + " Name is not provided");
+ }
+
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to
register {}", tenant, namespace,
+ sinkName, clientRole,
ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client
is not authorize to perform operation");
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace,
sinkName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+
+ try {
+ // Check tenant exists
+ worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
+
+ String qualifiedNamespace = tenant + "/" + namespace;
+ List<String> namespaces =
worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
+ if (namespaces != null &&
!namespaces.contains(qualifiedNamespace)) {
+ String qualifiedNamespaceWithCluster =
String.format("%s/%s/%s", tenant,
+
worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
+ if (namespaces != null &&
!namespaces.contains(qualifiedNamespaceWithCluster)) {
+ log.error("{}/{}/{} Namespace {} does not exist", tenant,
namespace, sinkName, namespace);
+ throw new RestException(Response.Status.BAD_REQUEST,
"Namespace does not exist");
+ }
+ }
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to
operate {} on tenant", tenant, namespace,
+ sinkName, clientRole,
ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client is
not authorize to perform operation");
+ } catch (PulsarAdminException.NotFoundException e) {
+ log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace,
sinkName, tenant);
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant does
not exist");
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Issues getting tenant data", tenant,
namespace, sinkName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+
+ FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
+
+ if (functionMetaDataManager.containsFunction(tenant, namespace,
sinkName)) {
+ log.error("{} {}/{}/{} already exists",
ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName);
+ throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s already exists",
ComponentTypeUtils.toString(componentType), sinkName));
+ }
+
+ Function.FunctionDetails functionDetails = null;
+ boolean isPkgUrlProvided = isNotBlank(sinkPkgUrl);
+ File componentPackageFile = null;
+ try {
+
+ // validate parameters
+ try {
+ if (isPkgUrlProvided) {
+
+ if (!Utils.isFunctionPackageUrlSupported(sinkPkgUrl)) {
+ throw new IllegalArgumentException("Function Package
url is not valid. supported url (http/https/file)");
+ }
+ try {
+ componentPackageFile =
FunctionCommon.extractFileFromPkgURL(sinkPkgUrl);
+ } catch (Exception e) {
+ throw new
IllegalArgumentException(String.format("Encountered error \"%s\" when getting
%s package from %s", e.getMessage(),
ComponentTypeUtils.toString(componentType), sinkPkgUrl));
+ }
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, sinkName,
+ sinkConfig, componentPackageFile);
+ } else {
+ if (uploadedInputStream != null) {
+ componentPackageFile =
WorkerUtils.dumpToTmpFile(uploadedInputStream);
+ }
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, sinkName,
+ sinkConfig, componentPackageFile);
+ if (!isFunctionCodeBuiltin(functionDetails) &&
(componentPackageFile == null || fileDetail == null)) {
+ throw new
IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package
is not provided");
+ }
+ }
+ } catch (Exception e) {
+ log.error("Invalid register {} request @ /{}/{}/{}",
ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+ throw new RestException(Response.Status.BAD_REQUEST,
e.getMessage());
+ }
+
+ try {
+
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+ } catch (Exception e) {
+ log.error("{} {}/{}/{} cannot be admitted by the runtime
factory", ComponentTypeUtils.toString(componentType), tenant, namespace,
sinkName);
+ throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s cannot be admitted:- %s",
ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
+ }
+
+ // function state
+ Function.FunctionMetaData.Builder functionMetaDataBuilder =
Function.FunctionMetaData.newBuilder()
+ .setFunctionDetails(functionDetails)
+ .setCreateTime(System.currentTimeMillis())
+ .setVersion(0);
+
+ // cache auth if need
+ if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+
+ if (clientAuthenticationDataHttps != null) {
+ try {
+ Optional<FunctionAuthData> functionAuthData =
worker().getFunctionRuntimeManager()
+ .getRuntimeFactory()
+ .getAuthProvider()
+ .cacheAuthData(tenant, namespace, sinkName,
clientAuthenticationDataHttps);
+
+ if (functionAuthData.isPresent()) {
+ functionMetaDataBuilder.setFunctionAuthSpec(
+
Function.FunctionAuthenticationSpec.newBuilder()
+
.setData(ByteString.copyFrom(functionAuthData.get().getData()))
+ .build());
+ }
+ } catch (Exception e) {
+ log.error("Error caching authentication data for {}
{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace,
sinkName, e);
+
+
+ throw new
RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error
caching authentication data for %s %s:- %s",
ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
+ }
+ }
+ }
+
+ Function.PackageLocationMetaData.Builder
packageLocationMetaDataBuilder;
+ try {
+ packageLocationMetaDataBuilder =
getFunctionPackageLocation(functionMetaDataBuilder.build(),
+ sinkPkgUrl, fileDetail, componentPackageFile);
+ } catch (Exception e) {
+ log.error("Failed process {} {}/{}/{} package: ",
ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+
+
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+ updateRequest(functionMetaDataBuilder.build());
+ } finally {
+
+ if (!(sinkPkgUrl != null && sinkPkgUrl.startsWith(Utils.FILE))
+ && componentPackageFile != null &&
componentPackageFile.exists()) {
+ componentPackageFile.delete();
+ }
+ }
+ }
+
+ public void updateSink(final String tenant,
+ final String namespace,
+ final String sinkName,
+ final InputStream uploadedInputStream,
+ final FormDataContentDisposition fileDetail,
+ final String sinkPkgUrl,
+ final SinkConfig sinkConfig,
+ final String clientRole,
+ AuthenticationDataHttps
clientAuthenticationDataHttps,
+ UpdateOptions updateOptions) {
+
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+
+ if (tenant == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant is
not provided");
+ }
+ if (namespace == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Namespace is
not provided");
+ }
+ if (sinkName == null) {
+ throw new RestException(Response.Status.BAD_REQUEST,
ComponentTypeUtils.toString(componentType) + " Name is not provided");
+ }
+
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to
update {}", tenant, namespace,
+ sinkName, clientRole,
ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client
is not authorize to perform operation");
+
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace,
sinkName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+
+ FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
+
+ if (!functionMetaDataManager.containsFunction(tenant, namespace,
sinkName)) {
+ throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s doesn't exist",
ComponentTypeUtils.toString(componentType), sinkName));
+ }
+
+ Function.FunctionMetaData existingComponent =
functionMetaDataManager.getFunctionMetaData(tenant, namespace, sinkName);
+
+ if
(!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType))
{
+ log.error("{}/{}/{} is not a {}", tenant, namespace, sinkName,
ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.NOT_FOUND,
String.format("%s %s doesn't exist",
ComponentTypeUtils.toString(componentType), sinkName));
+ }
+
+
+ SinkConfig existingSinkConfig =
SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
+ // The rest end points take precedence over whatever is there in
functionconfig
+ sinkConfig.setTenant(tenant);
+ sinkConfig.setNamespace(namespace);
+ sinkConfig.setName(sinkName);
+
+ SinkConfig mergedConfig;
+ try {
+ mergedConfig = SinkConfigUtils.validateUpdate(existingSinkConfig,
sinkConfig);
+ } catch (Exception e) {
+ throw new RestException(Response.Status.BAD_REQUEST,
e.getMessage());
+ }
+
+
+ if (existingSinkConfig.equals(mergedConfig) && isBlank(sinkPkgUrl) &&
uploadedInputStream == null) {
+ log.error("{}/{}/{} Update contains no changes", tenant,
namespace, sinkName);
+ throw new RestException(Response.Status.BAD_REQUEST, "Update
contains no change");
+ }
+
+ Function.FunctionDetails functionDetails = null;
+ File componentPackageFile = null;
+ try {
+
+ // validate parameters
+ try {
+ if (isNotBlank(sinkPkgUrl)) {
+ try {
+ componentPackageFile =
FunctionCommon.extractFileFromPkgURL(sinkPkgUrl);
+ } catch (Exception e) {
+ throw new
IllegalArgumentException(String.format("Encountered error \"%s\" when getting
%s package from %s", e.getMessage(),
ComponentTypeUtils.toString(componentType), sinkPkgUrl));
+ }
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, sinkName,
+ mergedConfig, componentPackageFile);
+
+ } else if
(existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
+ ||
existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP))
{
+ try {
+ componentPackageFile =
FunctionCommon.extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
+ } catch (Exception e) {
+ throw new
IllegalArgumentException(String.format("Encountered error \"%s\" when getting
%s package from %s", e.getMessage(),
ComponentTypeUtils.toString(componentType), sinkPkgUrl));
+ }
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, sinkName,
+ mergedConfig, componentPackageFile);
+ } else if (uploadedInputStream != null) {
+
+ componentPackageFile =
WorkerUtils.dumpToTmpFile(uploadedInputStream);
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, sinkName,
+ mergedConfig, componentPackageFile);
+
+ } else if
(existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN))
{
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, sinkName,
+ mergedConfig, componentPackageFile);
+ if (!isFunctionCodeBuiltin(functionDetails) &&
(componentPackageFile == null || fileDetail == null)) {
+ throw new
IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package
is not provided");
+ }
+ } else {
+
+ componentPackageFile = FunctionCommon.createPkgTempFile();
+ componentPackageFile.deleteOnExit();
+ log.info("componentPackageFile: {}", componentPackageFile);
+
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(),
componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
+
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, sinkName,
+ mergedConfig, componentPackageFile);
+ }
+ } catch (Exception e) {
+ log.error("Invalid update {} request @ /{}/{}/{}",
ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+ throw new RestException(Response.Status.BAD_REQUEST,
e.getMessage());
+ }
+
+ try {
+
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+ } catch (Exception e) {
+ log.error("Updated {} {}/{}/{} cannot be submitted to runtime
factory", ComponentTypeUtils.toString(componentType), tenant, namespace,
sinkName);
+ throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s cannot be admitted:- %s",
+ ComponentTypeUtils.toString(componentType), sinkName,
e.getMessage()));
+ }
+
+ // merge from existing metadata
+ Function.FunctionMetaData.Builder functionMetaDataBuilder =
Function.FunctionMetaData.newBuilder().mergeFrom(existingComponent)
+ .setFunctionDetails(functionDetails);
+
+ // update auth data if need
+ if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+ if (clientAuthenticationDataHttps != null && updateOptions !=
null && updateOptions.isUpdateAuthData()) {
+ // get existing auth data if it exists
+ Optional<FunctionAuthData> existingFunctionAuthData =
Optional.empty();
+ if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
+ existingFunctionAuthData =
Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
+ }
+
+ try {
+ Optional<FunctionAuthData> newFunctionAuthData =
worker().getFunctionRuntimeManager()
+ .getRuntimeFactory()
+ .getAuthProvider()
+ .updateAuthData(
+ tenant, namespace,
+ sinkName, existingFunctionAuthData,
+ clientAuthenticationDataHttps);
+
+ if (newFunctionAuthData.isPresent()) {
+ functionMetaDataBuilder.setFunctionAuthSpec(
+
Function.FunctionAuthenticationSpec.newBuilder()
+
.setData(ByteString.copyFrom(newFunctionAuthData.get().getData()))
+ .build());
+ } else {
+ functionMetaDataBuilder.clearFunctionAuthSpec();
+ }
+ } catch (Exception e) {
+ log.error("Error updating authentication data for {}
{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace,
sinkName, e);
+ throw new
RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error
caching authentication data for %s %s:- %s",
ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
+ }
+ }
+ }
+
+ Function.PackageLocationMetaData.Builder
packageLocationMetaDataBuilder;
+ if (isNotBlank(sinkPkgUrl) || uploadedInputStream != null) {
+ try {
+ packageLocationMetaDataBuilder =
getFunctionPackageLocation(functionMetaDataBuilder.build(),
+ sinkPkgUrl, fileDetail, componentPackageFile);
+ } catch (Exception e) {
+ log.error("Failed process {} {}/{}/{} package: ",
ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+ throw new
RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+ } else {
+ packageLocationMetaDataBuilder =
Function.PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
+ }
+
+
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+
+ updateRequest(functionMetaDataBuilder.build());
+ } finally {
+ if (!(sinkPkgUrl != null && sinkPkgUrl.startsWith(Utils.FILE))
+ && componentPackageFile != null &&
componentPackageFile.exists()) {
+ componentPackageFile.delete();
+ }
+ }
+ }
+
private class GetSinkStatus extends GetStatus<SinkStatus,
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData> {
@Override
@@ -206,10 +578,6 @@ public class SinksImpl extends ComponentImpl {
return exceptionInformation;
}
- public SinksImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier,
Function.FunctionDetails.ComponentType.SINK);
- }
-
public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData
getSinkInstanceStatus(final String tenant,
final String namespace,
final String sinkName,
@@ -287,4 +655,31 @@ public class SinksImpl extends ComponentImpl {
SinkConfig config =
SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
return config;
}
+
+ private Function.FunctionDetails validateUpdateRequestParams(final String
tenant,
+ final String
namespace,
+ final String
sinkName,
+ final
SinkConfig sinkConfig,
+ final File
componentPackageFile) throws IOException {
+
+ Path archivePath = null;
+ // The rest end points take precedence over whatever is there in
sinkConfig
+ sinkConfig.setTenant(tenant);
+ sinkConfig.setNamespace(namespace);
+ sinkConfig.setName(sinkName);
+
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sinkConfig);
+ if (!StringUtils.isEmpty(sinkConfig.getArchive())) {
+ String builtinArchive = sinkConfig.getArchive();
+ if
(builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
+ builtinArchive = builtinArchive.replaceFirst("^builtin://",
"");
+ }
+ try {
+ archivePath =
this.worker().getConnectorsManager().getSinkArchive(builtinArchive);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("No Sink
archive %s found", archivePath));
+ }
+ }
+ SinkConfigUtils.ExtractedSinkDetails sinkDetails =
SinkConfigUtils.validate(sinkConfig, archivePath, componentPackageFile);
+ return SinkConfigUtils.convert(sinkConfig, sinkDetails);
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index d35724e..15a26aa 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -18,33 +18,403 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
+import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.functions.auth.FunctionAuthData;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.rest.RestException;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
+import java.nio.file.Path;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
import java.util.function.Supplier;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static
org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static
org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
import static
org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
@Slf4j
public class SourcesImpl extends ComponentImpl {
+
+ public SourcesImpl(Supplier<WorkerService> workerServiceSupplier) {
+ super(workerServiceSupplier,
Function.FunctionDetails.ComponentType.SOURCE);
+ }
+
+ public void registerSource(final String tenant,
+ final String namespace,
+ final String sourceName,
+ final InputStream uploadedInputStream,
+ final FormDataContentDisposition fileDetail,
+ final String sourcePkgUrl,
+ final SourceConfig sourceConfig,
+ final String clientRole,
+ AuthenticationDataHttps
clientAuthenticationDataHttps) {
+
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+
+ if (tenant == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant is
not provided");
+ }
+ if (namespace == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Namespace is
not provided");
+ }
+ if (sourceName == null) {
+ throw new RestException(Response.Status.BAD_REQUEST,
ComponentTypeUtils.toString(componentType) + " Name is not provided");
+ }
+
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to
register {}", tenant, namespace,
+ sourceName, clientRole,
ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client
is not authorize to perform operation");
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace,
sourceName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+
+ try {
+ // Check tenant exists
+ worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
+
+ String qualifiedNamespace = tenant + "/" + namespace;
+ List<String> namespaces =
worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
+ if (namespaces != null &&
!namespaces.contains(qualifiedNamespace)) {
+ String qualifiedNamespaceWithCluster =
String.format("%s/%s/%s", tenant,
+
worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
+ if (namespaces != null &&
!namespaces.contains(qualifiedNamespaceWithCluster)) {
+ log.error("{}/{}/{} Namespace {} does not exist", tenant,
namespace, sourceName, namespace);
+ throw new RestException(Response.Status.BAD_REQUEST,
"Namespace does not exist");
+ }
+ }
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to
operate {} on tenant", tenant, namespace,
+ sourceName, clientRole,
ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client is
not authorize to perform operation");
+ } catch (PulsarAdminException.NotFoundException e) {
+ log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace,
sourceName, tenant);
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant does
not exist");
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Issues getting tenant data", tenant,
namespace, sourceName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+
+ FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
+
+ if (functionMetaDataManager.containsFunction(tenant, namespace,
sourceName)) {
+ log.error("{} {}/{}/{} already exists",
ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName);
+ throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s already exists",
ComponentTypeUtils.toString(componentType), sourceName));
+ }
+
+ Function.FunctionDetails functionDetails = null;
+ boolean isPkgUrlProvided = isNotBlank(sourcePkgUrl);
+ File componentPackageFile = null;
+ try {
+
+ // validate parameters
+ try {
+ if (isPkgUrlProvided) {
+
+ if (!Utils.isFunctionPackageUrlSupported(sourcePkgUrl)) {
+ throw new IllegalArgumentException("Function Package
url is not valid. supported url (http/https/file)");
+ }
+ try {
+ componentPackageFile =
FunctionCommon.extractFileFromPkgURL(sourcePkgUrl);
+ } catch (Exception e) {
+ throw new
IllegalArgumentException(String.format("Encountered error \"%s\" when getting
%s package from %s", e.getMessage(),
ComponentTypeUtils.toString(componentType), sourcePkgUrl));
+ }
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, sourceName,
+ sourceConfig, componentPackageFile);
+ } else {
+ if (uploadedInputStream != null) {
+ componentPackageFile =
WorkerUtils.dumpToTmpFile(uploadedInputStream);
+ }
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, sourceName,
+ sourceConfig, componentPackageFile);
+ if (!isFunctionCodeBuiltin(functionDetails) &&
(componentPackageFile == null || fileDetail == null)) {
+ throw new
IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package
is not provided");
+ }
+ }
+ } catch (Exception e) {
+ log.error("Invalid register {} request @ /{}/{}/{}",
ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+ throw new RestException(Response.Status.BAD_REQUEST,
e.getMessage());
+ }
+
+ try {
+
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+ } catch (Exception e) {
+ log.error("{} {}/{}/{} cannot be admitted by the runtime
factory", ComponentTypeUtils.toString(componentType), tenant, namespace,
sourceName);
+ throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s cannot be admitted:- %s",
ComponentTypeUtils.toString(componentType), sourceName, e.getMessage()));
+ }
+
+ // function state
+ Function.FunctionMetaData.Builder functionMetaDataBuilder =
Function.FunctionMetaData.newBuilder()
+ .setFunctionDetails(functionDetails)
+ .setCreateTime(System.currentTimeMillis())
+ .setVersion(0);
+
+ // cache auth if need
+ if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+
+ if (clientAuthenticationDataHttps != null) {
+ try {
+ Optional<FunctionAuthData> functionAuthData =
worker().getFunctionRuntimeManager()
+ .getRuntimeFactory()
+ .getAuthProvider()
+ .cacheAuthData(tenant, namespace, sourceName,
clientAuthenticationDataHttps);
+
+ if (functionAuthData.isPresent()) {
+ functionMetaDataBuilder.setFunctionAuthSpec(
+
Function.FunctionAuthenticationSpec.newBuilder()
+
.setData(ByteString.copyFrom(functionAuthData.get().getData()))
+ .build());
+ }
+ } catch (Exception e) {
+ log.error("Error caching authentication data for {}
{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace,
sourceName, e);
+
+
+ throw new
RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error
caching authentication data for %s %s:- %s",
ComponentTypeUtils.toString(componentType), sourceName, e.getMessage()));
+ }
+ }
+ }
+
+ Function.PackageLocationMetaData.Builder
packageLocationMetaDataBuilder;
+ try {
+ packageLocationMetaDataBuilder =
getFunctionPackageLocation(functionMetaDataBuilder.build(),
+ sourcePkgUrl, fileDetail, componentPackageFile);
+ } catch (Exception e) {
+ log.error("Failed process {} {}/{}/{} package: ",
ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+
+
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+ updateRequest(functionMetaDataBuilder.build());
+ } finally {
+
+ if (!(sourcePkgUrl != null && sourcePkgUrl.startsWith(Utils.FILE))
+ && componentPackageFile != null &&
componentPackageFile.exists()) {
+ componentPackageFile.delete();
+ }
+ }
+ }
+
+ public void updateSource(final String tenant,
+ final String namespace,
+ final String sourceName,
+ final InputStream uploadedInputStream,
+ final FormDataContentDisposition fileDetail,
+ final String sourcePkgUrl,
+ final SourceConfig sourceConfig,
+ final String clientRole,
+ AuthenticationDataHttps
clientAuthenticationDataHttps,
+ UpdateOptions updateOptions) {
+
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+
+ if (tenant == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant is
not provided");
+ }
+ if (namespace == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Namespace is
not provided");
+ }
+ if (sourceName == null) {
+ throw new RestException(Response.Status.BAD_REQUEST,
ComponentTypeUtils.toString(componentType) + " Name is not provided");
+ }
+
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to
update {}", tenant, namespace,
+ sourceName, clientRole,
ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client
is not authorize to perform operation");
+
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace,
sourceName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+
+ FunctionMetaDataManager functionMetaDataManager =
worker().getFunctionMetaDataManager();
+
+ if (!functionMetaDataManager.containsFunction(tenant, namespace,
sourceName)) {
+ throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s doesn't exist",
ComponentTypeUtils.toString(componentType), sourceName));
+ }
+
+ Function.FunctionMetaData existingComponent =
functionMetaDataManager.getFunctionMetaData(tenant, namespace, sourceName);
+
+ if
(!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType))
{
+ log.error("{}/{}/{} is not a {}", tenant, namespace, sourceName,
ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.NOT_FOUND,
String.format("%s %s doesn't exist",
ComponentTypeUtils.toString(componentType), sourceName));
+ }
+
+ SourceConfig existingSourceConfig =
SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
+ // The rest end points take precedence over whatever is there in
functionconfig
+ sourceConfig.setTenant(tenant);
+ sourceConfig.setNamespace(namespace);
+ sourceConfig.setName(sourceName);
+ SourceConfig mergedConfig;
+ try {
+ mergedConfig =
SourceConfigUtils.validateUpdate(existingSourceConfig, sourceConfig);
+ } catch (Exception e) {
+ throw new RestException(Response.Status.BAD_REQUEST,
e.getMessage());
+ }
+
+ if (existingSourceConfig.equals(mergedConfig) && isBlank(sourcePkgUrl)
&& uploadedInputStream == null) {
+ log.error("{}/{}/{} Update contains no changes", tenant,
namespace, sourceName);
+ throw new RestException(Response.Status.BAD_REQUEST, "Update
contains no change");
+ }
+
+ Function.FunctionDetails functionDetails = null;
+ File componentPackageFile = null;
+ try {
+
+ // validate parameters
+ try {
+ if (isNotBlank(sourcePkgUrl)) {
+ try {
+ componentPackageFile =
FunctionCommon.extractFileFromPkgURL(sourcePkgUrl);
+ } catch (Exception e) {
+ throw new
IllegalArgumentException(String.format("Encountered error \"%s\" when getting
%s package from %s", e.getMessage(),
ComponentTypeUtils.toString(componentType), sourcePkgUrl));
+ }
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, sourceName,
+ mergedConfig, componentPackageFile);
+
+ } else if
(existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
+ ||
existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP))
{
+ try {
+ componentPackageFile =
FunctionCommon.extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
+ } catch (Exception e) {
+ throw new
IllegalArgumentException(String.format("Encountered error \"%s\" when getting
%s package from %s", e.getMessage(),
ComponentTypeUtils.toString(componentType), sourcePkgUrl));
+ }
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, sourceName,
+ mergedConfig, componentPackageFile);
+ } else if (uploadedInputStream != null) {
+
+ componentPackageFile =
WorkerUtils.dumpToTmpFile(uploadedInputStream);
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, sourceName,
+ mergedConfig, componentPackageFile);
+
+ } else if
(existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN))
{
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, sourceName,
+ mergedConfig, componentPackageFile);
+ if (!isFunctionCodeBuiltin(functionDetails) &&
(componentPackageFile == null || fileDetail == null)) {
+ throw new
IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package
is not provided");
+ }
+ } else {
+
+ componentPackageFile = FunctionCommon.createPkgTempFile();
+ componentPackageFile.deleteOnExit();
+ log.info("componentPackageFile: {}", componentPackageFile);
+
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(),
componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
+
+ functionDetails = validateUpdateRequestParams(tenant,
namespace, sourceName,
+ mergedConfig, componentPackageFile);
+ }
+ } catch (Exception e) {
+ log.error("Invalid update {} request @ /{}/{}/{}",
ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+ throw new RestException(Response.Status.BAD_REQUEST,
e.getMessage());
+ }
+
+ try {
+
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+ } catch (Exception e) {
+ log.error("Updated {} {}/{}/{} cannot be submitted to runtime
factory", ComponentTypeUtils.toString(componentType), tenant, namespace,
sourceName);
+ throw new RestException(Response.Status.BAD_REQUEST,
String.format("%s %s cannot be admitted:- %s",
+ ComponentTypeUtils.toString(componentType),
sourceName, e.getMessage()));
+ }
+
+ // merge from existing metadata
+ Function.FunctionMetaData.Builder functionMetaDataBuilder =
Function.FunctionMetaData.newBuilder().mergeFrom(existingComponent)
+ .setFunctionDetails(functionDetails);
+
+ // update auth data if need
+ if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+ if (clientAuthenticationDataHttps != null && updateOptions !=
null && updateOptions.isUpdateAuthData()) {
+ // get existing auth data if it exists
+ Optional<FunctionAuthData> existingFunctionAuthData =
Optional.empty();
+ if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
+ existingFunctionAuthData =
Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
+ }
+
+ try {
+ Optional<FunctionAuthData> newFunctionAuthData =
worker().getFunctionRuntimeManager()
+ .getRuntimeFactory()
+ .getAuthProvider()
+ .updateAuthData(
+ tenant, namespace,
+ sourceName, existingFunctionAuthData,
+ clientAuthenticationDataHttps);
+
+ if (newFunctionAuthData.isPresent()) {
+ functionMetaDataBuilder.setFunctionAuthSpec(
+
Function.FunctionAuthenticationSpec.newBuilder()
+
.setData(ByteString.copyFrom(newFunctionAuthData.get().getData()))
+ .build());
+ } else {
+ functionMetaDataBuilder.clearFunctionAuthSpec();
+ }
+ } catch (Exception e) {
+ log.error("Error updating authentication data for {}
{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace,
sourceName, e);
+ throw new
RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error
caching authentication data for %s %s:- %s",
ComponentTypeUtils.toString(componentType), sourceName, e.getMessage()));
+ }
+ }
+ }
+
+ Function.PackageLocationMetaData.Builder
packageLocationMetaDataBuilder;
+ if (isNotBlank(sourcePkgUrl) || uploadedInputStream != null) {
+ try {
+ packageLocationMetaDataBuilder =
getFunctionPackageLocation(functionMetaDataBuilder.build(),
+ sourcePkgUrl, fileDetail, componentPackageFile);
+ } catch (Exception e) {
+ log.error("Failed process {} {}/{}/{} package: ",
ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+ throw new
RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+ } else {
+ packageLocationMetaDataBuilder =
Function.PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
+ }
+
+
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+
+ updateRequest(functionMetaDataBuilder.build());
+ } finally {
+ if (!(sourcePkgUrl != null && sourcePkgUrl.startsWith(Utils.FILE))
+ && componentPackageFile != null &&
componentPackageFile.exists()) {
+ componentPackageFile.delete();
+ }
+ }
+ }
+
private class GetSourceStatus extends GetStatus<SourceStatus,
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData> {
@Override
@@ -208,10 +578,6 @@ public class SourcesImpl extends ComponentImpl {
}
}
- public SourcesImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier,
Function.FunctionDetails.ComponentType.SOURCE);
- }
-
public SourceStatus getSourceStatus(final String tenant,
final String namespace,
final String componentName,
@@ -285,4 +651,31 @@ public class SourcesImpl extends ComponentImpl {
SourceConfig config =
SourceConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
return config;
}
+
+ private Function.FunctionDetails validateUpdateRequestParams(final String
tenant,
+ final String
namespace,
+ final String
sourceName,
+ final
SourceConfig sourceConfig,
+ final File
sourcePackageFile) throws IOException {
+
+ Path archivePath = null;
+ // The rest end points take precedence over whatever is there in
sourceconfig
+ sourceConfig.setTenant(tenant);
+ sourceConfig.setNamespace(namespace);
+ sourceConfig.setName(sourceName);
+
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig);
+ if (!StringUtils.isEmpty(sourceConfig.getArchive())) {
+ String builtinArchive = sourceConfig.getArchive();
+ if
(builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
+ builtinArchive = builtinArchive.replaceFirst("^builtin://",
"");
+ }
+ try {
+ archivePath =
this.worker().getConnectorsManager().getSourceArchive(builtinArchive);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("No Source
archive %s found", archivePath));
+ }
+ }
+ SourceConfigUtils.ExtractedSourceDetails sourceDetails =
SourceConfigUtils.validate(sourceConfig, archivePath, sourcePackageFile);
+ return SourceConfigUtils.convert(sourceConfig, sourceDetails);
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
index 2f1a87f..0bbacaa 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
@@ -67,10 +67,10 @@ public class FunctionsApiV3Resource extends
FunctionApiResource {
final @FormDataParam("data") InputStream
uploadedInputStream,
final @FormDataParam("data")
FormDataContentDisposition fileDetail,
final @FormDataParam("url") String
functionPkgUrl,
- final @FormDataParam("functionConfig") String
functionConfigJson) {
+ final @FormDataParam("functionConfig")
FunctionConfig functionConfig) {
functions.registerFunction(tenant, namespace, functionName,
uploadedInputStream, fileDetail,
- functionPkgUrl, functionConfigJson, clientAppId(),
clientAuthData());
+ functionPkgUrl, functionConfig, clientAppId(),
clientAuthData());
}
@@ -83,11 +83,11 @@ public class FunctionsApiV3Resource extends
FunctionApiResource {
final @FormDataParam("data") InputStream
uploadedInputStream,
final @FormDataParam("data")
FormDataContentDisposition fileDetail,
final @FormDataParam("url") String
functionPkgUrl,
- final @FormDataParam("functionConfig") String
functionConfigJson,
+ final @FormDataParam("functionConfig")
FunctionConfig functionConfig,
final @FormDataParam("updateOptions")
UpdateOptions updateOptions) {
functions.updateFunction(tenant, namespace, functionName,
uploadedInputStream, fileDetail,
- functionPkgUrl, functionConfigJson, clientAppId(),
clientAuthData(), updateOptions);
+ functionPkgUrl, functionConfig, clientAppId(),
clientAuthData(), updateOptions);
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
index e699544..d9f788b 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
@@ -33,7 +33,14 @@ import org.apache.pulsar.functions.worker.rest.api.SinksImpl;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.io.InputStream;
@@ -62,10 +69,10 @@ public class SinksApiV3Resource extends FunctionApiResource
{
final @FormDataParam("data") InputStream
uploadedInputStream,
final @FormDataParam("data")
FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
- final @FormDataParam("sinkConfig") String
sinkConfigJson) {
+ final @FormDataParam("sinkConfig") SinkConfig
sinkConfig) {
- sink.registerFunction(tenant, namespace, sinkName,
uploadedInputStream, fileDetail,
- functionPkgUrl, sinkConfigJson, clientAppId(),
clientAuthData());
+ sink.registerSink(tenant, namespace, sinkName, uploadedInputStream,
fileDetail,
+ functionPkgUrl, sinkConfig, clientAppId(), clientAuthData());
}
@PUT
@@ -77,11 +84,11 @@ public class SinksApiV3Resource extends FunctionApiResource
{
final @FormDataParam("data") InputStream
uploadedInputStream,
final @FormDataParam("data")
FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
- final @FormDataParam("sinkConfig") String
sinkConfigJson,
+ final @FormDataParam("sinkConfig") SinkConfig
sinkConfig,
final @FormDataParam("updateOptions") UpdateOptions
updateOptions) {
- sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream,
fileDetail,
- functionPkgUrl, sinkConfigJson, clientAppId(),
clientAuthData(), updateOptions);
+ sink.updateSink(tenant, namespace, sinkName, uploadedInputStream,
fileDetail,
+ functionPkgUrl, sinkConfig, clientAppId(), clientAuthData(),
updateOptions);
}
@DELETE
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
index de6df1a..9be1581 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
@@ -33,7 +33,14 @@ import
org.apache.pulsar.functions.worker.rest.api.SourcesImpl;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.io.InputStream;
@@ -62,10 +69,10 @@ public class SourcesApiV3Resource extends
FunctionApiResource {
final @FormDataParam("data") InputStream
uploadedInputStream,
final @FormDataParam("data")
FormDataContentDisposition fileDetail,
final @FormDataParam("url") String
functionPkgUrl,
- final @FormDataParam("sourceConfig") String
sourceConfigJson) {
+ final @FormDataParam("sourceConfig")
SourceConfig sourceConfig) {
- source.registerFunction(tenant, namespace, sourceName,
uploadedInputStream, fileDetail,
- functionPkgUrl, sourceConfigJson, clientAppId(),
clientAuthData());
+ source.registerSource(tenant, namespace, sourceName,
uploadedInputStream, fileDetail,
+ functionPkgUrl, sourceConfig, clientAppId(), clientAuthData());
}
@@ -78,11 +85,11 @@ public class SourcesApiV3Resource extends
FunctionApiResource {
final @FormDataParam("data") InputStream
uploadedInputStream,
final @FormDataParam("data")
FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
- final @FormDataParam("sourceConfig") String
sourceConfigJson,
+ final @FormDataParam("sourceConfig") SourceConfig
sourceConfig,
final @FormDataParam("updateOptions")
UpdateOptions updateOptions) {
- source.updateFunction(tenant, namespace, sourceName,
uploadedInputStream, fileDetail,
- functionPkgUrl, sourceConfigJson, clientAppId(),
clientAuthData(), updateOptions);
+ source.updateSource(tenant, namespace, sourceName,
uploadedInputStream, fileDetail,
+ functionPkgUrl, sourceConfig, clientAppId(), clientAuthData(),
updateOptions);
}
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 434cfc2..b3b673b 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -484,7 +484,7 @@ public class FunctionApiV3ResourceTest {
inputStream,
details,
functionPkgUrl,
- new Gson().toJson(functionConfig),
+ functionConfig,
null, null);
}
@@ -498,7 +498,7 @@ public class FunctionApiV3ResourceTest {
mockedInputStream,
mockedFormData,
null,
- new Gson().toJson(functionConfig),
+ functionConfig,
null, null);
}
@@ -912,7 +912,7 @@ public class FunctionApiV3ResourceTest {
inputStream,
details,
null,
- new Gson().toJson(functionConfig),
+ functionConfig,
null, null, null);
}
@@ -936,7 +936,7 @@ public class FunctionApiV3ResourceTest {
mockedInputStream,
mockedFormData,
null,
- new Gson().toJson(functionConfig),
+ functionConfig,
null, null, null);
}
@@ -1024,7 +1024,7 @@ public class FunctionApiV3ResourceTest {
null,
null,
filePackageUrl,
- new Gson().toJson(functionConfig),
+ functionConfig,
null, null, null);
}
@@ -1469,7 +1469,7 @@ public class FunctionApiV3ResourceTest {
functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
functionConfig.setOutput(outputTopic);
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
- resource.registerFunction(tenant, namespace, function, null, null,
filePackageUrl, new Gson().toJson(functionConfig), null, null);
+ resource.registerFunction(tenant, namespace, function, null, null,
filePackageUrl, functionConfig, null, null);
}
@@ -1500,7 +1500,7 @@ public class FunctionApiV3ResourceTest {
functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
functionConfig.setOutput(outputTopic);
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
- resource.registerFunction(actualTenant, actualNamespace, actualName,
null, null, filePackageUrl, new Gson().toJson(functionConfig), null, null);
+ resource.registerFunction(actualTenant, actualNamespace, actualName,
null, null, filePackageUrl, functionConfig, null, null);
}
public static FunctionConfig createDefaultFunctionConfig() {
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 3b3548a..c884835 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -434,28 +434,28 @@ public class SinkApiV3ResourceTest {
sinkConfig.setParallelism(parallelism);
}
- resource.registerFunction(
+ resource.registerSink(
tenant,
namespace,
sink,
inputStream,
details,
pkgUrl,
- new Gson().toJson(sinkConfig),
+ sinkConfig,
null, null);
}
private void registerDefaultSink() throws IOException {
SinkConfig sinkConfig = createDefaultSinkConfig();
- resource.registerFunction(
+ resource.registerSink(
tenant,
namespace,
sink,
new FileInputStream(JAR_FILE_PATH),
mockedFormData,
null,
- new Gson().toJson(sinkConfig),
+ sinkConfig,
null, null);
}
@@ -548,14 +548,14 @@ public class SinkApiV3ResourceTest {
sinkConfig.setClassName(className);
sinkConfig.setParallelism(parallelism);
sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
- resource.registerFunction(
+ resource.registerSink(
actualTenant,
actualNamespace,
actualName,
new FileInputStream(JAR_FILE_PATH),
mockedFormData,
null,
- new Gson().toJson(sinkConfig),
+ sinkConfig,
null, null);
}
@@ -829,14 +829,14 @@ public class SinkApiV3ResourceTest {
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
}
- resource.updateFunction(
+ resource.updateSink(
tenant,
namespace,
sink,
inputStream,
details,
null,
- new Gson().toJson(sinkConfig),
+ sinkConfig,
null, null, null);
}
@@ -870,14 +870,14 @@ public class SinkApiV3ResourceTest {
this.mockedFunctionMetaData =
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(),
any())).thenReturn(mockedFunctionMetaData);
- resource.updateFunction(
+ resource.updateSink(
tenant,
namespace,
sink,
new FileInputStream(JAR_FILE_PATH),
mockedFormData,
null,
- new Gson().toJson(sinkConfig),
+ sinkConfig,
null, null, null);
}
@@ -972,14 +972,14 @@ public class SinkApiV3ResourceTest {
CompletableFuture<RequestResult> requestResult =
CompletableFuture.completedFuture(rr);
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
- resource.updateFunction(
+ resource.updateSink(
tenant,
namespace,
sink,
null,
null,
filePackageUrl,
- new Gson().toJson(sinkConfig),
+ sinkConfig,
null, null, null);
}
@@ -1364,7 +1364,7 @@ public class SinkApiV3ResourceTest {
}
@Test(expectedExceptions = RestException.class,
expectedExceptionsMessageRegExp = "Namespace does not exist")
- public void testRegisterFunctionNonExistingNamespace() throws Exception {
+ public void testregisterSinkNonExistingNamespace() throws Exception {
try {
this.namespaceList.clear();
registerDefaultSink();
@@ -1375,7 +1375,7 @@ public class SinkApiV3ResourceTest {
}
@Test(expectedExceptions = RestException.class,
expectedExceptionsMessageRegExp = "Tenant does not exist")
- public void testRegisterFunctionNonExistingTenant() throws Exception {
+ public void testregisterSinkNonExistingTenant() throws Exception {
try {
when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class);
registerDefaultSink();
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index cd55369..875e2ef 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -404,28 +404,28 @@ public class SourceApiV3ResourceTest {
sourceConfig.setParallelism(parallelism);
}
- resource.registerFunction(
+ resource.registerSource(
tenant,
namespace,
function,
inputStream,
details,
pkgUrl,
- new Gson().toJson(sourceConfig),
+ sourceConfig,
null, null);
}
private void registerDefaultSource() throws IOException {
SourceConfig sourceConfig = createDefaultSourceConfig();
- resource.registerFunction(
+ resource.registerSource(
tenant,
namespace,
source,
new FileInputStream(JAR_FILE_PATH),
mockedFormData,
null,
- new Gson().toJson(sourceConfig),
+ sourceConfig,
null, null);
}
@@ -518,14 +518,14 @@ public class SourceApiV3ResourceTest {
sourceConfig.setParallelism(parallelism);
sourceConfig.setTopicName(outputTopic);
sourceConfig.setSerdeClassName(outputSerdeClassName);
- resource.registerFunction(
+ resource.registerSource(
actualTenant,
actualNamespace,
actualName,
new FileInputStream(JAR_FILE_PATH),
mockedFormData,
null,
- new Gson().toJson(sourceConfig),
+ sourceConfig,
null, null);
}
@@ -850,14 +850,14 @@ public class SourceApiV3ResourceTest {
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
}
- resource.updateFunction(
+ resource.updateSource(
tenant,
namespace,
function,
inputStream,
details,
null,
- new Gson().toJson(sourceConfig),
+ sourceConfig,
null, null, null);
}
@@ -887,14 +887,14 @@ public class SourceApiV3ResourceTest {
this.mockedFunctionMetaData =
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(),
any())).thenReturn(mockedFunctionMetaData);
- resource.updateFunction(
+ resource.updateSource(
tenant,
namespace,
source,
new FileInputStream(JAR_FILE_PATH),
mockedFormData,
null,
- new Gson().toJson(sourceConfig),
+ sourceConfig,
null, null, null);
}
@@ -988,14 +988,14 @@ public class SourceApiV3ResourceTest {
CompletableFuture<RequestResult> requestResult =
CompletableFuture.completedFuture(rr);
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
- resource.updateFunction(
+ resource.updateSource(
tenant,
namespace,
source,
null,
null,
filePackageUrl,
- new Gson().toJson(sourceConfig),
+ sourceConfig,
null, null, null);
}