wolfstudy commented on issue #3854: [issue #3767] support go function for pulsar URL: https://github.com/apache/pulsar/pull/3854#issuecomment-474240003 @jerrypeng > java -cp /Users/jerrypeng/workspace/incubator-pulsar/pulsar-functions/runtime-all/target/java-instance.jar:/Users/jerrypeng/workspace/incubator-pulsar/instances/deps/* -Dpulsar.functions.java.instance.jar=/Users/jerrypeng/workspace/incubator-pulsar/pulsar-functions/runtime-all/target/java-instance.jar -Dpulsar.functions.extra.dependencies.dir=/Users/jerrypeng/workspace/incubator-pulsar/instances/deps -Dlog4j.configurationFile=java_instance_log4j2.yml -Dpulsar.function.log.dir=/Users/jerrypeng/workspace/incubator-pulsar/logs/functions/public/default/java-jerry-test -Dpulsar.function.log.file=java-jerry-test-1 -Xmx1073741824 org.apache.pulsar.functions.runtime.JavaInstanceMain --jar /tmp/pulsar_functions/public/default/java-jerry-test/1/pulsar-functions-api-examples.jar --instance_id 1 --function_id ed07d96a-ea90-4212-b9bb-440ac825f26d --function_version 1dc83fa9-5a72-4640-8b57-491bee7860e4 --function_details '{"tenant":"public","namespace":"default","name":"java-jerry-test","className":"org.apache.pulsar.functions.api.examples.ExclamationFunction","autoAck":true,"parallelism":3,"source":{"typeClassName":"java.lang.String","timeoutMs":"60000","inputSpecs":{"persistent://public/default/java-input":{"schemaType":"NONE","receiverQueueSize":{"value":100}}},"cleanupSubscription":true},"sink":{"topic":"persistent://public/default/java-output","typeClassName":"java.lang.String"},"resources":{"cpu":1.0,"ram":"1073741824","disk":"10737418240"}}' --pulsar_serviceurl pulsar://127.0.0.1:6650 --use_tls false --tls_allow_insecure false --hostname_verification_enabled false --max_buffered_tuples 1024 --port 53904 --metrics_port 53905 --state_storage_serviceurl bk://127.0.0.1:4181 --expected_healthcheck_interval 30 --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider --cluster_name standalone In the go function, the above parameters will be provided in the form of a config file. ### specific examples are as follows: first configure the list of parameters mentioned above in `conf.yml`. ``` pulsarServiceURL: "pulsar://localhost:6650" intopics: [topic-1, topic-2] instanceID: 101 funcID: "pulsar-function" funcVersion: "1.0.0" name: "go-function" maxBufTuples: 10 port: 8091 clusterName: "pulsar-function-go" isRegexPattern: false receiverQueueVal: 10 inputSpecs: topic-01 autoAck: true sinkSpec: topic-02 killAfterIdleMs: 50000 ``` then, users can write the code they need, eg: ``` package main import ( "context" "github.com/apache/pulsar/pulsar-function-go/pf" ) func HandleResponse(ctx context.Context, in []byte) ([]byte, error) { res := append(in, 110) return res, nil } func main() { pf.Start(HandleResponse) } ``` To facilitate testing, we can write a producer program and enter data into the input topics. eg: ``` package main import ( "context" "fmt" "log" "github.com/apache/pulsar/pulsar-client-go/pulsar" ) func main() { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", }) if err != nil { log.Fatal(err) return } defer client.Close() producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "topic-01", }) defer producer.Close() ctx := context.Background() for i := 0; i < 10; i++ { if err := producer.Send(ctx, pulsar.ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), }); err != nil { log.Fatal(err) } } } ``` Now, the path to `conf.yml` is fixed, and later I will modify it to be configurable by specifying parameters.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
