This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 95df092  Refactoring Function Component implementation (#4541)
95df092 is described below

commit 95df092832a3633a81fd719a5c096919141cc4f9
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Sun Jun 23 20:01:37 2019 -0700

    Refactoring Function Component implementation (#4541)
    
    * Refactoring Function Component implementation
    
    * cleaning up
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  15 +-
 .../apache/pulsar/broker/admin/impl/SinksBase.java |  21 +-
 .../pulsar/broker/admin/impl/SourcesBase.java      |  20 +-
 .../functions/worker/rest/api/ComponentImpl.java   | 485 +--------------------
 .../functions/worker/rest/api/FunctionsImpl.java   | 394 ++++++++++++++++-
 .../functions/worker/rest/api/FunctionsImplV2.java |  19 +-
 .../functions/worker/rest/api/SinksImpl.java       | 403 ++++++++++++++++-
 .../functions/worker/rest/api/SourcesImpl.java     | 401 ++++++++++++++++-
 .../worker/rest/api/v3/FunctionsApiV3Resource.java |   8 +-
 .../worker/rest/api/v3/SinksApiV3Resource.java     |  21 +-
 .../worker/rest/api/v3/SourcesApiV3Resource.java   |  21 +-
 .../rest/api/v3/FunctionApiV3ResourceTest.java     |  14 +-
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  |  28 +-
 .../rest/api/v3/SourceApiV3ResourceTest.java       |  24 +-
 14 files changed, 1290 insertions(+), 584 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 11e20de..c6f38d0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -18,7 +18,12 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
-import io.swagger.annotations.*;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Example;
+import io.swagger.annotations.ExampleProperty;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.common.functions.FunctionConfig;
@@ -163,10 +168,10 @@ public class FunctionsBase extends AdminResource 
implements Supplier<WorkerServi
                             )
                     )
             )
-            final @FormDataParam("functionConfig") String functionConfigJson) {
+            final @FormDataParam("functionConfig") FunctionConfig 
functionConfig) {
 
         functions.registerFunction(tenant, namespace, functionName, 
uploadedInputStream, fileDetail,
-            functionPkgUrl, functionConfigJson, clientAppId(), 
clientAuthData());
+            functionPkgUrl, functionConfig, clientAppId(), clientAuthData());
     }
 
     @PUT
@@ -270,12 +275,12 @@ public class FunctionsBase extends AdminResource 
implements Supplier<WorkerServi
                             )
                     )
             )
-            final @FormDataParam("functionConfig") String functionConfigJson,
+            final @FormDataParam("functionConfig") FunctionConfig 
functionConfig,
             @ApiParam(value = "The update options is for the Pulsar Function 
that needs to be updated.")
             final @FormDataParam("updateOptions") UpdateOptions updateOptions) 
throws IOException {
 
         functions.updateFunction(tenant, namespace, functionName, 
uploadedInputStream, fileDetail,
-                functionPkgUrl, functionConfigJson, clientAppId(), 
clientAuthData(), updateOptions);
+                functionPkgUrl, functionConfig, clientAppId(), 
clientAuthData(), updateOptions);
     }
 
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
index c527bd3..e21d861 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
@@ -77,7 +77,7 @@ public class SinksBase extends AdminResource implements 
Supplier<WorkerService>
                              final @PathParam("sinkName") String sinkName,
                              final @FormDataParam("data") InputStream 
uploadedInputStream,
                              final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
-                             final @FormDataParam("url") String functionPkgUrl,
+                             final @FormDataParam("url") String sinkPkgUrl,
                              @ApiParam(
                                  value =
                                      "A JSON value presenting a sink config 
playload. All available configuration options are:  \n" +
@@ -136,10 +136,9 @@ public class SinksBase extends AdminResource implements 
Supplier<WorkerService>
                                      )
                                  )
                              )
-                             final @FormDataParam("sinkConfig") String 
sinkConfigJson) {
-
-        sink.registerFunction(tenant, namespace, sinkName, 
uploadedInputStream, fileDetail,
-                functionPkgUrl, sinkConfigJson, clientAppId(), 
clientAuthData());
+                             final @FormDataParam("sinkConfig") SinkConfig 
sinkConfig) {
+        sink.registerSink(tenant, namespace, sinkName, uploadedInputStream, 
fileDetail,
+                sinkPkgUrl, sinkConfig, clientAppId(), clientAuthData());
     }
 
     @PUT
@@ -162,7 +161,8 @@ public class SinksBase extends AdminResource implements 
Supplier<WorkerService>
                            final @PathParam("sinkName") String sinkName,
                            final @FormDataParam("data") InputStream 
uploadedInputStream,
                            final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
-                           final @FormDataParam("url") String functionPkgUrl,
+                           @ApiParam(value = "URL of sink's archive")
+                           final @FormDataParam("url") String sinkPkgUrl,
                            @ApiParam(
                                value =
                                    "A JSON value presenting a sink config 
playload. All available configuration options are:  \n" +
@@ -221,12 +221,11 @@ public class SinksBase extends AdminResource implements 
Supplier<WorkerService>
                                    )
                                )
                            )
-                           final @FormDataParam("sinkConfig") String 
sinkConfigJson,
-                           @ApiParam()
+                           final @FormDataParam("sinkConfig") SinkConfig 
sinkConfig,
+                           @ApiParam(value = "Update options for sink")
                            final @FormDataParam("updateOptions") UpdateOptions 
updateOptions) {
-
-         sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, 
fileDetail,
-                functionPkgUrl, sinkConfigJson, clientAppId(), 
clientAuthData(), updateOptions);
+         sink.updateSink(tenant, namespace, sinkName, uploadedInputStream, 
fileDetail,
+                sinkPkgUrl, sinkConfig, clientAppId(), clientAuthData(), 
updateOptions);
 
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
index 4856800..8dc20c9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
@@ -79,7 +79,7 @@ public class SourcesBase extends AdminResource implements 
Supplier<WorkerService
             final @PathParam("sourceName") String sourceName,
             final @FormDataParam("data") InputStream uploadedInputStream,
             final @FormDataParam("data") FormDataContentDisposition fileDetail,
-            final @FormDataParam("url") String functionPkgUrl,
+            final @FormDataParam("url") String sourcePkgUrl,
             @ApiParam(
                     value = "A JSON value presenting source configuration 
payload. An example of the expected functions can be found here.  \n" +
                             "classname  \n" +
@@ -126,10 +126,9 @@ public class SourcesBase extends AdminResource implements 
Supplier<WorkerService
                             )
                     )
             )
-            final @FormDataParam("sourceConfig") String sourceConfigJson) {
-
-        source.registerFunction(tenant, namespace, sourceName, 
uploadedInputStream, fileDetail,
-            functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData());
+            final @FormDataParam("sourceConfig") SourceConfig sourceConfig) {
+        source.registerSource(tenant, namespace, sourceName, 
uploadedInputStream, fileDetail,
+            sourcePkgUrl, sourceConfig, clientAppId(), clientAuthData());
     }
 
     @PUT
@@ -154,7 +153,8 @@ public class SourcesBase extends AdminResource implements 
Supplier<WorkerService
             final @PathParam("sourceName") String sourceName,
             final @FormDataParam("data") InputStream uploadedInputStream,
             final @FormDataParam("data") FormDataContentDisposition fileDetail,
-            final @FormDataParam("url") String functionPkgUrl,
+            @ApiParam(value = "URL of sources' archive")
+            final @FormDataParam("url") String sourcePkgUrl,
             @ApiParam(
                     value = "A JSON value presenting source configuration 
payload. An example of the expected functions can be found here.  \n" +
                             "classname  \n" +
@@ -201,11 +201,11 @@ public class SourcesBase extends AdminResource implements 
Supplier<WorkerService
                             )
                     )
             )
-            final @FormDataParam("sourceConfig") String sourceConfigJson,
+            final @FormDataParam("sourceConfig") SourceConfig sourceConfig,
+            @ApiParam(value = "Update options for source")
             final @FormDataParam("updateOptions") UpdateOptions updateOptions) 
{
-
-        source.updateFunction(tenant, namespace, sourceName, 
uploadedInputStream, fileDetail,
-            functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData(), 
updateOptions);
+        source.updateSource(tenant, namespace, sourceName, 
uploadedInputStream, fileDetail,
+            sourcePkgUrl, sourceConfig, clientAppId(), clientAuthData(), 
updateOptions);
     }
 
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 62ac9bd..345ea56 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -18,9 +18,6 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.gson.Gson;
-import com.google.protobuf.ByteString;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
@@ -34,7 +31,6 @@ import 
org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
 import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -46,19 +42,14 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.FunctionState;
-import org.apache.pulsar.common.functions.UpdateOptions;
 import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.Codec;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.functions.auth.FunctionAuthData;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -68,11 +59,9 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.utils.ComponentTypeUtils;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
-import org.apache.pulsar.functions.utils.SinkConfigUtils;
-import org.apache.pulsar.functions.utils.SourceConfigUtils;
-import org.apache.pulsar.functions.utils.ComponentTypeUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -95,12 +84,10 @@ import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
-import java.nio.file.Path;
 import java.util.Base64;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -111,10 +98,8 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static 
org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
 import static 
org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace;
 import static 
org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
 import static 
org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
@@ -285,170 +270,10 @@ public abstract class ComponentImpl {
         return true;
     }
 
