merlimat closed pull request #2114: Submit and run locally builtin connectors
URL: https://github.com/apache/incubator-pulsar/pull/2114
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/pulsar b/bin/pulsar
index d8f6e213cb..c28bfe0210 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -19,7 +19,7 @@
 #
 
 BINDIR=$(dirname "$0")
-PULSAR_HOME=`cd $BINDIR/..;pwd`
+export PULSAR_HOME=`cd $BINDIR/..;pwd`
 
 DEFAULT_BROKER_CONF=$PULSAR_HOME/conf/broker.conf
 DEFAULT_BOOKKEEPER_CONF=$PULSAR_HOME/conf/bookkeeper.conf
diff --git a/bin/pulsar-admin b/bin/pulsar-admin
index c984617864..1a1339d8ab 100755
--- a/bin/pulsar-admin
+++ b/bin/pulsar-admin
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,7 @@
 #
 
 BINDIR=$(dirname "$0")
-PULSAR_HOME=`cd $BINDIR/..;pwd`
+export PULSAR_HOME=`cd $BINDIR/..;pwd`
 
 DEFAULT_CLIENT_CONF=$PULSAR_HOME/conf/client.conf
 DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml
@@ -55,7 +55,7 @@ fi
 
 # exclude tests jar
 BUILT_JAR=`ls $PULSAR_HOME/pulsar-client-tools/target/pulsar-*.jar 2> 
/dev/null | grep -v tests | tail -1`
-if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then 
+if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then
     echo "\nCouldn't find pulsar jar.";
     echo "Make sure you've run 'mvn package'\n";
     exit 1;
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index d34f419f92..9f7339c690 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.admin;
 
 import java.util.List;
+import java.util.Set;
 
 import 
org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
@@ -232,4 +233,22 @@
      *
      */
     List<ConnectorDefinition> getConnectorsList() throws PulsarAdminException;
+
+    /**
+     * Fetches a list of supported Pulsar IO sources currently running in 
cluster mode
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     *
+     */
+    Set<String> getSources() throws PulsarAdminException;
+
+    /**
+     * Fetches a list of supported Pulsar IO sinks currently running in 
cluster mode
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     *
+     */
+    Set<String> getSinks() throws PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 5cb9284b94..a2008cf9f3 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -27,6 +27,8 @@
 import java.io.InputStream;
 import java.nio.file.StandardCopyOption;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.client.Entity;
