This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7315aeb6258 [improve][fn] Pass FunctionDetails to Go instance (#22350)
7315aeb6258 is described below

commit 7315aeb6258b7adc9d874268d50acb95ffc0cf2b
Author: jiangpengcheng <[email protected]>
AuthorDate: Fri Mar 29 13:47:29 2024 +0800

    [improve][fn] Pass FunctionDetails to Go instance (#22350)
---
 pulsar-function-go/conf/conf.go                    |   2 +
 pulsar-function-go/pf/instanceConf.go              |  11 ++
 pulsar-function-go/pf/instanceConf_test.go         | 207 +++++++++++++++++++++
 .../functions/instance/go/GoInstanceConfig.java    |   2 +
 .../pulsar/functions/runtime/RuntimeUtils.java     |   6 +
 .../runtime/kubernetes/KubernetesRuntimeTest.java  |   8 +
 6 files changed, 236 insertions(+)

diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
index 03513648fac..1442a0f865f 100644
--- a/pulsar-function-go/conf/conf.go
+++ b/pulsar-function-go/conf/conf.go
@@ -91,6 +91,8 @@ type Conf struct {
        UserConfig                  string `json:"userConfig" yaml:"userConfig"`
        //metrics config
        MetricsPort int `json:"metricsPort" yaml:"metricsPort"`
+       // FunctionDetails
+       FunctionDetails string `json:"functionDetails" yaml:"functionDetails"`
 }
 
 var (
diff --git a/pulsar-function-go/pf/instanceConf.go 
b/pulsar-function-go/pf/instanceConf.go
index 4cb60dd258a..844a2bc9b89 100644
--- a/pulsar-function-go/pf/instanceConf.go
+++ b/pulsar-function-go/pf/instanceConf.go
@@ -25,7 +25,9 @@ import (
        "time"
 
        "github.com/apache/pulsar/pulsar-function-go/conf"
+       log "github.com/apache/pulsar/pulsar-function-go/logutil"
        pb "github.com/apache/pulsar/pulsar-function-go/pb"
+       "google.golang.org/protobuf/encoding/protojson"
 )
 
 // This is the config passed to the Golang Instance. Contains all the 
information
@@ -122,6 +124,15 @@ func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf 
{
                tlsAllowInsecure:        cfg.TLSAllowInsecureConnection,
                tlsHostnameVerification: cfg.TLSHostnameVerificationEnable,
        }
+       // parse the raw function details and ignore the unmarshal 
error(fallback to original way)
+       if cfg.FunctionDetails != "" {
+               functionDetails := pb.FunctionDetails{}
+               if err := protojson.Unmarshal([]byte(cfg.FunctionDetails), 
&functionDetails); err != nil {
+                       log.Errorf("Failed to unmarshal function details: %v", 
err)
+               } else {
+                       instanceConf.funcDetails = functionDetails
+               }
+       }
 
        if instanceConf.funcDetails.ProcessingGuarantees == 
pb.ProcessingGuarantees_EFFECTIVELY_ONCE {
                panic("Go instance current not support EFFECTIVELY_ONCE 
processing guarantees.")
diff --git a/pulsar-function-go/pf/instanceConf_test.go 
b/pulsar-function-go/pf/instanceConf_test.go
index 02aef913ebc..cc5f46e2fe1 100644
--- a/pulsar-function-go/pf/instanceConf_test.go
+++ b/pulsar-function-go/pf/instanceConf_test.go
@@ -20,6 +20,7 @@
 package pf
 
 import (
+       "fmt"
        "testing"
 
        cfg "github.com/apache/pulsar/pulsar-function-go/conf"
@@ -113,3 +114,209 @@ func TestInstanceConf_Fail(t *testing.T) {
                newInstanceConfWithConf(&cfg.Conf{ProcessingGuarantees: 3})
        }, "Should have a panic")
 }
+
+func TestInstanceConf_WithDetails(t *testing.T) {
+       cfg := &cfg.Conf{
+               FunctionDetails: 
`{"tenant":"public","namespace":"default","name":"test-function","className":"process",
+"logTopic":"test-logs","userConfig":"{\"key1\":\"value1\"}","runtime":"GO","autoAck":true,"parallelism":1,
+"source":{"configs":"{\"username\":\"admin\"}","typeClassName":"string","timeoutMs":"15000",
+"subscriptionName":"test-subscription","inputSpecs":{"input":{"schemaType":"avro","receiverQueueSize":{"value":1000},
+"schemaProperties":{"schema_prop1":"schema1"},"consumerProperties":{"consumer_prop1":"consumer1"},"cryptoSpec":
+{"cryptoKeyReaderClassName":"key-reader","producerCryptoFailureAction":"SEND","consumerCryptoFailureAction":"CONSUME"}}}
+,"negativeAckRedeliveryDelayMs":"15000"},"sink":{"configs":"{\"password\":\"admin\"}","topic":"test-output",
+"typeClassName":"string","schemaType":"avro","producerSpec":{"maxPendingMessages":2000,"useThreadLocalProducers":true,
+"cryptoSpec":{"cryptoKeyReaderClassName":"key-reader","producerCryptoFailureAction":"DISCARD"},
+"batchBuilder":"DEFAULT"}},"resources":{"cpu":2.0,"ram":"1024","disk":"1024"},"packageUrl":"/path/to/package",
+"retryDetails":{"maxMessageRetries":3,"deadLetterTopic":"test-dead-letter-topic"},"secretsMap":
+"{\"secret1\":\"secret-value1\"}","runtimeFlags":"flags","componentType":"FUNCTION","customRuntimeOptions":"options",
+"retainOrdering":true,"retainKeyOrdering":true,"subscriptionPosition":"EARLIEST"}`,
+       }
+       instanceConf := newInstanceConfWithConf(cfg)
+       assert.Equal(t, "public", instanceConf.funcDetails.Tenant)
+       assert.Equal(t, "default", instanceConf.funcDetails.Namespace)
+       assert.Equal(t, "test-function", instanceConf.funcDetails.Name)
+       assert.Equal(t, "process", instanceConf.funcDetails.ClassName)
+       assert.Equal(t, "test-logs", instanceConf.funcDetails.LogTopic)
+       assert.Equal(t, pb.ProcessingGuarantees_ATLEAST_ONCE, 
instanceConf.funcDetails.ProcessingGuarantees)
+       assert.Equal(t, `{"key1":"value1"}`, 
instanceConf.funcDetails.UserConfig)
+       assert.Equal(t, `{"secret1":"secret-value1"}`, 
instanceConf.funcDetails.SecretsMap)
+       assert.Equal(t, pb.FunctionDetails_GO, instanceConf.funcDetails.Runtime)
+
+       assert.Equal(t, true, instanceConf.funcDetails.AutoAck)
+       assert.Equal(t, int32(1), instanceConf.funcDetails.Parallelism)
+
+       sourceSpec := pb.SourceSpec{
+               TypeClassName:                "string",
+               TimeoutMs:                    15000,
+               Configs:                      `{"username":"admin"}`,
+               SubscriptionName:             "test-subscription",
+               SubscriptionType:             pb.SubscriptionType_SHARED,
+               NegativeAckRedeliveryDelayMs: 15000,
+               InputSpecs: map[string]*pb.ConsumerSpec{
+                       "input": {
+                               SchemaType: "avro",
+                               SchemaProperties: map[string]string{
+                                       "schema_prop1": "schema1",
+                               },
+                               ConsumerProperties: map[string]string{
+                                       "consumer_prop1": "consumer1",
+                               },
+                               ReceiverQueueSize: 
&pb.ConsumerSpec_ReceiverQueueSize{
+                                       Value: 1000,
+                               },
+                               CryptoSpec: &pb.CryptoSpec{
+                                       CryptoKeyReaderClassName:    
"key-reader",
+                                       ProducerCryptoFailureAction: 
pb.CryptoSpec_SEND,
+                                       ConsumerCryptoFailureAction: 
pb.CryptoSpec_CONSUME,
+                               },
+                       },
+               },
+       }
+       assert.Equal(t, sourceSpec.String(), 
instanceConf.funcDetails.Source.String())
+
+       sinkSpec := pb.SinkSpec{
+               TypeClassName: "string",
+               Topic:         "test-output",
+               Configs:       `{"password":"admin"}`,
+               SchemaType:    "avro",
+               ProducerSpec: &pb.ProducerSpec{
+                       MaxPendingMessages:      2000,
+                       UseThreadLocalProducers: true,
+                       CryptoSpec: &pb.CryptoSpec{
+                               CryptoKeyReaderClassName:    "key-reader",
+                               ProducerCryptoFailureAction: 
pb.CryptoSpec_DISCARD,
+                               ConsumerCryptoFailureAction: pb.CryptoSpec_FAIL,
+                       },
+                       BatchBuilder: "DEFAULT",
+               },
+       }
+       assert.Equal(t, sinkSpec.String(), 
instanceConf.funcDetails.Sink.String())
+
+       resource := pb.Resources{
+               Cpu:  2.0,
+               Ram:  1024,
+               Disk: 1024,
+       }
+       assert.Equal(t, resource.String(), 
instanceConf.funcDetails.Resources.String())
+       assert.Equal(t, "/path/to/package", instanceConf.funcDetails.PackageUrl)
+
+       retryDetails := pb.RetryDetails{
+               MaxMessageRetries: 3,
+               DeadLetterTopic:   "test-dead-letter-topic",
+       }
+       assert.Equal(t, retryDetails.String(), 
instanceConf.funcDetails.RetryDetails.String())
+
+       assert.Equal(t, "flags", instanceConf.funcDetails.RuntimeFlags)
+       assert.Equal(t, pb.FunctionDetails_FUNCTION, 
instanceConf.funcDetails.ComponentType)
+       assert.Equal(t, "options", 
instanceConf.funcDetails.CustomRuntimeOptions)
+       assert.Equal(t, "", instanceConf.funcDetails.Builtin)
+       assert.Equal(t, true, instanceConf.funcDetails.RetainOrdering)
+       assert.Equal(t, true, instanceConf.funcDetails.RetainKeyOrdering)
+       assert.Equal(t, pb.SubscriptionPosition_EARLIEST, 
instanceConf.funcDetails.SubscriptionPosition)
+}
+
+func TestInstanceConf_WithEmptyOrInvalidDetails(t *testing.T) {
+       testCases := []struct {
+               name    string
+               details string
+       }{
+               {
+                       name:    "empty details",
+                       details: "",
+               },
+               {
+                       name:    "invalid details",
+                       details: "error",
+               },
+       }
+
+       for i, testCase := range testCases {
+
+               t.Run(fmt.Sprintf("testCase[%d] %s", i, testCase.name), func(t 
*testing.T) {
+                       cfg := &cfg.Conf{
+                               FunctionDetails:      testCase.details,
+                               Tenant:               "public",
+                               NameSpace:            "default",
+                               Name:                 "test-function",
+                               LogTopic:             "test-logs",
+                               ProcessingGuarantees: 0,
+                               UserConfig:           `{"key1":"value1"}`,
+                               SecretsMap:           
`{"secret1":"secret-value1"}`,
+                               Runtime:              3,
+                               AutoACK:              true,
+                               Parallelism:          1,
+                               SubscriptionType:     1,
+                               TimeoutMs:            15000,
+                               SubscriptionName:     "test-subscription",
+                               CleanupSubscription:  false,
+                               SubscriptionPosition: 0,
+                               SinkSpecTopic:        "test-output",
+                               SinkSchemaType:       "avro",
+                               Cpu:                  2.0,
+                               Ram:                  1024,
+                               Disk:                 1024,
+                               MaxMessageRetries:    3,
+                               DeadLetterTopic:      "test-dead-letter-topic",
+                               SourceInputSpecs: map[string]string{
+                                       "input": 
`{"schemaType":"avro","receiverQueueSize":{"value":1000},"schemaProperties":
+{"schema_prop1":"schema1"},"consumerProperties":{"consumer_prop1":"consumer1"}}`,
+                               },
+                       }
+                       instanceConf := newInstanceConfWithConf(cfg)
+
+                       assert.Equal(t, "public", 
instanceConf.funcDetails.Tenant)
+                       assert.Equal(t, "default", 
instanceConf.funcDetails.Namespace)
+                       assert.Equal(t, "test-function", 
instanceConf.funcDetails.Name)
+                       assert.Equal(t, "test-logs", 
instanceConf.funcDetails.LogTopic)
+                       assert.Equal(t, pb.ProcessingGuarantees_ATLEAST_ONCE, 
instanceConf.funcDetails.ProcessingGuarantees)
+                       assert.Equal(t, `{"key1":"value1"}`, 
instanceConf.funcDetails.UserConfig)
+                       assert.Equal(t, `{"secret1":"secret-value1"}`, 
instanceConf.funcDetails.SecretsMap)
+                       assert.Equal(t, pb.FunctionDetails_GO, 
instanceConf.funcDetails.Runtime)
+
+                       assert.Equal(t, true, instanceConf.funcDetails.AutoAck)
+                       assert.Equal(t, int32(1), 
instanceConf.funcDetails.Parallelism)
+
+                       sourceSpec := pb.SourceSpec{
+                               SubscriptionType:     
pb.SubscriptionType_FAILOVER,
+                               TimeoutMs:            15000,
+                               SubscriptionName:     "test-subscription",
+                               CleanupSubscription:  false,
+                               SubscriptionPosition: 
pb.SubscriptionPosition_LATEST,
+                               InputSpecs: map[string]*pb.ConsumerSpec{
+                                       "input": {
+                                               SchemaType: "avro",
+                                               SchemaProperties: 
map[string]string{
+                                                       "schema_prop1": 
"schema1",
+                                               },
+                                               ConsumerProperties: 
map[string]string{
+                                                       "consumer_prop1": 
"consumer1",
+                                               },
+                                               ReceiverQueueSize: 
&pb.ConsumerSpec_ReceiverQueueSize{
+                                                       Value: 1000,
+                                               },
+                                       },
+                               },
+                       }
+                       assert.Equal(t, sourceSpec.String(), 
instanceConf.funcDetails.Source.String())
+
+                       sinkSpec := pb.SinkSpec{
+                               Topic:      "test-output",
+                               SchemaType: "avro",
+                       }
+                       assert.Equal(t, sinkSpec.String(), 
instanceConf.funcDetails.Sink.String())
+
+                       resource := pb.Resources{
+                               Cpu:  2.0,
+                               Ram:  1024,
+                               Disk: 1024,
+                       }
+                       assert.Equal(t, resource.String(), 
instanceConf.funcDetails.Resources.String())
+
+                       retryDetails := pb.RetryDetails{
+                               MaxMessageRetries: 3,
+                               DeadLetterTopic:   "test-dead-letter-topic",
+                       }
+                       assert.Equal(t, retryDetails.String(), 
instanceConf.funcDetails.RetryDetails.String())
+               })
+       }
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
index 599b6ed8f4f..467ec749213 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
@@ -83,4 +83,6 @@ public class GoInstanceConfig {
     private String deadLetterTopic = "";
 
     private int metricsPort;
+
+    private String functionDetails = "";
 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 0214b18fb23..6160626c958 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -139,6 +139,12 @@ public class RuntimeUtils {
         final List<String> args = new LinkedList<>();
         GoInstanceConfig goInstanceConfig = new GoInstanceConfig();
 
+        // pass the raw functino details directly so that we don't need to 
assemble the `instanceConf.funcDetails`
+        // manually in Go instance
+        String functionDetails =
+                
JsonFormat.printer().omittingInsignificantWhitespace().print(instanceConfig.getFunctionDetails());
+        goInstanceConfig.setFunctionDetails(functionDetails);
+
         if (instanceConfig.getClusterName() != null) {
             goInstanceConfig.setClusterName(instanceConfig.getClusterName());
         }
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 02f3c0d23fb..980f763f7c3 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -1006,6 +1006,14 @@ public class KubernetesRuntimeTest {
         assertEquals(goInstanceConfig.get("expectedHealthCheckInterval"), 0);
         assertEquals(goInstanceConfig.get("deadLetterTopic"), "");
         assertEquals(goInstanceConfig.get("metricsPort"), 4331);
+        assertEquals(goInstanceConfig.get("functionDetails"), 
"{\"tenant\":\"tenant\",\"namespace\":\"namespace\","
+                + 
"\"name\":\"container\",\"className\":\"org.apache.pulsar.functions.utils.functioncache"
+                + 
".AddFunction\",\"logTopic\":\"container-log\",\"runtime\":\"GO\",\"source\":{\"className\":\"org"
+                + 
".pulsar.pulsar.TestSource\",\"subscriptionType\":\"FAILOVER\",\"typeClassName\":\"java.lang"
+                + 
".String\",\"inputSpecs\":{\"test_src\":{}}},\"sink\":{\"className\":\"org.pulsar.pulsar"
+                + 
".TestSink\",\"topic\":\"container-output\",\"serDeClassName\":\"org.apache.pulsar.functions"
+                + 
".runtime.serde.Utf8Serializer\",\"typeClassName\":\"java.lang.String\"},\"resources\":{\"cpu\":1"
+                + ".0,\"ram\":\"1000\",\"disk\":\"10000\"}}");
 
         // check padding and xmx
         V1Container containerSpec = 
container.getFunctionContainer(Collections.emptyList(), RESOURCES);

Reply via email to