-    public void registerFunction(final String tenant,
-                                 final String namespace,
-                                 final String componentName,
-                                 final InputStream uploadedInputStream,
-                                 final FormDataContentDisposition fileDetail,
-                                 final String functionPkgUrl,
-                                 final String componentConfigJson,
-                                 final String clientRole,
-                                 AuthenticationDataHttps 
clientAuthenticationDataHttps) {
-
-        if (!isWorkerServiceAvailable()) {
-            throwUnavailableException();
-        }
-
-        if (tenant == null) {
-            throw new RestException(Status.BAD_REQUEST, "Tenant is not 
provided");
-        }
-        if (namespace == null) {
-            throw new RestException(Status.BAD_REQUEST, "Namespace is not 
provided");
-        }
-        if (componentName == null) {
-            throw new RestException(Status.BAD_REQUEST, 
ComponentTypeUtils.toString(componentType) + " Name is not provided");
-        }
-
-        try {
-            if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
register {}", tenant, namespace,
-                        componentName, clientRole, 
ComponentTypeUtils.toString(componentType));
-                throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
-            }
-        } catch (PulsarAdminException e) {
-            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, 
componentName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
-        }
-
-        try {
-            // Check tenant exists
-            final TenantInfo tenantInfo = 
worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
-
-            String qualifiedNamespace = tenant + "/" + namespace;
-            List<String> namespaces = 
worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
-            if (namespaces != null && 
!namespaces.contains(qualifiedNamespace)) {
-                String qualifiedNamespaceWithCluster = 
String.format("%s/%s/%s", tenant,
-                        
worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
-                if (namespaces != null && 
!namespaces.contains(qualifiedNamespaceWithCluster)) {
-                    log.error("{}/{}/{} Namespace {} does not exist", tenant, 
namespace, componentName, namespace);
-                    throw new RestException(Status.BAD_REQUEST, "Namespace 
does not exist");
-                }
-            }
-        } catch (PulsarAdminException.NotAuthorizedException e) {
-            log.error("{}/{}/{} Client [{}] is not admin and authorized to 
operate {} on tenant", tenant, namespace,
-                    componentName, clientRole, 
ComponentTypeUtils.toString(componentType));
-            throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
-        } catch (PulsarAdminException.NotFoundException e) {
-            log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace, 
componentName, tenant);
-            throw new RestException(Status.BAD_REQUEST, "Tenant does not 
exist");
-        } catch (PulsarAdminException e) {
-            log.error("{}/{}/{} Issues getting tenant data", tenant, 
namespace, componentName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
-        }
-
-        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
-
-        if (functionMetaDataManager.containsFunction(tenant, namespace, 
componentName)) {
-            log.error("{} {}/{}/{} already exists", 
ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
-            throw new RestException(Status.BAD_REQUEST, String.format("%s %s 
already exists", ComponentTypeUtils.toString(componentType), componentName));
-        }
-
-        FunctionDetails functionDetails = null;
-        boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
-        File componentPackageFile = null;
-        try {
-
-            // validate parameters
-            try {
-                if (isPkgUrlProvided) {
-
-                    if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
-                        throw new IllegalArgumentException("Function Package 
url is not valid. supported url (http/https/file)");
-                    }
-                    try {
-                        componentPackageFile = 
FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
-                    } catch (Exception e) {
-                        throw new 
IllegalArgumentException(String.format("Encountered error \"%s\" when getting 
%s package from %s", e.getMessage(), 
ComponentTypeUtils.toString(componentType), functionPkgUrl));
-                    }
-                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, componentName,
-                            componentConfigJson, componentType, 
componentPackageFile);
-                } else {
-                    if (uploadedInputStream != null) {
-                        componentPackageFile = 
WorkerUtils.dumpToTmpFile(uploadedInputStream);
-                    }
-                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, componentName,
-                            componentConfigJson, componentType, 
componentPackageFile);
-                    if (!isFunctionCodeBuiltin(functionDetails) && 
(componentPackageFile == null || fileDetail == null)) {
-                        throw new 
IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package 
is not provided");
-                    }
-                }
-            } catch (Exception e) {
-                log.error("Invalid register {} request @ /{}/{}/{}", 
ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, 
e);
-                throw new RestException(Status.BAD_REQUEST, e.getMessage());
-            }
-
-            try {
-                
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
-            } catch (Exception e) {
-                log.error("{} {}/{}/{} cannot be admitted by the runtime 
factory", ComponentTypeUtils.toString(componentType), tenant, namespace, 
componentName);
-                throw new RestException(Status.BAD_REQUEST, String.format("%s 
%s cannot be admitted:- %s", ComponentTypeUtils.toString(componentType), 
componentName, e.getMessage()));
-            }
-
-            // function state
-            FunctionMetaData.Builder functionMetaDataBuilder = 
FunctionMetaData.newBuilder()
-                    .setFunctionDetails(functionDetails)
-                    .setCreateTime(System.currentTimeMillis())
-                    .setVersion(0);
-
-            // cache auth if need
-            if (worker().getWorkerConfig().isAuthenticationEnabled()) {
-
-                if (clientAuthenticationDataHttps != null) {
-                    try {
-                        Optional<FunctionAuthData> functionAuthData = 
worker().getFunctionRuntimeManager()
-                                .getRuntimeFactory()
-                                .getAuthProvider()
-                                .cacheAuthData(tenant, namespace, 
componentName, clientAuthenticationDataHttps);
-
-                        if (functionAuthData.isPresent()) {
-                            functionMetaDataBuilder.setFunctionAuthSpec(
-                                    
Function.FunctionAuthenticationSpec.newBuilder()
-                                            
.setData(ByteString.copyFrom(functionAuthData.get().getData()))
-                                            .build());
-                        }
-                    } catch (Exception e) {
-                        log.error("Error caching authentication data for {} 
{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, 
componentName, e);
-
-
-                        throw new RestException(Status.INTERNAL_SERVER_ERROR, 
String.format("Error caching authentication data for %s %s:- %s", 
ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
-                    }
-                }
-            }
-
-            PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
-            try {
-                packageLocationMetaDataBuilder = 
getFunctionPackageLocation(functionMetaDataBuilder.build(),
-                        functionPkgUrl, fileDetail, componentPackageFile);
-            } catch (Exception e) {
-                log.error("Failed process {} {}/{}/{} package: ", 
ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, 
e);
-                throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
-            }
-
-            
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
-            updateRequest(functionMetaDataBuilder.build());
-        } finally {
-
-            if (!(functionPkgUrl != null && 
functionPkgUrl.startsWith(Utils.FILE))
-                    && componentPackageFile != null && 
componentPackageFile.exists()) {
-                componentPackageFile.delete();
-            }
-        }
-    }
-
-    private PackageLocationMetaData.Builder getFunctionPackageLocation(final 
FunctionMetaData functionMetaData,
-                                                                       final 
String functionPkgUrl,
-                                                                       final 
FormDataContentDisposition fileDetail,
-                                                                       final 
File uploadedInputStreamAsFile) throws Exception {
+    PackageLocationMetaData.Builder getFunctionPackageLocation(final 
FunctionMetaData functionMetaData,
+                                                               final String 
functionPkgUrl,
+                                                               final 
FormDataContentDisposition fileDetail,
+                                                               final File 
uploadedInputStreamAsFile) throws Exception {
         FunctionDetails functionDetails = 
functionMetaData.getFunctionDetails();
         String tenant = functionDetails.getTenant();
         String namespace = functionDetails.getNamespace();
@@ -512,230 +337,6 @@ public abstract class ComponentImpl {
         return packageLocationMetaDataBuilder;
     }
 
-
-    public void updateFunction(final String tenant,
-                               final String namespace,
-                               final String componentName,
-                               final InputStream uploadedInputStream,
-                               final FormDataContentDisposition fileDetail,
-                               final String functionPkgUrl,
-                               final String componentConfigJson,
-                               final String clientRole,
-                               AuthenticationDataHttps 
clientAuthenticationDataHttps,
-                               UpdateOptions updateOptions) {
-
-        if (!isWorkerServiceAvailable()) {
-            throwUnavailableException();
-        }
-
-        if (tenant == null) {
-            throw new RestException(Status.BAD_REQUEST, "Tenant is not 
provided");
-        }
-        if (namespace == null) {
-            throw new RestException(Status.BAD_REQUEST, "Namespace is not 
provided");
-        }
-        if (componentName == null) {
-            throw new RestException(Status.BAD_REQUEST, 
ComponentTypeUtils.toString(componentType) + " Name is not provided");
-        }
-
-        try {
-            if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
update {}", tenant, namespace,
-                        componentName, clientRole, 
ComponentTypeUtils.toString(componentType));
-                throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
-
-            }
-        } catch (PulsarAdminException e) {
-            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, 
componentName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
-        }
-
-        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
-
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, 
componentName)) {
-            throw new RestException(Status.BAD_REQUEST, String.format("%s %s 
doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
-        }
-
-        String mergedComponentConfigJson;
-        String existingComponentConfigJson;
-
-        FunctionMetaData existingComponent = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
-
-        if 
(!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType))
 {
-            log.error("{}/{}/{} is not a {}", tenant, namespace, 
componentName, ComponentTypeUtils.toString(componentType));
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s 
doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
-        }
-
-        if (componentType.equals(FunctionDetails.ComponentType.FUNCTION)) {
-            FunctionConfig existingFunctionConfig = 
FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
-            existingComponentConfigJson = new 
Gson().toJson(existingFunctionConfig);
-            FunctionConfig functionConfig = new 
Gson().fromJson(componentConfigJson, FunctionConfig.class);
-            // The rest end points take precedence over whatever is there in 
functionconfig
-            functionConfig.setTenant(tenant);
-            functionConfig.setNamespace(namespace);
-            functionConfig.setName(componentName);
-            try {
-                FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(existingFunctionConfig,
-                        functionConfig);
-                mergedComponentConfigJson = new Gson().toJson(mergedConfig);
-            } catch (Exception e) {
-                throw new RestException(Status.BAD_REQUEST, e.getMessage());
-            }
-        } else if (componentType.equals(FunctionDetails.ComponentType.SOURCE)) 
{
-            SourceConfig existingSourceConfig = 
SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
-            existingComponentConfigJson = new 
Gson().toJson(existingSourceConfig);
-            SourceConfig sourceConfig = new 
Gson().fromJson(componentConfigJson, SourceConfig.class);
-            // The rest end points take precedence over whatever is there in 
functionconfig
-            sourceConfig.setTenant(tenant);
-            sourceConfig.setNamespace(namespace);
-            sourceConfig.setName(componentName);
-            try {
-                SourceConfig mergedConfig = 
SourceConfigUtils.validateUpdate(existingSourceConfig, sourceConfig);
-                mergedComponentConfigJson = new Gson().toJson(mergedConfig);
-            } catch (Exception e) {
-                throw new RestException(Status.BAD_REQUEST, e.getMessage());
-            }
-        } else {
-            SinkConfig existingSinkConfig = 
SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
-            existingComponentConfigJson = new 
Gson().toJson(existingSinkConfig);
-            SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, 
SinkConfig.class);
-            // The rest end points take precedence over whatever is there in 
functionconfig
-            sinkConfig.setTenant(tenant);
-            sinkConfig.setNamespace(namespace);
-            sinkConfig.setName(componentName);
-            try {
-                SinkConfig mergedConfig = 
SinkConfigUtils.validateUpdate(existingSinkConfig, sinkConfig);
-                mergedComponentConfigJson = new Gson().toJson(mergedConfig);
-            } catch (Exception e) {
-                throw new RestException(Status.BAD_REQUEST, e.getMessage());
-            }
-        }
-
-        if (existingComponentConfigJson.equals(mergedComponentConfigJson) && 
isBlank(functionPkgUrl) && uploadedInputStream == null) {
-            log.error("{}/{}/{} Update contains no changes", tenant, 
namespace, componentName);
-            throw new RestException(Status.BAD_REQUEST, "Update contains no 
change");
-        }
-
-        FunctionDetails functionDetails = null;
-        File componentPackageFile = null;
-        try {
-
-            // validate parameters
-            try {
-                if (isNotBlank(functionPkgUrl)) {
-                    try {
-                        componentPackageFile = 
FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
-                    } catch (Exception e) {
-                        throw new 
IllegalArgumentException(String.format("Encountered error \"%s\" when getting 
%s package from %s", e.getMessage(), 
ComponentTypeUtils.toString(componentType), functionPkgUrl));
-                    }
-                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, componentName,
-                            mergedComponentConfigJson, componentType, 
componentPackageFile);
-
-                } else if 
(existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
-                        || 
existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) 
{
-                    try {
-                        componentPackageFile = 
FunctionCommon.extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
-                    } catch (Exception e) {
-                        throw new 
IllegalArgumentException(String.format("Encountered error \"%s\" when getting 
%s package from %s", e.getMessage(), 
ComponentTypeUtils.toString(componentType), functionPkgUrl));
-                    }
-                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, componentName,
-                            mergedComponentConfigJson, componentType, 
componentPackageFile);
-                } else if (uploadedInputStream != null) {
-
-                    componentPackageFile = 
WorkerUtils.dumpToTmpFile(uploadedInputStream);
-                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, componentName,
-                            mergedComponentConfigJson, componentType, 
componentPackageFile);
-
-                } else if 
(existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN))
 {
-                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, componentName,
-                            mergedComponentConfigJson, componentType, 
componentPackageFile);
-                    if (!isFunctionCodeBuiltin(functionDetails) && 
(componentPackageFile == null || fileDetail == null)) {
-                        throw new 
IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package 
is not provided");
-                    }
-                } else {
-
-                    componentPackageFile = FunctionCommon.createPkgTempFile();
-                    componentPackageFile.deleteOnExit();
-                    log.info("componentPackageFile: {}", componentPackageFile);
-                    
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), 
componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
-
-                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, componentName,
-                            mergedComponentConfigJson, componentType, 
componentPackageFile);
-                }
-            } catch (Exception e) {
-                log.error("Invalid update {} request @ /{}/{}/{}", 
ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, 
e);
-                throw new RestException(Status.BAD_REQUEST, e.getMessage());
-            }
-
-            try {
-                
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
-            } catch (Exception e) {
-                log.error("Updated {} {}/{}/{} cannot be submitted to runtime 
factory", ComponentTypeUtils.toString(componentType), tenant, namespace, 
componentName);
-                throw new RestException(Status.BAD_REQUEST, String.format("%s 
%s cannot be admitted:- %s",
-                        ComponentTypeUtils.toString(componentType), 
componentName, e.getMessage()));
-            }
-
-            // merge from existing metadata
-            FunctionMetaData.Builder functionMetaDataBuilder = 
FunctionMetaData.newBuilder().mergeFrom(existingComponent)
-                    .setFunctionDetails(functionDetails);
-
-            // update auth data if need
-            if (worker().getWorkerConfig().isAuthenticationEnabled()) {
-                if (clientAuthenticationDataHttps != null && updateOptions != 
null && updateOptions.isUpdateAuthData()) {
-                    // get existing auth data if it exists
-                    Optional<FunctionAuthData> existingFunctionAuthData = 
Optional.empty();
-                    if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
-                        existingFunctionAuthData = 
Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
-                    }
-
-                    try {
-                        Optional<FunctionAuthData> newFunctionAuthData = 
worker().getFunctionRuntimeManager()
-                                .getRuntimeFactory()
-                                .getAuthProvider()
-                                .updateAuthData(
-                                        tenant, namespace,
-                                        componentName, 
existingFunctionAuthData,
-                                        clientAuthenticationDataHttps);
-
-                        if (newFunctionAuthData.isPresent()) {
-                            functionMetaDataBuilder.setFunctionAuthSpec(
-                                    
Function.FunctionAuthenticationSpec.newBuilder()
-                                            
.setData(ByteString.copyFrom(newFunctionAuthData.get().getData()))
-                                            .build());
-                        } else {
-                            functionMetaDataBuilder.clearFunctionAuthSpec();
-                        }
-                    } catch (Exception e) {
-                        log.error("Error updating authentication data for {} 
{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, 
componentName, e);
-                        throw new RestException(Status.INTERNAL_SERVER_ERROR, 
String.format("Error caching authentication data for %s %s:- %s", 
ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
-                    }
-                }
-            }
-
-            PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
-            if (isNotBlank(functionPkgUrl) || uploadedInputStream != null) {
-                try {
-                    packageLocationMetaDataBuilder = 
getFunctionPackageLocation(functionMetaDataBuilder.build(),
-                            functionPkgUrl, fileDetail, componentPackageFile);
-                } catch (Exception e) {
-                    log.error("Failed process {} {}/{}/{} package: ", 
ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, 
e);
-                    throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
-                }
-            } else {
-                packageLocationMetaDataBuilder = 
PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
-            }
-
-            
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
-
-            updateRequest(functionMetaDataBuilder.build());
-        } finally {
-            if (!(functionPkgUrl != null && 
functionPkgUrl.startsWith(Utils.FILE))
-                    && componentPackageFile != null && 
componentPackageFile.exists()) {
-                componentPackageFile.delete();
-            }
-        }
-    }
-
     public void deregisterFunction(final String tenant,
                                    final String namespace,
                                    final String componentName,
@@ -1269,7 +870,7 @@ public abstract class ComponentImpl {
         return retVals;
     }
 
-    private void updateRequest(final FunctionMetaData functionMetaData) {
+    void updateRequest(final FunctionMetaData functionMetaData) {
 
         // Submit to FMT
         FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
@@ -1708,80 +1309,6 @@ public abstract class ComponentImpl {
         return null;
     }
 
-private FunctionDetails validateUpdateRequestParams(final String tenant,
-                                                    final String namespace,
-                                                    final String componentName,
-                                                    final String 
componentConfigJson,
-                                                    final 
FunctionDetails.ComponentType componentType,
-                                                    final File 
componentPackageFile) throws IOException {
-        if (tenant == null) {
-            throw new IllegalArgumentException("Tenant is not provided");
-        }
-        if (namespace == null) {
-            throw new IllegalArgumentException("Namespace is not provided");
-        }
-        if (componentName == null) {
-            throw new IllegalArgumentException(String.format("%s Name is not 
provided", ComponentTypeUtils.toString(componentType)));
-        }
-
-        if (componentType.equals(FunctionDetails.ComponentType.FUNCTION) && 
!isEmpty(componentConfigJson)) {
-            FunctionConfig functionConfig = new 
Gson().fromJson(componentConfigJson, FunctionConfig.class);
-            // The rest end points take precedence over whatever is there in 
functionconfig
-            functionConfig.setTenant(tenant);
-            functionConfig.setNamespace(namespace);
-            functionConfig.setName(componentName);
-            FunctionConfigUtils.inferMissingArguments(functionConfig);
-            ClassLoader clsLoader = 
FunctionConfigUtils.validate(functionConfig, componentPackageFile);
-            return FunctionConfigUtils.convert(functionConfig, clsLoader);
-        }
-        if (componentType.equals(FunctionDetails.ComponentType.SOURCE)) {
-            Path archivePath = null;
-            SourceConfig sourceConfig = new 
Gson().fromJson(componentConfigJson, SourceConfig.class);
-            // The rest end points take precedence over whatever is there in 
sourceconfig
-            sourceConfig.setTenant(tenant);
-            sourceConfig.setNamespace(namespace);
-            sourceConfig.setName(componentName);
-            
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig);
-            if (!StringUtils.isEmpty(sourceConfig.getArchive())) {
-                String builtinArchive = sourceConfig.getArchive();
-                if 
(builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
-                    builtinArchive = 
builtinArchive.replaceFirst("^builtin://", "");
-                }
-                try {
-                    archivePath = 
this.worker().getConnectorsManager().getSourceArchive(builtinArchive);
-                } catch (Exception e) {
-                    throw new IllegalArgumentException(String.format("No 
Source archive %s found", archivePath));
-                }
-            }
-            SourceConfigUtils.ExtractedSourceDetails sourceDetails = 
SourceConfigUtils.validate(sourceConfig, archivePath, componentPackageFile);
-            return SourceConfigUtils.convert(sourceConfig, sourceDetails);
-        }
-        if (componentType.equals(FunctionDetails.ComponentType.SINK)) {
-            Path archivePath = null;
-            SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, 
SinkConfig.class);
-            // The rest end points take precedence over whatever is there in 
sinkConfig
-            sinkConfig.setTenant(tenant);
-            sinkConfig.setNamespace(namespace);
-            sinkConfig.setName(componentName);
-            
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sinkConfig);
-            if (!StringUtils.isEmpty(sinkConfig.getArchive())) {
-                String builtinArchive = sinkConfig.getArchive();
-                if 
(builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
-                    builtinArchive = 
builtinArchive.replaceFirst("^builtin://", "");
-                }
-                try {
-                    archivePath = 
this.worker().getConnectorsManager().getSinkArchive(builtinArchive);
-                } catch (Exception e) {
-                    throw new IllegalArgumentException(String.format("No Sink 
archive %s found", archivePath));
-                }
-            }
-            SinkConfigUtils.ExtractedSinkDetails sinkDetails = 
SinkConfigUtils.validate(sinkConfig, archivePath, componentPackageFile);
-            return SinkConfigUtils.convert(sinkConfig, sinkDetails);
-        } else {
-            throw new IllegalArgumentException("Unrecognized component type: " 
+ ComponentTypeUtils.toString(componentType));
-        }
-    }
-
     private void validateTriggerRequestParams(final String tenant,
                                               final String namespace,
                                               final String functionName,
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index bf09fbf..da86189 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -18,27 +18,401 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
+import com.google.protobuf.ByteString;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.functions.auth.FunctionAuthData;
+import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.ComponentTypeUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.rest.RestException;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Supplier;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static 
org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static 
org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
+import static 
org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
+
 @Slf4j
 public class FunctionsImpl extends ComponentImpl {
 
+    public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
+        super(workerServiceSupplier, 
Function.FunctionDetails.ComponentType.FUNCTION);
+    }
+
+    public void registerFunction(final String tenant,
+                                 final String namespace,
+                                 final String functionName,
+                                 final InputStream uploadedInputStream,
+                                 final FormDataContentDisposition fileDetail,
+                                 final String functionPkgUrl,
+                                 final FunctionConfig functionConfig,
+                                 final String clientRole,
+                                 AuthenticationDataHttps 
clientAuthenticationDataHttps) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        if (tenant == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is 
not provided");
+        }
+        if (namespace == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is 
not provided");
+        }
+        if (functionName == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, 
ComponentTypeUtils.toString(componentType) + " Name is not provided");
+        }
+
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
register {}", tenant, namespace,
+                        functionName, clientRole, 
ComponentTypeUtils.toString(componentType));
+                throw new RestException(Response.Status.UNAUTHORIZED, "client 
is not authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, 
functionName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
+
+        try {
+            // Check tenant exists
+            worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
+
+            String qualifiedNamespace = tenant + "/" + namespace;
+            List<String> namespaces = 
worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
+            if (namespaces != null && 
!namespaces.contains(qualifiedNamespace)) {
+                String qualifiedNamespaceWithCluster = 
String.format("%s/%s/%s", tenant,
+                        
worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
+                if (namespaces != null && 
!namespaces.contains(qualifiedNamespaceWithCluster)) {
+                    log.error("{}/{}/{} Namespace {} does not exist", tenant, 
namespace, functionName, namespace);
+                    throw new RestException(Response.Status.BAD_REQUEST, 
"Namespace does not exist");
+                }
+            }
+        } catch (PulsarAdminException.NotAuthorizedException e) {
+            log.error("{}/{}/{} Client [{}] is not admin and authorized to 
operate {} on tenant", tenant, namespace,
+                    functionName, clientRole, 
ComponentTypeUtils.toString(componentType));
+            throw new RestException(Response.Status.UNAUTHORIZED, "client is 
not authorize to perform operation");
+        } catch (PulsarAdminException.NotFoundException e) {
+            log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace, 
functionName, tenant);
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant does 
not exist");
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Issues getting tenant data", tenant, 
namespace, functionName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
+
+        if (functionMetaDataManager.containsFunction(tenant, namespace, 
functionName)) {
+            log.error("{} {}/{}/{} already exists", 
ComponentTypeUtils.toString(componentType), tenant, namespace, functionName);
+            throw new RestException(Response.Status.BAD_REQUEST, 
String.format("%s %s already exists", 
ComponentTypeUtils.toString(componentType), functionName));
+        }
+
+        Function.FunctionDetails functionDetails = null;
+        boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
+        File componentPackageFile = null;
+        try {
+
+            // validate parameters
+            try {
+                if (isPkgUrlProvided) {
+
+                    if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
+                        throw new IllegalArgumentException("Function Package 
url is not valid. supported url (http/https/file)");
+                    }
+                    try {
+                        componentPackageFile = 
FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
+                    } catch (Exception e) {
+                        throw new 
IllegalArgumentException(String.format("Encountered error \"%s\" when getting 
%s package from %s", e.getMessage(), 
ComponentTypeUtils.toString(componentType), functionPkgUrl));
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, functionName,
+                            functionConfig, componentPackageFile);
+                } else {
+                    if (uploadedInputStream != null) {
+                        componentPackageFile = 
WorkerUtils.dumpToTmpFile(uploadedInputStream);
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, functionName,
+                            functionConfig, componentPackageFile);
+                    if (!isFunctionCodeBuiltin(functionDetails) && 
(componentPackageFile == null || fileDetail == null)) {
+                        throw new 
IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package 
is not provided");
+                    }
+                }
+            } catch (Exception e) {
+                log.error("Invalid register {} request @ /{}/{}/{}", 
ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+                throw new RestException(Response.Status.BAD_REQUEST, 
e.getMessage());
+            }
+
+            try {
+                
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+            } catch (Exception e) {
+                log.error("{} {}/{}/{} cannot be admitted by the runtime 
factory", ComponentTypeUtils.toString(componentType), tenant, namespace, 
functionName);
+                throw new RestException(Response.Status.BAD_REQUEST, 
String.format("%s %s cannot be admitted:- %s", 
ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
+            }
+
+            // function state
+            Function.FunctionMetaData.Builder functionMetaDataBuilder = 
Function.FunctionMetaData.newBuilder()
+                    .setFunctionDetails(functionDetails)
+                    .setCreateTime(System.currentTimeMillis())
+                    .setVersion(0);
+
+            // cache auth if need
+            if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+
+                if (clientAuthenticationDataHttps != null) {
+                    try {
+                        Optional<FunctionAuthData> functionAuthData = 
worker().getFunctionRuntimeManager()
+                                .getRuntimeFactory()
+                                .getAuthProvider()
+                                .cacheAuthData(tenant, namespace, 
functionName, clientAuthenticationDataHttps);
+
+                        if (functionAuthData.isPresent()) {
+                            functionMetaDataBuilder.setFunctionAuthSpec(
+                                    
Function.FunctionAuthenticationSpec.newBuilder()
+                                            
.setData(ByteString.copyFrom(functionAuthData.get().getData()))
+                                            .build());
+                        }
+                    } catch (Exception e) {
+                        log.error("Error caching authentication data for {} 
{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, 
functionName, e);
+
+
+                        throw new 
RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error 
caching authentication data for %s %s:- %s", 
ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
+                    }
+                }
+            }
+
+            Function.PackageLocationMetaData.Builder 
packageLocationMetaDataBuilder;
+            try {
+                packageLocationMetaDataBuilder = 
getFunctionPackageLocation(functionMetaDataBuilder.build(),
+                        functionPkgUrl, fileDetail, componentPackageFile);
+            } catch (Exception e) {
+                log.error("Failed process {} {}/{}/{} package: ", 
ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+            }
+
+            
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+            updateRequest(functionMetaDataBuilder.build());
+        } finally {
+
+            if (!(functionPkgUrl != null && 
functionPkgUrl.startsWith(Utils.FILE))
+                    && componentPackageFile != null && 
componentPackageFile.exists()) {
+                componentPackageFile.delete();
+            }
+        }
+    }
+
+    public void updateFunction(final String tenant,
+                               final String namespace,
+                               final String functionName,
+                               final InputStream uploadedInputStream,
+                               final FormDataContentDisposition fileDetail,
+                               final String functionPkgUrl,
+                               final FunctionConfig functionConfig,
+                               final String clientRole,
+                               AuthenticationDataHttps 
clientAuthenticationDataHttps,
+                               UpdateOptions updateOptions) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        if (tenant == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is 
not provided");
+        }
+        if (namespace == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is 
not provided");
+        }
+        if (functionName == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, 
ComponentTypeUtils.toString(componentType) + " Name is not provided");
+        }
+
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
update {}", tenant, namespace,
+                        functionName, clientRole, 
ComponentTypeUtils.toString(componentType));
+                throw new RestException(Response.Status.UNAUTHORIZED, "client 
is not authorize to perform operation");
+
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, 
functionName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
+
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, 
functionName)) {
+            throw new RestException(Response.Status.BAD_REQUEST, 
String.format("%s %s doesn't exist", 
ComponentTypeUtils.toString(componentType), functionName));
+        }
+
+        Function.FunctionMetaData existingComponent = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
+
+        if 
(!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType))
 {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, functionName, 
ComponentTypeUtils.toString(componentType));
+            throw new RestException(Response.Status.NOT_FOUND, 
String.format("%s %s doesn't exist", 
ComponentTypeUtils.toString(componentType), functionName));
+        }
+
+        FunctionConfig existingFunctionConfig = 
FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
+        // The rest end points take precedence over whatever is there in 
function config
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(functionName);
+        FunctionConfig mergedConfig;
+        try {
+            mergedConfig = 
FunctionConfigUtils.validateUpdate(existingFunctionConfig, functionConfig);
+        } catch (Exception e) {
+            throw new RestException(Response.Status.BAD_REQUEST, 
e.getMessage());
+        }
+
+        if (existingFunctionConfig.equals(mergedConfig) && 
isBlank(functionPkgUrl) && uploadedInputStream == null) {
+            log.error("{}/{}/{} Update contains no changes", tenant, 
namespace, functionName);
+            throw new RestException(Response.Status.BAD_REQUEST, "Update 
contains no change");
+        }
+
+        Function.FunctionDetails functionDetails = null;
+        File componentPackageFile = null;
+        try {
+
+            // validate parameters
+            try {
+                if (isNotBlank(functionPkgUrl)) {
+                    try {
+                        componentPackageFile = 
FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
+                    } catch (Exception e) {
+                        throw new 
IllegalArgumentException(String.format("Encountered error \"%s\" when getting 
%s package from %s", e.getMessage(), 
ComponentTypeUtils.toString(componentType), functionPkgUrl));
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, functionName,
+                            mergedConfig, componentPackageFile);
+
+                } else if 
(existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
+                        || 
existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) 
{
+                    try {
+                        componentPackageFile = 
FunctionCommon.extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
+                    } catch (Exception e) {
+                        throw new 
IllegalArgumentException(String.format("Encountered error \"%s\" when getting 
%s package from %s", e.getMessage(), 
ComponentTypeUtils.toString(componentType), functionPkgUrl));
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, functionName,
+                            mergedConfig, componentPackageFile);
+                } else if (uploadedInputStream != null) {
+
+                    componentPackageFile = 
WorkerUtils.dumpToTmpFile(uploadedInputStream);
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, functionName,
+                            mergedConfig, componentPackageFile);
+
+                } else if 
(existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN))
 {
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, functionName,
+                            mergedConfig, componentPackageFile);
+                    if (!isFunctionCodeBuiltin(functionDetails) && 
(componentPackageFile == null || fileDetail == null)) {
+                        throw new 
IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package 
is not provided");
+                    }
+                } else {
+
+                    componentPackageFile = FunctionCommon.createPkgTempFile();
+                    componentPackageFile.deleteOnExit();
+                    log.info("componentPackageFile: {}", componentPackageFile);
+                    
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), 
componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
+
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, functionName,
+                            mergedConfig, componentPackageFile);
+                }
+            } catch (Exception e) {
+                log.error("Invalid update {} request @ /{}/{}/{}", 
ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+                throw new RestException(Response.Status.BAD_REQUEST, 
e.getMessage());
+            }
+
+            try {
+                
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+            } catch (Exception e) {
+                log.error("Updated {} {}/{}/{} cannot be submitted to runtime 
factory", ComponentTypeUtils.toString(componentType), tenant, namespace, 
functionName);
+                throw new RestException(Response.Status.BAD_REQUEST, 
String.format("%s %s cannot be admitted:- %s",
+                        ComponentTypeUtils.toString(componentType), 
functionName, e.getMessage()));
+            }
+
+            // merge from existing metadata
+            Function.FunctionMetaData.Builder functionMetaDataBuilder = 
Function.FunctionMetaData.newBuilder().mergeFrom(existingComponent)
+                    .setFunctionDetails(functionDetails);
+
+            // update auth data if need
+            if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+                if (clientAuthenticationDataHttps != null && updateOptions != 
null && updateOptions.isUpdateAuthData()) {
+                    // get existing auth data if it exists
+                    Optional<FunctionAuthData> existingFunctionAuthData = 
Optional.empty();
+                    if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
+                        existingFunctionAuthData = 
Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
+                    }
+
+                    try {
+                        Optional<FunctionAuthData> newFunctionAuthData = 
worker().getFunctionRuntimeManager()
+                                .getRuntimeFactory()
+                                .getAuthProvider()
+                                .updateAuthData(
+                                        tenant, namespace,
+                                        functionName, existingFunctionAuthData,
+                                        clientAuthenticationDataHttps);
+
+                        if (newFunctionAuthData.isPresent()) {
+                            functionMetaDataBuilder.setFunctionAuthSpec(
+                                    
Function.FunctionAuthenticationSpec.newBuilder()
+                                            
.setData(ByteString.copyFrom(newFunctionAuthData.get().getData()))
+                                            .build());
+                        } else {
+                            functionMetaDataBuilder.clearFunctionAuthSpec();
+                        }
+                    } catch (Exception e) {
+                        log.error("Error updating authentication data for {} 
{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, 
functionName, e);
+                        throw new 
RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error 
caching authentication data for %s %s:- %s", 
ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
+                    }
+                }
+            }
+
+            Function.PackageLocationMetaData.Builder 
packageLocationMetaDataBuilder;
+            if (isNotBlank(functionPkgUrl) || uploadedInputStream != null) {
+                try {
+                    packageLocationMetaDataBuilder = 
getFunctionPackageLocation(functionMetaDataBuilder.build(),
+                            functionPkgUrl, fileDetail, componentPackageFile);
+                } catch (Exception e) {
+                    log.error("Failed process {} {}/{}/{} package: ", 
ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+                    throw new 
RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+                }
+            } else {
+                packageLocationMetaDataBuilder = 
Function.PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
+            }
+
+            
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+
+            updateRequest(functionMetaDataBuilder.build());
+        } finally {
+            if (!(functionPkgUrl != null && 
functionPkgUrl.startsWith(Utils.FILE))
+                    && componentPackageFile != null && 
componentPackageFile.exists()) {
+                componentPackageFile.delete();
+            }
+        }
+    }
+
     private class GetFunctionStatus extends GetStatus<FunctionStatus, 
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> {
 
         @Override
@@ -195,10 +569,6 @@ public class FunctionsImpl extends ComponentImpl {
         return exceptionInformation;
     }
 
-    public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
-        super(workerServiceSupplier, 
Function.FunctionDetails.ComponentType.FUNCTION);
-    }
-
     /**
      * Get status of a function instance.  If this worker is not running the 
function instance,
      * @param tenant the tenant the function belongs to
@@ -262,4 +632,20 @@ public class FunctionsImpl extends ComponentImpl {
 
         return functionStatus;
     }
+
+    private Function.FunctionDetails validateUpdateRequestParams(final String 
tenant,
+                                                                 final String 
namespace,
+                                                                 final String 
componentName,
+                                                                 final 
FunctionConfig functionConfig,
+                                                                 final File 
componentPackageFile) throws IOException {
+
+        // The rest end points take precedence over whatever is there in 
function config
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(componentName);
+        FunctionConfigUtils.inferMissingArguments(functionConfig);
+        ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, 
componentPackageFile);
+        return FunctionConfigUtils.convert(functionConfig, clsLoader);
+
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
index b1da329..d4e6414 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -18,14 +18,12 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.FunctionState;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.utils.FunctionCommon;
@@ -103,14 +101,9 @@ public class FunctionsImplV2 {
             throw new RestException(Response.Status.BAD_REQUEST, 
e.getMessage());
         }
         FunctionConfig functionConfig = 
FunctionConfigUtils.convertFromDetails(functionDetailsBuilder.build());
-        String functionConfigJson = null;
-        try {
-            functionConfigJson = 
ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig);
-        } catch (JsonProcessingException e) {
-            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
-        }
+
         delegate.registerFunction(tenant, namespace, functionName, 
uploadedInputStream, fileDetail,
-                functionPkgUrl, functionConfigJson, clientRole, null);
+                functionPkgUrl, functionConfig, clientRole, null);
         return Response.ok().build();
     }
 
@@ -125,15 +118,9 @@ public class FunctionsImplV2 {
             throw new RestException(Response.Status.BAD_REQUEST, 
e.getMessage());
         }
         FunctionConfig functionConfig = 
FunctionConfigUtils.convertFromDetails(functionDetailsBuilder.build());
-        String functionConfigJson = null;
-        try {
-            functionConfigJson = 
ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig);
-        } catch (JsonProcessingException e) {
-            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
-        }
 
         delegate.updateFunction(tenant, namespace, functionName, 
uploadedInputStream, fileDetail,
-                functionPkgUrl, functionConfigJson, clientRole, null, null);
+                functionPkgUrl, functionConfig, clientRole, null, null);
         return Response.ok().build();
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index fa07160..504722f 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -18,34 +18,406 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
+import com.google.protobuf.ByteString;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SinkStatus;
+import org.apache.pulsar.functions.auth.FunctionAuthData;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.utils.ComponentTypeUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.rest.RestException;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
+import java.nio.file.Path;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Supplier;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static 
org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static 
org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
 import static 
org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
 
 @Slf4j
 public class SinksImpl extends ComponentImpl {
 
+    public SinksImpl(Supplier<WorkerService> workerServiceSupplier) {
+        super(workerServiceSupplier, 
Function.FunctionDetails.ComponentType.SINK);
+    }
+
+    public void registerSink(final String tenant,
+                             final String namespace,
+                             final String sinkName,
+                             final InputStream uploadedInputStream,
+                             final FormDataContentDisposition fileDetail,
+                             final String sinkPkgUrl,
+                             final SinkConfig sinkConfig,
+                             final String clientRole,
+                             AuthenticationDataHttps 
clientAuthenticationDataHttps) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        if (tenant == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is 
not provided");
+        }
+        if (namespace == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is 
not provided");
+        }
+        if (sinkName == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, 
ComponentTypeUtils.toString(componentType) + " Name is not provided");
+        }
+
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
register {}", tenant, namespace,
+                        sinkName, clientRole, 
ComponentTypeUtils.toString(componentType));
+                throw new RestException(Response.Status.UNAUTHORIZED, "client 
is not authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, 
sinkName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
+
+        try {
+            // Check tenant exists
+            worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
+
+            String qualifiedNamespace = tenant + "/" + namespace;
+            List<String> namespaces = 
worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
+            if (namespaces != null && 
!namespaces.contains(qualifiedNamespace)) {
+                String qualifiedNamespaceWithCluster = 
String.format("%s/%s/%s", tenant,
+                        
worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
+                if (namespaces != null && 
!namespaces.contains(qualifiedNamespaceWithCluster)) {
+                    log.error("{}/{}/{} Namespace {} does not exist", tenant, 
namespace, sinkName, namespace);
+                    throw new RestException(Response.Status.BAD_REQUEST, 
"Namespace does not exist");
+                }
+            }
+        } catch (PulsarAdminException.NotAuthorizedException e) {
+            log.error("{}/{}/{} Client [{}] is not admin and authorized to 
operate {} on tenant", tenant, namespace,
+                    sinkName, clientRole, 
ComponentTypeUtils.toString(componentType));
+            throw new RestException(Response.Status.UNAUTHORIZED, "client is 
not authorize to perform operation");
+        } catch (PulsarAdminException.NotFoundException e) {
+            log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace, 
sinkName, tenant);
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant does 
not exist");
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Issues getting tenant data", tenant, 
namespace, sinkName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
+
+        if (functionMetaDataManager.containsFunction(tenant, namespace, 
sinkName)) {
+            log.error("{} {}/{}/{} already exists", 
ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName);
+            throw new RestException(Response.Status.BAD_REQUEST, 
String.format("%s %s already exists", 
ComponentTypeUtils.toString(componentType), sinkName));
+        }
+
+        Function.FunctionDetails functionDetails = null;
+        boolean isPkgUrlProvided = isNotBlank(sinkPkgUrl);
+        File componentPackageFile = null;
+        try {
+
+            // validate parameters
+            try {
+                if (isPkgUrlProvided) {
+
+                    if (!Utils.isFunctionPackageUrlSupported(sinkPkgUrl)) {
+                        throw new IllegalArgumentException("Function Package 
url is not valid. supported url (http/https/file)");
+                    }
+                    try {
+                        componentPackageFile = 
FunctionCommon.extractFileFromPkgURL(sinkPkgUrl);
+                    } catch (Exception e) {
+                        throw new 
IllegalArgumentException(String.format("Encountered error \"%s\" when getting 
%s package from %s", e.getMessage(), 
ComponentTypeUtils.toString(componentType), sinkPkgUrl));
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, sinkName,
+                            sinkConfig, componentPackageFile);
+                } else {
+                    if (uploadedInputStream != null) {
+                        componentPackageFile = 
WorkerUtils.dumpToTmpFile(uploadedInputStream);
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, sinkName,
+                            sinkConfig, componentPackageFile);
+                    if (!isFunctionCodeBuiltin(functionDetails) && 
(componentPackageFile == null || fileDetail == null)) {
+                        throw new 
IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package 
is not provided");
+                    }
+                }
+            } catch (Exception e) {
+                log.error("Invalid register {} request @ /{}/{}/{}", 
ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+                throw new RestException(Response.Status.BAD_REQUEST, 
e.getMessage());
+            }
+
+            try {
+                
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+            } catch (Exception e) {
+                log.error("{} {}/{}/{} cannot be admitted by the runtime 
factory", ComponentTypeUtils.toString(componentType), tenant, namespace, 
sinkName);
+                throw new RestException(Response.Status.BAD_REQUEST, 
String.format("%s %s cannot be admitted:- %s", 
ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
+            }
+
+            // function state
+            Function.FunctionMetaData.Builder functionMetaDataBuilder = 
Function.FunctionMetaData.newBuilder()
+                    .setFunctionDetails(functionDetails)
+                    .setCreateTime(System.currentTimeMillis())
+                    .setVersion(0);
+
+            // cache auth if need
+            if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+
+                if (clientAuthenticationDataHttps != null) {
+                    try {
+                        Optional<FunctionAuthData> functionAuthData = 
worker().getFunctionRuntimeManager()
+                                .getRuntimeFactory()
+                                .getAuthProvider()
+                                .cacheAuthData(tenant, namespace, sinkName, 
clientAuthenticationDataHttps);
+
+                        if (functionAuthData.isPresent()) {
+                            functionMetaDataBuilder.setFunctionAuthSpec(
+                                    
Function.FunctionAuthenticationSpec.newBuilder()
+                                            
.setData(ByteString.copyFrom(functionAuthData.get().getData()))
+                                            .build());
+                        }
+                    } catch (Exception e) {
+                        log.error("Error caching authentication data for {} 
{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, 
sinkName, e);
+
+
+                        throw new 
RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error 
caching authentication data for %s %s:- %s", 
ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
+                    }
+                }
+            }
+
+            Function.PackageLocationMetaData.Builder 
packageLocationMetaDataBuilder;
+            try {
+                packageLocationMetaDataBuilder = 
getFunctionPackageLocation(functionMetaDataBuilder.build(),
+                        sinkPkgUrl, fileDetail, componentPackageFile);
+            } catch (Exception e) {
+                log.error("Failed process {} {}/{}/{} package: ", 
ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+            }
+
+            
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+            updateRequest(functionMetaDataBuilder.build());
+        } finally {
+
+            if (!(sinkPkgUrl != null && sinkPkgUrl.startsWith(Utils.FILE))
+                    && componentPackageFile != null && 
componentPackageFile.exists()) {
+                componentPackageFile.delete();
+            }
+        }
+    }
+
+    public void updateSink(final String tenant,
+                           final String namespace,
+                           final String sinkName,
+                           final InputStream uploadedInputStream,
+                           final FormDataContentDisposition fileDetail,
+                           final String sinkPkgUrl,
+                           final SinkConfig sinkConfig,
+                           final String clientRole,
+                           AuthenticationDataHttps 
clientAuthenticationDataHttps,
+                           UpdateOptions updateOptions) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        if (tenant == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is 
not provided");
+        }
+        if (namespace == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is 
not provided");
+        }
+        if (sinkName == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, 
ComponentTypeUtils.toString(componentType) + " Name is not provided");
+        }
+
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
update {}", tenant, namespace,
+                        sinkName, clientRole, 
ComponentTypeUtils.toString(componentType));
+                throw new RestException(Response.Status.UNAUTHORIZED, "client 
is not authorize to perform operation");
+
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, 
sinkName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
+
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, 
sinkName)) {
+            throw new RestException(Response.Status.BAD_REQUEST, 
String.format("%s %s doesn't exist", 
ComponentTypeUtils.toString(componentType), sinkName));
+        }
+
+        Function.FunctionMetaData existingComponent = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, sinkName);
+
+        if 
(!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType))
 {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, sinkName, 
ComponentTypeUtils.toString(componentType));
+            throw new RestException(Response.Status.NOT_FOUND, 
String.format("%s %s doesn't exist", 
ComponentTypeUtils.toString(componentType), sinkName));
+        }
+
+
+        SinkConfig existingSinkConfig = 
SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
+        // The rest end points take precedence over whatever is there in 
functionconfig
+        sinkConfig.setTenant(tenant);
+        sinkConfig.setNamespace(namespace);
+        sinkConfig.setName(sinkName);
+
+        SinkConfig mergedConfig;
+        try {
+            mergedConfig = SinkConfigUtils.validateUpdate(existingSinkConfig, 
sinkConfig);
+        } catch (Exception e) {
+            throw new RestException(Response.Status.BAD_REQUEST, 
e.getMessage());
+        }
+
+
+        if (existingSinkConfig.equals(mergedConfig) && isBlank(sinkPkgUrl) && 
uploadedInputStream == null) {
+            log.error("{}/{}/{} Update contains no changes", tenant, 
namespace, sinkName);
+            throw new RestException(Response.Status.BAD_REQUEST, "Update 
contains no change");
+        }
+
+        Function.FunctionDetails functionDetails = null;
+        File componentPackageFile = null;
+        try {
+
+            // validate parameters
+            try {
+                if (isNotBlank(sinkPkgUrl)) {
+                    try {
+                        componentPackageFile = 
FunctionCommon.extractFileFromPkgURL(sinkPkgUrl);
+                    } catch (Exception e) {
+                        throw new 
IllegalArgumentException(String.format("Encountered error \"%s\" when getting 
%s package from %s", e.getMessage(), 
ComponentTypeUtils.toString(componentType), sinkPkgUrl));
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, sinkName,
+                            mergedConfig, componentPackageFile);
+
+                } else if 
(existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
+                        || 
existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) 
{
+                    try {
+                        componentPackageFile = 
FunctionCommon.extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
+                    } catch (Exception e) {
+                        throw new 
IllegalArgumentException(String.format("Encountered error \"%s\" when getting 
%s package from %s", e.getMessage(), 
ComponentTypeUtils.toString(componentType), sinkPkgUrl));
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, sinkName,
+                            mergedConfig, componentPackageFile);
+                } else if (uploadedInputStream != null) {
+
+                    componentPackageFile = 
WorkerUtils.dumpToTmpFile(uploadedInputStream);
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, sinkName,
+                            mergedConfig, componentPackageFile);
+
+                } else if 
(existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN))
 {
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, sinkName,
+                            mergedConfig, componentPackageFile);
+                    if (!isFunctionCodeBuiltin(functionDetails) && 
(componentPackageFile == null || fileDetail == null)) {
+                        throw new 
IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package 
is not provided");
+                    }
+                } else {
+
+                    componentPackageFile = FunctionCommon.createPkgTempFile();
+                    componentPackageFile.deleteOnExit();
+                    log.info("componentPackageFile: {}", componentPackageFile);
+                    
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), 
componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
+
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, sinkName,
+                            mergedConfig, componentPackageFile);
+                }
+            } catch (Exception e) {
+                log.error("Invalid update {} request @ /{}/{}/{}", 
ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+                throw new RestException(Response.Status.BAD_REQUEST, 
e.getMessage());
+            }
+
+            try {
+                
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+            } catch (Exception e) {
+                log.error("Updated {} {}/{}/{} cannot be submitted to runtime 
factory", ComponentTypeUtils.toString(componentType), tenant, namespace, 
sinkName);
+                throw new RestException(Response.Status.BAD_REQUEST, 
String.format("%s %s cannot be admitted:- %s",
+                        ComponentTypeUtils.toString(componentType), sinkName, 
e.getMessage()));
+            }
+
+            // merge from existing metadata
+            Function.FunctionMetaData.Builder functionMetaDataBuilder = 
Function.FunctionMetaData.newBuilder().mergeFrom(existingComponent)
+                    .setFunctionDetails(functionDetails);
+
+            // update auth data if need
+            if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+                if (clientAuthenticationDataHttps != null && updateOptions != 
null && updateOptions.isUpdateAuthData()) {
+                    // get existing auth data if it exists
+                    Optional<FunctionAuthData> existingFunctionAuthData = 
Optional.empty();
+                    if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
+                        existingFunctionAuthData = 
Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
+                    }
+
+                    try {
+                        Optional<FunctionAuthData> newFunctionAuthData = 
worker().getFunctionRuntimeManager()
+                                .getRuntimeFactory()
+                                .getAuthProvider()
+                                .updateAuthData(
+                                        tenant, namespace,
+                                        sinkName, existingFunctionAuthData,
+                                        clientAuthenticationDataHttps);
+
+                        if (newFunctionAuthData.isPresent()) {
+                            functionMetaDataBuilder.setFunctionAuthSpec(
+                                    
Function.FunctionAuthenticationSpec.newBuilder()
+                                            
.setData(ByteString.copyFrom(newFunctionAuthData.get().getData()))
+                                            .build());
+                        } else {
+                            functionMetaDataBuilder.clearFunctionAuthSpec();
+                        }
+                    } catch (Exception e) {
+                        log.error("Error updating authentication data for {} 
{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, 
sinkName, e);
+                        throw new 
RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error 
caching authentication data for %s %s:- %s", 
ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
+                    }
+                }
+            }
+
+            Function.PackageLocationMetaData.Builder 
packageLocationMetaDataBuilder;
+            if (isNotBlank(sinkPkgUrl) || uploadedInputStream != null) {
+                try {
+                    packageLocationMetaDataBuilder = 
getFunctionPackageLocation(functionMetaDataBuilder.build(),
+                            sinkPkgUrl, fileDetail, componentPackageFile);
+                } catch (Exception e) {
+                    log.error("Failed process {} {}/{}/{} package: ", 
ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+                    throw new 
RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+                }
+            } else {
+                packageLocationMetaDataBuilder = 
Function.PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
+            }
+
+            
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+
+            updateRequest(functionMetaDataBuilder.build());
+        } finally {
+            if (!(sinkPkgUrl != null && sinkPkgUrl.startsWith(Utils.FILE))
+                    && componentPackageFile != null && 
componentPackageFile.exists()) {
+                componentPackageFile.delete();
+            }
+        }
+    }
+
     private class GetSinkStatus extends GetStatus<SinkStatus, 
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData> {
 
         @Override
@@ -206,10 +578,6 @@ public class SinksImpl extends ComponentImpl {
         return exceptionInformation;
     }
 
-    public SinksImpl(Supplier<WorkerService> workerServiceSupplier) {
-        super(workerServiceSupplier, 
Function.FunctionDetails.ComponentType.SINK);
-    }
-
     public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
getSinkInstanceStatus(final String tenant,
                                                                                
       final String namespace,
                                                                                
       final String sinkName,
@@ -287,4 +655,31 @@ public class SinksImpl extends ComponentImpl {
         SinkConfig config = 
SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
         return config;
     }
+
+    private Function.FunctionDetails validateUpdateRequestParams(final String 
tenant,
+                                                                 final String 
namespace,
+                                                                 final String 
sinkName,
+                                                                 final 
SinkConfig sinkConfig,
+                                                                 final File 
componentPackageFile) throws IOException {
+
+        Path archivePath = null;
+        // The rest end points take precedence over whatever is there in 
sinkConfig
+        sinkConfig.setTenant(tenant);
+        sinkConfig.setNamespace(namespace);
+        sinkConfig.setName(sinkName);
+        
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sinkConfig);
+        if (!StringUtils.isEmpty(sinkConfig.getArchive())) {
+            String builtinArchive = sinkConfig.getArchive();
+            if 
(builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
+                builtinArchive = builtinArchive.replaceFirst("^builtin://", 
"");
+            }
+            try {
+                archivePath = 
this.worker().getConnectorsManager().getSinkArchive(builtinArchive);
+            } catch (Exception e) {
+                throw new IllegalArgumentException(String.format("No Sink 
archive %s found", archivePath));
+            }
+        }
+        SinkConfigUtils.ExtractedSinkDetails sinkDetails = 
SinkConfigUtils.validate(sinkConfig, archivePath, componentPackageFile);
+        return SinkConfigUtils.convert(sinkConfig, sinkDetails);
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index d35724e..15a26aa 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -18,33 +18,403 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
+import com.google.protobuf.ByteString;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.functions.auth.FunctionAuthData;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.utils.ComponentTypeUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.rest.RestException;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
+import java.nio.file.Path;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Supplier;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static 
org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static 
org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
 import static 
org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
 
 @Slf4j
 public class SourcesImpl extends ComponentImpl {
+
+    public SourcesImpl(Supplier<WorkerService> workerServiceSupplier) {
+        super(workerServiceSupplier, 
Function.FunctionDetails.ComponentType.SOURCE);
+    }
+
+    public void registerSource(final String tenant,
+                               final String namespace,
+                               final String sourceName,
+                               final InputStream uploadedInputStream,
+                               final FormDataContentDisposition fileDetail,
+                               final String sourcePkgUrl,
+                               final SourceConfig sourceConfig,
+                               final String clientRole,
+                               AuthenticationDataHttps 
clientAuthenticationDataHttps) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        if (tenant == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is 
not provided");
+        }
+        if (namespace == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is 
not provided");
+        }
+        if (sourceName == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, 
ComponentTypeUtils.toString(componentType) + " Name is not provided");
+        }
+
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
register {}", tenant, namespace,
+                        sourceName, clientRole, 
ComponentTypeUtils.toString(componentType));
+                throw new RestException(Response.Status.UNAUTHORIZED, "client 
is not authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, 
sourceName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
+
+        try {
+            // Check tenant exists
+            worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
+
+            String qualifiedNamespace = tenant + "/" + namespace;
+            List<String> namespaces = 
worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
+            if (namespaces != null && 
!namespaces.contains(qualifiedNamespace)) {
+                String qualifiedNamespaceWithCluster = 
String.format("%s/%s/%s", tenant,
+                        
worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
+                if (namespaces != null && 
!namespaces.contains(qualifiedNamespaceWithCluster)) {
+                    log.error("{}/{}/{} Namespace {} does not exist", tenant, 
namespace, sourceName, namespace);
+                    throw new RestException(Response.Status.BAD_REQUEST, 
"Namespace does not exist");
+                }
+            }
+        } catch (PulsarAdminException.NotAuthorizedException e) {
+            log.error("{}/{}/{} Client [{}] is not admin and authorized to 
operate {} on tenant", tenant, namespace,
+                    sourceName, clientRole, 
ComponentTypeUtils.toString(componentType));
+            throw new RestException(Response.Status.UNAUTHORIZED, "client is 
not authorize to perform operation");
+        } catch (PulsarAdminException.NotFoundException e) {
+            log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace, 
sourceName, tenant);
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant does 
not exist");
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Issues getting tenant data", tenant, 
namespace, sourceName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
+
+        if (functionMetaDataManager.containsFunction(tenant, namespace, 
sourceName)) {
+            log.error("{} {}/{}/{} already exists", 
ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName);
+            throw new RestException(Response.Status.BAD_REQUEST, 
String.format("%s %s already exists", 
ComponentTypeUtils.toString(componentType), sourceName));
+        }
+
+        Function.FunctionDetails functionDetails = null;
+        boolean isPkgUrlProvided = isNotBlank(sourcePkgUrl);
+        File componentPackageFile = null;
+        try {
+
+            // validate parameters
+            try {
+                if (isPkgUrlProvided) {
+
+                    if (!Utils.isFunctionPackageUrlSupported(sourcePkgUrl)) {
+                        throw new IllegalArgumentException("Function Package 
url is not valid. supported url (http/https/file)");
+                    }
+                    try {
+                        componentPackageFile = 
FunctionCommon.extractFileFromPkgURL(sourcePkgUrl);
+                    } catch (Exception e) {
+                        throw new 
IllegalArgumentException(String.format("Encountered error \"%s\" when getting 
%s package from %s", e.getMessage(), 
ComponentTypeUtils.toString(componentType), sourcePkgUrl));
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, sourceName,
+                            sourceConfig, componentPackageFile);
+                } else {
+                    if (uploadedInputStream != null) {
+                        componentPackageFile = 
WorkerUtils.dumpToTmpFile(uploadedInputStream);
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, sourceName,
+                            sourceConfig, componentPackageFile);
+                    if (!isFunctionCodeBuiltin(functionDetails) && 
(componentPackageFile == null || fileDetail == null)) {
+                        throw new 
IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package 
is not provided");
+                    }
+                }
+            } catch (Exception e) {
+                log.error("Invalid register {} request @ /{}/{}/{}", 
ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+                throw new RestException(Response.Status.BAD_REQUEST, 
e.getMessage());
+            }
+
+            try {
+                
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+            } catch (Exception e) {
+                log.error("{} {}/{}/{} cannot be admitted by the runtime 
factory", ComponentTypeUtils.toString(componentType), tenant, namespace, 
sourceName);
+                throw new RestException(Response.Status.BAD_REQUEST, 
String.format("%s %s cannot be admitted:- %s", 
ComponentTypeUtils.toString(componentType), sourceName, e.getMessage()));
+            }
+
+            // function state
+            Function.FunctionMetaData.Builder functionMetaDataBuilder = 
Function.FunctionMetaData.newBuilder()
+                    .setFunctionDetails(functionDetails)
+                    .setCreateTime(System.currentTimeMillis())
+                    .setVersion(0);
+
+            // cache auth if need
+            if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+
+                if (clientAuthenticationDataHttps != null) {
+                    try {
+                        Optional<FunctionAuthData> functionAuthData = 
worker().getFunctionRuntimeManager()
+                                .getRuntimeFactory()
+                                .getAuthProvider()
+                                .cacheAuthData(tenant, namespace, sourceName, 
clientAuthenticationDataHttps);
+
+                        if (functionAuthData.isPresent()) {
+                            functionMetaDataBuilder.setFunctionAuthSpec(
+                                    
Function.FunctionAuthenticationSpec.newBuilder()
+                                            
.setData(ByteString.copyFrom(functionAuthData.get().getData()))
+                                            .build());
+                        }
+                    } catch (Exception e) {
+                        log.error("Error caching authentication data for {} 
{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, 
sourceName, e);
+
+
+                        throw new 
RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error 
caching authentication data for %s %s:- %s", 
ComponentTypeUtils.toString(componentType), sourceName, e.getMessage()));
+                    }
+                }
+            }
+
+            Function.PackageLocationMetaData.Builder 
packageLocationMetaDataBuilder;
+            try {
+                packageLocationMetaDataBuilder = 
getFunctionPackageLocation(functionMetaDataBuilder.build(),
+                        sourcePkgUrl, fileDetail, componentPackageFile);
+            } catch (Exception e) {
+                log.error("Failed process {} {}/{}/{} package: ", 
ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+            }
+
+            
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+            updateRequest(functionMetaDataBuilder.build());
+        } finally {
+
+            if (!(sourcePkgUrl != null && sourcePkgUrl.startsWith(Utils.FILE))
+                    && componentPackageFile != null && 
componentPackageFile.exists()) {
+                componentPackageFile.delete();
+            }
+        }
+    }
+
+    public void updateSource(final String tenant,
+                               final String namespace,
+                               final String sourceName,
+                               final InputStream uploadedInputStream,
+                               final FormDataContentDisposition fileDetail,
+                               final String sourcePkgUrl,
+                               final SourceConfig sourceConfig,
+                               final String clientRole,
+                               AuthenticationDataHttps 
clientAuthenticationDataHttps,
+                               UpdateOptions updateOptions) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        if (tenant == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is 
not provided");
+        }
+        if (namespace == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is 
not provided");
+        }
+        if (sourceName == null) {
+            throw new RestException(Response.Status.BAD_REQUEST, 
ComponentTypeUtils.toString(componentType) + " Name is not provided");
+        }
+
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
update {}", tenant, namespace,
+                        sourceName, clientRole, 
ComponentTypeUtils.toString(componentType));
+                throw new RestException(Response.Status.UNAUTHORIZED, "client 
is not authorize to perform operation");
+
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, 
sourceName, e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = 
worker().getFunctionMetaDataManager();
+
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, 
sourceName)) {
+            throw new RestException(Response.Status.BAD_REQUEST, 
String.format("%s %s doesn't exist", 
ComponentTypeUtils.toString(componentType), sourceName));
+        }
+
+        Function.FunctionMetaData existingComponent = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, sourceName);
+
+        if 
(!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType))
 {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, sourceName, 
ComponentTypeUtils.toString(componentType));
+            throw new RestException(Response.Status.NOT_FOUND, 
String.format("%s %s doesn't exist", 
ComponentTypeUtils.toString(componentType), sourceName));
+        }
+
+        SourceConfig existingSourceConfig = 
SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
+        // The rest end points take precedence over whatever is there in 
functionconfig
+        sourceConfig.setTenant(tenant);
+        sourceConfig.setNamespace(namespace);
+        sourceConfig.setName(sourceName);
+        SourceConfig mergedConfig;
+        try {
+            mergedConfig = 
SourceConfigUtils.validateUpdate(existingSourceConfig, sourceConfig);
+        } catch (Exception e) {
+            throw new RestException(Response.Status.BAD_REQUEST, 
e.getMessage());
+        }
+
+        if (existingSourceConfig.equals(mergedConfig) && isBlank(sourcePkgUrl) 
&& uploadedInputStream == null) {
+            log.error("{}/{}/{} Update contains no changes", tenant, 
namespace, sourceName);
+            throw new RestException(Response.Status.BAD_REQUEST, "Update 
contains no change");
+        }
+
+        Function.FunctionDetails functionDetails = null;
+        File componentPackageFile = null;
+        try {
+
+            // validate parameters
+            try {
+                if (isNotBlank(sourcePkgUrl)) {
+                    try {
+                        componentPackageFile = 
FunctionCommon.extractFileFromPkgURL(sourcePkgUrl);
+                    } catch (Exception e) {
+                        throw new 
IllegalArgumentException(String.format("Encountered error \"%s\" when getting 
%s package from %s", e.getMessage(), 
ComponentTypeUtils.toString(componentType), sourcePkgUrl));
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, sourceName,
+                            mergedConfig, componentPackageFile);
+
+                } else if 
(existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
+                        || 
existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) 
{
+                    try {
+                        componentPackageFile = 
FunctionCommon.extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
+                    } catch (Exception e) {
+                        throw new 
IllegalArgumentException(String.format("Encountered error \"%s\" when getting 
%s package from %s", e.getMessage(), 
ComponentTypeUtils.toString(componentType), sourcePkgUrl));
+                    }
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, sourceName,
+                            mergedConfig, componentPackageFile);
+                } else if (uploadedInputStream != null) {
+
+                    componentPackageFile = 
WorkerUtils.dumpToTmpFile(uploadedInputStream);
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, sourceName,
+                            mergedConfig, componentPackageFile);
+
+                } else if 
(existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN))
 {
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, sourceName,
+                            mergedConfig, componentPackageFile);
+                    if (!isFunctionCodeBuiltin(functionDetails) && 
(componentPackageFile == null || fileDetail == null)) {
+                        throw new 
IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package 
is not provided");
+                    }
+                } else {
+
+                    componentPackageFile = FunctionCommon.createPkgTempFile();
+                    componentPackageFile.deleteOnExit();
+                    log.info("componentPackageFile: {}", componentPackageFile);
+                    
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), 
componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
+
+                    functionDetails = validateUpdateRequestParams(tenant, 
namespace, sourceName,
+                            mergedConfig, componentPackageFile);
+                }
+            } catch (Exception e) {
+                log.error("Invalid update {} request @ /{}/{}/{}", 
ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+                throw new RestException(Response.Status.BAD_REQUEST, 
e.getMessage());
+            }
+
+            try {
+                
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+            } catch (Exception e) {
+                log.error("Updated {} {}/{}/{} cannot be submitted to runtime 
factory", ComponentTypeUtils.toString(componentType), tenant, namespace, 
sourceName);
+                throw new RestException(Response.Status.BAD_REQUEST, 
String.format("%s %s cannot be admitted:- %s",
+                        ComponentTypeUtils.toString(componentType), 
sourceName, e.getMessage()));
+            }
+
+            // merge from existing metadata
+            Function.FunctionMetaData.Builder functionMetaDataBuilder = 
Function.FunctionMetaData.newBuilder().mergeFrom(existingComponent)
+                    .setFunctionDetails(functionDetails);
+
+            // update auth data if need
+            if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+                if (clientAuthenticationDataHttps != null && updateOptions != 
null && updateOptions.isUpdateAuthData()) {
+                    // get existing auth data if it exists
+                    Optional<FunctionAuthData> existingFunctionAuthData = 
Optional.empty();
+                    if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
+                        existingFunctionAuthData = 
Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
+                    }
+
+                    try {
+                        Optional<FunctionAuthData> newFunctionAuthData = 
worker().getFunctionRuntimeManager()
+                                .getRuntimeFactory()
+                                .getAuthProvider()
+                                .updateAuthData(
+                                        tenant, namespace,
+                                        sourceName, existingFunctionAuthData,
+                                        clientAuthenticationDataHttps);
+
+                        if (newFunctionAuthData.isPresent()) {
+                            functionMetaDataBuilder.setFunctionAuthSpec(
+                                    
Function.FunctionAuthenticationSpec.newBuilder()
+                                            
.setData(ByteString.copyFrom(newFunctionAuthData.get().getData()))
+                                            .build());
+                        } else {
+                            functionMetaDataBuilder.clearFunctionAuthSpec();
+                        }
+                    } catch (Exception e) {
+                        log.error("Error updating authentication data for {} 
{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, 
sourceName, e);
+                        throw new 
RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error 
caching authentication data for %s %s:- %s", 
ComponentTypeUtils.toString(componentType), sourceName, e.getMessage()));
+                    }
+                }
+            }
+
+            Function.PackageLocationMetaData.Builder 
packageLocationMetaDataBuilder;
+            if (isNotBlank(sourcePkgUrl) || uploadedInputStream != null) {
+                try {
+                    packageLocationMetaDataBuilder = 
getFunctionPackageLocation(functionMetaDataBuilder.build(),
+                            sourcePkgUrl, fileDetail, componentPackageFile);
+                } catch (Exception e) {
+                    log.error("Failed process {} {}/{}/{} package: ", 
ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+                    throw new 
RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+                }
+            } else {
+                packageLocationMetaDataBuilder = 
Function.PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
+            }
+
+            
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+
+            updateRequest(functionMetaDataBuilder.build());
+        } finally {
+            if (!(sourcePkgUrl != null && sourcePkgUrl.startsWith(Utils.FILE))
+                    && componentPackageFile != null && 
componentPackageFile.exists()) {
+                componentPackageFile.delete();
+            }
+        }
+    }
+
     private class GetSourceStatus extends GetStatus<SourceStatus, 
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData> {
 
         @Override
@@ -208,10 +578,6 @@ public class SourcesImpl extends ComponentImpl {
         }
     }
 
-    public SourcesImpl(Supplier<WorkerService> workerServiceSupplier) {
-        super(workerServiceSupplier, 
Function.FunctionDetails.ComponentType.SOURCE);
-    }
-
     public SourceStatus getSourceStatus(final String tenant,
                                         final String namespace,
                                         final String componentName,
@@ -285,4 +651,31 @@ public class SourcesImpl extends ComponentImpl {
         SourceConfig config = 
SourceConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
         return config;
     }
+
+    private Function.FunctionDetails validateUpdateRequestParams(final String 
tenant,
+                                                                 final String 
namespace,
+                                                                 final String 
sourceName,
+                                                                 final 
SourceConfig sourceConfig,
+                                                                 final File 
sourcePackageFile) throws IOException {
+
+        Path archivePath = null;
+        // The rest end points take precedence over whatever is there in 
sourceconfig
+        sourceConfig.setTenant(tenant);
+        sourceConfig.setNamespace(namespace);
+        sourceConfig.setName(sourceName);
+        
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig);
+        if (!StringUtils.isEmpty(sourceConfig.getArchive())) {
+            String builtinArchive = sourceConfig.getArchive();
+            if 
(builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
+                builtinArchive = builtinArchive.replaceFirst("^builtin://", 
"");
+            }
+            try {
+                archivePath = 
this.worker().getConnectorsManager().getSourceArchive(builtinArchive);
+            } catch (Exception e) {
+                throw new IllegalArgumentException(String.format("No Source 
archive %s found", archivePath));
+            }
+        }
+        SourceConfigUtils.ExtractedSourceDetails sourceDetails = 
SourceConfigUtils.validate(sourceConfig, archivePath, sourcePackageFile);
+        return SourceConfigUtils.convert(sourceConfig, sourceDetails);
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
index 2f1a87f..0bbacaa 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
@@ -67,10 +67,10 @@ public class FunctionsApiV3Resource extends 
FunctionApiResource {
                                  final @FormDataParam("data") InputStream 
uploadedInputStream,
                                  final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
                                  final @FormDataParam("url") String 
functionPkgUrl,
-                                 final @FormDataParam("functionConfig") String 
functionConfigJson) {
+                                 final @FormDataParam("functionConfig") 
FunctionConfig functionConfig) {
 
         functions.registerFunction(tenant, namespace, functionName, 
uploadedInputStream, fileDetail,
-                functionPkgUrl, functionConfigJson, clientAppId(), 
clientAuthData());
+                functionPkgUrl, functionConfig, clientAppId(), 
clientAuthData());
 
     }
 
@@ -83,11 +83,11 @@ public class FunctionsApiV3Resource extends 
FunctionApiResource {
                                final @FormDataParam("data") InputStream 
uploadedInputStream,
                                final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
                                final @FormDataParam("url") String 
functionPkgUrl,
-                               final @FormDataParam("functionConfig") String 
functionConfigJson,
+                               final @FormDataParam("functionConfig") 
FunctionConfig functionConfig,
                                final @FormDataParam("updateOptions") 
UpdateOptions updateOptions) {
 
         functions.updateFunction(tenant, namespace, functionName, 
uploadedInputStream, fileDetail,
-                functionPkgUrl, functionConfigJson, clientAppId(), 
clientAuthData(), updateOptions);
+                functionPkgUrl, functionConfig, clientAppId(), 
clientAuthData(), updateOptions);
 
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
index e699544..d9f788b 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
@@ -33,7 +33,14 @@ import org.apache.pulsar.functions.worker.rest.api.SinksImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import java.io.IOException;
 import java.io.InputStream;
@@ -62,10 +69,10 @@ public class SinksApiV3Resource extends FunctionApiResource 
{
                              final @FormDataParam("data") InputStream 
uploadedInputStream,
                              final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
                              final @FormDataParam("url") String functionPkgUrl,
-                             final @FormDataParam("sinkConfig") String 
sinkConfigJson) {
+                             final @FormDataParam("sinkConfig") SinkConfig 
sinkConfig) {
 
-        sink.registerFunction(tenant, namespace, sinkName, 
uploadedInputStream, fileDetail,
-                functionPkgUrl, sinkConfigJson, clientAppId(), 
clientAuthData());
+        sink.registerSink(tenant, namespace, sinkName, uploadedInputStream, 
fileDetail,
+                functionPkgUrl, sinkConfig, clientAppId(), clientAuthData());
     }
 
     @PUT
@@ -77,11 +84,11 @@ public class SinksApiV3Resource extends FunctionApiResource 
{
                            final @FormDataParam("data") InputStream 
uploadedInputStream,
                            final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
                            final @FormDataParam("url") String functionPkgUrl,
-                           final @FormDataParam("sinkConfig") String 
sinkConfigJson,
+                           final @FormDataParam("sinkConfig") SinkConfig 
sinkConfig,
                            final @FormDataParam("updateOptions") UpdateOptions 
updateOptions) {
 
-        sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, 
fileDetail,
-                functionPkgUrl, sinkConfigJson, clientAppId(), 
clientAuthData(), updateOptions);
+        sink.updateSink(tenant, namespace, sinkName, uploadedInputStream, 
fileDetail,
+                functionPkgUrl, sinkConfig, clientAppId(), clientAuthData(), 
updateOptions);
     }
 
     @DELETE
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
index de6df1a..9be1581 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
@@ -33,7 +33,14 @@ import 
org.apache.pulsar.functions.worker.rest.api.SourcesImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import java.io.IOException;
 import java.io.InputStream;
@@ -62,10 +69,10 @@ public class SourcesApiV3Resource extends 
FunctionApiResource {
                                    final @FormDataParam("data") InputStream 
uploadedInputStream,
                                    final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
                                    final @FormDataParam("url") String 
functionPkgUrl,
-                                   final @FormDataParam("sourceConfig") String 
sourceConfigJson) {
+                                   final @FormDataParam("sourceConfig") 
SourceConfig sourceConfig) {
 
-        source.registerFunction(tenant, namespace, sourceName, 
uploadedInputStream, fileDetail,
-                functionPkgUrl, sourceConfigJson, clientAppId(), 
clientAuthData());
+        source.registerSource(tenant, namespace, sourceName, 
uploadedInputStream, fileDetail,
+                functionPkgUrl, sourceConfig, clientAppId(), clientAuthData());
 
     }
 
@@ -78,11 +85,11 @@ public class SourcesApiV3Resource extends 
FunctionApiResource {
                              final @FormDataParam("data") InputStream 
uploadedInputStream,
                              final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
                              final @FormDataParam("url") String functionPkgUrl,
-                             final @FormDataParam("sourceConfig") String 
sourceConfigJson,
+                             final @FormDataParam("sourceConfig") SourceConfig 
sourceConfig,
                              final @FormDataParam("updateOptions") 
UpdateOptions updateOptions) {
 
-        source.updateFunction(tenant, namespace, sourceName, 
uploadedInputStream, fileDetail,
-                functionPkgUrl, sourceConfigJson, clientAppId(), 
clientAuthData(), updateOptions);
+        source.updateSource(tenant, namespace, sourceName, 
uploadedInputStream, fileDetail,
+                functionPkgUrl, sourceConfig, clientAppId(), clientAuthData(), 
updateOptions);
     }
 
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 434cfc2..b3b673b 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -484,7 +484,7 @@ public class FunctionApiV3ResourceTest {
                 inputStream,
                 details,
                 functionPkgUrl,
-                new Gson().toJson(functionConfig),
+                functionConfig,
                 null, null);
 
     }
@@ -498,7 +498,7 @@ public class FunctionApiV3ResourceTest {
             mockedInputStream,
             mockedFormData,
             null,
-            new Gson().toJson(functionConfig),
+            functionConfig,
                 null, null);
     }
 
@@ -912,7 +912,7 @@ public class FunctionApiV3ResourceTest {
             inputStream,
             details,
             null,
-            new Gson().toJson(functionConfig),
+            functionConfig,
                 null, null, null);
 
     }
@@ -936,7 +936,7 @@ public class FunctionApiV3ResourceTest {
             mockedInputStream,
             mockedFormData,
             null,
-            new Gson().toJson(functionConfig),
+            functionConfig,
                 null, null, null);
     }
 
@@ -1024,7 +1024,7 @@ public class FunctionApiV3ResourceTest {
             null,
             null,
             filePackageUrl,
-            new Gson().toJson(functionConfig),
+            functionConfig,
                 null, null, null);
 
     }
@@ -1469,7 +1469,7 @@ public class FunctionApiV3ResourceTest {
         functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
         functionConfig.setOutput(outputTopic);
         functionConfig.setOutputSerdeClassName(outputSerdeClassName);
-        resource.registerFunction(tenant, namespace, function, null, null, 
filePackageUrl, new Gson().toJson(functionConfig), null, null);
+        resource.registerFunction(tenant, namespace, function, null, null, 
filePackageUrl, functionConfig, null, null);
 
     }
 
@@ -1500,7 +1500,7 @@ public class FunctionApiV3ResourceTest {
         functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
         functionConfig.setOutput(outputTopic);
         functionConfig.setOutputSerdeClassName(outputSerdeClassName);
-        resource.registerFunction(actualTenant, actualNamespace, actualName, 
null, null, filePackageUrl, new Gson().toJson(functionConfig), null, null);
+        resource.registerFunction(actualTenant, actualNamespace, actualName, 
null, null, filePackageUrl, functionConfig, null, null);
     }
 
     public static FunctionConfig createDefaultFunctionConfig() {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 3b3548a..c884835 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -434,28 +434,28 @@ public class SinkApiV3ResourceTest {
             sinkConfig.setParallelism(parallelism);
         }
 
-        resource.registerFunction(
+        resource.registerSink(
                 tenant,
                 namespace,
                 sink,
                 inputStream,
                 details,
                 pkgUrl,
-                new Gson().toJson(sinkConfig),
+                sinkConfig,
                 null, null);
 
     }
 
     private void registerDefaultSink() throws IOException {
         SinkConfig sinkConfig = createDefaultSinkConfig();
-        resource.registerFunction(
+        resource.registerSink(
             tenant,
             namespace,
                 sink,
                 new FileInputStream(JAR_FILE_PATH),
             mockedFormData,
             null,
-            new Gson().toJson(sinkConfig),
+            sinkConfig,
                 null, null);
     }
 
@@ -548,14 +548,14 @@ public class SinkApiV3ResourceTest {
         sinkConfig.setClassName(className);
         sinkConfig.setParallelism(parallelism);
         sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
-        resource.registerFunction(
+        resource.registerSink(
                 actualTenant,
                 actualNamespace,
                 actualName,
                 new FileInputStream(JAR_FILE_PATH),
                 mockedFormData,
                 null,
-                new Gson().toJson(sinkConfig),
+                sinkConfig,
                 null, null);
     }
 
@@ -829,14 +829,14 @@ public class SinkApiV3ResourceTest {
             
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
         }
 
-        resource.updateFunction(
+        resource.updateSink(
             tenant,
             namespace,
             sink,
             inputStream,
             details,
             null,
-            new Gson().toJson(sinkConfig),
+            sinkConfig,
                 null, null, null);
 
     }
@@ -870,14 +870,14 @@ public class SinkApiV3ResourceTest {
         this.mockedFunctionMetaData = 
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetaData);
 
-        resource.updateFunction(
+        resource.updateSink(
             tenant,
             namespace,
                 sink,
                 new FileInputStream(JAR_FILE_PATH),
             mockedFormData,
             null,
-            new Gson().toJson(sinkConfig),
+            sinkConfig,
                 null, null, null);
     }
 
@@ -972,14 +972,14 @@ public class SinkApiV3ResourceTest {
             CompletableFuture<RequestResult> requestResult = 
CompletableFuture.completedFuture(rr);
             
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        resource.updateFunction(
+        resource.updateSink(
             tenant,
             namespace,
                 sink,
             null,
             null,
             filePackageUrl,
-            new Gson().toJson(sinkConfig),
+            sinkConfig,
                 null, null, null);
     }
 
@@ -1364,7 +1364,7 @@ public class SinkApiV3ResourceTest {
     }
 
     @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Namespace does not exist")
-    public void testRegisterFunctionNonExistingNamespace() throws Exception {
+    public void testregisterSinkNonExistingNamespace() throws Exception {
         try {
             this.namespaceList.clear();
             registerDefaultSink();
@@ -1375,7 +1375,7 @@ public class SinkApiV3ResourceTest {
     }
 
     @Test(expectedExceptions = RestException.class, 
expectedExceptionsMessageRegExp = "Tenant does not exist")
-    public void testRegisterFunctionNonExistingTenant() throws Exception {
+    public void testregisterSinkNonExistingTenant() throws Exception {
         try {
             
when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class);
             registerDefaultSink();
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index cd55369..875e2ef 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -404,28 +404,28 @@ public class SourceApiV3ResourceTest {
             sourceConfig.setParallelism(parallelism);
         }
 
-        resource.registerFunction(
+        resource.registerSource(
                 tenant,
                 namespace,
                 function,
                 inputStream,
                 details,
                 pkgUrl,
-                new Gson().toJson(sourceConfig),
+                sourceConfig,
                 null, null);
 
     }
 
     private void registerDefaultSource() throws IOException {
         SourceConfig sourceConfig = createDefaultSourceConfig();
-        resource.registerFunction(
+        resource.registerSource(
             tenant,
             namespace,
                 source,
             new FileInputStream(JAR_FILE_PATH),
             mockedFormData,
             null,
-            new Gson().toJson(sourceConfig),
+            sourceConfig,
                 null, null);
     }
 
@@ -518,14 +518,14 @@ public class SourceApiV3ResourceTest {
         sourceConfig.setParallelism(parallelism);
         sourceConfig.setTopicName(outputTopic);
         sourceConfig.setSerdeClassName(outputSerdeClassName);
-        resource.registerFunction(
+        resource.registerSource(
                 actualTenant,
                 actualNamespace,
                 actualName,
                 new FileInputStream(JAR_FILE_PATH),
                 mockedFormData,
                 null,
-                new Gson().toJson(sourceConfig),
+                sourceConfig,
                 null, null);
     }
 
@@ -850,14 +850,14 @@ public class SourceApiV3ResourceTest {
             
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
         }
 
-        resource.updateFunction(
+        resource.updateSource(
             tenant,
             namespace,
             function,
             inputStream,
             details,
             null,
-            new Gson().toJson(sourceConfig),
+            sourceConfig,
                 null, null, null);
 
     }
@@ -887,14 +887,14 @@ public class SourceApiV3ResourceTest {
         this.mockedFunctionMetaData = 
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetaData);
 
-        resource.updateFunction(
+        resource.updateSource(
             tenant,
             namespace,
                 source,
                 new FileInputStream(JAR_FILE_PATH),
             mockedFormData,
             null,
-            new Gson().toJson(sourceConfig),
+            sourceConfig,
                 null, null, null);
     }
 
@@ -988,14 +988,14 @@ public class SourceApiV3ResourceTest {
             CompletableFuture<RequestResult> requestResult = 
CompletableFuture.completedFuture(rr);
             
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        resource.updateFunction(
+        resource.updateSource(
             tenant,
             namespace,
                 source,
             null,
             null,
             filePackageUrl,
-            new Gson().toJson(sourceConfig),
+            sourceConfig,
                 null, null, null);
 
     }

Reply via email to