@@ -37,6 +39,7 @@
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.Functions;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
@@ -110,7 +113,10 @@ public void createFunction(FunctionDetails 
functionDetails, String fileName) thr
         try {
             final FormDataMultiPart mp = new FormDataMultiPart();
 
-            mp.bodyPart(new FileDataBodyPart("data", new File(fileName), 
MediaType.APPLICATION_OCTET_STREAM_TYPE));
+            if (fileName != null && !fileName.startsWith("builtin://")) {
+                // If the function code is built in, we don't need to submit 
here
+                mp.bodyPart(new FileDataBodyPart("data", new File(fileName), 
MediaType.APPLICATION_OCTET_STREAM_TYPE));
+            }
 
             mp.bodyPart(new FormDataBodyPart("functionDetails",
                 printJson(functionDetails),
@@ -153,9 +159,12 @@ public void deleteFunction(String cluster, String 
namespace, String function) th
     public void updateFunction(FunctionDetails functionDetails, String 
fileName) throws PulsarAdminException {
         try {
             final FormDataMultiPart mp = new FormDataMultiPart();
-            if (fileName != null) {
+
+            if (fileName != null && !fileName.startsWith("builtin://")) {
+                // If the function code is built in, we don't need to submit 
here
                 mp.bodyPart(new FileDataBodyPart("data", new File(fileName), 
MediaType.APPLICATION_OCTET_STREAM_TYPE));
             }
+
             mp.bodyPart(new FormDataBodyPart("functionDetails",
                 printJson(functionDetails),
                 MediaType.APPLICATION_JSON_TYPE));
@@ -251,6 +260,18 @@ public void downloadFunction(String destinationPath, 
String path) throws PulsarA
         }
     }
 
+    @Override
+    public Set<String> getSources() throws PulsarAdminException {
+        return getConnectorsList().stream().filter(c -> 
!StringUtils.isEmpty(c.getSourceClass()))
+                .map(ConnectorDefinition::getName).collect(Collectors.toSet());
+    }
+
+    @Override
+    public Set<String> getSinks() throws PulsarAdminException {
+        return getConnectorsList().stream().filter(c -> 
!StringUtils.isEmpty(c.getSinkClass()))
+                .map(ConnectorDefinition::getName).collect(Collectors.toSet());
+    }
+
     public static void mergeJson(String json, Builder builder) throws 
IOException {
         JsonFormat.parser().merge(json, builder);
     }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 2c91588f05..c27b40aa8d 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -24,12 +24,26 @@
 import com.beust.jcommander.converters.StringConverter;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.text.WordUtils;
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.nar.NarClassLoader;
@@ -44,6 +58,7 @@
 import org.apache.pulsar.functions.utils.SinkConfig;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+import org.apache.pulsar.functions.utils.io.Connectors;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 
 import java.io.File;
@@ -139,6 +154,25 @@ void runCmd() throws Exception {
                             
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
                     sinkConfig.getArchive(), admin);
         }
+
+        @Override
+        protected String validateSinkType(String sinkType) throws IOException {
+            // Validate the connector sink type from the locally available 
connectors
+            String pulsarHome = System.getenv("PULSAR_HOME");
+            if (pulsarHome == null) {
+                pulsarHome = Paths.get("").toAbsolutePath().toString();
+            }
+            String connectorsDir = Paths.get(pulsarHome, 
"connectors").toString();
+            Connectors connectors = 
ConnectorUtils.searchForConnectors(connectorsDir);
+
+            if (!connectors.getSinks().containsKey(sinkType)) {
+                throw new ParameterException("Invalid sink type '" + sinkType 
+ "' -- Available sinks are: "
+                        + connectors.getSinks().keySet());
+            }
+
+            // Sink type is a valid built-in connector type. For local-run 
we'll fill it up with its own archive path
+            return connectors.getSinks().get(sinkType).toString();
+        }
     }
 
     @Parameters(commandDescription = "Submit a Pulsar IO sink connector to run 
in a Pulsar cluster")
@@ -174,6 +208,10 @@ void runCmd() throws Exception {
         protected String namespace;
         @Parameter(names = "--name", description = "The sink's name")
         protected String name;
+
+        @Parameter(names = { "-t", "--sink-type" }, description = "The sinks's 
connector provider")
+        protected String sinkType;
+
         @Parameter(names = "--inputs", description = "The sink's input topic 
or topics (multiple topics can be specified as a comma-separated list)")
         protected String inputs;
         @Parameter(names = "--topicsPattern", description = "TopicsPattern to 
consume from list of topics under a namespace that match the pattern. [--input] 
and [--topicsPattern] are mutually exclusive. Add SerDe class name for a 
pattern in --customSerdeInputs  (supported for java fun only)")
@@ -246,10 +284,18 @@ void processArguments() throws Exception {
                 sinkConfig.setParallelism(parallelism);
             }
 
+            if (archive != null && sinkType != null) {
+                throw new ParameterException("Cannot specify both archive and 
sink-type");
+            }
+
             if (null != archive) {
                 sinkConfig.setArchive(archive);
             }
 
+            if (sinkType != null) {
+                sinkConfig.setArchive(validateSinkType(sinkType));
+            }
+
             org.apache.pulsar.functions.utils.Resources resources = 
sinkConfig.getResources();
             if (resources == null) {
                 resources = new org.apache.pulsar.functions.utils.Resources();
@@ -307,6 +353,7 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) {
                 throw new ParameterException("Sink archive not specfied");
             }
 
+            boolean isConnectorBuiltin = 
sinkConfig.getArchive().startsWith(Utils.BUILTIN);
             boolean isArchivePathUrl = 
Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive());
 
             String archivePath = null;
@@ -326,6 +373,9 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) {
                                 + ", due to =" + e.getMessage());
                     }
                 }
+            } else if (isConnectorBuiltin) {
+                // Ignore local checks when submitting built-in connector
+                archivePath = null;
             } else {
                 archivePath = sinkConfig.getArchive();
             }
@@ -339,6 +389,7 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) {
                 try {
                     ConnectorDefinition connector = 
ConnectorUtils.getConnectorDefinition(archivePath);
                     log.info("Connector: {}", connector);
+
                     // Validate sink class
                     ConnectorUtils.getIOSinkClass(archivePath);
                 } catch (IOException e) {
@@ -368,16 +419,23 @@ protected FunctionDetails createSinkConfig(SinkConfig 
sinkConfig) throws IOExcep
             // check if configs are valid
             validateSinkConfigs(sinkConfig);
 
-            String sinkClassName = 
ConnectorUtils.getIOSinkClass(sinkConfig.getArchive());
+            String sinkClassName = null;
+            String typeArg = null;
+
+            FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
+
+            boolean isBuiltin = 
sinkConfig.getArchive().startsWith(Utils.BUILTIN);
 
-            String typeArg;
-            try (NarClassLoader ncl = NarClassLoader.getFromArchive(new 
File(sinkConfig.getArchive()),
-                    Collections.emptySet())) {
-                typeArg = sinkConfig.getArchive().startsWith(Utils.FILE) ? null
-                        : Utils.getSinkType(sinkClassName, ncl).getName();
+            if (!isBuiltin) {
+                sinkClassName = 
ConnectorUtils.getIOSinkClass(sinkConfig.getArchive());
+
+                try (NarClassLoader ncl = NarClassLoader.getFromArchive(new 
File(sinkConfig.getArchive()),
+                        Collections.emptySet())) {
+                    typeArg = sinkConfig.getArchive().startsWith(Utils.FILE) ? 
null
+                            : Utils.getSinkType(sinkClassName, ncl).getName();
+                }
             }
 
-            FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
             if (sinkConfig.getTenant() != null) {
                 functionDetailsBuilder.setTenant(sinkConfig.getTenant());
             }
@@ -414,7 +472,15 @@ protected FunctionDetails createSinkConfig(SinkConfig 
sinkConfig) throws IOExcep
 
             // set up sink spec
             SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
-            sinkSpecBuilder.setClassName(sinkClassName);
+            if (sinkClassName != null) {
+                sinkSpecBuilder.setClassName(sinkClassName);
+            }
+
+            if (isBuiltin) {
+                String builtin = 
sinkConfig.getArchive().replaceFirst("^builtin://", "");
+                sinkSpecBuilder.setBuiltin(builtin);
+            }
+
             if (sinkConfig.getConfigs() != null) {
                 sinkSpecBuilder.setConfigs(new 
Gson().toJson(sinkConfig.getConfigs()));
             }
@@ -438,6 +504,23 @@ protected FunctionDetails createSinkConfig(SinkConfig 
sinkConfig) throws IOExcep
             }
             return functionDetailsBuilder.build();
         }
+
+        protected String validateSinkType(String sinkType) throws IOException {
+            Set<String> availableSinks;
+            try {
+                availableSinks = admin.functions().getSinks();
+            } catch (PulsarAdminException e) {
+                throw new IOException(e);
+            }
+
+            if (!availableSinks.contains(sinkType)) {
+                throw new ParameterException(
+                        "Invalid sink type '" + sinkType + "' -- Available 
sinks are: " + availableSinks);
+            }
+
+            // Source type is a valid built-in connector type
+            return "builtin://" + sinkType;
+        }
     }
 
     @Parameters(commandDescription = "Stops a Pulsar IO sink connector")
@@ -475,7 +558,7 @@ void runCmd() throws Exception {
     }
 
     @Parameters(commandDescription = "Get the list of Pulsar IO connector 
sinks supported by Pulsar cluster")
-    public class ListSinks extends SinkCommand {
+    public class ListSinks extends BaseCommand {
         @Override
         void runCmd() throws Exception {
             admin.functions().getConnectorsList().stream().filter(x -> 
!StringUtils.isEmpty(x.getSinkClass()))
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 53609d0c0b..7c147e5f1d 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -24,12 +24,23 @@
 import com.beust.jcommander.converters.StringConverter;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.text.WordUtils;
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.nar.NarClassLoader;
@@ -43,6 +54,7 @@
 import org.apache.pulsar.functions.utils.SourceConfig;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+import org.apache.pulsar.functions.utils.io.Connectors;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 
 import java.io.File;
@@ -135,6 +147,25 @@ void runCmd() throws Exception {
                             
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
                     sourceConfig.getArchive(), admin);
         }
+
+        @Override
+        protected String validateSourceType(String sourceType) throws 
IOException {
+            // Validate the connector source type from the locally available 
connectors
+            String pulsarHome = System.getenv("PULSAR_HOME");
+            if (pulsarHome == null) {
+                pulsarHome = Paths.get("").toAbsolutePath().toString();
+            }
+            String connectorsDir = Paths.get(pulsarHome, 
"connectors").toString();
+            Connectors connectors = 
ConnectorUtils.searchForConnectors(connectorsDir);
+
+            if (!connectors.getSources().containsKey(sourceType)) {
+                throw new ParameterException("Invalid source type '" + 
sourceType + "' -- Available sources are: "
+                        + connectors.getSources().keySet());
+            }
+
+            // Source type is a valid built-in connector type. For local-run 
we'll fill it up with its own archive path
+            return connectors.getSources().get(sourceType).toString();
+        }
     }
 
     @Parameters(commandDescription = "Submit a Pulsar IO source connector to 
run in a Pulsar cluster")
@@ -170,10 +201,12 @@ void runCmd() throws Exception {
         protected String namespace;
         @Parameter(names = "--name", description = "The source's name")
         protected String name;
+
+        @Parameter(names = { "-t", "--source-type" }, description = "The 
source's connector provider")
+        protected String sourceType;
+
         @Parameter(names = "--processingGuarantees", description = "The 
processing guarantees (aka delivery semantics) applied to the Source")
         protected FunctionConfig.ProcessingGuarantees processingGuarantees;
-        @Parameter(names = "--className", description = "The source's class 
name")
-        protected String className;
         @Parameter(names = "--destinationTopicName", description = "The Pulsar 
topic to which data is sent")
         protected String destinationTopicName;
         @Parameter(names = "--deserializationClassName", description = "The 
SerDe classname for the source")
@@ -228,10 +261,18 @@ void processArguments() throws Exception {
                 sourceConfig.setParallelism(parallelism);
             }
 
+            if (archive != null && sourceType != null) {
+                throw new ParameterException("Cannot specify both archive and 
source-type");
+            }
+
             if (archive != null) {
                 sourceConfig.setArchive(archive);
             }
 
+            if (sourceType != null) {
+                sourceConfig.setArchive(validateSourceType(sourceType));
+            }
+
             org.apache.pulsar.functions.utils.Resources resources = 
sourceConfig.getResources();
             if (resources == null) {
                 resources = new org.apache.pulsar.functions.utils.Resources();
@@ -276,6 +317,7 @@ protected void validateSourceConfigs(SourceConfig 
sourceConfig) {
                 throw new ParameterException("Source archive not specfied");
             }
 
+            boolean isConnectorBuiltin = 
sourceConfig.getArchive().startsWith(Utils.BUILTIN);
             boolean isArchivePathUrl = 
Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive());
 
             String archivePath = null;
@@ -295,6 +337,9 @@ protected void validateSourceConfigs(SourceConfig 
sourceConfig) {
                                 + ", due to =" + e.getMessage());
                     }
                 }
+            } else if (isConnectorBuiltin) {
+                // Ignore local checks when submitting built-in connector
+                archivePath = null;
             } else {
                 archivePath = sourceConfig.getArchive();
             }
@@ -338,16 +383,23 @@ protected FunctionDetails createSourceConfig(SourceConfig 
sourceConfig) throws I
             // check if source configs are valid
             validateSourceConfigs(sourceConfig);
 
-            String sourceClassName = 
ConnectorUtils.getIOSourceClass(sourceConfig.getArchive());
+            String sourceClassName = null;
+            String typeArg = null;
+
+            FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
+
+            boolean isBuiltin = 
sourceConfig.getArchive().startsWith(Utils.BUILTIN);
+
+            if (!isBuiltin) {
+                sourceClassName = 
ConnectorUtils.getIOSourceClass(sourceConfig.getArchive());
 
-            String typeArg;
-            try (NarClassLoader ncl = NarClassLoader.getFromArchive(new 
File(sourceConfig.getArchive()),
-                    Collections.emptySet())) {
-                typeArg = sourceConfig.getArchive().startsWith(Utils.FILE) ? 
null
-                        : getSourceType(sourceClassName, ncl).getName();
+                try (NarClassLoader ncl = NarClassLoader.getFromArchive(new 
File(sourceConfig.getArchive()),
+                        Collections.emptySet())) {
+                    typeArg = sourceConfig.getArchive().startsWith(Utils.FILE) 
? null
+                            : getSourceType(sourceClassName, ncl).getName();
+                }
             }
 
-            FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
             if (sourceConfig.getTenant() != null) {
                 functionDetailsBuilder.setTenant(sourceConfig.getTenant());
             }
@@ -368,7 +420,14 @@ protected FunctionDetails createSourceConfig(SourceConfig 
sourceConfig) throws I
 
             // set source spec
             SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-            sourceSpecBuilder.setClassName(sourceClassName);
+            if (sourceClassName != null) {
+                sourceSpecBuilder.setClassName(sourceClassName);
+            }
+
+            if (isBuiltin) {
+                String builtin = 
sourceConfig.getArchive().replaceFirst("^builtin://", "");
+                sourceSpecBuilder.setBuiltin(builtin);
+            }
 
             if (sourceConfig.getConfigs() != null) {
                 sourceSpecBuilder.setConfigs(new 
Gson().toJson(sourceConfig.getConfigs()));
@@ -409,6 +468,23 @@ protected FunctionDetails createSourceConfig(SourceConfig 
sourceConfig) throws I
 
             return functionDetailsBuilder.build();
         }
+
+        protected String validateSourceType(String sourceType) throws 
IOException {
+            Set<String> availableSources;
+            try {
+                availableSources = admin.functions().getSources();
+            } catch (PulsarAdminException e) {
+                throw new IOException(e);
+            }
+
+            if (!availableSources.contains(sourceType)) {
+                throw new ParameterException(
+                        "Invalid source type '" + sourceType + "' -- Available 
sources are: " + availableSources);
+            }
+
+            // Source type is a valid built-in connector type
+            return "builtin://" + sourceType;
+        }
     }
 
     @Parameters(commandDescription = "Stops a Pulsar IO source connector")
@@ -446,7 +522,7 @@ void runCmd() throws Exception {
     }
 
     @Parameters(commandDescription = "Get the list of Pulsar IO connector 
sources supported by Pulsar cluster")
-    public class ListSources extends SourceCommand {
+    public class ListSources extends BaseCommand {
         @Override
         void runCmd() throws Exception {
             admin.functions().getConnectorsList().stream().filter(x -> 
!StringUtils.isEmpty(x.getSourceClass()))
@@ -457,5 +533,4 @@ void runCmd() throws Exception {
                     });
         }
     }
-
 }
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index 0f8b649d65..01b3660b6e 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -71,6 +71,10 @@ message SourceSpec {
     map<string,string> topicsToSerDeClassName = 4;
     uint64 timeoutMs = 6;
     string topicsPattern = 7;
+
+    /* If specified, this will refer to an archive that is
+     * already present in the server */
+    string builtin = 8;
 }
 
 message SinkSpec {
@@ -82,6 +86,10 @@ message SinkSpec {
     // configs used only when functions output to sink
     string topic = 3;
     string serDeClassName = 4;
+
+    /* If specified, this will refer to an archive that is
+     * already present in the server */
+    string builtin = 6;
 }
 
 message PackageLocationMetaData {
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 8b5e9fbeff..0c25be29ca 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -55,6 +55,7 @@
 
     public static String HTTP = "http";
     public static String FILE = "file";
+    public static String BUILTIN = "builtin";
 
     public static final long getSequenceId(MessageId messageId) {
         MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof 
TopicMessageIdImpl)
@@ -222,7 +223,7 @@ public static boolean fileExists(String file) {
     }
 
     public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) 
{
-        return isNotBlank(functionPkgUrl)
-                && (functionPkgUrl.startsWith(Utils.HTTP) || 
functionPkgUrl.startsWith(Utils.FILE));
+        return isNotBlank(functionPkgUrl) && 
(functionPkgUrl.startsWith(Utils.HTTP)
+                || functionPkgUrl.startsWith(Utils.FILE));
     }
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index 7ff6b974de..297a2f27d0 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -643,6 +643,11 @@ public void validateField(String name, Object o) {
         @Override
         public void validateField(String name, Object o) {
             SourceConfig sourceConfig = (SourceConfig) o;
+            if (sourceConfig.getArchive().startsWith(Utils.BUILTIN)) {
+                // We don't have to check the archive, since it's provided on 
the worker itself
+                return;
+            }
+
             String sourceClassName;
             try {
                 sourceClassName = 
ConnectorUtils.getIOSourceClass(sourceConfig.getArchive());
@@ -714,6 +719,11 @@ public void validateField(String name, Object o) {
         @Override
         public void validateField(String name, Object o) {
             SinkConfig sinkConfig = (SinkConfig) o;
+            if (sinkConfig.getArchive().startsWith(Utils.BUILTIN)) {
+                // We don't have to check the archive, since it's provided on 
the worker itself
+                return;
+            }
+
             // if function-pkg url is present eg: file://xyz.jar then 
admin-tool might not have access of the file at
             // the same location so, need to rely on server side validation.
             if (Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive())) {
@@ -805,7 +815,7 @@ public void validateField(String name, Object o) {
 
             if(!Utils.isFunctionPackageUrlSupported(path)) {
                 // check file existence if path is not url and local path
-                if (!fileExists(path)) {
+                if (!path.startsWith(Utils.BUILTIN) && !fileExists(path)) {
                     throw new IllegalArgumentException
                             (String.format("File %s specified in field '%s' 
does not exist", path, name));
                 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index ef519f4dce..4741c404d6 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -20,8 +20,13 @@
 
 import static org.apache.pulsar.functions.utils.Utils.FILE;
 import static org.apache.pulsar.functions.utils.Utils.HTTP;
+import static org.apache.pulsar.functions.utils.Utils.getSourceType;
 import static 
org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.MoreFiles;
+import com.google.common.io.RecursiveDeleteOption;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
@@ -30,29 +35,32 @@
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.io.MoreFiles;
-import com.google.common.io.RecursiveDeleteOption;
-
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 
 @Data
 @Setter
@@ -68,15 +76,18 @@
     private LinkedBlockingQueue<FunctionAction> actionQueue;
     private volatile boolean running;
     private Thread actioner;
+    private final ConnectorsManager connectorsManager;
 
     public FunctionActioner(WorkerConfig workerConfig,
                             RuntimeFactory runtimeFactory,
                             Namespace dlogNamespace,
-                            LinkedBlockingQueue<FunctionAction> actionQueue) {
+                            LinkedBlockingQueue<FunctionAction> actionQueue,
+                            ConnectorsManager connectorsManager) {
         this.workerConfig = workerConfig;
         this.runtimeFactory = runtimeFactory;
         this.dlogNamespace = dlogNamespace;
         this.actionQueue = actionQueue;
+        this.connectorsManager = connectorsManager;
         actioner = new Thread(() -> {
             log.info("Starting Actioner Thread...");
             while(running) {
@@ -118,30 +129,33 @@ public void join() throws InterruptedException {
     protected void startFunction(FunctionRuntimeInfo functionRuntimeInfo) 
throws Exception {
         FunctionMetaData functionMetaData = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
         int instanceId = 
functionRuntimeInfo.getFunctionInstance().getInstanceId();
-        log.info("Starting function {} - {} ...",
-                functionMetaData.getFunctionDetails().getName(), instanceId);
+
+        FunctionDetails.Builder functionDetails = 
FunctionDetails.newBuilder(functionMetaData.getFunctionDetails());
+        log.info("Starting function {} - {} ...", functionDetails.getName(), 
instanceId);
         File pkgFile = null;
-        
+
         String pkgLocation = 
functionMetaData.getPackageLocation().getPackagePath();
         boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation);
-        
-        if(isPkgUrlProvided && pkgLocation.startsWith(FILE)) {
+
+        if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) {
             URL url = new URL(pkgLocation);
             pkgFile = new File(url.toURI());
+        } else if (isFunctionCodeBuiltin(functionDetails)) {
+            pkgFile = getBuiltinArchive(functionDetails);
         } else {
             File pkgDir = new File(
                     workerConfig.getDownloadDirectory(),
                     getDownloadPackagePath(functionMetaData, instanceId));
             pkgDir.mkdirs();
-            
+
             pkgFile = new File(
                     pkgDir,
                     new 
File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails())).getName());
             downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, 
instanceId);
         }
-        
+
         InstanceConfig instanceConfig = new InstanceConfig();
-        
instanceConfig.setFunctionDetails(functionMetaData.getFunctionDetails());
+        instanceConfig.setFunctionDetails(functionDetails.build());
         // TODO: set correct function id and version when features implemented
         instanceConfig.setFunctionId(UUID.randomUUID().toString());
         instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
@@ -156,9 +170,9 @@ protected void startFunction(FunctionRuntimeInfo 
functionRuntimeInfo) throws Exc
     }
 
     private void downloadFile(File pkgFile, boolean isPkgUrlProvided, 
FunctionMetaData functionMetaData, int instanceId) throws 
FileNotFoundException, IOException {
-        
+
         File pkgDir = pkgFile.getParentFile();
-        
+
         if (pkgFile.exists()) {
             log.warn("Function package exists already {} deleting it",
                     pkgFile);
@@ -178,7 +192,7 @@ private void downloadFile(File pkgFile, boolean 
isPkgUrlProvided, FunctionMetaDa
         boolean downloadFromHttp = isPkgUrlProvided && 
pkgLocationPath.startsWith(HTTP);
         log.info("Function package file {} will be downloaded from {}", 
tempPkgFile,
                 downloadFromHttp ? pkgLocationPath : 
functionMetaData.getPackageLocation());
-        
+
         if(downloadFromHttp) {
             Utils.downloadFromHttpUrl(pkgLocationPath, new 
FileOutputStream(tempPkgFile));
         } else {
@@ -187,7 +201,7 @@ private void downloadFile(File pkgFile, boolean 
isPkgUrlProvided, FunctionMetaDa
                     new FileOutputStream(tempPkgFile),
                     pkgLocationPath);
         }
-        
+
         try {
             // create a hardlink, if there are two concurrent createLink 
operations, one will fail.
             // this ensures one instance will successfully download the 
package.
@@ -243,4 +257,70 @@ private String getDownloadPackagePath(FunctionMetaData 
functionMetaData, int ins
                 },
                 File.separatorChar);
     }
+
+    public static boolean isFunctionCodeBuiltin(FunctionDetailsOrBuilder 
functionDetails) {
+        if (functionDetails.hasSource()) {
+            SourceSpec sourceSpec = functionDetails.getSource();
+            if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+                return true;
+            }
+        }
+
+        if (functionDetails.hasSink()) {
+            SinkSpec sinkSpec = functionDetails.getSink();
+            if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private File getBuiltinArchive(FunctionDetails.Builder functionDetails) 
throws IOException {
+        if (functionDetails.hasSource()) {
+            SourceSpec sourceSpec = functionDetails.getSource();
+            if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+                File archive = 
connectorsManager.getSourceArchive(sourceSpec.getBuiltin()).toFile();
+                String sourceClass = 
ConnectorUtils.getConnectorDefinition(archive.toString()).getSourceClass();
+                SourceSpec.Builder builder = 
SourceSpec.newBuilder(functionDetails.getSource());
+                builder.setClassName(sourceClass);
+                functionDetails.setSource(builder);
+
+                fillSourceSinkTypeClass(functionDetails, archive, sourceClass);
+                return archive;
+            }
+        }
+
+        if (functionDetails.hasSink()) {
+            SinkSpec sinkSpec = functionDetails.getSink();
+            if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
+                File archive = 
connectorsManager.getSinkArchive(sinkSpec.getBuiltin()).toFile();
+                String sinkClass = 
ConnectorUtils.getConnectorDefinition(archive.toString()).getSinkClass();
+                SinkSpec.Builder builder = 
SinkSpec.newBuilder(functionDetails.getSink());
+                builder.setClassName(sinkClass);
+                functionDetails.setSink(builder);
+
+                fillSourceSinkTypeClass(functionDetails, archive, sinkClass);
+                return archive;
+            }
+        }
+
+        throw new IOException("Could not find built in archive definition");
+    }
+
+    private void fillSourceSinkTypeClass(FunctionDetails.Builder 
functionDetails, File archive, String className)
+            throws IOException {
+        try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, 
Collections.emptySet())) {
+            String typeArg = getSourceType(className, ncl).getName();
+
+            SourceSpec.Builder sourceBuilder = 
SourceSpec.newBuilder(functionDetails.getSource());
+            sourceBuilder.setTypeClassName(typeArg);
+            functionDetails.setSource(sourceBuilder);
+
+            SinkSpec.Builder sinkBuilder = 
SinkSpec.newBuilder(functionDetails.getSink());
+            sinkBuilder.setTypeClassName(typeArg);
+            functionDetails.setSink(sinkBuilder);
+        }
+    }
+
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 5c6184f96e..08de636d2a 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -78,13 +78,16 @@
     private RuntimeFactory runtimeFactory;
 
     private MembershipManager membershipManager;
+    private final ConnectorsManager connectorsManager;
 
 
     public FunctionRuntimeManager(WorkerConfig workerConfig,
                                   PulsarClient pulsarClient,
                                   Namespace dlogNamespace,
-                                  MembershipManager membershipManager) throws 
Exception {
+                                  MembershipManager membershipManager,
+                                  ConnectorsManager connectorsManager) throws 
Exception {
         this.workerConfig = workerConfig;
+        this.connectorsManager = connectorsManager;
 
         Reader<byte[]> reader = pulsarClient.newReader()
                 .topic(this.workerConfig.getFunctionAssignmentTopic())
@@ -99,7 +102,7 @@ public FunctionRuntimeManager(WorkerConfig workerConfig,
                 .tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath())
                 
.useTls(workerConfig.isUseTls()).tlsAllowInsecureConnection(workerConfig.isTlsAllowInsecureConnection())
                 
.tlsHostnameVerificationEnable(workerConfig.isTlsHostnameVerificationEnable()).build();
-        
+
         if (workerConfig.getThreadContainerFactory() != null) {
             this.runtimeFactory = new ThreadRuntimeFactory(
                     
workerConfig.getThreadContainerFactory().getThreadGroupName(),
@@ -121,7 +124,7 @@ public FunctionRuntimeManager(WorkerConfig workerConfig,
         this.actionQueue = new LinkedBlockingQueue<>();
 
         this.functionActioner = new FunctionActioner(this.workerConfig, 
runtimeFactory,
-                dlogNamespace, actionQueue);
+                dlogNamespace, actionQueue, connectorsManager);
 
         this.membershipManager = membershipManager;
     }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 0237700f29..439333190f 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -102,12 +102,14 @@ public void start(URI dlogUri) throws 
InterruptedException {
             this.functionMetaDataManager = new FunctionMetaDataManager(
                     this.workerConfig, this.schedulerManager, this.client);
 
+            this.connectorsManager = new ConnectorsManager(workerConfig);
+
             //create membership manager
             this.membershipManager = new MembershipManager(this.workerConfig, 
this.client);
 
             // create function runtime manager
             this.functionRuntimeManager = new FunctionRuntimeManager(
-                    this.workerConfig, this.client, this.dlogNamespace, 
this.membershipManager);
+                    this.workerConfig, this.client, this.dlogNamespace, 
this.membershipManager, connectorsManager);
 
             // Setting references to managers in scheduler
             
this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
@@ -136,8 +138,6 @@ public void start(URI dlogUri) throws InterruptedException {
             // indicate function worker service is done intializing
             this.isInitialized = true;
 
-            this.connectorsManager = new ConnectorsManager(workerConfig);
-
         } catch (Throwable t) {
             log.error("Error Starting up in worker", t);
             throw new RuntimeException(t);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index a9de4d7ba9..4ff66c6dbe 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -25,6 +25,8 @@
 import static 
org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported;
 import static 
org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders.create;
 
+import com.google.gson.Gson;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -51,6 +53,8 @@
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Message;
@@ -64,6 +68,8 @@
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders;
@@ -77,9 +83,6 @@
 import org.apache.pulsar.io.core.Source;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
-import com.google.gson.Gson;
-
-import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
 
 @Slf4j
@@ -148,12 +151,17 @@ public Response registerFunction(final String tenant, 
final String namespace, fi
         FunctionMetaData.Builder functionMetaDataBuilder = 
FunctionMetaData.newBuilder()
                 
.setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0);
 
-        PackageLocationMetaData.Builder packageLocationMetaDataBuilder = 
PackageLocationMetaData.newBuilder()
-                .setPackagePath(isPkgUrlProvided ? functionPkgUrl
-                        : createPackagePath(tenant, namespace, functionName, 
fileDetail.getFileName()));
-        
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+        PackageLocationMetaData.Builder packageLocationMetaDataBuilder = 
PackageLocationMetaData.newBuilder();
+        boolean isBuiltin = isFunctionCodeBuiltin(functionDetails);
+        if (isBuiltin) {
+            packageLocationMetaDataBuilder.setPackagePath("builtin://" + 
getFunctionCodeBuiltin(functionDetails));
+        } else {
+            packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? 
functionPkgUrl
+                    : createPackagePath(tenant, namespace, functionName, 
fileDetail.getFileName()));
+        }
 
-        return isPkgUrlProvided ? 
updateRequest(functionMetaDataBuilder.build())
+        
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+        return (isPkgUrlProvided || isBuiltin) ? 
updateRequest(functionMetaDataBuilder.build())
                 : updateRequest(functionMetaDataBuilder.build(), 
uploadedInputStream);
     }
 
@@ -193,12 +201,18 @@ public Response updateFunction(final String tenant, final 
String namespace, fina
         FunctionMetaData.Builder functionMetaDataBuilder = 
FunctionMetaData.newBuilder()
                 
.setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0);
 
-        PackageLocationMetaData.Builder packageLocationMetaDataBuilder = 
PackageLocationMetaData.newBuilder()
-                .setPackagePath(isPkgUrlProvided ? functionPkgUrl
-                        : createPackagePath(tenant, namespace, functionName, 
fileDetail.getFileName()));
-        
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+        PackageLocationMetaData.Builder packageLocationMetaDataBuilder = 
PackageLocationMetaData.newBuilder();
+
+        boolean isBuiltin = isFunctionCodeBuiltin(functionDetails);
+        if (isBuiltin) {
+            packageLocationMetaDataBuilder.setPackagePath("builtin://" + 
getFunctionCodeBuiltin(functionDetails));
+        } else {
+            packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? 
functionPkgUrl
+                    : createPackagePath(tenant, namespace, functionName, 
fileDetail.getFileName()));
+        }
 
-        return isPkgUrlProvided ? 
updateRequest(functionMetaDataBuilder.build())
+        
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+        return (isPkgUrlProvided || isBuiltin) ? 
updateRequest(functionMetaDataBuilder.build())
                 : updateRequest(functionMetaDataBuilder.build(), 
uploadedInputStream);
     }
 
@@ -659,12 +673,53 @@ private FunctionDetails 
validateUpdateRequestParamsWithPkgUrl(String tenant, Str
     private FunctionDetails validateUpdateRequestParams(String tenant, String 
namespace, String functionName,
             InputStream uploadedInputStream, FormDataContentDisposition 
fileDetail, String functionDetailsJson)
             throws IllegalArgumentException {
-        if (uploadedInputStream == null || fileDetail == null) {
+
+        FunctionDetails functionDetails = validateUpdateRequestParams(tenant, 
namespace, functionName,
+                functionDetailsJson, null);
+        if (!isFunctionCodeBuiltin(functionDetails) && (uploadedInputStream == 
null || fileDetail == null)) {
             throw new IllegalArgumentException("Function Package is not 
provided");
         }
-        return validateUpdateRequestParams(tenant, namespace, functionName, 
functionDetailsJson, null);
+
+        return functionDetails;
+    }
+
+    private boolean isFunctionCodeBuiltin(FunctionDetails functionDetails) {
+        if (functionDetails.hasSource()) {
+            SourceSpec sourceSpec = functionDetails.getSource();
+            if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+                return true;
+            }
+        }
+
+        if (functionDetails.hasSink()) {
+            SinkSpec sinkSpec = functionDetails.getSink();
+            if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
+                return true;
+            }
+        }
+
+        return false;
     }
 
+    private String getFunctionCodeBuiltin(FunctionDetails functionDetails) {
+        if (functionDetails.hasSource()) {
+            SourceSpec sourceSpec = functionDetails.getSource();
+            if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+                return sourceSpec.getBuiltin();
+            }
+        }
+
+        if (functionDetails.hasSink()) {
+            SinkSpec sinkSpec = functionDetails.getSink();
+            if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
+                return sinkSpec.getBuiltin();
+            }
+        }
+
+        return null;
+    }
+
+
     private FunctionDetails validateUpdateRequestParams(String tenant, String 
namespace, String functionName,
             String functionDetailsJson, File jarWithFileUrl) throws 
IllegalArgumentException {
         if (tenant == null) {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index a1bcd4a424..575447792c 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -47,7 +47,7 @@
 
     /**
      * Validates FunctionActioner tries to download file from bk.
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -68,7 +68,8 @@ public void testStartFunctionWithDLNamespace() throws 
Exception {
         LinkedBlockingQueue<FunctionAction> queue = new 
LinkedBlockingQueue<>();
 
         @SuppressWarnings("resource")
-        FunctionActioner actioner = new FunctionActioner(workerConfig, 
factory, dlogNamespace, queue);
+        FunctionActioner actioner = new FunctionActioner(workerConfig, 
factory, dlogNamespace, queue,
+                new ConnectorsManager(workerConfig));
         Runtime runtime = mock(Runtime.class);
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
@@ -110,7 +111,8 @@ public void testStartFunctionWithPkgUrl() throws Exception {
         LinkedBlockingQueue<FunctionAction> queue = new 
LinkedBlockingQueue<>();
 
         @SuppressWarnings("resource")
-        FunctionActioner actioner = new FunctionActioner(workerConfig, 
factory, dlogNamespace, queue);
+        FunctionActioner actioner = new FunctionActioner(workerConfig, 
factory, dlogNamespace, queue,
+                new ConnectorsManager(workerConfig));
 
         // (1) test with file url. functionActioner should be able to consider 
file-url and it should be able to call
         // RuntimeSpawner
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 690f474150..8ab7473eed 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -92,7 +92,8 @@ public void testProcessAssignmentUpdateAddFunctions() throws 
Exception {
                 workerConfig,
                 pulsarClient,
                 mock(Namespace.class),
-                mock(MembershipManager.class)
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class)
         ));
 
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
@@ -185,7 +186,8 @@ public void testProcessAssignmentUpdateDeleteFunctions() 
throws Exception {
                 workerConfig,
                 pulsarClient,
                 mock(Namespace.class),
-                mock(MembershipManager.class)
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class)
         ));
 
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
@@ -282,7 +284,8 @@ public void testProcessAssignmentUpdateModifyFunctions() 
throws Exception {
                 workerConfig,
                 pulsarClient,
                 mock(Namespace.class),
-                mock(MembershipManager.class)
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class)
         ));
 
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 6dd3fa3c3c..6bf134306e 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -135,7 +135,8 @@ public void testCheckFailuresNoFailures() throws Exception {
                 workerConfig,
                 pulsarClient,
                 mock(Namespace.class),
-                mock(MembershipManager.class)
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class)
         ));
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
         MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockPulsarClient()));
@@ -199,7 +200,8 @@ public void testCheckFailuresSomeFailures() throws 
Exception {
                 workerConfig,
                 pulsarClient,
                 mock(Namespace.class),
-                mock(MembershipManager.class)
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class)
         ));
 
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
@@ -288,7 +290,8 @@ public void testCheckFailuresSomeUnassigned() throws 
Exception {
                 workerConfig,
                 pulsarClient,
                 mock(Namespace.class),
-                mock(MembershipManager.class)
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class)
         ));
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
         MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockPulsarClient()));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to