This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 42c3bf9  [go function] support localrun and cluster mode for go 
function (#4174)
42c3bf9 is described below

commit 42c3bf94920f1d177a2403e06650500509f94aaa
Author: 冉小龙 <[email protected]>
AuthorDate: Mon May 6 09:13:18 2019 +0800

    [go function] support localrun and cluster mode for go function (#4174)
    
    ### Motivation
    
    Master Issue: #3767
    
    support  local-run and cluster mode for go function.
    
    in go function, we can use:
    
    ```
    ./bin/pulsar-admin functions localrun/create
    --go 
/Users/wolf4j/github.com/apache/pulsar/pulsar-function-go/examples/outputFunc.go
    --inputs persistent://public/default/my-topic
    --output persistent://public/default/test
    --tenant public
    --namespace default
    --name pulsarfunction
    --classname hellopulsar
    --log-topic logtopic
    ```
    
    Different from `--jar` or `--py`, `--go` uploads a complete executable 
file(including: instance file + user code file)
---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |  29 +++-
 .../pulsar/common/functions/FunctionConfig.java    |   4 +-
 pulsar-function-go/conf/conf.go                    | 105 +++++++-----
 pulsar-function-go/examples/test/consumer.go       |   2 +-
 pulsar-function-go/examples/test/producer.go       |   2 +-
 pulsar-function-go/pf/instance.go                  |   5 +-
 .../functions/instance/go/GoInstanceConfig.java    |  66 ++++++++
 pulsar-functions/runtime/pom.xml                   |  10 ++
 .../functions/runtime/KubernetesRuntime.java       |   2 +
 .../runtime/KubernetesRuntimeFactory.java          |   4 +-
 .../pulsar/functions/runtime/LocalRunner.java      |  22 ++-
 .../pulsar/functions/runtime/ProcessRuntime.java   |   4 +-
 .../functions/runtime/ProcessRuntimeFactory.java   |   4 +-
 .../pulsar/functions/runtime/RuntimeUtils.java     | 181 ++++++++++++++++++---
 .../DefaultSecretsProviderConfigurator.java        |   2 +
 .../KubernetesSecretsProviderConfigurator.java     |   2 +
 .../functions/utils/FunctionConfigUtils.java       |  40 ++++-
 .../pulsar/functions/worker/FunctionActioner.java  |   2 +
 18 files changed, 394 insertions(+), 92 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 8850619..9b72c3b 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -198,6 +198,10 @@ public class CmdFunctions extends CmdBase {
                 description = "Path to the main Python file/Python Wheel file 
for the function (if the function is written in Python)",
                 listConverter = StringConverter.class)
         protected String pyFile;
+        @Parameter(
+                names = "--go",
+                description = "Path to the main Go executable binary for the 
function (if the function is written in Go)")
+        protected String goFile;
         @Parameter(names = {"-i",
                 "--inputs"}, description = "The function's input topic or 
topics (multiple topics can be specified as a comma-separated list)")
         protected String inputs;
@@ -477,10 +481,16 @@ public class CmdFunctions extends CmdBase {
                 functionConfig.setPy(pyFile);
             }
 
+            if (null != goFile) {
+                functionConfig.setGo(goFile);
+            }
+
             if (functionConfig.getJar() != null) {
                 userCodeFile = functionConfig.getJar();
             } else if (functionConfig.getPy() != null) {
                 userCodeFile = functionConfig.getPy();
+            } else if (functionConfig.getGo() != null) {
+                userCodeFile = functionConfig.getGo();
             }
 
             // check if configs are valid
@@ -488,8 +498,11 @@ public class CmdFunctions extends CmdBase {
         }
 
         protected void validateFunctionConfigs(FunctionConfig functionConfig) {
-            if (StringUtils.isEmpty(functionConfig.getClassName())) {
-                throw new IllegalArgumentException("No Function Classname 
specified");
+            // go doesn't need className
+            if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON 
|| functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA){
+                if (StringUtils.isEmpty(functionConfig.getClassName())) {
+                    throw new IllegalArgumentException("No Function Classname 
specified");
+                }
             }
             if (StringUtils.isEmpty(functionConfig.getName())) {
                 
org.apache.pulsar.common.functions.Utils.inferMissingFunctionName(functionConfig);
@@ -501,13 +514,13 @@ public class CmdFunctions extends CmdBase {
                 
org.apache.pulsar.common.functions.Utils.inferMissingNamespace(functionConfig);
             }
 
-            if (isNotBlank(functionConfig.getJar()) && 
isNotBlank(functionConfig.getPy())) {
-                throw new ParameterException("Either a Java jar or a Python 
file needs to"
+            if (isNotBlank(functionConfig.getJar()) && 
isNotBlank(functionConfig.getPy()) && isNotBlank(functionConfig.getGo())) {
+                throw new ParameterException("Either a Java jar or a Python 
file or a Go executable binary needs to"
                         + " be specified for the function. Cannot specify 
both.");
             }
 
-            if (isBlank(functionConfig.getJar()) && 
isBlank(functionConfig.getPy())) {
-                throw new ParameterException("Either a Java jar or a Python 
file needs to"
+            if (isBlank(functionConfig.getJar()) && 
isBlank(functionConfig.getPy()) && isBlank(functionConfig.getGo())) {
+                throw new ParameterException("Either a Java jar or a Python 
file or a Go executable binary needs to"
                         + " be specified for the function. Please specify 
one.");
             }
 
@@ -519,6 +532,10 @@ public class CmdFunctions extends CmdBase {
                     !new File(functionConfig.getPy()).exists()) {
                 throw new ParameterException("The specified python file does 
not exist");
             }
+            if (!isBlank(functionConfig.getGo()) && 
!Utils.isFunctionPackageUrlSupported(functionConfig.getGo()) &&
+                    !new File(functionConfig.getGo()).exists()) {
+                throw new ParameterException("The specified go executable 
binary does not exist");
+            }
         }
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index 2b8883d..1686434 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -45,7 +45,8 @@ public class FunctionConfig {
 
     public enum Runtime {
         JAVA,
-        PYTHON
+        PYTHON,
+        GO
     }
 
     // Any flags that you want to pass to the runtime.
@@ -97,6 +98,7 @@ public class FunctionConfig {
     private Long timeoutMs;
     private String jar;
     private String py;
+    private String go;
     // Whether the subscriptions the functions created/used should be deleted 
when the functions is deleted
     private Boolean cleanupSubscription;
 }
diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
index d784855..007110c 100644
--- a/pulsar-function-go/conf/conf.go
+++ b/pulsar-function-go/conf/conf.go
@@ -20,6 +20,7 @@
 package conf
 
 import (
+       "encoding/json"
        "flag"
        "io/ioutil"
        "os"
@@ -33,60 +34,78 @@ import (
 const ConfigPath = "github.com/apache/pulsar/pulsar-function-go/conf/conf.yaml"
 
 type Conf struct {
-       PulsarServiceURL string        `yaml:"pulsarServiceURL"`
-       InstanceID       int           `yaml:"instanceID"`
-       FuncID           string        `yaml:"funcID"`
-       FuncVersion      string        `yaml:"funcVersion"`
-       MaxBufTuples     int           `yaml:"maxBufTuples"`
-       Port             int           `yaml:"port"`
-       ClusterName      string        `yaml:"clusterName"`
-       KillAfterIdleMs  time.Duration `yaml:"killAfterIdleMs"`
+       PulsarServiceURL string        `json:"pulsarServiceURL" 
yaml:"pulsarServiceURL"`
+       InstanceID       int           `json:"instanceID" yaml:"instanceID"`
+       FuncID           string        `json:"funcID" yaml:"funcID"`
+       FuncVersion      string        `json:"funcVersion" yaml:"funcVersion"`
+       MaxBufTuples     int           `json:"maxBufTuples" yaml:"maxBufTuples"`
+       Port             int           `json:"port" yaml:"port"`
+       ClusterName      string        `json:"clusterName" yaml:"clusterName"`
+       KillAfterIdleMs  time.Duration `json:"killAfterIdleMs" 
yaml:"killAfterIdleMs"`
        // function details config
-       Tenant               string `yaml:"tenant"`
-       NameSpace            string `yaml:"nameSpace"`
-       Name                 string `yaml:"name"`
-       LogTopic             string `yaml:"logTopic"`
-       ProcessingGuarantees int32  `yaml:"processingGuarantees"`
-       SecretsMap           string `yaml:"secretsMap"`
-       Runtime              int32  `yaml:"runtime"`
-       AutoACK              bool   `yaml:"autoAck"`
-       Parallelism          int32  `yaml:"parallelism"`
+       Tenant               string `json:"tenant" yaml:"tenant"`
+       NameSpace            string `json:"nameSpace" yaml:"nameSpace"`
+       Name                 string `json:"name" yaml:"name"`
+       LogTopic             string `json:"logTopic" yaml:"logTopic"`
+       ProcessingGuarantees int32  `json:"processingGuarantees" 
yaml:"processingGuarantees"`
+       SecretsMap           string `json:"secretsMap" yaml:"secretsMap"`
+       Runtime              int32  `json:"runtime" yaml:"runtime"`
+       AutoACK              bool   `json:"autoAck" yaml:"autoAck"`
+       Parallelism          int32  `json:"parallelism" yaml:"parallelism"`
        //source config
-       SubscriptionType    int32  `yaml:"subscriptionType"`
-       TimeoutMs           uint64 `yaml:"timeoutMs"`
-       SubscriptionName    string `yaml:"subscriptionName"`
-       CleanupSubscription bool   `yaml:"cleanupSubscription"`
+       SubscriptionType    int32  `json:"subscriptionType" 
yaml:"subscriptionType"`
+       TimeoutMs           uint64 `json:"timeoutMs" yaml:"timeoutMs"`
+       SubscriptionName    string `json:"subscriptionName" 
yaml:"subscriptionName"`
+       CleanupSubscription bool   `json:"cleanupSubscription"  
yaml:"cleanupSubscription"`
        //source input specs
-       SourceSpecTopic            string `yaml:"sourceSpecsTopic"`
-       SourceSchemaType           string `yaml:"sourceSchemaType"`
-       IsRegexPatternSubscription bool   `yaml:"isRegexPatternSubscription"`
-       ReceiverQueueSize          int32  `yaml:"receiverQueueSize"`
+       SourceSpecTopic            string `json:"sourceSpecsTopic" 
yaml:"sourceSpecsTopic"`
+       SourceSchemaType           string `json:"sourceSchemaType" 
yaml:"sourceSchemaType"`
+       IsRegexPatternSubscription bool   `json:"isRegexPatternSubscription" 
yaml:"isRegexPatternSubscription"`
+       ReceiverQueueSize          int32  `json:"receiverQueueSize" 
yaml:"receiverQueueSize"`
        //sink spec config
-       SinkSpecTopic  string `yaml:"sinkSpecsTopic"`
-       SinkSchemaType string `yaml:"sinkSchemaType"`
+       SinkSpecTopic  string `json:"sinkSpecsTopic" yaml:"sinkSpecsTopic"`
+       SinkSchemaType string `json:"sinkSchemaType" yaml:"sinkSchemaType"`
        //resources config
-       Cpu  float64 `yaml:"cpu"`
-       Ram  int64   `yaml:"ram"`
-       Disk int64   `yaml:"disk"`
+       Cpu  float64 `json:"cpu" yaml:"cpu"`
+       Ram  int64   `json:"ram" yaml:"ram"`
+       Disk int64   `json:"disk" yaml:"disk"`
        //retryDetails config
-       MaxMessageRetries int32  `yaml:"maxMessageRetries"`
-       DeadLetterTopic   string `yaml:"deadLetterTopic"`
+       MaxMessageRetries int32  `json:"maxMessageRetries" 
yaml:"maxMessageRetries"`
+       DeadLetterTopic   string `json:"deadLetterTopic" yaml:"deadLetterTopic"`
 }
 
-var opts string
+var (
+       help         bool
+       confFilePath string
+       confContent  string
+)
 
 func (c *Conf) GetConf() *Conf {
        flag.Parse()
 
-       yamlFile, err := ioutil.ReadFile(opts)
-       if err != nil {
-               log.Errorf("not found conf file, err:%s", err.Error())
-               return nil
+       if help {
+               flag.Usage()
+       }
+
+       if confFilePath != "" {
+               yamlFile, err := ioutil.ReadFile(confFilePath)
+               if err != nil {
+                       log.Errorf("not found conf file, err:%s", err.Error())
+                       return nil
+               }
+               err = yaml.Unmarshal(yamlFile, c)
+               if err != nil {
+                       log.Errorf("unmarshal yaml file error:%s", err.Error())
+                       return nil
+               }
        }
-       err = yaml.Unmarshal(yamlFile, c)
-       if err != nil {
-               log.Errorf("unmarshal yaml file error:%s", err.Error())
-               return nil
+
+       if confContent != "" {
+               err := json.Unmarshal([]byte(confContent), c)
+               if err != nil {
+                       log.Errorf("unmarshal config content error:%s", 
err.Error())
+                       return nil
+               }
        }
        return c
 }
@@ -105,5 +124,7 @@ func init() {
                homeDir = os.Getenv("HOME")
        }
        defaultPath := homeDir + "/" + ConfigPath
-       flag.StringVar(&opts, "instance-conf", defaultPath, "config conf.yml 
filepath")
+       flag.BoolVar(&help, "help", false, "print help cmd")
+       flag.StringVar(&confFilePath, "instance-conf-path", defaultPath, 
"config conf.yml filepath")
+       flag.StringVar(&confContent, "instance-conf", "", "the string content 
of Conf struct")
 }
diff --git a/pulsar-function-go/examples/test/consumer.go 
b/pulsar-function-go/examples/test/consumer.go
index ba5e5b5..86878a4 100644
--- a/pulsar-function-go/examples/test/consumer.go
+++ b/pulsar-function-go/examples/test/consumer.go
@@ -36,7 +36,7 @@ func main() {
        defer client.Close()
 
        consumer, err := client.Subscribe(pulsar.ConsumerOptions{
-               Topic:            "topic-02",
+               Topic:            "test",
                SubscriptionName: "my-subscription",
                Type:             pulsar.Shared,
        })
diff --git a/pulsar-function-go/examples/test/producer.go 
b/pulsar-function-go/examples/test/producer.go
index 475f83d..99c35f8 100644
--- a/pulsar-function-go/examples/test/producer.go
+++ b/pulsar-function-go/examples/test/producer.go
@@ -40,7 +40,7 @@ func main() {
        defer client.Close()
 
        producer, err := client.CreateProducer(pulsar.ProducerOptions{
-               Topic: "topic-01",
+               Topic: "my-topic",
        })
 
        defer producer.Close()
diff --git a/pulsar-function-go/pf/instance.go 
b/pulsar-function-go/pf/instance.go
index d809932..75fa43e 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -262,7 +262,7 @@ func (gi *goInstance) nackInputMessage(inputMessage 
pulsar.Message) {
 }
 
 func getIdleTimeout(timeoutMilliSecond time.Duration) time.Duration {
-       if timeoutMilliSecond < 0 {
+       if timeoutMilliSecond <= 0 {
                return time.Duration(math.MaxInt64)
        }
        return timeoutMilliSecond
@@ -284,7 +284,8 @@ func (gi *goInstance) setupLogHandler() error {
 
 func (gi *goInstance) addLogTopicHandler() {
        if gi.context.logAppender == nil {
-               panic("please init logAppender")
+               log.Error("the logAppender is nil, if you want to use it, 
please specify `--log-topic` at startup.")
+               return
        }
 
        for _, logByte := range log.StrEntry {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
new file mode 100644
index 0000000..5c95208
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.go;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Setter
+@Getter
+public class GoInstanceConfig {
+    private String pulsarServiceURL = "";
+    private int instanceID;
+    private String funcID = "";
+    private String funcVersion = "";
+    private int maxBufTuples;
+    private int port;
+    private String clusterName = "";
+    private int killAfterIdleMs;
+
+    private String tenant = "";
+    private String nameSpace = "";
+    private String name = "";
+    private String className = "";
+    private String logTopic = "";
+    private int processingGuarantees;
+    private String secretsMap = "";
+    private int runtime;
+    private boolean autoAck;
+    private int parallelism;
+
+    private int subscriptionType;
+    private long timeoutMs;
+    private String subscriptionName = "";
+    private boolean cleanupSubscription;
+
+    private String sourceSpecsTopic = "";
+    private String sourceSchemaType = "";
+    private boolean isRegexPatternSubscription;
+    private int receiverQueueSize;
+
+    private String sinkSpecsTopic = "";
+    private String sinkSchemaType = "";
+
+    private double cpu;
+    private long ram;
+    private long disk;
+
+    private int maxMessageRetries;
+    private String deadLetterTopic = "";
+}
diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml
index ca363c8..1b8d179 100644
--- a/pulsar-functions/runtime/pom.xml
+++ b/pulsar-functions/runtime/pom.xml
@@ -51,6 +51,16 @@
     </dependency>
 
     <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>io.kubernetes</groupId>
       <artifactId>client-java</artifactId>
       <version>2.0.0</version>
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index c3fe161..9c5c19c 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -190,6 +190,8 @@ public class KubernetesRuntime implements Runtime {
             case PYTHON:
                 logConfigFile = pulsarRootDir + 
"/conf/functions-logging/console_logging_config.ini";
                 break;
+            case GO:
+                throw new UnsupportedOperationException();
         }
 
         this.authConfig = authConfig;
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index 3aed50e..5736540 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -185,7 +185,7 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
     public KubernetesRuntime createContainer(InstanceConfig instanceConfig, 
String codePkgUrl,
                                              String originalCodeFileName,
                                              Long expectedHealthCheckInterval) 
throws Exception {
-        String instanceFile;
+        String instanceFile = null;
         switch (instanceConfig.getFunctionDetails().getRuntime()) {
             case JAVA:
                 instanceFile = javaInstanceJarFile;
@@ -193,6 +193,8 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
             case PYTHON:
                 instanceFile = pythonInstanceFile;
                 break;
+            case GO:
+                throw new UnsupportedOperationException();
             default:
                 throw new RuntimeException("Unsupported Runtime " + 
instanceConfig.getFunctionDetails().getRuntime());
         }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
index 37efbee..581bbbf 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
@@ -108,8 +108,12 @@ public class LocalRunner {
                     }
                     classLoader = loadJar(file);
                 }
-            } else {
+            } else if (functionConfig.getRuntime() == 
FunctionConfig.Runtime.GO) {
+                userCodeFile = functionConfig.getGo();
+            } else if (functionConfig.getRuntime() == 
FunctionConfig.Runtime.PYTHON){
                 userCodeFile = functionConfig.getPy();
+            } else {
+                throw new UnsupportedOperationException();
             }
             functionDetails = FunctionConfigUtils.convert(functionConfig, 
classLoader);
         } else if (!StringUtils.isEmpty(sourceConfigString)) {
@@ -172,14 +176,14 @@ public class LocalRunner {
         }
 
         try (ProcessRuntimeFactory containerFactory = new 
ProcessRuntimeFactory(
-            serviceUrl,
-            stateStorageServiceUrl,
-            authConfig,
-            null, /* java instance jar file */
-            null, /* python instance file */
-            null, /* log directory */
-            null, /* extra dependencies dir */
-            new DefaultSecretsProviderConfigurator(), false)) {
+                serviceUrl,
+                stateStorageServiceUrl,
+                authConfig,
+                null, /* java instance jar file */
+                null, /* python instance file */
+                null, /* log directory */
+                null, /* extra dependencies dir */
+                new DefaultSecretsProviderConfigurator(), false)) {
             List<RuntimeSpawner> spawners = new LinkedList<>();
             for (int i = 0; i < parallelism; ++i) {
                 InstanceConfig instanceConfig = new InstanceConfig();
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index c202ad9..43e74e8 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -102,12 +102,14 @@ class ProcessRuntime implements Runtime {
             case PYTHON:
                 logConfigFile = System.getenv("PULSAR_HOME") + 
"/conf/functions-logging/logging_config.ini";
                 break;
+            case GO:
+                break;
         }
         this.extraDependenciesDir = extraDependenciesDir;
         this.processArgs = RuntimeUtils.composeCmd(
             instanceConfig,
             instanceFile,
-            // DONT SET extra dependencies here (for python runtime),
+            // DONT SET extra dependencies here (for python or go runtime),
             // since process runtime is using Java ProcessBuilder,
             // we have to set the environment variable via ProcessBuilder
             FunctionDetails.Runtime.JAVA == 
instanceConfig.getFunctionDetails().getRuntime() ? extraDependenciesDir : null,
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index 6976df5..1eb3899 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -119,7 +119,7 @@ public class ProcessRuntimeFactory implements 
RuntimeFactory {
     public ProcessRuntime createContainer(InstanceConfig instanceConfig, 
String codeFile,
                                           String originalCodeFileName,
                                           Long expectedHealthCheckInterval) 
throws Exception {
-        String instanceFile;
+        String instanceFile = null;
         switch (instanceConfig.getFunctionDetails().getRuntime()) {
             case JAVA:
                 instanceFile = javaInstanceJarFile;
@@ -127,6 +127,8 @@ public class ProcessRuntimeFactory implements 
RuntimeFactory {
             case PYTHON:
                 instanceFile = pythonInstanceFile;
                 break;
+            case GO:
+                break;
             default:
                 throw new RuntimeException("Unsupported Runtime " + 
instanceConfig.getFunctionDetails().getRuntime());
         }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 8a5ab37..86e9bdf 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -19,11 +19,12 @@
 
 package org.apache.pulsar.functions.runtime;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.util.JsonFormat;
 
-import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.BufferedReader;
 import java.net.HttpURLConnection;
 import java.net.InetAddress;
 import java.net.URL;
@@ -32,8 +33,10 @@ import java.util.List;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.instance.go.GoInstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
@@ -82,7 +85,7 @@ public class RuntimeUtils {
     public static List<String> getArgsBeforeCmd(InstanceConfig instanceConfig, 
String extraDependenciesDir) {
 
         final List<String> args = new LinkedList<>();
-        if (instanceConfig.getFunctionDetails().getRuntime() ==  
Function.FunctionDetails.Runtime.JAVA) {
+        if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
             //no-op
         } else if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.PYTHON) {
             // add `extraDependenciesDir` to python package searching path
@@ -94,26 +97,160 @@ public class RuntimeUtils {
         return args;
     }
 
+    /**
+     * Different from python and java function, Go function uploads a complete 
executable file(including:
+     * instance file + user code file). Its parameter list is provided to the 
broker in the form of a yaml file,
+     * the advantage of this approach is that backward compatibility is 
guaranteed.
+     *
+     * In Java and Python the instance is managed by broker (or function 
worker) so the changes in command line
+     * is under control; but in Go the instance is compiled with the user 
function, so pulsar doesn't have the
+     * control what instance is used in the function. Hence in order to 
support BC for go function, we can't
+     * dynamically add more commandline arguments. Using an instance config to 
pass the parameters from function
+     * worker to go instance is the best way for maintaining the BC.
+     * <p>
+     * When we run the go function, we only need to specify the location of 
the go-function file and the yaml file.
+     * The content of the yaml file will be automatically generated according 
to the content provided by instanceConfig.
+     */
+
+    public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
+                                                String originalCodeFileName,
+                                                String pulsarServiceUrl) 
throws IOException {
+        final List<String> args = new LinkedList<>();
+        GoInstanceConfig goInstanceConfig = new GoInstanceConfig();
+
+        if (instanceConfig.getClusterName() != null) {
+            goInstanceConfig.setClusterName(instanceConfig.getClusterName());
+        }
+
+        if (instanceConfig.getInstanceId() != 0) {
+            goInstanceConfig.setInstanceID(instanceConfig.getInstanceId());
+        }
+
+        if (instanceConfig.getFunctionId() != null) {
+            goInstanceConfig.setFuncID(instanceConfig.getFunctionId());
+        }
+
+        if (instanceConfig.getFunctionVersion() != null) {
+            
goInstanceConfig.setFuncVersion(instanceConfig.getFunctionVersion());
+        }
+
+        if (instanceConfig.getFunctionDetails().getAutoAck()) {
+            
goInstanceConfig.setAutoAck(instanceConfig.getFunctionDetails().getAutoAck());
+        }
+
+        if (instanceConfig.getFunctionDetails().getTenant() != null) {
+            
goInstanceConfig.setTenant(instanceConfig.getFunctionDetails().getTenant());
+        }
+
+        if (instanceConfig.getFunctionDetails().getNamespace() != null) {
+            
goInstanceConfig.setNameSpace(instanceConfig.getFunctionDetails().getNamespace());
+        }
+
+        if (instanceConfig.getFunctionDetails().getName() != null) {
+            
goInstanceConfig.setName(instanceConfig.getFunctionDetails().getName());
+        }
+
+        if (instanceConfig.getFunctionDetails().getLogTopic() != null) {
+            
goInstanceConfig.setLogTopic(instanceConfig.getFunctionDetails().getLogTopic());
+        }
+        if (instanceConfig.getFunctionDetails().getProcessingGuarantees() != 
null) {
+            
goInstanceConfig.setProcessingGuarantees(instanceConfig.getFunctionDetails().getProcessingGuaranteesValue());
+        }
+        if (instanceConfig.getFunctionDetails().getSecretsMap() != null) {
+            
goInstanceConfig.setSecretsMap(instanceConfig.getFunctionDetails().getSecretsMap());
+        }
+        if (instanceConfig.getFunctionDetails().getParallelism() != 0) {
+            
goInstanceConfig.setParallelism(instanceConfig.getFunctionDetails().getParallelism());
+        }
+
+        if (instanceConfig.getMaxBufferedTuples() != 0) {
+            
goInstanceConfig.setMaxBufTuples(instanceConfig.getMaxBufferedTuples());
+        }
+
+        if (pulsarServiceUrl != null) {
+            goInstanceConfig.setPulsarServiceURL(pulsarServiceUrl);
+        }
+        if 
(instanceConfig.getFunctionDetails().getSource().getCleanupSubscription()) {
+            
goInstanceConfig.setCleanupSubscription(instanceConfig.getFunctionDetails().getSource().getCleanupSubscription());
+        }
+        if 
(instanceConfig.getFunctionDetails().getSource().getSubscriptionName() != null) 
{
+            
goInstanceConfig.setSubscriptionName(instanceConfig.getFunctionDetails().getSource().getSubscriptionName());
+        }
+
+        if (instanceConfig.getFunctionDetails().getSource().getInputSpecsMap() 
!= null) {
+            for (String inputTopic : 
instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().keySet()) {
+                goInstanceConfig.setSourceSpecsTopic(inputTopic);
+            }
+        }
+
+        if (instanceConfig.getFunctionDetails().getSource().getTimeoutMs() != 
0) {
+            
goInstanceConfig.setTimeoutMs(instanceConfig.getFunctionDetails().getSource().getTimeoutMs());
+        }
+
+        if (instanceConfig.getFunctionDetails().getSink().getTopic() != null) {
+            
goInstanceConfig.setSinkSpecsTopic(instanceConfig.getFunctionDetails().getSink().getTopic());
+        }
+
+        if (instanceConfig.getFunctionDetails().getResources().getCpu() != 0) {
+            
goInstanceConfig.setCpu(instanceConfig.getFunctionDetails().getResources().getCpu());
+        }
+
+        if (instanceConfig.getFunctionDetails().getResources().getRam() != 0) {
+            
goInstanceConfig.setRam(instanceConfig.getFunctionDetails().getResources().getRam());
+        }
+
+        if (instanceConfig.getFunctionDetails().getResources().getDisk() != 0) 
{
+            
goInstanceConfig.setDisk(instanceConfig.getFunctionDetails().getResources().getDisk());
+        }
+
+        if 
(instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic() != 
null) {
+            
goInstanceConfig.setDeadLetterTopic(instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
+        }
+
+        if 
(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries() 
!= 0) {
+            
goInstanceConfig.setMaxMessageRetries(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
+        }
+
+        goInstanceConfig.setKillAfterIdleMs(0);
+
+        // Parse the contents of goInstanceConfig into json form string
+        ObjectMapper objectMapper = ObjectMapperFactory.getThreadLocal();
+        String configContent = 
objectMapper.writeValueAsString(goInstanceConfig);
+
+        // Nit: at present, the implementation of go function depends on 
pulsar-client-go,
+        // pulsar-client-go uses cgo, so the currently uploaded executable 
doesn't support cross-compilation.
+        args.add(originalCodeFileName);
+        args.add("-instance-conf");
+        args.add(configContent);
+        return args;
+    }
+
+
     public static List<String> getCmd(InstanceConfig instanceConfig,
-                                          String instanceFile,
-                                          String extraDependenciesDir, /* 
extra dependencies for running instances */
-                                           String logDirectory,
-                                          String originalCodeFileName,
-                                          String pulsarServiceUrl,
-                                          String stateStorageServiceUrl,
-                                          AuthenticationConfig authConfig,
-                                          String shardId,
-                                          Integer grpcPort,
-                                          Long expectedHealthCheckInterval,
-                                          String logConfigFile,
-                                          String secretsProviderClassName,
-                                          String secretsProviderConfig,
-                                          Boolean installUserCodeDependencies,
-                                          String pythonDependencyRepository,
-                                          String 
pythonExtraDependencyRepository,
-                                          int metricsPort) throws Exception {
+                                      String instanceFile,
+                                      String extraDependenciesDir, /* extra 
dependencies for running instances */
+                                      String logDirectory,
+                                      String originalCodeFileName,
+                                      String pulsarServiceUrl,
+                                      String stateStorageServiceUrl,
+                                      AuthenticationConfig authConfig,
+                                      String shardId,
+                                      Integer grpcPort,
+                                      Long expectedHealthCheckInterval,
+                                      String logConfigFile,
+                                      String secretsProviderClassName,
+                                      String secretsProviderConfig,
+                                      Boolean installUserCodeDependencies,
+                                      String pythonDependencyRepository,
+                                      String pythonExtraDependencyRepository,
+                                      int metricsPort) throws Exception {
         final List<String> args = new LinkedList<>();
-        if (instanceConfig.getFunctionDetails().getRuntime() ==  
Function.FunctionDetails.Runtime.JAVA) {
+
+        if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.GO) {
+            return getGoInstanceCmd(instanceConfig, originalCodeFileName, 
pulsarServiceUrl);
+        }
+
+        if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
             args.add("java");
             args.add("-cp");
 
@@ -245,7 +382,7 @@ public class RuntimeUtils {
                 
FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
     }
 
-    public static String getPrometheusMetrics(int metricsPort) throws 
IOException{
+    public static String getPrometheusMetrics(int metricsPort) throws 
IOException {
         StringBuilder result = new StringBuilder();
         URL url = new URL(String.format("http://%s:%s";, 
InetAddress.getLocalHost().getHostAddress(), metricsPort));
         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
@@ -258,5 +395,5 @@ public class RuntimeUtils {
         rd.close();
         return result.toString();
     }
-  
+
 }
diff --git 
a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java
 
b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java
index 32c13b7..8d4dbff 100644
--- 
a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java
+++ 
b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java
@@ -40,6 +40,8 @@ public class DefaultSecretsProviderConfigurator implements 
SecretsProviderConfig
                 return ClearTextSecretsProvider.class.getName();
             case PYTHON:
                 return "secretsprovider.ClearTextSecretsProvider";
+            case GO:
+                return "";
             default:
                 throw new RuntimeException("Unknown runtime " + 
functionDetails.getRuntime());
         }
diff --git 
a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
 
b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
index 6dc31c7..8d4e34e 100644
--- 
a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
+++ 
b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
@@ -47,6 +47,8 @@ public class KubernetesSecretsProviderConfigurator implements 
SecretsProviderCon
                 return EnvironmentBasedSecretsProvider.class.getName();
             case PYTHON:
                 return "secretsprovider.EnvironmentBasedSecretsProvider";
+            case GO:
+                throw new UnsupportedOperationException();
             default:
                 throw new RuntimeException("Unknown function runtime " + 
functionDetails.getRuntime());
         }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index ae18551..a9e5a00 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -290,7 +290,8 @@ public class FunctionConfigUtils {
         }
         Map<String, Object> userConfig;
         if (!isEmpty(functionDetails.getUserConfig())) {
-            Type type = new TypeToken<Map<String, Object>>() {}.getType();
+            Type type = new TypeToken<Map<String, Object>>() {
+            }.getType();
             userConfig = new Gson().fromJson(functionDetails.getUserConfig(), 
type);
         } else {
             userConfig = new HashMap<>();
@@ -308,7 +309,8 @@ public class FunctionConfigUtils {
         functionConfig.setUserConfig(userConfig);
 
         if (!isEmpty(functionDetails.getSecretsMap())) {
-            Type type = new TypeToken<Map<String, Object>>() {}.getType();
+            Type type = new TypeToken<Map<String, Object>>() {
+            }.getType();
             Map<String, Object> secretsMap = new 
Gson().fromJson(functionDetails.getSecretsMap(), type);
             functionConfig.setSecrets(secretsMap);
         }
@@ -347,6 +349,8 @@ public class FunctionConfigUtils {
             functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
         } else if (functionConfig.getPy() != null) {
             functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON);
+        } else if (functionConfig.getGo() != null) {
+            functionConfig.setRuntime(FunctionConfig.Runtime.GO);
         }
 
         WindowConfig windowConfig = functionConfig.getWindowConfig();
@@ -429,6 +433,20 @@ public class FunctionConfigUtils {
         }
     }
 
+    private static void doGolangChecks(FunctionConfig functionConfig) {
+        if (functionConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+            throw new RuntimeException("Effectively-once processing guarantees 
not yet supported in Go function");
+        }
+
+        if (functionConfig.getWindowConfig() != null) {
+            throw new IllegalArgumentException("Windowing is not supported in 
Go function yet");
+        }
+
+        if (functionConfig.getMaxMessageRetries() != null && 
functionConfig.getMaxMessageRetries() >= 0) {
+            throw new IllegalArgumentException("Message retries not yet 
supported in Go function");
+        }
+    }
+
     private static void verifyNoTopicClash(Collection<String> inputTopics, 
String outputTopic) throws IllegalArgumentException {
         if (inputTopics.contains(outputTopic)) {
             throw new IllegalArgumentException(
@@ -447,8 +465,11 @@ public class FunctionConfigUtils {
         if (isEmpty(functionConfig.getName())) {
             throw new IllegalArgumentException("Function name cannot be null");
         }
-        if (isEmpty(functionConfig.getClassName())) {
-            throw new IllegalArgumentException("Function classname cannot be 
null");
+        // go doesn't need className
+        if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON || 
functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA){
+            if (isEmpty(functionConfig.getClassName())) {
+                throw new IllegalArgumentException("Function classname cannot 
be null");
+            }
         }
 
         Collection<String> allInputTopics = 
collectAllInputTopics(functionConfig);
@@ -530,6 +551,12 @@ public class FunctionConfigUtils {
                 throw new IllegalArgumentException("The supplied python file 
does not exist");
             }
         }
+        if (!isEmpty(functionConfig.getGo()) && 
!org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getGo())
+                && functionConfig.getGo().startsWith(BUILTIN)) {
+            if (!new File(functionConfig.getGo()).exists()) {
+                throw new IllegalArgumentException("The supplied go file does 
not exist");
+            }
+        }
 
         if (functionConfig.getInputSpecs() != null) {
             functionConfig.getInputSpecs().forEach((topicName, conf) -> {
@@ -587,6 +614,9 @@ public class FunctionConfigUtils {
             }
             doJavaChecks(functionConfig, classLoader);
             return classLoader;
+        } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
+            doGolangChecks(functionConfig);
+            return null;
         } else {
             doPythonChecks(functionConfig);
             return null;
@@ -615,7 +645,7 @@ public class FunctionConfigUtils {
         if (mergedConfig.getInputSpecs() == null) {
             mergedConfig.setInputSpecs(new HashMap<>());
         }
-        
+
         if (newConfig.getInputs() != null) {
             newConfig.getInputs().forEach((topicName -> {
                 newConfig.getInputSpecs().put(topicName,
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 7140fdc..a84e2b1 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -474,6 +474,8 @@ public class FunctionActioner {
                 return fileName + ".jar";
             case PYTHON:
                 return fileName + ".py";
+            case GO:
+                return fileName + ".go";
             default:
                 throw new RuntimeException("Unknown runtime " + 
FunctionDetails.getRuntime());
         }

Reply via email to