This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 42c3bf9 [go function] support localrun and cluster mode for go
function (#4174)
42c3bf9 is described below
commit 42c3bf94920f1d177a2403e06650500509f94aaa
Author: 冉小龙 <[email protected]>
AuthorDate: Mon May 6 09:13:18 2019 +0800
[go function] support localrun and cluster mode for go function (#4174)
### Motivation
Master Issue: #3767
support local-run and cluster mode for go function.
in go function, we can use:
```
./bin/pulsar-admin functions localrun/create
--go
/Users/wolf4j/github.com/apache/pulsar/pulsar-function-go/examples/outputFunc.go
--inputs persistent://public/default/my-topic
--output persistent://public/default/test
--tenant public
--namespace default
--name pulsarfunction
--classname hellopulsar
--log-topic logtopic
```
Different from `--jar` or `--py`, `--go` uploads a complete executable
file(including: instance file + user code file)
---
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 29 +++-
.../pulsar/common/functions/FunctionConfig.java | 4 +-
pulsar-function-go/conf/conf.go | 105 +++++++-----
pulsar-function-go/examples/test/consumer.go | 2 +-
pulsar-function-go/examples/test/producer.go | 2 +-
pulsar-function-go/pf/instance.go | 5 +-
.../functions/instance/go/GoInstanceConfig.java | 66 ++++++++
pulsar-functions/runtime/pom.xml | 10 ++
.../functions/runtime/KubernetesRuntime.java | 2 +
.../runtime/KubernetesRuntimeFactory.java | 4 +-
.../pulsar/functions/runtime/LocalRunner.java | 22 ++-
.../pulsar/functions/runtime/ProcessRuntime.java | 4 +-
.../functions/runtime/ProcessRuntimeFactory.java | 4 +-
.../pulsar/functions/runtime/RuntimeUtils.java | 181 ++++++++++++++++++---
.../DefaultSecretsProviderConfigurator.java | 2 +
.../KubernetesSecretsProviderConfigurator.java | 2 +
.../functions/utils/FunctionConfigUtils.java | 40 ++++-
.../pulsar/functions/worker/FunctionActioner.java | 2 +
18 files changed, 394 insertions(+), 92 deletions(-)
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 8850619..9b72c3b 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -198,6 +198,10 @@ public class CmdFunctions extends CmdBase {
description = "Path to the main Python file/Python Wheel file
for the function (if the function is written in Python)",
listConverter = StringConverter.class)
protected String pyFile;
+ @Parameter(
+ names = "--go",
+ description = "Path to the main Go executable binary for the
function (if the function is written in Go)")
+ protected String goFile;
@Parameter(names = {"-i",
"--inputs"}, description = "The function's input topic or
topics (multiple topics can be specified as a comma-separated list)")
protected String inputs;
@@ -477,10 +481,16 @@ public class CmdFunctions extends CmdBase {
functionConfig.setPy(pyFile);
}
+ if (null != goFile) {
+ functionConfig.setGo(goFile);
+ }
+
if (functionConfig.getJar() != null) {
userCodeFile = functionConfig.getJar();
} else if (functionConfig.getPy() != null) {
userCodeFile = functionConfig.getPy();
+ } else if (functionConfig.getGo() != null) {
+ userCodeFile = functionConfig.getGo();
}
// check if configs are valid
@@ -488,8 +498,11 @@ public class CmdFunctions extends CmdBase {
}
protected void validateFunctionConfigs(FunctionConfig functionConfig) {
- if (StringUtils.isEmpty(functionConfig.getClassName())) {
- throw new IllegalArgumentException("No Function Classname
specified");
+ // go doesn't need className
+ if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON
|| functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA){
+ if (StringUtils.isEmpty(functionConfig.getClassName())) {
+ throw new IllegalArgumentException("No Function Classname
specified");
+ }
}
if (StringUtils.isEmpty(functionConfig.getName())) {
org.apache.pulsar.common.functions.Utils.inferMissingFunctionName(functionConfig);
@@ -501,13 +514,13 @@ public class CmdFunctions extends CmdBase {
org.apache.pulsar.common.functions.Utils.inferMissingNamespace(functionConfig);
}
- if (isNotBlank(functionConfig.getJar()) &&
isNotBlank(functionConfig.getPy())) {
- throw new ParameterException("Either a Java jar or a Python
file needs to"
+ if (isNotBlank(functionConfig.getJar()) &&
isNotBlank(functionConfig.getPy()) && isNotBlank(functionConfig.getGo())) {
+ throw new ParameterException("Either a Java jar or a Python
file or a Go executable binary needs to"
+ " be specified for the function. Cannot specify
both.");
}
- if (isBlank(functionConfig.getJar()) &&
isBlank(functionConfig.getPy())) {
- throw new ParameterException("Either a Java jar or a Python
file needs to"
+ if (isBlank(functionConfig.getJar()) &&
isBlank(functionConfig.getPy()) && isBlank(functionConfig.getGo())) {
+ throw new ParameterException("Either a Java jar or a Python
file or a Go executable binary needs to"
+ " be specified for the function. Please specify
one.");
}
@@ -519,6 +532,10 @@ public class CmdFunctions extends CmdBase {
!new File(functionConfig.getPy()).exists()) {
throw new ParameterException("The specified python file does
not exist");
}
+ if (!isBlank(functionConfig.getGo()) &&
!Utils.isFunctionPackageUrlSupported(functionConfig.getGo()) &&
+ !new File(functionConfig.getGo()).exists()) {
+ throw new ParameterException("The specified go executable
binary does not exist");
+ }
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index 2b8883d..1686434 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -45,7 +45,8 @@ public class FunctionConfig {
public enum Runtime {
JAVA,
- PYTHON
+ PYTHON,
+ GO
}
// Any flags that you want to pass to the runtime.
@@ -97,6 +98,7 @@ public class FunctionConfig {
private Long timeoutMs;
private String jar;
private String py;
+ private String go;
// Whether the subscriptions the functions created/used should be deleted
when the functions is deleted
private Boolean cleanupSubscription;
}
diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
index d784855..007110c 100644
--- a/pulsar-function-go/conf/conf.go
+++ b/pulsar-function-go/conf/conf.go
@@ -20,6 +20,7 @@
package conf
import (
+ "encoding/json"
"flag"
"io/ioutil"
"os"
@@ -33,60 +34,78 @@ import (
const ConfigPath = "github.com/apache/pulsar/pulsar-function-go/conf/conf.yaml"
type Conf struct {
- PulsarServiceURL string `yaml:"pulsarServiceURL"`
- InstanceID int `yaml:"instanceID"`
- FuncID string `yaml:"funcID"`
- FuncVersion string `yaml:"funcVersion"`
- MaxBufTuples int `yaml:"maxBufTuples"`
- Port int `yaml:"port"`
- ClusterName string `yaml:"clusterName"`
- KillAfterIdleMs time.Duration `yaml:"killAfterIdleMs"`
+ PulsarServiceURL string `json:"pulsarServiceURL"
yaml:"pulsarServiceURL"`
+ InstanceID int `json:"instanceID" yaml:"instanceID"`
+ FuncID string `json:"funcID" yaml:"funcID"`
+ FuncVersion string `json:"funcVersion" yaml:"funcVersion"`
+ MaxBufTuples int `json:"maxBufTuples" yaml:"maxBufTuples"`
+ Port int `json:"port" yaml:"port"`
+ ClusterName string `json:"clusterName" yaml:"clusterName"`
+ KillAfterIdleMs time.Duration `json:"killAfterIdleMs"
yaml:"killAfterIdleMs"`
// function details config
- Tenant string `yaml:"tenant"`
- NameSpace string `yaml:"nameSpace"`
- Name string `yaml:"name"`
- LogTopic string `yaml:"logTopic"`
- ProcessingGuarantees int32 `yaml:"processingGuarantees"`
- SecretsMap string `yaml:"secretsMap"`
- Runtime int32 `yaml:"runtime"`
- AutoACK bool `yaml:"autoAck"`
- Parallelism int32 `yaml:"parallelism"`
+ Tenant string `json:"tenant" yaml:"tenant"`
+ NameSpace string `json:"nameSpace" yaml:"nameSpace"`
+ Name string `json:"name" yaml:"name"`
+ LogTopic string `json:"logTopic" yaml:"logTopic"`
+ ProcessingGuarantees int32 `json:"processingGuarantees"
yaml:"processingGuarantees"`
+ SecretsMap string `json:"secretsMap" yaml:"secretsMap"`
+ Runtime int32 `json:"runtime" yaml:"runtime"`
+ AutoACK bool `json:"autoAck" yaml:"autoAck"`
+ Parallelism int32 `json:"parallelism" yaml:"parallelism"`
//source config
- SubscriptionType int32 `yaml:"subscriptionType"`
- TimeoutMs uint64 `yaml:"timeoutMs"`
- SubscriptionName string `yaml:"subscriptionName"`
- CleanupSubscription bool `yaml:"cleanupSubscription"`
+ SubscriptionType int32 `json:"subscriptionType"
yaml:"subscriptionType"`
+ TimeoutMs uint64 `json:"timeoutMs" yaml:"timeoutMs"`
+ SubscriptionName string `json:"subscriptionName"
yaml:"subscriptionName"`
+ CleanupSubscription bool `json:"cleanupSubscription"
yaml:"cleanupSubscription"`
//source input specs
- SourceSpecTopic string `yaml:"sourceSpecsTopic"`
- SourceSchemaType string `yaml:"sourceSchemaType"`
- IsRegexPatternSubscription bool `yaml:"isRegexPatternSubscription"`
- ReceiverQueueSize int32 `yaml:"receiverQueueSize"`
+ SourceSpecTopic string `json:"sourceSpecsTopic"
yaml:"sourceSpecsTopic"`
+ SourceSchemaType string `json:"sourceSchemaType"
yaml:"sourceSchemaType"`
+ IsRegexPatternSubscription bool `json:"isRegexPatternSubscription"
yaml:"isRegexPatternSubscription"`
+ ReceiverQueueSize int32 `json:"receiverQueueSize"
yaml:"receiverQueueSize"`
//sink spec config
- SinkSpecTopic string `yaml:"sinkSpecsTopic"`
- SinkSchemaType string `yaml:"sinkSchemaType"`
+ SinkSpecTopic string `json:"sinkSpecsTopic" yaml:"sinkSpecsTopic"`
+ SinkSchemaType string `json:"sinkSchemaType" yaml:"sinkSchemaType"`
//resources config
- Cpu float64 `yaml:"cpu"`
- Ram int64 `yaml:"ram"`
- Disk int64 `yaml:"disk"`
+ Cpu float64 `json:"cpu" yaml:"cpu"`
+ Ram int64 `json:"ram" yaml:"ram"`
+ Disk int64 `json:"disk" yaml:"disk"`
//retryDetails config
- MaxMessageRetries int32 `yaml:"maxMessageRetries"`
- DeadLetterTopic string `yaml:"deadLetterTopic"`
+ MaxMessageRetries int32 `json:"maxMessageRetries"
yaml:"maxMessageRetries"`
+ DeadLetterTopic string `json:"deadLetterTopic" yaml:"deadLetterTopic"`
}
-var opts string
+var (
+ help bool
+ confFilePath string
+ confContent string
+)
func (c *Conf) GetConf() *Conf {
flag.Parse()
- yamlFile, err := ioutil.ReadFile(opts)
- if err != nil {
- log.Errorf("not found conf file, err:%s", err.Error())
- return nil
+ if help {
+ flag.Usage()
+ }
+
+ if confFilePath != "" {
+ yamlFile, err := ioutil.ReadFile(confFilePath)
+ if err != nil {
+ log.Errorf("not found conf file, err:%s", err.Error())
+ return nil
+ }
+ err = yaml.Unmarshal(yamlFile, c)
+ if err != nil {
+ log.Errorf("unmarshal yaml file error:%s", err.Error())
+ return nil
+ }
}
- err = yaml.Unmarshal(yamlFile, c)
- if err != nil {
- log.Errorf("unmarshal yaml file error:%s", err.Error())
- return nil
+
+ if confContent != "" {
+ err := json.Unmarshal([]byte(confContent), c)
+ if err != nil {
+ log.Errorf("unmarshal config content error:%s",
err.Error())
+ return nil
+ }
}
return c
}
@@ -105,5 +124,7 @@ func init() {
homeDir = os.Getenv("HOME")
}
defaultPath := homeDir + "/" + ConfigPath
- flag.StringVar(&opts, "instance-conf", defaultPath, "config conf.yml
filepath")
+ flag.BoolVar(&help, "help", false, "print help cmd")
+ flag.StringVar(&confFilePath, "instance-conf-path", defaultPath,
"config conf.yml filepath")
+ flag.StringVar(&confContent, "instance-conf", "", "the string content
of Conf struct")
}
diff --git a/pulsar-function-go/examples/test/consumer.go
b/pulsar-function-go/examples/test/consumer.go
index ba5e5b5..86878a4 100644
--- a/pulsar-function-go/examples/test/consumer.go
+++ b/pulsar-function-go/examples/test/consumer.go
@@ -36,7 +36,7 @@ func main() {
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
- Topic: "topic-02",
+ Topic: "test",
SubscriptionName: "my-subscription",
Type: pulsar.Shared,
})
diff --git a/pulsar-function-go/examples/test/producer.go
b/pulsar-function-go/examples/test/producer.go
index 475f83d..99c35f8 100644
--- a/pulsar-function-go/examples/test/producer.go
+++ b/pulsar-function-go/examples/test/producer.go
@@ -40,7 +40,7 @@ func main() {
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
- Topic: "topic-01",
+ Topic: "my-topic",
})
defer producer.Close()
diff --git a/pulsar-function-go/pf/instance.go
b/pulsar-function-go/pf/instance.go
index d809932..75fa43e 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -262,7 +262,7 @@ func (gi *goInstance) nackInputMessage(inputMessage
pulsar.Message) {
}
func getIdleTimeout(timeoutMilliSecond time.Duration) time.Duration {
- if timeoutMilliSecond < 0 {
+ if timeoutMilliSecond <= 0 {
return time.Duration(math.MaxInt64)
}
return timeoutMilliSecond
@@ -284,7 +284,8 @@ func (gi *goInstance) setupLogHandler() error {
func (gi *goInstance) addLogTopicHandler() {
if gi.context.logAppender == nil {
- panic("please init logAppender")
+ log.Error("the logAppender is nil, if you want to use it,
please specify `--log-topic` at startup.")
+ return
}
for _, logByte := range log.StrEntry {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
new file mode 100644
index 0000000..5c95208
--- /dev/null
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.go;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Setter
+@Getter
+public class GoInstanceConfig {
+ private String pulsarServiceURL = "";
+ private int instanceID;
+ private String funcID = "";
+ private String funcVersion = "";
+ private int maxBufTuples;
+ private int port;
+ private String clusterName = "";
+ private int killAfterIdleMs;
+
+ private String tenant = "";
+ private String nameSpace = "";
+ private String name = "";
+ private String className = "";
+ private String logTopic = "";
+ private int processingGuarantees;
+ private String secretsMap = "";
+ private int runtime;
+ private boolean autoAck;
+ private int parallelism;
+
+ private int subscriptionType;
+ private long timeoutMs;
+ private String subscriptionName = "";
+ private boolean cleanupSubscription;
+
+ private String sourceSpecsTopic = "";
+ private String sourceSchemaType = "";
+ private boolean isRegexPatternSubscription;
+ private int receiverQueueSize;
+
+ private String sinkSpecsTopic = "";
+ private String sinkSchemaType = "";
+
+ private double cpu;
+ private long ram;
+ private long disk;
+
+ private int maxMessageRetries;
+ private String deadLetterTopic = "";
+}
diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml
index ca363c8..1b8d179 100644
--- a/pulsar-functions/runtime/pom.xml
+++ b/pulsar-functions/runtime/pom.xml
@@ -51,6 +51,16 @@
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>2.0.0</version>
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index c3fe161..9c5c19c 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -190,6 +190,8 @@ public class KubernetesRuntime implements Runtime {
case PYTHON:
logConfigFile = pulsarRootDir +
"/conf/functions-logging/console_logging_config.ini";
break;
+ case GO:
+ throw new UnsupportedOperationException();
}
this.authConfig = authConfig;
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index 3aed50e..5736540 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -185,7 +185,7 @@ public class KubernetesRuntimeFactory implements
RuntimeFactory {
public KubernetesRuntime createContainer(InstanceConfig instanceConfig,
String codePkgUrl,
String originalCodeFileName,
Long expectedHealthCheckInterval)
throws Exception {
- String instanceFile;
+ String instanceFile = null;
switch (instanceConfig.getFunctionDetails().getRuntime()) {
case JAVA:
instanceFile = javaInstanceJarFile;
@@ -193,6 +193,8 @@ public class KubernetesRuntimeFactory implements
RuntimeFactory {
case PYTHON:
instanceFile = pythonInstanceFile;
break;
+ case GO:
+ throw new UnsupportedOperationException();
default:
throw new RuntimeException("Unsupported Runtime " +
instanceConfig.getFunctionDetails().getRuntime());
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
index 37efbee..581bbbf 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
@@ -108,8 +108,12 @@ public class LocalRunner {
}
classLoader = loadJar(file);
}
- } else {
+ } else if (functionConfig.getRuntime() ==
FunctionConfig.Runtime.GO) {
+ userCodeFile = functionConfig.getGo();
+ } else if (functionConfig.getRuntime() ==
FunctionConfig.Runtime.PYTHON){
userCodeFile = functionConfig.getPy();
+ } else {
+ throw new UnsupportedOperationException();
}
functionDetails = FunctionConfigUtils.convert(functionConfig,
classLoader);
} else if (!StringUtils.isEmpty(sourceConfigString)) {
@@ -172,14 +176,14 @@ public class LocalRunner {
}
try (ProcessRuntimeFactory containerFactory = new
ProcessRuntimeFactory(
- serviceUrl,
- stateStorageServiceUrl,
- authConfig,
- null, /* java instance jar file */
- null, /* python instance file */
- null, /* log directory */
- null, /* extra dependencies dir */
- new DefaultSecretsProviderConfigurator(), false)) {
+ serviceUrl,
+ stateStorageServiceUrl,
+ authConfig,
+ null, /* java instance jar file */
+ null, /* python instance file */
+ null, /* log directory */
+ null, /* extra dependencies dir */
+ new DefaultSecretsProviderConfigurator(), false)) {
List<RuntimeSpawner> spawners = new LinkedList<>();
for (int i = 0; i < parallelism; ++i) {
InstanceConfig instanceConfig = new InstanceConfig();
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index c202ad9..43e74e8 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -102,12 +102,14 @@ class ProcessRuntime implements Runtime {
case PYTHON:
logConfigFile = System.getenv("PULSAR_HOME") +
"/conf/functions-logging/logging_config.ini";
break;
+ case GO:
+ break;
}
this.extraDependenciesDir = extraDependenciesDir;
this.processArgs = RuntimeUtils.composeCmd(
instanceConfig,
instanceFile,
- // DONT SET extra dependencies here (for python runtime),
+ // DONT SET extra dependencies here (for python or go runtime),
// since process runtime is using Java ProcessBuilder,
// we have to set the environment variable via ProcessBuilder
FunctionDetails.Runtime.JAVA ==
instanceConfig.getFunctionDetails().getRuntime() ? extraDependenciesDir : null,
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index 6976df5..1eb3899 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -119,7 +119,7 @@ public class ProcessRuntimeFactory implements
RuntimeFactory {
public ProcessRuntime createContainer(InstanceConfig instanceConfig,
String codeFile,
String originalCodeFileName,
Long expectedHealthCheckInterval)
throws Exception {
- String instanceFile;
+ String instanceFile = null;
switch (instanceConfig.getFunctionDetails().getRuntime()) {
case JAVA:
instanceFile = javaInstanceJarFile;
@@ -127,6 +127,8 @@ public class ProcessRuntimeFactory implements
RuntimeFactory {
case PYTHON:
instanceFile = pythonInstanceFile;
break;
+ case GO:
+ break;
default:
throw new RuntimeException("Unsupported Runtime " +
instanceConfig.getFunctionDetails().getRuntime());
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 8a5ab37..86e9bdf 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -19,11 +19,12 @@
package org.apache.pulsar.functions.runtime;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.util.JsonFormat;
-import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.BufferedReader;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.URL;
@@ -32,8 +33,10 @@ import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.instance.go.GoInstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
@@ -82,7 +85,7 @@ public class RuntimeUtils {
public static List<String> getArgsBeforeCmd(InstanceConfig instanceConfig,
String extraDependenciesDir) {
final List<String> args = new LinkedList<>();
- if (instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.JAVA) {
+ if (instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.JAVA) {
//no-op
} else if (instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.PYTHON) {
// add `extraDependenciesDir` to python package searching path
@@ -94,26 +97,160 @@ public class RuntimeUtils {
return args;
}
+ /**
+ * Different from python and java function, Go function uploads a complete
executable file(including:
+ * instance file + user code file). Its parameter list is provided to the
broker in the form of a yaml file,
+ * the advantage of this approach is that backward compatibility is
guaranteed.
+ *
+ * In Java and Python the instance is managed by broker (or function
worker) so the changes in command line
+ * is under control; but in Go the instance is compiled with the user
function, so pulsar doesn't have the
+ * control what instance is used in the function. Hence in order to
support BC for go function, we can't
+ * dynamically add more commandline arguments. Using an instance config to
pass the parameters from function
+ * worker to go instance is the best way for maintaining the BC.
+ * <p>
+ * When we run the go function, we only need to specify the location of
the go-function file and the yaml file.
+ * The content of the yaml file will be automatically generated according
to the content provided by instanceConfig.
+ */
+
+ public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
+ String originalCodeFileName,
+ String pulsarServiceUrl)
throws IOException {
+ final List<String> args = new LinkedList<>();
+ GoInstanceConfig goInstanceConfig = new GoInstanceConfig();
+
+ if (instanceConfig.getClusterName() != null) {
+ goInstanceConfig.setClusterName(instanceConfig.getClusterName());
+ }
+
+ if (instanceConfig.getInstanceId() != 0) {
+ goInstanceConfig.setInstanceID(instanceConfig.getInstanceId());
+ }
+
+ if (instanceConfig.getFunctionId() != null) {
+ goInstanceConfig.setFuncID(instanceConfig.getFunctionId());
+ }
+
+ if (instanceConfig.getFunctionVersion() != null) {
+
goInstanceConfig.setFuncVersion(instanceConfig.getFunctionVersion());
+ }
+
+ if (instanceConfig.getFunctionDetails().getAutoAck()) {
+
goInstanceConfig.setAutoAck(instanceConfig.getFunctionDetails().getAutoAck());
+ }
+
+ if (instanceConfig.getFunctionDetails().getTenant() != null) {
+
goInstanceConfig.setTenant(instanceConfig.getFunctionDetails().getTenant());
+ }
+
+ if (instanceConfig.getFunctionDetails().getNamespace() != null) {
+
goInstanceConfig.setNameSpace(instanceConfig.getFunctionDetails().getNamespace());
+ }
+
+ if (instanceConfig.getFunctionDetails().getName() != null) {
+
goInstanceConfig.setName(instanceConfig.getFunctionDetails().getName());
+ }
+
+ if (instanceConfig.getFunctionDetails().getLogTopic() != null) {
+
goInstanceConfig.setLogTopic(instanceConfig.getFunctionDetails().getLogTopic());
+ }
+ if (instanceConfig.getFunctionDetails().getProcessingGuarantees() !=
null) {
+
goInstanceConfig.setProcessingGuarantees(instanceConfig.getFunctionDetails().getProcessingGuaranteesValue());
+ }
+ if (instanceConfig.getFunctionDetails().getSecretsMap() != null) {
+
goInstanceConfig.setSecretsMap(instanceConfig.getFunctionDetails().getSecretsMap());
+ }
+ if (instanceConfig.getFunctionDetails().getParallelism() != 0) {
+
goInstanceConfig.setParallelism(instanceConfig.getFunctionDetails().getParallelism());
+ }
+
+ if (instanceConfig.getMaxBufferedTuples() != 0) {
+
goInstanceConfig.setMaxBufTuples(instanceConfig.getMaxBufferedTuples());
+ }
+
+ if (pulsarServiceUrl != null) {
+ goInstanceConfig.setPulsarServiceURL(pulsarServiceUrl);
+ }
+ if
(instanceConfig.getFunctionDetails().getSource().getCleanupSubscription()) {
+
goInstanceConfig.setCleanupSubscription(instanceConfig.getFunctionDetails().getSource().getCleanupSubscription());
+ }
+ if
(instanceConfig.getFunctionDetails().getSource().getSubscriptionName() != null)
{
+
goInstanceConfig.setSubscriptionName(instanceConfig.getFunctionDetails().getSource().getSubscriptionName());
+ }
+
+ if (instanceConfig.getFunctionDetails().getSource().getInputSpecsMap()
!= null) {
+ for (String inputTopic :
instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().keySet()) {
+ goInstanceConfig.setSourceSpecsTopic(inputTopic);
+ }
+ }
+
+ if (instanceConfig.getFunctionDetails().getSource().getTimeoutMs() !=
0) {
+
goInstanceConfig.setTimeoutMs(instanceConfig.getFunctionDetails().getSource().getTimeoutMs());
+ }
+
+ if (instanceConfig.getFunctionDetails().getSink().getTopic() != null) {
+
goInstanceConfig.setSinkSpecsTopic(instanceConfig.getFunctionDetails().getSink().getTopic());
+ }
+
+ if (instanceConfig.getFunctionDetails().getResources().getCpu() != 0) {
+
goInstanceConfig.setCpu(instanceConfig.getFunctionDetails().getResources().getCpu());
+ }
+
+ if (instanceConfig.getFunctionDetails().getResources().getRam() != 0) {
+
goInstanceConfig.setRam(instanceConfig.getFunctionDetails().getResources().getRam());
+ }
+
+ if (instanceConfig.getFunctionDetails().getResources().getDisk() != 0)
{
+
goInstanceConfig.setDisk(instanceConfig.getFunctionDetails().getResources().getDisk());
+ }
+
+ if
(instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic() !=
null) {
+
goInstanceConfig.setDeadLetterTopic(instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
+ }
+
+ if
(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries()
!= 0) {
+
goInstanceConfig.setMaxMessageRetries(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
+ }
+
+ goInstanceConfig.setKillAfterIdleMs(0);
+
+ // Parse the contents of goInstanceConfig into json form string
+ ObjectMapper objectMapper = ObjectMapperFactory.getThreadLocal();
+ String configContent =
objectMapper.writeValueAsString(goInstanceConfig);
+
+ // Nit: at present, the implementation of go function depends on
pulsar-client-go,
+ // pulsar-client-go uses cgo, so the currently uploaded executable
doesn't support cross-compilation.
+ args.add(originalCodeFileName);
+ args.add("-instance-conf");
+ args.add(configContent);
+ return args;
+ }
+
+
public static List<String> getCmd(InstanceConfig instanceConfig,
- String instanceFile,
- String extraDependenciesDir, /*
extra dependencies for running instances */
- String logDirectory,
- String originalCodeFileName,
- String pulsarServiceUrl,
- String stateStorageServiceUrl,
- AuthenticationConfig authConfig,
- String shardId,
- Integer grpcPort,
- Long expectedHealthCheckInterval,
- String logConfigFile,
- String secretsProviderClassName,
- String secretsProviderConfig,
- Boolean installUserCodeDependencies,
- String pythonDependencyRepository,
- String
pythonExtraDependencyRepository,
- int metricsPort) throws Exception {
+ String instanceFile,
+ String extraDependenciesDir, /* extra
dependencies for running instances */
+ String logDirectory,
+ String originalCodeFileName,
+ String pulsarServiceUrl,
+ String stateStorageServiceUrl,
+ AuthenticationConfig authConfig,
+ String shardId,
+ Integer grpcPort,
+ Long expectedHealthCheckInterval,
+ String logConfigFile,
+ String secretsProviderClassName,
+ String secretsProviderConfig,
+ Boolean installUserCodeDependencies,
+ String pythonDependencyRepository,
+ String pythonExtraDependencyRepository,
+ int metricsPort) throws Exception {
final List<String> args = new LinkedList<>();
- if (instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.JAVA) {
+
+ if (instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.GO) {
+ return getGoInstanceCmd(instanceConfig, originalCodeFileName,
pulsarServiceUrl);
+ }
+
+ if (instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.JAVA) {
args.add("java");
args.add("-cp");
@@ -245,7 +382,7 @@ public class RuntimeUtils {
FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
}
- public static String getPrometheusMetrics(int metricsPort) throws
IOException{
+ public static String getPrometheusMetrics(int metricsPort) throws
IOException {
StringBuilder result = new StringBuilder();
URL url = new URL(String.format("http://%s:%s",
InetAddress.getLocalHost().getHostAddress(), metricsPort));
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
@@ -258,5 +395,5 @@ public class RuntimeUtils {
rd.close();
return result.toString();
}
-
+
}
diff --git
a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java
b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java
index 32c13b7..8d4dbff 100644
---
a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java
+++
b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java
@@ -40,6 +40,8 @@ public class DefaultSecretsProviderConfigurator implements
SecretsProviderConfig
return ClearTextSecretsProvider.class.getName();
case PYTHON:
return "secretsprovider.ClearTextSecretsProvider";
+ case GO:
+ return "";
default:
throw new RuntimeException("Unknown runtime " +
functionDetails.getRuntime());
}
diff --git
a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
index 6dc31c7..8d4e34e 100644
---
a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
+++
b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
@@ -47,6 +47,8 @@ public class KubernetesSecretsProviderConfigurator implements
SecretsProviderCon
return EnvironmentBasedSecretsProvider.class.getName();
case PYTHON:
return "secretsprovider.EnvironmentBasedSecretsProvider";
+ case GO:
+ throw new UnsupportedOperationException();
default:
throw new RuntimeException("Unknown function runtime " +
functionDetails.getRuntime());
}
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index ae18551..a9e5a00 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -290,7 +290,8 @@ public class FunctionConfigUtils {
}
Map<String, Object> userConfig;
if (!isEmpty(functionDetails.getUserConfig())) {
- Type type = new TypeToken<Map<String, Object>>() {}.getType();
+ Type type = new TypeToken<Map<String, Object>>() {
+ }.getType();
userConfig = new Gson().fromJson(functionDetails.getUserConfig(),
type);
} else {
userConfig = new HashMap<>();
@@ -308,7 +309,8 @@ public class FunctionConfigUtils {
functionConfig.setUserConfig(userConfig);
if (!isEmpty(functionDetails.getSecretsMap())) {
- Type type = new TypeToken<Map<String, Object>>() {}.getType();
+ Type type = new TypeToken<Map<String, Object>>() {
+ }.getType();
Map<String, Object> secretsMap = new
Gson().fromJson(functionDetails.getSecretsMap(), type);
functionConfig.setSecrets(secretsMap);
}
@@ -347,6 +349,8 @@ public class FunctionConfigUtils {
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
} else if (functionConfig.getPy() != null) {
functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON);
+ } else if (functionConfig.getGo() != null) {
+ functionConfig.setRuntime(FunctionConfig.Runtime.GO);
}
WindowConfig windowConfig = functionConfig.getWindowConfig();
@@ -429,6 +433,20 @@ public class FunctionConfigUtils {
}
}
+ private static void doGolangChecks(FunctionConfig functionConfig) {
+ if (functionConfig.getProcessingGuarantees() ==
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+ throw new RuntimeException("Effectively-once processing guarantees
not yet supported in Go function");
+ }
+
+ if (functionConfig.getWindowConfig() != null) {
+ throw new IllegalArgumentException("Windowing is not supported in
Go function yet");
+ }
+
+ if (functionConfig.getMaxMessageRetries() != null &&
functionConfig.getMaxMessageRetries() >= 0) {
+ throw new IllegalArgumentException("Message retries not yet
supported in Go function");
+ }
+ }
+
private static void verifyNoTopicClash(Collection<String> inputTopics,
String outputTopic) throws IllegalArgumentException {
if (inputTopics.contains(outputTopic)) {
throw new IllegalArgumentException(
@@ -447,8 +465,11 @@ public class FunctionConfigUtils {
if (isEmpty(functionConfig.getName())) {
throw new IllegalArgumentException("Function name cannot be null");
}
- if (isEmpty(functionConfig.getClassName())) {
- throw new IllegalArgumentException("Function classname cannot be
null");
+ // go doesn't need className
+ if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON ||
functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA){
+ if (isEmpty(functionConfig.getClassName())) {
+ throw new IllegalArgumentException("Function classname cannot
be null");
+ }
}
Collection<String> allInputTopics =
collectAllInputTopics(functionConfig);
@@ -530,6 +551,12 @@ public class FunctionConfigUtils {
throw new IllegalArgumentException("The supplied python file
does not exist");
}
}
+ if (!isEmpty(functionConfig.getGo()) &&
!org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getGo())
+ && functionConfig.getGo().startsWith(BUILTIN)) {
+ if (!new File(functionConfig.getGo()).exists()) {
+ throw new IllegalArgumentException("The supplied go file does
not exist");
+ }
+ }
if (functionConfig.getInputSpecs() != null) {
functionConfig.getInputSpecs().forEach((topicName, conf) -> {
@@ -587,6 +614,9 @@ public class FunctionConfigUtils {
}
doJavaChecks(functionConfig, classLoader);
return classLoader;
+ } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
+ doGolangChecks(functionConfig);
+ return null;
} else {
doPythonChecks(functionConfig);
return null;
@@ -615,7 +645,7 @@ public class FunctionConfigUtils {
if (mergedConfig.getInputSpecs() == null) {
mergedConfig.setInputSpecs(new HashMap<>());
}
-
+
if (newConfig.getInputs() != null) {
newConfig.getInputs().forEach((topicName -> {
newConfig.getInputSpecs().put(topicName,
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 7140fdc..a84e2b1 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -474,6 +474,8 @@ public class FunctionActioner {
return fileName + ".jar";
case PYTHON:
return fileName + ".py";
+ case GO:
+ return fileName + ".go";
default:
throw new RuntimeException("Unknown runtime " +
FunctionDetails.getRuntime());
}