This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 510744c20d0 [feat][fn] Add stateStorageURL and pulsarWebService URL to
go InstanceConfig (#20443)
510744c20d0 is described below
commit 510744c20d0339c4b29366f07bf1849cabb81dec
Author: Andy Walker <[email protected]>
AuthorDate: Sat Jun 3 03:01:18 2023 -0400
[feat][fn] Add stateStorageURL and pulsarWebService URL to go
InstanceConfig (#20443)
Co-authored-by: Andy Walker <[email protected]>
---
pulsar-function-go/conf/conf.go | 18 ++++++++++--------
pulsar-function-go/pf/instanceConf.go | 4 ++++
.../pulsar/functions/instance/go/GoInstanceConfig.java | 2 ++
.../apache/pulsar/functions/runtime/RuntimeUtils.java | 15 +++++++++++++--
.../pulsar/functions/runtime/RuntimeUtilsTest.java | 5 ++++-
5 files changed, 33 insertions(+), 11 deletions(-)
diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
index 639da71d25a..03513648fac 100644
--- a/pulsar-function-go/conf/conf.go
+++ b/pulsar-function-go/conf/conf.go
@@ -33,14 +33,16 @@ import (
const ConfigPath = "conf/conf.yaml"
type Conf struct {
- PulsarServiceURL string `json:"pulsarServiceURL"
yaml:"pulsarServiceURL"`
- InstanceID int `json:"instanceID" yaml:"instanceID"`
- FuncID string `json:"funcID" yaml:"funcID"`
- FuncVersion string `json:"funcVersion" yaml:"funcVersion"`
- MaxBufTuples int `json:"maxBufTuples" yaml:"maxBufTuples"`
- Port int `json:"port" yaml:"port"`
- ClusterName string `json:"clusterName" yaml:"clusterName"`
- KillAfterIdleMs time.Duration `json:"killAfterIdleMs"
yaml:"killAfterIdleMs"`
+ PulsarServiceURL string `json:"pulsarServiceURL"
yaml:"pulsarServiceURL"`
+ StateStorageServiceURL string `json:"stateStorageServiceUrl"
yaml:"stateStorageServiceUrl"`
+ PulsarWebServiceURL string `json:"pulsarWebServiceUrl"
yaml:"pulsarWebServiceUrl"`
+ InstanceID int `json:"instanceID"
yaml:"instanceID"`
+ FuncID string `json:"funcID" yaml:"funcID"`
+ FuncVersion string `json:"funcVersion"
yaml:"funcVersion"`
+ MaxBufTuples int `json:"maxBufTuples"
yaml:"maxBufTuples"`
+ Port int `json:"port" yaml:"port"`
+ ClusterName string `json:"clusterName"
yaml:"clusterName"`
+ KillAfterIdleMs time.Duration `json:"killAfterIdleMs"
yaml:"killAfterIdleMs"`
// function details config
Tenant string `json:"tenant" yaml:"tenant"`
NameSpace string `json:"nameSpace" yaml:"nameSpace"`
diff --git a/pulsar-function-go/pf/instanceConf.go
b/pulsar-function-go/pf/instanceConf.go
index 72bcaf9bbb2..4cb60dd258a 100644
--- a/pulsar-function-go/pf/instanceConf.go
+++ b/pulsar-function-go/pf/instanceConf.go
@@ -39,6 +39,8 @@ type instanceConf struct {
port int
clusterName string
pulsarServiceURL string
+ stateServiceURL string
+ pulsarWebServiceURL string
killAfterIdle time.Duration
expectedHealthCheckInterval int32
metricsPort int
@@ -76,6 +78,8 @@ func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf {
port: cfg.Port,
clusterName: cfg.ClusterName,
pulsarServiceURL: cfg.PulsarServiceURL,
+ stateServiceURL: cfg.StateStorageServiceURL,
+ pulsarWebServiceURL: cfg.PulsarWebServiceURL,
killAfterIdle: cfg.KillAfterIdleMs,
expectedHealthCheckInterval: cfg.ExpectedHealthCheckInterval,
metricsPort: cfg.MetricsPort,
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
index bcbc7fa057f..599b6ed8f4f 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
@@ -27,6 +27,8 @@ import org.apache.pulsar.functions.proto.Function;
@Getter
public class GoInstanceConfig {
private String pulsarServiceURL = "";
+ private String stateStorageServiceUrl = "";
+ private String pulsarWebServiceUrl = "";
private int instanceID;
private String funcID = "";
private String funcVersion = "";
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 1b6b6946163..67469041c94 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -132,6 +132,8 @@ public class RuntimeUtils {
AuthenticationConfig
authConfig,
String originalCodeFileName,
String pulsarServiceUrl,
+ String stateStorageServiceUrl,
+ String pulsarWebServiceUrl,
boolean k8sRuntime) throws
IOException {
final List<String> args = new LinkedList<>();
GoInstanceConfig goInstanceConfig = new GoInstanceConfig();
@@ -140,6 +142,14 @@ public class RuntimeUtils {
goInstanceConfig.setClusterName(instanceConfig.getClusterName());
}
+ if (null != stateStorageServiceUrl) {
+ goInstanceConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
+ }
+
+ if (instanceConfig.isExposePulsarAdminClientEnabled() &&
StringUtils.isNotBlank(pulsarWebServiceUrl)) {
+ goInstanceConfig.setPulsarWebServiceUrl(pulsarWebServiceUrl);
+ }
+
if (instanceConfig.getInstanceId() != 0) {
goInstanceConfig.setInstanceID(instanceConfig.getInstanceId());
}
@@ -310,8 +320,9 @@ public class RuntimeUtils {
final List<String> args = new LinkedList<>();
if (instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.GO) {
- return getGoInstanceCmd(instanceConfig, authConfig,
- originalCodeFileName, pulsarServiceUrl, k8sRuntime);
+ return getGoInstanceCmd(instanceConfig, authConfig,
originalCodeFileName,
+ pulsarServiceUrl, stateStorageServiceUrl,
pulsarWebServiceUrl,
+ k8sRuntime);
}
if (instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.JAVA) {
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
index a0d11b551f3..a9dfcd7ffc7 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
@@ -122,8 +122,9 @@ public class RuntimeUtilsTest {
.build();
instanceConfig.setFunctionDetails(functionDetails);
+ instanceConfig.setExposePulsarAdminClientEnabled(true);
- List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig,
authConfig,"config", "pulsar://localhost:6650", k8sRuntime);
+ List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig,
authConfig, "config", "pulsar://localhost:6650", "bk://localhost:4181",
"http://localhost:8080", k8sRuntime);
if (k8sRuntime) {
goInstanceConfig = new
ObjectMapper().readValue(commands.get(2).replaceAll("^\'|\'$", ""),
HashMap.class);
} else {
@@ -151,6 +152,8 @@ public class RuntimeUtilsTest {
Assert.assertEquals(goInstanceConfig.get("autoAck"), true);
Assert.assertEquals(goInstanceConfig.get("regexPatternSubscription"),
false);
Assert.assertEquals(goInstanceConfig.get("pulsarServiceURL"),
"pulsar://localhost:6650");
+ Assert.assertEquals(goInstanceConfig.get("stateStorageServiceUrl"),
"bk://localhost:4181");
+ Assert.assertEquals(goInstanceConfig.get("pulsarWebServiceUrl"),
"http://localhost:8080");
Assert.assertEquals(goInstanceConfig.get("runtime"), 3);
Assert.assertEquals(goInstanceConfig.get("cpu"), 2.0);
Assert.assertEquals(goInstanceConfig.get("funcID"), "func-7734");