This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 510744c20d0 [feat][fn] Add stateStorageURL and pulsarWebService URL to 
go InstanceConfig (#20443)
510744c20d0 is described below

commit 510744c20d0339c4b29366f07bf1849cabb81dec
Author: Andy Walker <[email protected]>
AuthorDate: Sat Jun 3 03:01:18 2023 -0400

    [feat][fn] Add stateStorageURL and pulsarWebService URL to go 
InstanceConfig (#20443)
    
    Co-authored-by: Andy Walker <[email protected]>
---
 pulsar-function-go/conf/conf.go                        | 18 ++++++++++--------
 pulsar-function-go/pf/instanceConf.go                  |  4 ++++
 .../pulsar/functions/instance/go/GoInstanceConfig.java |  2 ++
 .../apache/pulsar/functions/runtime/RuntimeUtils.java  | 15 +++++++++++++--
 .../pulsar/functions/runtime/RuntimeUtilsTest.java     |  5 ++++-
 5 files changed, 33 insertions(+), 11 deletions(-)

diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
index 639da71d25a..03513648fac 100644
--- a/pulsar-function-go/conf/conf.go
+++ b/pulsar-function-go/conf/conf.go
@@ -33,14 +33,16 @@ import (
 const ConfigPath = "conf/conf.yaml"
 
 type Conf struct {
-       PulsarServiceURL string        `json:"pulsarServiceURL" 
yaml:"pulsarServiceURL"`
-       InstanceID       int           `json:"instanceID" yaml:"instanceID"`
-       FuncID           string        `json:"funcID" yaml:"funcID"`
-       FuncVersion      string        `json:"funcVersion" yaml:"funcVersion"`
-       MaxBufTuples     int           `json:"maxBufTuples" yaml:"maxBufTuples"`
-       Port             int           `json:"port" yaml:"port"`
-       ClusterName      string        `json:"clusterName" yaml:"clusterName"`
-       KillAfterIdleMs  time.Duration `json:"killAfterIdleMs" 
yaml:"killAfterIdleMs"`
+       PulsarServiceURL       string        `json:"pulsarServiceURL" 
yaml:"pulsarServiceURL"`
+       StateStorageServiceURL string        `json:"stateStorageServiceUrl" 
yaml:"stateStorageServiceUrl"`
+       PulsarWebServiceURL    string        `json:"pulsarWebServiceUrl" 
yaml:"pulsarWebServiceUrl"`
+       InstanceID             int           `json:"instanceID" 
yaml:"instanceID"`
+       FuncID                 string        `json:"funcID" yaml:"funcID"`
+       FuncVersion            string        `json:"funcVersion" 
yaml:"funcVersion"`
+       MaxBufTuples           int           `json:"maxBufTuples" 
yaml:"maxBufTuples"`
+       Port                   int           `json:"port" yaml:"port"`
+       ClusterName            string        `json:"clusterName" 
yaml:"clusterName"`
+       KillAfterIdleMs        time.Duration `json:"killAfterIdleMs" 
yaml:"killAfterIdleMs"`
        // function details config
        Tenant               string `json:"tenant" yaml:"tenant"`
        NameSpace            string `json:"nameSpace" yaml:"nameSpace"`
diff --git a/pulsar-function-go/pf/instanceConf.go 
b/pulsar-function-go/pf/instanceConf.go
index 72bcaf9bbb2..4cb60dd258a 100644
--- a/pulsar-function-go/pf/instanceConf.go
+++ b/pulsar-function-go/pf/instanceConf.go
@@ -39,6 +39,8 @@ type instanceConf struct {
        port                        int
        clusterName                 string
        pulsarServiceURL            string
+       stateServiceURL             string
+       pulsarWebServiceURL         string
        killAfterIdle               time.Duration
        expectedHealthCheckInterval int32
        metricsPort                 int
@@ -76,6 +78,8 @@ func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf {
                port:                        cfg.Port,
                clusterName:                 cfg.ClusterName,
                pulsarServiceURL:            cfg.PulsarServiceURL,
+               stateServiceURL:             cfg.StateStorageServiceURL,
+               pulsarWebServiceURL:         cfg.PulsarWebServiceURL,
                killAfterIdle:               cfg.KillAfterIdleMs,
                expectedHealthCheckInterval: cfg.ExpectedHealthCheckInterval,
                metricsPort:                 cfg.MetricsPort,
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
index bcbc7fa057f..599b6ed8f4f 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
@@ -27,6 +27,8 @@ import org.apache.pulsar.functions.proto.Function;
 @Getter
 public class GoInstanceConfig {
     private String pulsarServiceURL = "";
+    private String stateStorageServiceUrl = "";
+    private String pulsarWebServiceUrl = "";
     private int instanceID;
     private String funcID = "";
     private String funcVersion = "";
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 1b6b6946163..67469041c94 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -132,6 +132,8 @@ public class RuntimeUtils {
                                                 AuthenticationConfig 
authConfig,
                                                 String originalCodeFileName,
                                                 String pulsarServiceUrl,
+                                                String stateStorageServiceUrl,
+                                                String pulsarWebServiceUrl,
                                                 boolean k8sRuntime) throws 
IOException {
         final List<String> args = new LinkedList<>();
         GoInstanceConfig goInstanceConfig = new GoInstanceConfig();
@@ -140,6 +142,14 @@ public class RuntimeUtils {
             goInstanceConfig.setClusterName(instanceConfig.getClusterName());
         }
 
+        if (null != stateStorageServiceUrl) {
+            goInstanceConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
+        }
+
+        if (instanceConfig.isExposePulsarAdminClientEnabled() && 
StringUtils.isNotBlank(pulsarWebServiceUrl)) {
+            goInstanceConfig.setPulsarWebServiceUrl(pulsarWebServiceUrl);
+        }
+
         if (instanceConfig.getInstanceId() != 0) {
             goInstanceConfig.setInstanceID(instanceConfig.getInstanceId());
         }
@@ -310,8 +320,9 @@ public class RuntimeUtils {
         final List<String> args = new LinkedList<>();
 
         if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.GO) {
-            return getGoInstanceCmd(instanceConfig, authConfig,
-                    originalCodeFileName, pulsarServiceUrl, k8sRuntime);
+            return getGoInstanceCmd(instanceConfig, authConfig, 
originalCodeFileName,
+                    pulsarServiceUrl, stateStorageServiceUrl, 
pulsarWebServiceUrl,
+                    k8sRuntime);
         }
 
         if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
index a0d11b551f3..a9dfcd7ffc7 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
@@ -122,8 +122,9 @@ public class RuntimeUtilsTest {
                 .build();
 
         instanceConfig.setFunctionDetails(functionDetails);
+        instanceConfig.setExposePulsarAdminClientEnabled(true);
 
-        List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, 
authConfig,"config", "pulsar://localhost:6650", k8sRuntime);
+        List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, 
authConfig, "config", "pulsar://localhost:6650", "bk://localhost:4181",  
"http://localhost:8080";, k8sRuntime);
         if (k8sRuntime) {
             goInstanceConfig = new 
ObjectMapper().readValue(commands.get(2).replaceAll("^\'|\'$", ""), 
HashMap.class);
         } else {
@@ -151,6 +152,8 @@ public class RuntimeUtilsTest {
         Assert.assertEquals(goInstanceConfig.get("autoAck"), true);
         Assert.assertEquals(goInstanceConfig.get("regexPatternSubscription"), 
false);
         Assert.assertEquals(goInstanceConfig.get("pulsarServiceURL"), 
"pulsar://localhost:6650");
+        Assert.assertEquals(goInstanceConfig.get("stateStorageServiceUrl"), 
"bk://localhost:4181");
+        Assert.assertEquals(goInstanceConfig.get("pulsarWebServiceUrl"), 
"http://localhost:8080";);
         Assert.assertEquals(goInstanceConfig.get("runtime"), 3);
         Assert.assertEquals(goInstanceConfig.get("cpu"), 2.0);
         Assert.assertEquals(goInstanceConfig.get("funcID"), "func-7734");

Reply via email to