This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7315aeb6258 [improve][fn] Pass FunctionDetails to Go instance (#22350)
7315aeb6258 is described below
commit 7315aeb6258b7adc9d874268d50acb95ffc0cf2b
Author: jiangpengcheng <[email protected]>
AuthorDate: Fri Mar 29 13:47:29 2024 +0800
[improve][fn] Pass FunctionDetails to Go instance (#22350)
---
pulsar-function-go/conf/conf.go | 2 +
pulsar-function-go/pf/instanceConf.go | 11 ++
pulsar-function-go/pf/instanceConf_test.go | 207 +++++++++++++++++++++
.../functions/instance/go/GoInstanceConfig.java | 2 +
.../pulsar/functions/runtime/RuntimeUtils.java | 6 +
.../runtime/kubernetes/KubernetesRuntimeTest.java | 8 +
6 files changed, 236 insertions(+)
diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
index 03513648fac..1442a0f865f 100644
--- a/pulsar-function-go/conf/conf.go
+++ b/pulsar-function-go/conf/conf.go
@@ -91,6 +91,8 @@ type Conf struct {
UserConfig string `json:"userConfig" yaml:"userConfig"`
//metrics config
MetricsPort int `json:"metricsPort" yaml:"metricsPort"`
+ // FunctionDetails
+ FunctionDetails string `json:"functionDetails" yaml:"functionDetails"`
}
var (
diff --git a/pulsar-function-go/pf/instanceConf.go
b/pulsar-function-go/pf/instanceConf.go
index 4cb60dd258a..844a2bc9b89 100644
--- a/pulsar-function-go/pf/instanceConf.go
+++ b/pulsar-function-go/pf/instanceConf.go
@@ -25,7 +25,9 @@ import (
"time"
"github.com/apache/pulsar/pulsar-function-go/conf"
+ log "github.com/apache/pulsar/pulsar-function-go/logutil"
pb "github.com/apache/pulsar/pulsar-function-go/pb"
+ "google.golang.org/protobuf/encoding/protojson"
)
// This is the config passed to the Golang Instance. Contains all the
information
@@ -122,6 +124,15 @@ func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf
{
tlsAllowInsecure: cfg.TLSAllowInsecureConnection,
tlsHostnameVerification: cfg.TLSHostnameVerificationEnable,
}
+ // parse the raw function details and ignore the unmarshal
error(fallback to original way)
+ if cfg.FunctionDetails != "" {
+ functionDetails := pb.FunctionDetails{}
+ if err := protojson.Unmarshal([]byte(cfg.FunctionDetails),
&functionDetails); err != nil {
+ log.Errorf("Failed to unmarshal function details: %v",
err)
+ } else {
+ instanceConf.funcDetails = functionDetails
+ }
+ }
if instanceConf.funcDetails.ProcessingGuarantees ==
pb.ProcessingGuarantees_EFFECTIVELY_ONCE {
panic("Go instance current not support EFFECTIVELY_ONCE
processing guarantees.")
diff --git a/pulsar-function-go/pf/instanceConf_test.go
b/pulsar-function-go/pf/instanceConf_test.go
index 02aef913ebc..cc5f46e2fe1 100644
--- a/pulsar-function-go/pf/instanceConf_test.go
+++ b/pulsar-function-go/pf/instanceConf_test.go
@@ -20,6 +20,7 @@
package pf
import (
+ "fmt"
"testing"
cfg "github.com/apache/pulsar/pulsar-function-go/conf"
@@ -113,3 +114,209 @@ func TestInstanceConf_Fail(t *testing.T) {
newInstanceConfWithConf(&cfg.Conf{ProcessingGuarantees: 3})
}, "Should have a panic")
}
+
+func TestInstanceConf_WithDetails(t *testing.T) {
+ cfg := &cfg.Conf{
+ FunctionDetails:
`{"tenant":"public","namespace":"default","name":"test-function","className":"process",
+"logTopic":"test-logs","userConfig":"{\"key1\":\"value1\"}","runtime":"GO","autoAck":true,"parallelism":1,
+"source":{"configs":"{\"username\":\"admin\"}","typeClassName":"string","timeoutMs":"15000",
+"subscriptionName":"test-subscription","inputSpecs":{"input":{"schemaType":"avro","receiverQueueSize":{"value":1000},
+"schemaProperties":{"schema_prop1":"schema1"},"consumerProperties":{"consumer_prop1":"consumer1"},"cryptoSpec":
+{"cryptoKeyReaderClassName":"key-reader","producerCryptoFailureAction":"SEND","consumerCryptoFailureAction":"CONSUME"}}}
+,"negativeAckRedeliveryDelayMs":"15000"},"sink":{"configs":"{\"password\":\"admin\"}","topic":"test-output",
+"typeClassName":"string","schemaType":"avro","producerSpec":{"maxPendingMessages":2000,"useThreadLocalProducers":true,
+"cryptoSpec":{"cryptoKeyReaderClassName":"key-reader","producerCryptoFailureAction":"DISCARD"},
+"batchBuilder":"DEFAULT"}},"resources":{"cpu":2.0,"ram":"1024","disk":"1024"},"packageUrl":"/path/to/package",
+"retryDetails":{"maxMessageRetries":3,"deadLetterTopic":"test-dead-letter-topic"},"secretsMap":
+"{\"secret1\":\"secret-value1\"}","runtimeFlags":"flags","componentType":"FUNCTION","customRuntimeOptions":"options",
+"retainOrdering":true,"retainKeyOrdering":true,"subscriptionPosition":"EARLIEST"}`,
+ }
+ instanceConf := newInstanceConfWithConf(cfg)
+ assert.Equal(t, "public", instanceConf.funcDetails.Tenant)
+ assert.Equal(t, "default", instanceConf.funcDetails.Namespace)
+ assert.Equal(t, "test-function", instanceConf.funcDetails.Name)
+ assert.Equal(t, "process", instanceConf.funcDetails.ClassName)
+ assert.Equal(t, "test-logs", instanceConf.funcDetails.LogTopic)
+ assert.Equal(t, pb.ProcessingGuarantees_ATLEAST_ONCE,
instanceConf.funcDetails.ProcessingGuarantees)
+ assert.Equal(t, `{"key1":"value1"}`,
instanceConf.funcDetails.UserConfig)
+ assert.Equal(t, `{"secret1":"secret-value1"}`,
instanceConf.funcDetails.SecretsMap)
+ assert.Equal(t, pb.FunctionDetails_GO, instanceConf.funcDetails.Runtime)
+
+ assert.Equal(t, true, instanceConf.funcDetails.AutoAck)
+ assert.Equal(t, int32(1), instanceConf.funcDetails.Parallelism)
+
+ sourceSpec := pb.SourceSpec{
+ TypeClassName: "string",
+ TimeoutMs: 15000,
+ Configs: `{"username":"admin"}`,
+ SubscriptionName: "test-subscription",
+ SubscriptionType: pb.SubscriptionType_SHARED,
+ NegativeAckRedeliveryDelayMs: 15000,
+ InputSpecs: map[string]*pb.ConsumerSpec{
+ "input": {
+ SchemaType: "avro",
+ SchemaProperties: map[string]string{
+ "schema_prop1": "schema1",
+ },
+ ConsumerProperties: map[string]string{
+ "consumer_prop1": "consumer1",
+ },
+ ReceiverQueueSize:
&pb.ConsumerSpec_ReceiverQueueSize{
+ Value: 1000,
+ },
+ CryptoSpec: &pb.CryptoSpec{
+ CryptoKeyReaderClassName:
"key-reader",
+ ProducerCryptoFailureAction:
pb.CryptoSpec_SEND,
+ ConsumerCryptoFailureAction:
pb.CryptoSpec_CONSUME,
+ },
+ },
+ },
+ }
+ assert.Equal(t, sourceSpec.String(),
instanceConf.funcDetails.Source.String())
+
+ sinkSpec := pb.SinkSpec{
+ TypeClassName: "string",
+ Topic: "test-output",
+ Configs: `{"password":"admin"}`,
+ SchemaType: "avro",
+ ProducerSpec: &pb.ProducerSpec{
+ MaxPendingMessages: 2000,
+ UseThreadLocalProducers: true,
+ CryptoSpec: &pb.CryptoSpec{
+ CryptoKeyReaderClassName: "key-reader",
+ ProducerCryptoFailureAction:
pb.CryptoSpec_DISCARD,
+ ConsumerCryptoFailureAction: pb.CryptoSpec_FAIL,
+ },
+ BatchBuilder: "DEFAULT",
+ },
+ }
+ assert.Equal(t, sinkSpec.String(),
instanceConf.funcDetails.Sink.String())
+
+ resource := pb.Resources{
+ Cpu: 2.0,
+ Ram: 1024,
+ Disk: 1024,
+ }
+ assert.Equal(t, resource.String(),
instanceConf.funcDetails.Resources.String())
+ assert.Equal(t, "/path/to/package", instanceConf.funcDetails.PackageUrl)
+
+ retryDetails := pb.RetryDetails{
+ MaxMessageRetries: 3,
+ DeadLetterTopic: "test-dead-letter-topic",
+ }
+ assert.Equal(t, retryDetails.String(),
instanceConf.funcDetails.RetryDetails.String())
+
+ assert.Equal(t, "flags", instanceConf.funcDetails.RuntimeFlags)
+ assert.Equal(t, pb.FunctionDetails_FUNCTION,
instanceConf.funcDetails.ComponentType)
+ assert.Equal(t, "options",
instanceConf.funcDetails.CustomRuntimeOptions)
+ assert.Equal(t, "", instanceConf.funcDetails.Builtin)
+ assert.Equal(t, true, instanceConf.funcDetails.RetainOrdering)
+ assert.Equal(t, true, instanceConf.funcDetails.RetainKeyOrdering)
+ assert.Equal(t, pb.SubscriptionPosition_EARLIEST,
instanceConf.funcDetails.SubscriptionPosition)
+}
+
+func TestInstanceConf_WithEmptyOrInvalidDetails(t *testing.T) {
+ testCases := []struct {
+ name string
+ details string
+ }{
+ {
+ name: "empty details",
+ details: "",
+ },
+ {
+ name: "invalid details",
+ details: "error",
+ },
+ }
+
+ for i, testCase := range testCases {
+
+ t.Run(fmt.Sprintf("testCase[%d] %s", i, testCase.name), func(t
*testing.T) {
+ cfg := &cfg.Conf{
+ FunctionDetails: testCase.details,
+ Tenant: "public",
+ NameSpace: "default",
+ Name: "test-function",
+ LogTopic: "test-logs",
+ ProcessingGuarantees: 0,
+ UserConfig: `{"key1":"value1"}`,
+ SecretsMap:
`{"secret1":"secret-value1"}`,
+ Runtime: 3,
+ AutoACK: true,
+ Parallelism: 1,
+ SubscriptionType: 1,
+ TimeoutMs: 15000,
+ SubscriptionName: "test-subscription",
+ CleanupSubscription: false,
+ SubscriptionPosition: 0,
+ SinkSpecTopic: "test-output",
+ SinkSchemaType: "avro",
+ Cpu: 2.0,
+ Ram: 1024,
+ Disk: 1024,
+ MaxMessageRetries: 3,
+ DeadLetterTopic: "test-dead-letter-topic",
+ SourceInputSpecs: map[string]string{
+ "input":
`{"schemaType":"avro","receiverQueueSize":{"value":1000},"schemaProperties":
+{"schema_prop1":"schema1"},"consumerProperties":{"consumer_prop1":"consumer1"}}`,
+ },
+ }
+ instanceConf := newInstanceConfWithConf(cfg)
+
+ assert.Equal(t, "public",
instanceConf.funcDetails.Tenant)
+ assert.Equal(t, "default",
instanceConf.funcDetails.Namespace)
+ assert.Equal(t, "test-function",
instanceConf.funcDetails.Name)
+ assert.Equal(t, "test-logs",
instanceConf.funcDetails.LogTopic)
+ assert.Equal(t, pb.ProcessingGuarantees_ATLEAST_ONCE,
instanceConf.funcDetails.ProcessingGuarantees)
+ assert.Equal(t, `{"key1":"value1"}`,
instanceConf.funcDetails.UserConfig)
+ assert.Equal(t, `{"secret1":"secret-value1"}`,
instanceConf.funcDetails.SecretsMap)
+ assert.Equal(t, pb.FunctionDetails_GO,
instanceConf.funcDetails.Runtime)
+
+ assert.Equal(t, true, instanceConf.funcDetails.AutoAck)
+ assert.Equal(t, int32(1),
instanceConf.funcDetails.Parallelism)
+
+ sourceSpec := pb.SourceSpec{
+ SubscriptionType:
pb.SubscriptionType_FAILOVER,
+ TimeoutMs: 15000,
+ SubscriptionName: "test-subscription",
+ CleanupSubscription: false,
+ SubscriptionPosition:
pb.SubscriptionPosition_LATEST,
+ InputSpecs: map[string]*pb.ConsumerSpec{
+ "input": {
+ SchemaType: "avro",
+ SchemaProperties:
map[string]string{
+ "schema_prop1":
"schema1",
+ },
+ ConsumerProperties:
map[string]string{
+ "consumer_prop1":
"consumer1",
+ },
+ ReceiverQueueSize:
&pb.ConsumerSpec_ReceiverQueueSize{
+ Value: 1000,
+ },
+ },
+ },
+ }
+ assert.Equal(t, sourceSpec.String(),
instanceConf.funcDetails.Source.String())
+
+ sinkSpec := pb.SinkSpec{
+ Topic: "test-output",
+ SchemaType: "avro",
+ }
+ assert.Equal(t, sinkSpec.String(),
instanceConf.funcDetails.Sink.String())
+
+ resource := pb.Resources{
+ Cpu: 2.0,
+ Ram: 1024,
+ Disk: 1024,
+ }
+ assert.Equal(t, resource.String(),
instanceConf.funcDetails.Resources.String())
+
+ retryDetails := pb.RetryDetails{
+ MaxMessageRetries: 3,
+ DeadLetterTopic: "test-dead-letter-topic",
+ }
+ assert.Equal(t, retryDetails.String(),
instanceConf.funcDetails.RetryDetails.String())
+ })
+ }
+}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
index 599b6ed8f4f..467ec749213 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
@@ -83,4 +83,6 @@ public class GoInstanceConfig {
private String deadLetterTopic = "";
private int metricsPort;
+
+ private String functionDetails = "";
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 0214b18fb23..6160626c958 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -139,6 +139,12 @@ public class RuntimeUtils {
final List<String> args = new LinkedList<>();
GoInstanceConfig goInstanceConfig = new GoInstanceConfig();
+ // pass the raw functino details directly so that we don't need to
assemble the `instanceConf.funcDetails`
+ // manually in Go instance
+ String functionDetails =
+
JsonFormat.printer().omittingInsignificantWhitespace().print(instanceConfig.getFunctionDetails());
+ goInstanceConfig.setFunctionDetails(functionDetails);
+
if (instanceConfig.getClusterName() != null) {
goInstanceConfig.setClusterName(instanceConfig.getClusterName());
}
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 02f3c0d23fb..980f763f7c3 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -1006,6 +1006,14 @@ public class KubernetesRuntimeTest {
assertEquals(goInstanceConfig.get("expectedHealthCheckInterval"), 0);
assertEquals(goInstanceConfig.get("deadLetterTopic"), "");
assertEquals(goInstanceConfig.get("metricsPort"), 4331);
+ assertEquals(goInstanceConfig.get("functionDetails"),
"{\"tenant\":\"tenant\",\"namespace\":\"namespace\","
+ +
"\"name\":\"container\",\"className\":\"org.apache.pulsar.functions.utils.functioncache"
+ +
".AddFunction\",\"logTopic\":\"container-log\",\"runtime\":\"GO\",\"source\":{\"className\":\"org"
+ +
".pulsar.pulsar.TestSource\",\"subscriptionType\":\"FAILOVER\",\"typeClassName\":\"java.lang"
+ +
".String\",\"inputSpecs\":{\"test_src\":{}}},\"sink\":{\"className\":\"org.pulsar.pulsar"
+ +
".TestSink\",\"topic\":\"container-output\",\"serDeClassName\":\"org.apache.pulsar.functions"
+ +
".runtime.serde.Utf8Serializer\",\"typeClassName\":\"java.lang.String\"},\"resources\":{\"cpu\":1"
+ + ".0,\"ram\":\"1000\",\"disk\":\"10000\"}}");
// check padding and xmx
V1Container containerSpec =
container.getFunctionContainer(Collections.emptyList(), RESOURCES);