This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f229d3b [issue #3767] support go function for pulsar (#3854)
f229d3b is described below
commit f229d3b34c6d6d26ee3ddf32bdc0e5f568d92bfd
Author: 冉小龙 <[email protected]>
AuthorDate: Tue Apr 9 16:09:46 2019 +0800
[issue #3767] support go function for pulsar (#3854)
Master Issue: #3767
### Motivation
At present, go function only supports the simplest function. Input and
output only allow []byte. For details, refer to:
[PIP32](https://github.com/apache/pulsar/wiki/PIP-32%3A-Go-Function-API%2C-Instance-and-LocalRun)
---
pulsar-function-go/README.md | 0
pulsar-function-go/conf/conf.go | 109 +++
pulsar-function-go/conf/conf.yaml | 57 ++
pulsar-function-go/examples/contextFunc.go | 38 +
pulsar-function-go/examples/hello.go | 34 +
pulsar-function-go/examples/inputFunc.go | 36 +
pulsar-function-go/examples/outputFunc.go | 35 +
pulsar-function-go/examples/test/consumer.go | 61 ++
pulsar-function-go/examples/test/producer.go | 57 ++
pulsar-function-go/go.mod | 11 +
pulsar-function-go/go.sum | 36 +
pulsar-function-go/pb/Function.pb.go | 1025 ++++++++++++++++++++
pulsar-function-go/pb/InstanceCommunication.pb.go | 648 +++++++++++++
pulsar-function-go/pb/Request.pb.go | 153 +++
pulsar-function-go/pb/doc.go | 34 +
pulsar-function-go/pb/generate.sh | 81 ++
pulsar-function-go/pf/context.go | 98 ++
pulsar-function-go/pf/context_test.go | 45 +
pulsar-function-go/pf/function.go | 172 ++++
pulsar-function-go/pf/function_test.go | 182 ++++
pulsar-function-go/pf/instance.go | 275 ++++++
pulsar-function-go/pf/instanceConf.go | 104 ++
pulsar-function-go/pf/instanceConf_test.go | 32 +
pulsar-function-go/pf/util.go | 41 +
pulsar-function-go/pf/util_test.go | 52 +
.../proto/src/main/proto/Function.proto | 21 +-
26 files changed, 3427 insertions(+), 10 deletions(-)
diff --git a/pulsar-function-go/README.md b/pulsar-function-go/README.md
new file mode 100644
index 0000000..e69de29
diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
new file mode 100644
index 0000000..d784855
--- /dev/null
+++ b/pulsar-function-go/conf/conf.go
@@ -0,0 +1,109 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package conf
+
+import (
+ "flag"
+ "io/ioutil"
+ "os"
+ "os/user"
+ "time"
+
+ "github.com/apache/pulsar/pulsar-function-go/log"
+ "gopkg.in/yaml.v2"
+)
+
+const ConfigPath = "github.com/apache/pulsar/pulsar-function-go/conf/conf.yaml"
+
+type Conf struct {
+ PulsarServiceURL string `yaml:"pulsarServiceURL"`
+ InstanceID int `yaml:"instanceID"`
+ FuncID string `yaml:"funcID"`
+ FuncVersion string `yaml:"funcVersion"`
+ MaxBufTuples int `yaml:"maxBufTuples"`
+ Port int `yaml:"port"`
+ ClusterName string `yaml:"clusterName"`
+ KillAfterIdleMs time.Duration `yaml:"killAfterIdleMs"`
+ // function details config
+ Tenant string `yaml:"tenant"`
+ NameSpace string `yaml:"nameSpace"`
+ Name string `yaml:"name"`
+ LogTopic string `yaml:"logTopic"`
+ ProcessingGuarantees int32 `yaml:"processingGuarantees"`
+ SecretsMap string `yaml:"secretsMap"`
+ Runtime int32 `yaml:"runtime"`
+ AutoACK bool `yaml:"autoAck"`
+ Parallelism int32 `yaml:"parallelism"`
+ //source config
+ SubscriptionType int32 `yaml:"subscriptionType"`
+ TimeoutMs uint64 `yaml:"timeoutMs"`
+ SubscriptionName string `yaml:"subscriptionName"`
+ CleanupSubscription bool `yaml:"cleanupSubscription"`
+ //source input specs
+ SourceSpecTopic string `yaml:"sourceSpecsTopic"`
+ SourceSchemaType string `yaml:"sourceSchemaType"`
+ IsRegexPatternSubscription bool `yaml:"isRegexPatternSubscription"`
+ ReceiverQueueSize int32 `yaml:"receiverQueueSize"`
+ //sink spec config
+ SinkSpecTopic string `yaml:"sinkSpecsTopic"`
+ SinkSchemaType string `yaml:"sinkSchemaType"`
+ //resources config
+ Cpu float64 `yaml:"cpu"`
+ Ram int64 `yaml:"ram"`
+ Disk int64 `yaml:"disk"`
+ //retryDetails config
+ MaxMessageRetries int32 `yaml:"maxMessageRetries"`
+ DeadLetterTopic string `yaml:"deadLetterTopic"`
+}
+
+var opts string
+
+func (c *Conf) GetConf() *Conf {
+ flag.Parse()
+
+ yamlFile, err := ioutil.ReadFile(opts)
+ if err != nil {
+ log.Errorf("not found conf file, err:%s", err.Error())
+ return nil
+ }
+ err = yaml.Unmarshal(yamlFile, c)
+ if err != nil {
+ log.Errorf("unmarshal yaml file error:%s", err.Error())
+ return nil
+ }
+ return c
+}
+
+func init() {
+ var homeDir string
+ usr, err := user.Current()
+ if err == nil {
+ homeDir = usr.HomeDir
+ }
+
+ // Fall back to standard HOME environment variable that works
+ // for most POSIX OSes if the directory from the Go standard
+ // lib failed.
+ if err != nil || homeDir == "" {
+ homeDir = os.Getenv("HOME")
+ }
+ defaultPath := homeDir + "/" + ConfigPath
+ flag.StringVar(&opts, "instance-conf", defaultPath, "config conf.yml
filepath")
+}
diff --git a/pulsar-function-go/conf/conf.yaml
b/pulsar-function-go/conf/conf.yaml
new file mode 100644
index 0000000..f786cca
--- /dev/null
+++ b/pulsar-function-go/conf/conf.yaml
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+pulsarServiceURL: "pulsar://localhost:6650"
+instanceID: 101
+funcID: "pulsar-function"
+funcVersion: "1.0.0"
+maxBufTuples: 10
+port: 8091
+clusterName: "pulsar-function-go"
+killAfterIdleMs: 50000
+# function details config
+tenant: ""
+nameSpace: ""
+name: "go-function"
+logTopic: ""
+processingGuarantees: 0
+secretsMap: ""
+runtime: 0
+autoAck: true
+parallelism: 0
+# source config
+subscriptionType: 0
+timeoutMs: 0
+subscriptionName: ""
+cleanupSubscription: false
+# source input specs
+sourceSpecsTopic: persistent://public/default/topic-01
+sourceSchemaType: ""
+isRegexPatternSubscription: false
+receiverQueueSize: 10
+# sink specs config
+sinkSpecsTopic: persistent://public/default/topic-02
+sinkSchemaType: ""
+# resource config
+cpu: 0
+ram: 0
+disk: 0
+# retryDetails config
+maxMessageRetries: 0
+deadLetterTopic: ""
diff --git a/pulsar-function-go/examples/contextFunc.go
b/pulsar-function-go/examples/contextFunc.go
new file mode 100644
index 0000000..7a057fc
--- /dev/null
+++ b/pulsar-function-go/examples/contextFunc.go
@@ -0,0 +1,38 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/apache/pulsar/pulsar-function-go/pf"
+)
+
+func contextFunc(ctx context.Context) {
+ if fc, ok := pf.FromContext(ctx); ok {
+ fmt.Printf("function ID is:%s, ", fc.GetFuncID())
+ fmt.Printf("function version is:%s\n", fc.GetFuncVersion())
+ }
+}
+
+func main() {
+ pf.Start(contextFunc)
+}
diff --git a/pulsar-function-go/examples/hello.go
b/pulsar-function-go/examples/hello.go
new file mode 100644
index 0000000..ad27dec
--- /dev/null
+++ b/pulsar-function-go/examples/hello.go
@@ -0,0 +1,34 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+ "fmt"
+
+ "github.com/apache/pulsar/pulsar-function-go/pf"
+)
+
+func hello() {
+ fmt.Println("hello pulsar function")
+}
+
+func main() {
+ pf.Start(hello)
+}
diff --git a/pulsar-function-go/examples/inputFunc.go
b/pulsar-function-go/examples/inputFunc.go
new file mode 100644
index 0000000..0d270de
--- /dev/null
+++ b/pulsar-function-go/examples/inputFunc.go
@@ -0,0 +1,36 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/apache/pulsar/pulsar-function-go/pf"
+)
+
+func HandleRequest(ctx context.Context, in []byte) error{
+ fmt.Println(string(in) + "!")
+ return nil
+}
+
+func main() {
+ pf.Start(HandleRequest)
+}
diff --git a/pulsar-function-go/examples/outputFunc.go
b/pulsar-function-go/examples/outputFunc.go
new file mode 100644
index 0000000..603e278
--- /dev/null
+++ b/pulsar-function-go/examples/outputFunc.go
@@ -0,0 +1,35 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+ "context"
+
+ "github.com/apache/pulsar/pulsar-function-go/pf"
+)
+
+func HandleResponse(ctx context.Context, in []byte) ([]byte, error) {
+ res := append(in, 110)
+ return res, nil
+}
+
+func main() {
+ pf.Start(HandleResponse)
+}
diff --git a/pulsar-function-go/examples/test/consumer.go
b/pulsar-function-go/examples/test/consumer.go
new file mode 100644
index 0000000..ba5e5b5
--- /dev/null
+++ b/pulsar-function-go/examples/test/consumer.go
@@ -0,0 +1,61 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "log"
+
+ "github.com/apache/pulsar/pulsar-client-go/pulsar"
+)
+
+func main() {
+ client, err := pulsar.NewClient(pulsar.ClientOptions{URL:
"pulsar://localhost:6650"})
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ defer client.Close()
+
+ consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: "topic-02",
+ SubscriptionName: "my-subscription",
+ Type: pulsar.Shared,
+ })
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ defer consumer.Close()
+
+ for {
+ msg, err := consumer.Receive(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ fmt.Printf("Received message msgId: %s -- content: '%s'\n",
+ msg.ID(), string(msg.Payload()))
+
+ consumer.Ack(msg)
+ }
+}
+
diff --git a/pulsar-function-go/examples/test/producer.go
b/pulsar-function-go/examples/test/producer.go
new file mode 100644
index 0000000..475f83d
--- /dev/null
+++ b/pulsar-function-go/examples/test/producer.go
@@ -0,0 +1,57 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "log"
+
+ "github.com/apache/pulsar/pulsar-client-go/pulsar"
+)
+
+func main() {
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+
+ if err != nil {
+ log.Fatal(err)
+ return
+ }
+
+ defer client.Close()
+
+ producer, err := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: "topic-01",
+ })
+
+ defer producer.Close()
+
+ ctx := context.Background()
+
+ for i := 0; i < 10; i++ {
+ if err := producer.Send(ctx, pulsar.ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ }); err != nil {
+ log.Fatal(err)
+ }
+ }
+}
diff --git a/pulsar-function-go/go.mod b/pulsar-function-go/go.mod
new file mode 100644
index 0000000..0762d53
--- /dev/null
+++ b/pulsar-function-go/go.mod
@@ -0,0 +1,11 @@
+module github.com/apache/pulsar/pulsar-function-go
+
+require (
+ github.com/apache/pulsar/pulsar-client-go
v0.0.0-20190312044336-ff4db8db12be
+ github.com/davecgh/go-spew v1.1.1
+ github.com/golang/protobuf v1.3.0
+ github.com/sirupsen/logrus v1.4.0
+ github.com/stretchr/testify v1.3.0
+ gopkg.in/natefinch/lumberjack.v2 v2.0.0
+ gopkg.in/yaml.v2 v2.2.2
+)
diff --git a/pulsar-function-go/go.sum b/pulsar-function-go/go.sum
new file mode 100644
index 0000000..4a17410
--- /dev/null
+++ b/pulsar-function-go/go.sum
@@ -0,0 +1,36 @@
+github.com/BurntSushi/toml v0.3.1
h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
+github.com/BurntSushi/toml v0.3.1/go.mod
h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/apache/pulsar/pulsar-client-go v0.0.0-20190312044336-ff4db8db12be
h1:BpPXJTLeqlWQPlMZxFP+/Sm1Zva7NJB2gWRfWlN8xi8=
+github.com/apache/pulsar/pulsar-client-go
v0.0.0-20190312044336-ff4db8db12be/go.mod
h1:Dt5jPpS2v4WlPya7e9jXrqGN+6BVzVyaBw+bixjLFFU=
+github.com/davecgh/go-spew v1.1.0/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/golang/protobuf v1.3.0
h1:kbxbvI4Un1LUWKxufD+BiE6AEExYYgkQLQmLFqA1LFk=
+github.com/golang/protobuf v1.3.0/go.mod
h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/pmezard/go-difflib v1.0.0
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/sirupsen/logrus v1.3.0/go.mod
h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.0
h1:yKenngtzGh+cUSSh6GWbxW2abRqhYUSR/t/6+2QqNvE=
+github.com/sirupsen/logrus v1.4.0/go.mod
h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/stretchr/objx v0.1.0/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.1.1/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.2.2/go.mod
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0
h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793
h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd
h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33
h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod
h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405
h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/natefinch/lumberjack.v2 v2.0.0
h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
+gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod
h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
+gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/pulsar-function-go/pb/Function.pb.go
b/pulsar-function-go/pb/Function.pb.go
new file mode 100644
index 0000000..9766ea6
--- /dev/null
+++ b/pulsar-function-go/pb/Function.pb.go
@@ -0,0 +1,1025 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: Function.proto
+
+package pb
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type ProcessingGuarantees int32
+
+const (
+ ProcessingGuarantees_ATLEAST_ONCE ProcessingGuarantees = 0
+ ProcessingGuarantees_ATMOST_ONCE ProcessingGuarantees = 1
+ ProcessingGuarantees_EFFECTIVELY_ONCE ProcessingGuarantees = 2
+)
+
+var ProcessingGuarantees_name = map[int32]string{
+ 0: "ATLEAST_ONCE",
+ 1: "ATMOST_ONCE",
+ 2: "EFFECTIVELY_ONCE",
+}
+var ProcessingGuarantees_value = map[string]int32{
+ "ATLEAST_ONCE": 0,
+ "ATMOST_ONCE": 1,
+ "EFFECTIVELY_ONCE": 2,
+}
+
+func (x ProcessingGuarantees) String() string {
+ return proto.EnumName(ProcessingGuarantees_name, int32(x))
+}
+func (ProcessingGuarantees) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_Function_33c6e1841d2624f0, []int{0}
+}
+
+type SubscriptionType int32
+
+const (
+ SubscriptionType_SHARED SubscriptionType = 0
+ SubscriptionType_FAILOVER SubscriptionType = 1
+)
+
+var SubscriptionType_name = map[int32]string{
+ 0: "SHARED",
+ 1: "FAILOVER",
+}
+var SubscriptionType_value = map[string]int32{
+ "SHARED": 0,
+ "FAILOVER": 1,
+}
+
+func (x SubscriptionType) String() string {
+ return proto.EnumName(SubscriptionType_name, int32(x))
+}
+func (SubscriptionType) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_Function_33c6e1841d2624f0, []int{1}
+}
+
+type FunctionState int32
+
+const (
+ FunctionState_RUNNING FunctionState = 0
+ FunctionState_STOPPED FunctionState = 1
+)
+
+var FunctionState_name = map[int32]string{
+ 0: "RUNNING",
+ 1: "STOPPED",
+}
+var FunctionState_value = map[string]int32{
+ "RUNNING": 0,
+ "STOPPED": 1,
+}
+
+func (x FunctionState) String() string {
+ return proto.EnumName(FunctionState_name, int32(x))
+}
+func (FunctionState) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_Function_33c6e1841d2624f0, []int{2}
+}
+
+type FunctionDetails_Runtime int32
+
+const (
+ FunctionDetails_JAVA FunctionDetails_Runtime = 0
+ FunctionDetails_PYTHON FunctionDetails_Runtime = 1
+ FunctionDetails_GO FunctionDetails_Runtime = 3
+)
+
+var FunctionDetails_Runtime_name = map[int32]string{
+ 0: "JAVA",
+ 1: "PYTHON",
+ 3: "GO",
+}
+var FunctionDetails_Runtime_value = map[string]int32{
+ "JAVA": 0,
+ "PYTHON": 1,
+ "GO": 3,
+}
+
+func (x FunctionDetails_Runtime) String() string {
+ return proto.EnumName(FunctionDetails_Runtime_name, int32(x))
+}
+func (FunctionDetails_Runtime) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_Function_33c6e1841d2624f0, []int{2, 0}
+}
+
+type Resources struct {
+ Cpu float64 `protobuf:"fixed64,1,opt,name=cpu,proto3"
json:"cpu,omitempty"`
+ Ram int64 `protobuf:"varint,2,opt,name=ram,proto3"
json:"ram,omitempty"`
+ Disk int64 `protobuf:"varint,3,opt,name=disk,proto3"
json:"disk,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Resources) Reset() { *m = Resources{} }
+func (m *Resources) String() string { return proto.CompactTextString(m) }
+func (*Resources) ProtoMessage() {}
+func (*Resources) Descriptor() ([]byte, []int) {
+ return fileDescriptor_Function_33c6e1841d2624f0, []int{0}
+}
+func (m *Resources) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Resources.Unmarshal(m, b)
+}
+func (m *Resources) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_Resources.Marshal(b, m, deterministic)
+}
+func (dst *Resources) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Resources.Merge(dst, src)
+}
+func (m *Resources) XXX_Size() int {
+ return xxx_messageInfo_Resources.Size(m)
+}
+func (m *Resources) XXX_DiscardUnknown() {
+ xxx_messageInfo_Resources.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Resources proto.InternalMessageInfo
+
+func (m *Resources) GetCpu() float64 {
+ if m != nil {
+ return m.Cpu
+ }
+ return 0
+}
+
+func (m *Resources) GetRam() int64 {
+ if m != nil {
+ return m.Ram
+ }
+ return 0
+}
+
+func (m *Resources) GetDisk() int64 {
+ if m != nil {
+ return m.Disk
+ }
+ return 0
+}
+
+type RetryDetails struct {
+ MaxMessageRetries int32
`protobuf:"varint,1,opt,name=maxMessageRetries,proto3"
json:"maxMessageRetries,omitempty"`
+ DeadLetterTopic string
`protobuf:"bytes,2,opt,name=deadLetterTopic,proto3"
json:"deadLetterTopic,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *RetryDetails) Reset() { *m = RetryDetails{} }
+func (m *RetryDetails) String() string { return proto.CompactTextString(m) }
+func (*RetryDetails) ProtoMessage() {}
+func (*RetryDetails) Descriptor() ([]byte, []int) {
+ return fileDescriptor_Function_33c6e1841d2624f0, []int{1}
+}
+func (m *RetryDetails) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_RetryDetails.Unmarshal(m, b)
+}
+func (m *RetryDetails) XXX_Marshal(b []byte, deterministic bool) ([]byte,
error) {
+ return xxx_messageInfo_RetryDetails.Marshal(b, m, deterministic)
+}
+func (dst *RetryDetails) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_RetryDetails.Merge(dst, src)
+}
+func (m *RetryDetails) XXX_Size() int {
+ return xxx_messageInfo_RetryDetails.Size(m)
+}
+func (m *RetryDetails) XXX_DiscardUnknown() {
+ xxx_messageInfo_RetryDetails.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_RetryDetails proto.InternalMessageInfo
+
+func (m *RetryDetails) GetMaxMessageRetries() int32 {
+ if m != nil {
+ return m.MaxMessageRetries
+ }
+ return 0
+}
+
+func (m *RetryDetails) GetDeadLetterTopic() string {
+ if m != nil {
+ return m.DeadLetterTopic
+ }
+ return ""
+}
+
+type FunctionDetails struct {
+ Tenant string
`protobuf:"bytes,1,opt,name=tenant,proto3" json:"tenant,omitempty"`
+ Namespace string
`protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"`
+ Name string
`protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"`
+ ClassName string
`protobuf:"bytes,4,opt,name=className,proto3" json:"className,omitempty"`
+ LogTopic string
`protobuf:"bytes,5,opt,name=logTopic,proto3" json:"logTopic,omitempty"`
+ ProcessingGuarantees ProcessingGuarantees
`protobuf:"varint,6,opt,name=processingGuarantees,proto3,enum=proto.ProcessingGuarantees"
json:"processingGuarantees,omitempty"`
+ UserConfig string
`protobuf:"bytes,7,opt,name=userConfig,proto3" json:"userConfig,omitempty"`
+ SecretsMap string
`protobuf:"bytes,16,opt,name=secretsMap,proto3" json:"secretsMap,omitempty"`
+ Runtime FunctionDetails_Runtime
`protobuf:"varint,8,opt,name=runtime,proto3,enum=proto.FunctionDetails_Runtime"
json:"runtime,omitempty"`
+ AutoAck bool
`protobuf:"varint,9,opt,name=autoAck,proto3" json:"autoAck,omitempty"`
+ Parallelism int32
`protobuf:"varint,10,opt,name=parallelism,proto3" json:"parallelism,omitempty"`
+ Source *SourceSpec
`protobuf:"bytes,11,opt,name=source,proto3" json:"source,omitempty"`
+ Sink *SinkSpec
`protobuf:"bytes,12,opt,name=sink,proto3" json:"sink,omitempty"`
+ Resources *Resources
`protobuf:"bytes,13,opt,name=resources,proto3" json:"resources,omitempty"`
+ PackageUrl string
`protobuf:"bytes,14,opt,name=packageUrl,proto3" json:"packageUrl,omitempty"`
+ RetryDetails *RetryDetails
`protobuf:"bytes,15,opt,name=retryDetails,proto3" json:"retryDetails,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *FunctionDetails) Reset() { *m = FunctionDetails{} }
+func (m *FunctionDetails) String() string { return proto.CompactTextString(m) }
+func (*FunctionDetails) ProtoMessage() {}
+func (*FunctionDetails) Descriptor() ([]byte, []int) {
+ return fileDescriptor_Function_33c6e1841d2624f0, []int{2}
+}
+func (m *FunctionDetails) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_FunctionDetails.Unmarshal(m, b)
+}
+func (m *FunctionDetails) XXX_Marshal(b []byte, deterministic bool) ([]byte,
error) {
+ return xxx_messageInfo_FunctionDetails.Marshal(b, m, deterministic)
+}
+func (dst *FunctionDetails) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_FunctionDetails.Merge(dst, src)
+}
+func (m *FunctionDetails) XXX_Size() int {
+ return xxx_messageInfo_FunctionDetails.Size(m)
+}
+func (m *FunctionDetails) XXX_DiscardUnknown() {
+ xxx_messageInfo_FunctionDetails.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_FunctionDetails proto.InternalMessageInfo
+
+func (m *FunctionDetails) GetTenant() string {
+ if m != nil {
+ return m.Tenant
+ }
+ return ""
+}
+
+func (m *FunctionDetails) GetNamespace() string {
+ if m != nil {
+ return m.Namespace
+ }
+ return ""
+}
+
+func (m *FunctionDetails) GetName() string {
+ if m != nil {
+ return m.Name
+ }
+ return ""
+}
+
+func (m *FunctionDetails) GetClassName() string {
+ if m != nil {
+ return m.ClassName
+ }
+ return ""
+}
+
+func (m *FunctionDetails) GetLogTopic() string {
+ if m != nil {
+ return m.LogTopic
+ }
+ return ""
+}
+
+func (m *FunctionDetails) GetProcessingGuarantees() ProcessingGuarantees {
+ if m != nil {
+ return m.ProcessingGuarantees
+ }
+ return ProcessingGuarantees_ATLEAST_ONCE
+}
+
+func (m *FunctionDetails) GetUserConfig() string {
+ if m != nil {
+ return m.UserConfig
+ }
+ return ""
+}
+
+func (m *FunctionDetails) GetSecretsMap() string {
+ if m != nil {
+ return m.SecretsMap
+ }
+ return ""
+}
+
+func (m *FunctionDetails) GetRuntime() FunctionDetails_Runtime {
+ if m != nil {
+ return m.Runtime
+ }
+ return FunctionDetails_JAVA
+}
+
+func (m *FunctionDetails) GetAutoAck() bool {
+ if m != nil {
+ return m.AutoAck
+ }
+ return false
+}
+
+func (m *FunctionDetails) GetParallelism() int32 {
+ if m != nil {
+ return m.Parallelism
+ }
+ return 0
+}
+
+func (m *FunctionDetails) GetSource() *SourceSpec {
+ if m != nil {
+ return m.Source
+ }
+ return nil
+}
+
+func (m *FunctionDetails) GetSink() *SinkSpec {
+ if m != nil {
+ return m.Sink
+ }
+ return nil
+}
+
+func (m *FunctionDetails) GetResources() *Resources {
+ if m != nil {
+ return m.Resources
+ }
+ return nil
+}
+
+func (m *FunctionDetails) GetPackageUrl() string {
+ if m != nil {
+ return m.PackageUrl
+ }
+ return ""
+}
+
+func (m *FunctionDetails) GetRetryDetails() *RetryDetails {
+ if m != nil {
+ return m.RetryDetails
+ }
+ return nil
+}
+
+type ConsumerSpec struct {
+ SchemaType string
`protobuf:"bytes,1,opt,name=schemaType,proto3" json:"schemaType,omitempty"`
+ SerdeClassName string
`protobuf:"bytes,2,opt,name=serdeClassName,proto3"
json:"serdeClassName,omitempty"`
+ IsRegexPattern bool
`protobuf:"varint,3,opt,name=isRegexPattern,proto3"
json:"isRegexPattern,omitempty"`
+ ReceiverQueueSize *ConsumerSpec_ReceiverQueueSize
`protobuf:"bytes,4,opt,name=receiverQueueSize,proto3"
json:"receiverQueueSize,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *ConsumerSpec) Reset() { *m = ConsumerSpec{} }
+func (m *ConsumerSpec) String() string { return proto.CompactTextString(m) }
+func (*ConsumerSpec) ProtoMessage() {}
+func (*ConsumerSpec) Descriptor() ([]byte, []int) {
+ return fileDescriptor_Function_33c6e1841d2624f0, []int{3}
+}
+func (m *ConsumerSpec) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_ConsumerSpec.Unmarshal(m, b)
+}
+func (m *ConsumerSpec) XXX_Marshal(b []byte, deterministic bool) ([]byte,
error) {
+ return xxx_messageInfo_ConsumerSpec.Marshal(b, m, deterministic)
+}
+func (dst *ConsumerSpec) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ConsumerSpec.Merge(dst, src)
+}
+func (m *ConsumerSpec) XXX_Size() int {
+ return xxx_messageInfo_ConsumerSpec.Size(m)
+}
+func (m *ConsumerSpec) XXX_DiscardUnknown() {
+ xxx_messageInfo_ConsumerSpec.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ConsumerSpec proto.InternalMessageInfo
+
+func (m *ConsumerSpec) GetSchemaType() string {
+ if m != nil {
+ return m.SchemaType
+ }
+ return ""
+}
+
+func (m *ConsumerSpec) GetSerdeClassName() string {
+ if m != nil {
+ return m.SerdeClassName
+ }
+ return ""
+}
+
+func (m *ConsumerSpec) GetIsRegexPattern() bool {
+ if m != nil {
+ return m.IsRegexPattern
+ }
+ return false
+}
+
+func (m *ConsumerSpec) GetReceiverQueueSize() *ConsumerSpec_ReceiverQueueSize {
+ if m != nil {
+ return m.ReceiverQueueSize
+ }
+ return nil
+}
+
+type ConsumerSpec_ReceiverQueueSize struct {
+ Value int32
`protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *ConsumerSpec_ReceiverQueueSize) Reset() { *m =
ConsumerSpec_ReceiverQueueSize{} }
+func (m *ConsumerSpec_ReceiverQueueSize) String() string { return
proto.CompactTextString(m) }
+func (*ConsumerSpec_ReceiverQueueSize) ProtoMessage() {}
+func (*ConsumerSpec_ReceiverQueueSize) Descriptor() ([]byte, []int) {
+ return fileDescriptor_Function_33c6e1841d2624f0, []int{3, 0}
+}
+func (m *ConsumerSpec_ReceiverQueueSize) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_ConsumerSpec_ReceiverQueueSize.Unmarshal(m, b)
+}
+func (m *ConsumerSpec_ReceiverQueueSize) XXX_Marshal(b []byte, deterministic
bool) ([]byte, error) {
+ return xxx_messageInfo_ConsumerSpec_ReceiverQueueSize.Marshal(b, m,
deterministic)
+}
+func (dst *ConsumerSpec_ReceiverQueueSize) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ConsumerSpec_ReceiverQueueSize.Merge(dst, src)
+}
+func (m *ConsumerSpec_ReceiverQueueSize) XXX_Size() int {
+ return xxx_messageInfo_ConsumerSpec_ReceiverQueueSize.Size(m)
+}
+func (m *ConsumerSpec_ReceiverQueueSize) XXX_DiscardUnknown() {
+ xxx_messageInfo_ConsumerSpec_ReceiverQueueSize.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ConsumerSpec_ReceiverQueueSize proto.InternalMessageInfo
+
+func (m *ConsumerSpec_ReceiverQueueSize) GetValue() int32 {
+ if m != nil {
+ return m.Value
+ }
+ return 0
+}
+
+type SourceSpec struct {
+ ClassName string `protobuf:"bytes,1,opt,name=className,proto3"
json:"className,omitempty"`
+ // map in json format
+ Configs string `protobuf:"bytes,2,opt,name=configs,proto3"
json:"configs,omitempty"`
+ TypeClassName string `protobuf:"bytes,5,opt,name=typeClassName,proto3"
json:"typeClassName,omitempty"`
+ // configs used only when source feeds into functions
+ SubscriptionType SubscriptionType
`protobuf:"varint,3,opt,name=subscriptionType,proto3,enum=proto.SubscriptionType"
json:"subscriptionType,omitempty"`
+ // @deprecated -- use topicsToSchema
+ TopicsToSerDeClassName map[string]string
`protobuf:"bytes,4,rep,name=topicsToSerDeClassName,proto3"
json:"topicsToSerDeClassName,omitempty"
protobuf_key:"bytes,1,opt,name=key,proto3"
protobuf_val:"bytes,2,opt,name=value,proto3"` // Deprecated: Do not use.
+ // *
+ //
+ InputSpecs map[string]*ConsumerSpec
`protobuf:"bytes,10,rep,name=inputSpecs,proto3" json:"inputSpecs,omitempty"
protobuf_key:"bytes,1,opt,name=key,proto3"
protobuf_val:"bytes,2,opt,name=value,proto3"`
+ TimeoutMs uint64
`protobuf:"varint,6,opt,name=timeoutMs,proto3" json:"timeoutMs,omitempty"`
+ TopicsPattern string
`protobuf:"bytes,7,opt,name=topicsPattern,proto3"
json:"topicsPattern,omitempty"` // Deprecated: Do not use.
+ // If specified, this will refer to an archive that is
+ // already present in the server
+ Builtin string
`protobuf:"bytes,8,opt,name=builtin,proto3" json:"builtin,omitempty"`
+ SubscriptionName string
`protobuf:"bytes,9,opt,name=subscriptionName,proto3"
json:"subscriptionName,omitempty"`
+ CleanupSubscription bool
`protobuf:"varint,11,opt,name=cleanupSubscription,proto3"
json:"cleanupSubscription,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *SourceSpec) Reset() { *m = SourceSpec{} }
+func (m *SourceSpec) String() string { return proto.CompactTextString(m) }
+func (*SourceSpec) ProtoMessage() {}
+func (*SourceSpec) Descriptor() ([]byte, []int) {
+ return fileDescriptor_Function_33c6e1841d2624f0, []int{4}
+}
+func (m *SourceSpec) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_SourceSpec.Unmarshal(m, b)
+}
+func (m *SourceSpec) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
{
+ return xxx_messageInfo_SourceSpec.Marshal(b, m, deterministic)
+}
+func (dst *SourceSpec) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_SourceSpec.Merge(dst, src)
+}
+func (m *SourceSpec) XXX_Size() int {
+ return xxx_messageInfo_SourceSpec.Size(m)
+}
+func (m *SourceSpec) XXX_DiscardUnknown() {
+ xxx_messageInfo_SourceSpec.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_SourceSpec proto.InternalMessageInfo
+
+func (m *SourceSpec) GetClassName() string {
+ if m != nil {
+ return m.ClassName
+ }
+ return ""
+}
+
+func (m *SourceSpec) GetConfigs() string {
+ if m != nil {
+ return m.Configs
+ }
+ return ""
+}
+
+func (m *SourceSpec) GetTypeClassName() string {
+ if m != nil {
+ return m.TypeClassName
+ }
+ return ""
+}
+
+func (m *SourceSpec) GetSubscriptionType() SubscriptionType {
+ if m != nil {
+ return m.SubscriptionType
+ }
+ return SubscriptionType_SHARED
+}
+
+// Deprecated: Do not use.
+func (m *SourceSpec) GetTopicsToSerDeClassName() map[string]string {
+ if m != nil {
+ return m.TopicsToSerDeClassName
+ }
+ return nil
+}
+
+func (m *SourceSpec) GetInputSpecs() map[string]*ConsumerSpec {
+ if m != nil {
+ return m.InputSpecs
+ }
+ return nil
+}
+
+func (m *SourceSpec) GetTimeoutMs() uint64 {
+ if m != nil {
+ return m.TimeoutMs
+ }
+ return 0
+}
+
+// Deprecated: Do not use.
+func (m *SourceSpec) GetTopicsPattern() string {
+ if m != nil {
+ return m.TopicsPattern
+ }
+ return ""
+}
+
+func (m *SourceSpec) GetBuiltin() string {
+ if m != nil {
+ return m.Builtin
+ }
+ return ""
+}
+
+func (m *SourceSpec) GetSubscriptionName() string {
+ if m != nil {
+ return m.SubscriptionName
+ }
+ return ""
+}
+
+func (m *SourceSpec) GetCleanupSubscription() bool {
+ if m != nil {
+ return m.CleanupSubscription
+ }
+ return false
+}
+
+type SinkSpec struct {
+ ClassName string `protobuf:"bytes,1,opt,name=className,proto3"
json:"className,omitempty"`
+ // map in json format
+ Configs string `protobuf:"bytes,2,opt,name=configs,proto3"
json:"configs,omitempty"`
+ TypeClassName string `protobuf:"bytes,5,opt,name=typeClassName,proto3"
json:"typeClassName,omitempty"`
+ // configs used only when functions output to sink
+ Topic string `protobuf:"bytes,3,opt,name=topic,proto3"
json:"topic,omitempty"`
+ SerDeClassName string
`protobuf:"bytes,4,opt,name=serDeClassName,proto3"
json:"serDeClassName,omitempty"`
+ // If specified, this will refer to an archive that is
+ // already present in the server
+ Builtin string `protobuf:"bytes,6,opt,name=builtin,proto3"
json:"builtin,omitempty"`
+ // *
+ // Builtin schema type or custom schema class name
+ SchemaType string
`protobuf:"bytes,7,opt,name=schemaType,proto3" json:"schemaType,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *SinkSpec) Reset() { *m = SinkSpec{} }
+func (m *SinkSpec) String() string { return proto.CompactTextString(m) }
+func (*SinkSpec) ProtoMessage() {}
+func (*SinkSpec) Descriptor() ([]byte, []int) {
+ return fileDescriptor_Function_33c6e1841d2624f0, []int{5}
+}
+func (m *SinkSpec) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_SinkSpec.Unmarshal(m, b)
+}
+func (m *SinkSpec) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_SinkSpec.Marshal(b, m, deterministic)
+}
+func (dst *SinkSpec) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_SinkSpec.Merge(dst, src)
+}
+func (m *SinkSpec) XXX_Size() int {
+ return xxx_messageInfo_SinkSpec.Size(m)
+}
+func (m *SinkSpec) XXX_DiscardUnknown() {
+ xxx_messageInfo_SinkSpec.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_SinkSpec proto.InternalMessageInfo
+
+func (m *SinkSpec) GetClassName() string {
+ if m != nil {
+ return m.ClassName
+ }
+ return ""
+}
+
+func (m *SinkSpec) GetConfigs() string {
+ if m != nil {
+ return m.Configs
+ }
+ return ""
+}
+
+func (m *SinkSpec) GetTypeClassName() string {
+ if m != nil {
+ return m.TypeClassName
+ }
+ return ""
+}
+
+func (m *SinkSpec) GetTopic() string {
+ if m != nil {
+ return m.Topic
+ }
+ return ""
+}
+
+func (m *SinkSpec) GetSerDeClassName() string {
+ if m != nil {
+ return m.SerDeClassName
+ }
+ return ""
+}
+
+func (m *SinkSpec) GetBuiltin() string {
+ if m != nil {
+ return m.Builtin
+ }
+ return ""
+}
+
+func (m *SinkSpec) GetSchemaType() string {
+ if m != nil {
+ return m.SchemaType
+ }
+ return ""
+}
+
+type PackageLocationMetaData struct {
+ PackagePath string
`protobuf:"bytes,1,opt,name=packagePath,proto3" json:"packagePath,omitempty"`
+ OriginalFileName string
`protobuf:"bytes,2,opt,name=originalFileName,proto3"
json:"originalFileName,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *PackageLocationMetaData) Reset() { *m =
PackageLocationMetaData{} }
+func (m *PackageLocationMetaData) String() string { return
proto.CompactTextString(m) }
+func (*PackageLocationMetaData) ProtoMessage() {}
+func (*PackageLocationMetaData) Descriptor() ([]byte, []int) {
+ return fileDescriptor_Function_33c6e1841d2624f0, []int{6}
+}
+func (m *PackageLocationMetaData) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_PackageLocationMetaData.Unmarshal(m, b)
+}
+func (m *PackageLocationMetaData) XXX_Marshal(b []byte, deterministic bool)
([]byte, error) {
+ return xxx_messageInfo_PackageLocationMetaData.Marshal(b, m,
deterministic)
+}
+func (dst *PackageLocationMetaData) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_PackageLocationMetaData.Merge(dst, src)
+}
+func (m *PackageLocationMetaData) XXX_Size() int {
+ return xxx_messageInfo_PackageLocationMetaData.Size(m)
+}
+func (m *PackageLocationMetaData) XXX_DiscardUnknown() {
+ xxx_messageInfo_PackageLocationMetaData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_PackageLocationMetaData proto.InternalMessageInfo
+
+func (m *PackageLocationMetaData) GetPackagePath() string {
+ if m != nil {
+ return m.PackagePath
+ }
+ return ""
+}
+
+func (m *PackageLocationMetaData) GetOriginalFileName() string {
+ if m != nil {
+ return m.OriginalFileName
+ }
+ return ""
+}
+
+type FunctionMetaData struct {
+ FunctionDetails *FunctionDetails
`protobuf:"bytes,1,opt,name=functionDetails,proto3"
json:"functionDetails,omitempty"`
+ PackageLocation *PackageLocationMetaData
`protobuf:"bytes,2,opt,name=packageLocation,proto3"
json:"packageLocation,omitempty"`
+ Version uint64
`protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"`
+ CreateTime uint64
`protobuf:"varint,4,opt,name=createTime,proto3" json:"createTime,omitempty"`
+ InstanceStates map[int32]FunctionState
`protobuf:"bytes,5,rep,name=instanceStates,proto3"
json:"instanceStates,omitempty" protobuf_key:"varint,1,opt,name=key,proto3"
protobuf_val:"varint,2,opt,name=value,proto3,enum=proto.FunctionState"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *FunctionMetaData) Reset() { *m = FunctionMetaData{} }
+func (m *FunctionMetaData) String() string { return proto.CompactTextString(m)
}
+func (*FunctionMetaData) ProtoMessage() {}
+func (*FunctionMetaData) Descriptor() ([]byte, []int) {
+ return fileDescriptor_Function_33c6e1841d2624f0, []int{7}
+}
+func (m *FunctionMetaData) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_FunctionMetaData.Unmarshal(m, b)
+}
+func (m *FunctionMetaData) XXX_Marshal(b []byte, deterministic bool) ([]byte,
error) {
+ return xxx_messageInfo_FunctionMetaData.Marshal(b, m, deterministic)
+}
+func (dst *FunctionMetaData) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_FunctionMetaData.Merge(dst, src)
+}
+func (m *FunctionMetaData) XXX_Size() int {
+ return xxx_messageInfo_FunctionMetaData.Size(m)
+}
+func (m *FunctionMetaData) XXX_DiscardUnknown() {
+ xxx_messageInfo_FunctionMetaData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_FunctionMetaData proto.InternalMessageInfo
+
+func (m *FunctionMetaData) GetFunctionDetails() *FunctionDetails {
+ if m != nil {
+ return m.FunctionDetails
+ }
+ return nil
+}
+
+func (m *FunctionMetaData) GetPackageLocation() *PackageLocationMetaData {
+ if m != nil {
+ return m.PackageLocation
+ }
+ return nil
+}
+
+func (m *FunctionMetaData) GetVersion() uint64 {
+ if m != nil {
+ return m.Version
+ }
+ return 0
+}
+
+func (m *FunctionMetaData) GetCreateTime() uint64 {
+ if m != nil {
+ return m.CreateTime
+ }
+ return 0
+}
+
+func (m *FunctionMetaData) GetInstanceStates() map[int32]FunctionState {
+ if m != nil {
+ return m.InstanceStates
+ }
+ return nil
+}
+
+type Instance struct {
+ FunctionMetaData *FunctionMetaData
`protobuf:"bytes,1,opt,name=functionMetaData,proto3"
json:"functionMetaData,omitempty"`
+ InstanceId int32
`protobuf:"varint,2,opt,name=instanceId,proto3" json:"instanceId,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Instance) Reset() { *m = Instance{} }
+func (m *Instance) String() string { return proto.CompactTextString(m) }
+func (*Instance) ProtoMessage() {}
+func (*Instance) Descriptor() ([]byte, []int) {
+ return fileDescriptor_Function_33c6e1841d2624f0, []int{8}
+}
+func (m *Instance) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Instance.Unmarshal(m, b)
+}
+func (m *Instance) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_Instance.Marshal(b, m, deterministic)
+}
+func (dst *Instance) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Instance.Merge(dst, src)
+}
+func (m *Instance) XXX_Size() int {
+ return xxx_messageInfo_Instance.Size(m)
+}
+func (m *Instance) XXX_DiscardUnknown() {
+ xxx_messageInfo_Instance.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Instance proto.InternalMessageInfo
+
+func (m *Instance) GetFunctionMetaData() *FunctionMetaData {
+ if m != nil {
+ return m.FunctionMetaData
+ }
+ return nil
+}
+
+func (m *Instance) GetInstanceId() int32 {
+ if m != nil {
+ return m.InstanceId
+ }
+ return 0
+}
+
+type Assignment struct {
+ Instance *Instance
`protobuf:"bytes,1,opt,name=instance,proto3" json:"instance,omitempty"`
+ WorkerId string
`protobuf:"bytes,2,opt,name=workerId,proto3" json:"workerId,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Assignment) Reset() { *m = Assignment{} }
+func (m *Assignment) String() string { return proto.CompactTextString(m) }
+func (*Assignment) ProtoMessage() {}
+func (*Assignment) Descriptor() ([]byte, []int) {
+ return fileDescriptor_Function_33c6e1841d2624f0, []int{9}
+}
+func (m *Assignment) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Assignment.Unmarshal(m, b)
+}
+func (m *Assignment) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
{
+ return xxx_messageInfo_Assignment.Marshal(b, m, deterministic)
+}
+func (dst *Assignment) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Assignment.Merge(dst, src)
+}
+func (m *Assignment) XXX_Size() int {
+ return xxx_messageInfo_Assignment.Size(m)
+}
+func (m *Assignment) XXX_DiscardUnknown() {
+ xxx_messageInfo_Assignment.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Assignment proto.InternalMessageInfo
+
+func (m *Assignment) GetInstance() *Instance {
+ if m != nil {
+ return m.Instance
+ }
+ return nil
+}
+
+func (m *Assignment) GetWorkerId() string {
+ if m != nil {
+ return m.WorkerId
+ }
+ return ""
+}
+
+func init() {
+ proto.RegisterType((*Resources)(nil), "proto.Resources")
+ proto.RegisterType((*RetryDetails)(nil), "proto.RetryDetails")
+ proto.RegisterType((*FunctionDetails)(nil), "proto.FunctionDetails")
+ proto.RegisterType((*ConsumerSpec)(nil), "proto.ConsumerSpec")
+ proto.RegisterType((*ConsumerSpec_ReceiverQueueSize)(nil),
"proto.ConsumerSpec.ReceiverQueueSize")
+ proto.RegisterType((*SourceSpec)(nil), "proto.SourceSpec")
+ proto.RegisterMapType((map[string]*ConsumerSpec)(nil),
"proto.SourceSpec.InputSpecsEntry")
+ proto.RegisterMapType((map[string]string)(nil),
"proto.SourceSpec.TopicsToSerDeClassNameEntry")
+ proto.RegisterType((*SinkSpec)(nil), "proto.SinkSpec")
+ proto.RegisterType((*PackageLocationMetaData)(nil),
"proto.PackageLocationMetaData")
+ proto.RegisterType((*FunctionMetaData)(nil), "proto.FunctionMetaData")
+ proto.RegisterMapType((map[int32]FunctionState)(nil),
"proto.FunctionMetaData.InstanceStatesEntry")
+ proto.RegisterType((*Instance)(nil), "proto.Instance")
+ proto.RegisterType((*Assignment)(nil), "proto.Assignment")
+ proto.RegisterEnum("proto.ProcessingGuarantees",
ProcessingGuarantees_name, ProcessingGuarantees_value)
+ proto.RegisterEnum("proto.SubscriptionType", SubscriptionType_name,
SubscriptionType_value)
+ proto.RegisterEnum("proto.FunctionState", FunctionState_name,
FunctionState_value)
+ proto.RegisterEnum("proto.FunctionDetails_Runtime",
FunctionDetails_Runtime_name, FunctionDetails_Runtime_value)
+}
+
+func init() { proto.RegisterFile("Function.proto",
fileDescriptor_Function_33c6e1841d2624f0) }
+
+var fileDescriptor_Function_33c6e1841d2624f0 = []byte{
+ // 1220 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56,
0xdb, 0x6e, 0xdb, 0x46,
+ 0x13, 0x36, 0xad, 0xf3, 0xc8, 0xb6, 0xe8, 0x8d, 0x91, 0x10, 0xce, 0x8f,
0x40, 0xd1, 0xdf, 0x83,
+ 0xec, 0x24, 0x42, 0xe1, 0x5e, 0x34, 0xe8, 0x55, 0x15, 0x59, 0x4e, 0x54,
0xd8, 0x96, 0xba, 0x52,
+ 0x52, 0xe4, 0xaa, 0xd8, 0xd0, 0x63, 0x65, 0x21, 0x8a, 0x24, 0x76, 0x97,
0x69, 0xdc, 0x07, 0xe8,
+ 0x63, 0xf4, 0x49, 0xfa, 0x30, 0x7d, 0x92, 0xa2, 0xd8, 0x25, 0x29, 0x91,
0x94, 0xdc, 0xbb, 0x5e,
+ 0x89, 0xf3, 0xcd, 0x71, 0x67, 0xbf, 0x99, 0x15, 0x1c, 0x5c, 0x44, 0xbe,
0xab, 0x78, 0xe0, 0xf7,
+ 0x42, 0x11, 0xa8, 0x80, 0x54, 0xcc, 0x4f, 0x67, 0x00, 0x0d, 0x8a, 0x32,
0x88, 0x84, 0x8b, 0x92,
+ 0xd8, 0x50, 0x72, 0xc3, 0xc8, 0xb1, 0xda, 0x56, 0xd7, 0xa2, 0xfa, 0x53,
0x23, 0x82, 0x2d, 0x9d,
+ 0xdd, 0xb6, 0xd5, 0x2d, 0x51, 0xfd, 0x49, 0x08, 0x94, 0x6f, 0xb8, 0x5c,
0x38, 0x25, 0x03, 0x99,
+ 0xef, 0xce, 0x2d, 0xec, 0x51, 0x54, 0xe2, 0xee, 0x1c, 0x15, 0xe3, 0x9e,
0x24, 0xcf, 0xe1, 0x70,
+ 0xc9, 0x3e, 0x5f, 0xa1, 0x94, 0x6c, 0x8e, 0x5a, 0xc3, 0x51, 0x9a, 0xa8,
0x15, 0xba, 0xa9, 0x20,
+ 0x5d, 0x68, 0xdd, 0x20, 0xbb, 0xb9, 0x44, 0xa5, 0x50, 0xcc, 0x82, 0x90,
0xbb, 0x26, 0x5f, 0x83,
+ 0x16, 0xe1, 0xce, 0x1f, 0x15, 0x68, 0xa5, 0xc7, 0x48, 0x73, 0x3d, 0x84,
0xaa, 0x42, 0x9f, 0xf9,
+ 0xca, 0x24, 0x68, 0xd0, 0x44, 0x22, 0xff, 0x83, 0x86, 0xcf, 0x96, 0x28,
0x43, 0xe6, 0x62, 0x12,
+ 0x6f, 0x0d, 0xe8, 0x53, 0x68, 0xc1, 0x9c, 0xa2, 0x41, 0xcd, 0xb7, 0xf6,
0x70, 0x3d, 0x26, 0xe5,
+ 0xb5, 0x56, 0x94, 0x63, 0x8f, 0x15, 0x40, 0x8e, 0xa1, 0xee, 0x05, 0xf3,
0xb8, 0xbc, 0x8a, 0x51,
+ 0xae, 0x64, 0x32, 0x86, 0xa3, 0x50, 0x04, 0x2e, 0x4a, 0xc9, 0xfd, 0xf9,
0xeb, 0x88, 0x09, 0xe6,
+ 0x2b, 0x44, 0xe9, 0x54, 0xdb, 0x56, 0xf7, 0xe0, 0xec, 0x71, 0xdc, 0xf1,
0xde, 0x64, 0x8b, 0x09,
+ 0xdd, 0xea, 0x48, 0x9e, 0x00, 0x44, 0x12, 0xc5, 0x20, 0xf0, 0x6f, 0xf9,
0xdc, 0xa9, 0x99, 0x74,
+ 0x19, 0x44, 0xeb, 0x25, 0xba, 0x02, 0x95, 0xbc, 0x62, 0xa1, 0x63, 0xc7,
0xfa, 0x35, 0x42, 0x5e,
+ 0x42, 0x4d, 0x44, 0xbe, 0xe2, 0x4b, 0x74, 0xea, 0xa6, 0x86, 0x27, 0x49,
0x0d, 0x85, 0xee, 0xf5,
+ 0x68, 0x6c, 0x45, 0x53, 0x73, 0xe2, 0x40, 0x8d, 0x45, 0x2a, 0xe8, 0xbb,
0x0b, 0xa7, 0xd1, 0xb6,
+ 0xba, 0x75, 0x9a, 0x8a, 0xa4, 0x0d, 0xcd, 0x90, 0x09, 0xe6, 0x79, 0xe8,
0x71, 0xb9, 0x74, 0xc0,
+ 0x5c, 0x67, 0x16, 0x22, 0x27, 0x50, 0x8d, 0x99, 0xe4, 0x34, 0xdb, 0x56,
0xb7, 0x79, 0x76, 0x98,
+ 0x24, 0x9d, 0x1a, 0x70, 0x1a, 0xa2, 0x4b, 0x13, 0x03, 0xf2, 0x7f, 0x28,
0x4b, 0xee, 0x2f, 0x9c,
+ 0x3d, 0x63, 0xd8, 0x4a, 0x0d, 0xb9, 0xbf, 0x30, 0x66, 0x46, 0x49, 0x7a,
0xd0, 0x10, 0x29, 0x37,
+ 0x9d, 0x7d, 0x63, 0x69, 0x27, 0x96, 0x2b, 0xce, 0xd2, 0xb5, 0x89, 0xee,
0x4a, 0xc8, 0xdc, 0x05,
+ 0x9b, 0xe3, 0x5b, 0xe1, 0x39, 0x07, 0x71, 0x57, 0xd6, 0x08, 0xf9, 0x0e,
0xf6, 0x44, 0x86, 0xa6,
+ 0x4e, 0xcb, 0x84, 0x7c, 0xb0, 0x0a, 0xb9, 0x56, 0xd1, 0x9c, 0x61, 0xe7,
0x6b, 0xa8, 0x25, 0x8d,
+ 0x22, 0x75, 0x28, 0xff, 0xd8, 0x7f, 0xd7, 0xb7, 0x77, 0x08, 0x40, 0x75,
0xf2, 0x7e, 0xf6, 0x66,
+ 0x7c, 0x6d, 0x5b, 0xa4, 0x0a, 0xbb, 0xaf, 0xc7, 0x76, 0xa9, 0xf3, 0xb7,
0x05, 0x7b, 0x83, 0xc0,
+ 0x97, 0xd1, 0x12, 0x85, 0x3e, 0x88, 0xb9, 0x28, 0xf7, 0x23, 0x2e, 0xd9,
0xec, 0x2e, 0xc4, 0x84,
+ 0xa1, 0x19, 0x84, 0x7c, 0x05, 0x07, 0x12, 0xc5, 0x0d, 0x0e, 0x56, 0xc4,
0x8b, 0xa9, 0x5a, 0x40,
+ 0xb5, 0x1d, 0x97, 0x14, 0xe7, 0xf8, 0x79, 0xc2, 0xf4, 0x3c, 0xf8, 0x86,
0xb9, 0x75, 0x5a, 0x40,
+ 0xc9, 0x14, 0x0e, 0x05, 0xba, 0xc8, 0x3f, 0xa1, 0xf8, 0x29, 0xc2, 0x08,
0xa7, 0xfc, 0xb7, 0x98,
+ 0xcb, 0xcd, 0xb3, 0x2f, 0x93, 0x73, 0x66, 0xeb, 0xeb, 0xd1, 0xa2, 0x31,
0xdd, 0xf4, 0x3f, 0x3e,
+ 0x81, 0xc3, 0x0d, 0x3b, 0x72, 0x04, 0x95, 0x4f, 0xcc, 0x8b, 0x30, 0x99,
0xeb, 0x58, 0xe8, 0xfc,
+ 0x59, 0x01, 0x58, 0x5f, 0x77, 0x7e, 0xa4, 0xac, 0xe2, 0x48, 0x39, 0x50,
0x73, 0x0d, 0x9f, 0x65,
+ 0x72, 0xea, 0x54, 0x24, 0x5f, 0xc0, 0xbe, 0xba, 0x0b, 0x33, 0x5d, 0x89,
0x27, 0x2e, 0x0f, 0x92,
+ 0x01, 0xd8, 0x32, 0xfa, 0x20, 0x5d, 0xc1, 0x43, 0xcd, 0x69, 0xd3, 0xe2,
0x92, 0xa1, 0xfb, 0xa3,
+ 0x94, 0x50, 0x05, 0x35, 0xdd, 0x70, 0x20, 0x1c, 0x1e, 0x2a, 0x3d, 0xc4,
0x72, 0x16, 0x4c, 0x51,
+ 0x9c, 0x67, 0x72, 0x96, 0xdb, 0xa5, 0x6e, 0xf3, 0xec, 0xc5, 0x06, 0x89,
0x7b, 0xb3, 0xad, 0xf6,
+ 0x43, 0x5f, 0x89, 0xbb, 0x57, 0xbb, 0x8e, 0x45, 0xef, 0x09, 0x48, 0xfa,
0x00, 0xdc, 0x0f, 0x23,
+ 0xa5, 0x83, 0x48, 0x07, 0x4c, 0xf8, 0xa7, 0x9b, 0xe1, 0x47, 0x2b, 0x1b,
0x13, 0x92, 0x66, 0x9c,
+ 0x74, 0x43, 0x35, 0x0d, 0x83, 0x48, 0x5d, 0xc5, 0xeb, 0xa5, 0x4c, 0xd7,
0x00, 0xe9, 0xc2, 0x7e,
+ 0x9c, 0x3a, 0x25, 0x89, 0xd9, 0x1c, 0xa6, 0xa6, 0xbc, 0x42, 0xb7, 0xfe,
0x43, 0xc4, 0x3d, 0xc5,
+ 0x7d, 0xb3, 0x20, 0x1a, 0x34, 0x15, 0xc9, 0x69, 0xbe, 0xa9, 0xa6, 0x13,
0x0d, 0x63, 0xb2, 0x81,
+ 0x93, 0x6f, 0xe0, 0x81, 0xeb, 0x21, 0xf3, 0xa3, 0x30, 0xdb, 0x68, 0x33,
0xfd, 0x75, 0xba, 0x4d,
+ 0x75, 0x3c, 0x82, 0xc7, 0xff, 0xd2, 0x3d, 0xfd, 0xdc, 0x2c, 0xf0, 0x2e,
0x61, 0x8a, 0xfe, 0x5c,
+ 0xd3, 0x2c, 0x66, 0x48, 0x2c, 0x7c, 0xbf, 0xfb, 0xd2, 0x3a, 0xa6, 0xd0,
0x2a, 0x74, 0x6a, 0x8b,
+ 0xfb, 0x49, 0xd6, 0x7d, 0x3d, 0xeb, 0xd9, 0x19, 0xc8, 0xc4, 0xec, 0xfc,
0x65, 0x41, 0x3d, 0x5d,
+ 0x42, 0xff, 0x31, 0x79, 0x8f, 0xa0, 0x62, 0xae, 0x24, 0x79, 0x82, 0x62,
0x21, 0xd9, 0x07, 0x79,
+ 0x16, 0xa6, 0xfb, 0x20, 0x4b, 0xa5, 0xcc, 0xfd, 0x55, 0xf3, 0xf7, 0x97,
0xdf, 0x38, 0xb5, 0xe2,
+ 0xc6, 0xe9, 0xcc, 0xe1, 0xd1, 0x24, 0x5e, 0x89, 0x97, 0x81, 0xcb, 0xf4,
0xa5, 0x5c, 0xa1, 0x62,
+ 0xe7, 0x4c, 0xb1, 0x78, 0xc3, 0x1b, 0xd5, 0x84, 0xa9, 0x8f, 0xc9, 0x91,
0xb3, 0x90, 0x26, 0x47,
+ 0x20, 0xf8, 0x9c, 0xfb, 0xcc, 0xbb, 0xe0, 0x1e, 0x66, 0x16, 0xd6, 0x06,
0xde, 0xf9, 0xbd, 0x04,
+ 0x76, 0xfa, 0xdc, 0xac, 0x52, 0xfc, 0x00, 0xad, 0xdb, 0xfc, 0x13, 0x64,
0xd2, 0x34, 0xcf, 0x1e,
+ 0x6e, 0x7f, 0xa0, 0x68, 0xd1, 0x9c, 0xbc, 0x81, 0x56, 0x98, 0xaf, 0x3f,
0xb9, 0xdb, 0xf4, 0x89,
+ 0xbb, 0xe7, 0x74, 0xb4, 0xe8, 0xa6, 0x7b, 0xf8, 0x09, 0x85, 0xd4, 0x11,
0x4a, 0x66, 0x92, 0x52,
+ 0x51, 0xf7, 0xd0, 0x15, 0xc8, 0x14, 0xce, 0x78, 0x72, 0x03, 0x65, 0x9a,
0x41, 0xc8, 0x14, 0x0e,
+ 0xb8, 0x2f, 0x15, 0xf3, 0x5d, 0x9c, 0x2a, 0xa6, 0x50, 0x3a, 0x15, 0x33,
0xcc, 0xcf, 0x0a, 0x87,
+ 0x48, 0x73, 0xf7, 0x46, 0x39, 0xeb, 0x78, 0xac, 0x0b, 0x21, 0x8e, 0x7f,
0x86, 0x07, 0x5b, 0xcc,
+ 0xb2, 0x9c, 0xae, 0xc4, 0x9c, 0x3e, 0xcd, 0x72, 0xfa, 0xe0, 0xec, 0xa8,
0x90, 0xd4, 0x38, 0x67,
+ 0x49, 0x1d, 0x40, 0x3d, 0x0d, 0xac, 0x57, 0xe6, 0x6d, 0xa1, 0xb8, 0xe4,
0x02, 0x1e, 0xdd, 0x53,
+ 0x3b, 0xdd, 0x70, 0xd0, 0xed, 0x49, 0x6b, 0x1f, 0xdd, 0x98, 0x2a, 0x2a,
0x34, 0x83, 0x74, 0xde,
+ 0x02, 0xf4, 0xa5, 0xe4, 0x73, 0x7f, 0x89, 0xbe, 0x22, 0xcf, 0xa0, 0x9e,
0xea, 0x92, 0x54, 0xe9,
+ 0x73, 0x9f, 0x56, 0x45, 0x57, 0x06, 0xfa, 0x5f, 0xd6, 0xaf, 0x81, 0x58,
0xa0, 0x48, 0x02, 0x37,
+ 0xe8, 0x4a, 0x3e, 0x1d, 0xc3, 0xd1, 0xb6, 0xbf, 0x50, 0xc4, 0x86, 0xbd,
0xfe, 0xec, 0x72, 0xd8,
+ 0x9f, 0xce, 0x7e, 0x19, 0x5f, 0x0f, 0x86, 0xf6, 0x0e, 0x69, 0x41, 0xb3,
0x3f, 0xbb, 0x1a, 0xa7,
+ 0x80, 0x45, 0x8e, 0xc0, 0x1e, 0x5e, 0x5c, 0x0c, 0x07, 0xb3, 0xd1, 0xbb,
0xe1, 0xe5, 0xfb, 0x18,
+ 0xdd, 0x3d, 0x7d, 0x0e, 0x76, 0xf1, 0x81, 0xd0, 0xaf, 0xfa, 0xf4, 0x4d,
0x9f, 0x0e, 0xcf, 0xed,
+ 0x1d, 0xb2, 0x07, 0xf5, 0x8b, 0xfe, 0xe8, 0x72, 0xfc, 0x6e, 0x48, 0x6d,
0xeb, 0xf4, 0x04, 0xf6,
+ 0x73, 0x2d, 0x26, 0x4d, 0xa8, 0xd1, 0xb7, 0xd7, 0xd7, 0xa3, 0xeb, 0xd7,
0xf6, 0x8e, 0x16, 0xa6,
+ 0xb3, 0xf1, 0x64, 0x32, 0x3c, 0xb7, 0xad, 0x57, 0x2f, 0xe0, 0x69, 0x20,
0xe6, 0x3d, 0x16, 0x32,
+ 0xf7, 0x23, 0xf6, 0xc2, 0xc8, 0x93, 0x4c, 0xf4, 0xd2, 0x36, 0xca, 0xf8,
0xf4, 0xaf, 0xea, 0x69,
+ 0xb4, 0x0f, 0x55, 0x03, 0x7c, 0xfb, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff,
0xfe, 0xa8, 0xa5, 0x5f,
+ 0xa3, 0x0b, 0x00, 0x00,
+}
diff --git a/pulsar-function-go/pb/InstanceCommunication.pb.go
b/pulsar-function-go/pb/InstanceCommunication.pb.go
new file mode 100644
index 0000000..a0c36e0
--- /dev/null
+++ b/pulsar-function-go/pb/InstanceCommunication.pb.go
@@ -0,0 +1,648 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: InstanceCommunication.proto
+
+package pb
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+import _ "github.com/golang/protobuf/ptypes/empty"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type FunctionStatus struct {
+ Running bool `protobuf:"varint,1,opt,name=running,proto3"
json:"running,omitempty"`
+ FailureException string
`protobuf:"bytes,2,opt,name=failureException,proto3"
json:"failureException,omitempty"`
+ NumRestarts int64
`protobuf:"varint,3,opt,name=numRestarts,proto3" json:"numRestarts,omitempty"`
+ // int64 numProcessed = 4;
+ NumReceived int64
`protobuf:"varint,17,opt,name=numReceived,proto3" json:"numReceived,omitempty"`
+ NumSuccessfullyProcessed int64
`protobuf:"varint,5,opt,name=numSuccessfullyProcessed,proto3"
json:"numSuccessfullyProcessed,omitempty"`
+ NumUserExceptions int64
`protobuf:"varint,6,opt,name=numUserExceptions,proto3"
json:"numUserExceptions,omitempty"`
+ LatestUserExceptions []*FunctionStatus_ExceptionInformation
`protobuf:"bytes,7,rep,name=latestUserExceptions,proto3"
json:"latestUserExceptions,omitempty"`
+ NumSystemExceptions int64
`protobuf:"varint,8,opt,name=numSystemExceptions,proto3"
json:"numSystemExceptions,omitempty"`
+ LatestSystemExceptions []*FunctionStatus_ExceptionInformation
`protobuf:"bytes,9,rep,name=latestSystemExceptions,proto3"
json:"latestSystemExceptions,omitempty"`
+ NumSourceExceptions int64
`protobuf:"varint,18,opt,name=numSourceExceptions,proto3"
json:"numSourceExceptions,omitempty"`
+ LatestSourceExceptions []*FunctionStatus_ExceptionInformation
`protobuf:"bytes,19,rep,name=latestSourceExceptions,proto3"
json:"latestSourceExceptions,omitempty"`
+ NumSinkExceptions int64
`protobuf:"varint,20,opt,name=numSinkExceptions,proto3"
json:"numSinkExceptions,omitempty"`
+ LatestSinkExceptions []*FunctionStatus_ExceptionInformation
`protobuf:"bytes,21,rep,name=latestSinkExceptions,proto3"
json:"latestSinkExceptions,omitempty"`
+ // map from topic name to number of deserialization exceptions
+ // map<string, int64> deserializationExceptions = 10;
+ // number of serialization exceptions on the output
+ // int64 serializationExceptions = 11;
+ // average latency
+ AverageLatency float64
`protobuf:"fixed64,12,opt,name=averageLatency,proto3"
json:"averageLatency,omitempty"`
+ // When was the last time the function was invoked.
+ // expressed in ms since epoch
+ LastInvocationTime int64
`protobuf:"varint,13,opt,name=lastInvocationTime,proto3"
json:"lastInvocationTime,omitempty"`
+ InstanceId string
`protobuf:"bytes,14,opt,name=instanceId,proto3" json:"instanceId,omitempty"`
+ // MetricsData metrics = 15 [deprecated=true];
+ // owner of function-instance
+ WorkerId string
`protobuf:"bytes,16,opt,name=workerId,proto3" json:"workerId,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *FunctionStatus) Reset() { *m = FunctionStatus{} }
+func (m *FunctionStatus) String() string { return proto.CompactTextString(m) }
+func (*FunctionStatus) ProtoMessage() {}
+func (*FunctionStatus) Descriptor() ([]byte, []int) {
+ return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{0}
+}
+func (m *FunctionStatus) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_FunctionStatus.Unmarshal(m, b)
+}
+func (m *FunctionStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte,
error) {
+ return xxx_messageInfo_FunctionStatus.Marshal(b, m, deterministic)
+}
+func (dst *FunctionStatus) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_FunctionStatus.Merge(dst, src)
+}
+func (m *FunctionStatus) XXX_Size() int {
+ return xxx_messageInfo_FunctionStatus.Size(m)
+}
+func (m *FunctionStatus) XXX_DiscardUnknown() {
+ xxx_messageInfo_FunctionStatus.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_FunctionStatus proto.InternalMessageInfo
+
+func (m *FunctionStatus) GetRunning() bool {
+ if m != nil {
+ return m.Running
+ }
+ return false
+}
+
+func (m *FunctionStatus) GetFailureException() string {
+ if m != nil {
+ return m.FailureException
+ }
+ return ""
+}
+
+func (m *FunctionStatus) GetNumRestarts() int64 {
+ if m != nil {
+ return m.NumRestarts
+ }
+ return 0
+}
+
+func (m *FunctionStatus) GetNumReceived() int64 {
+ if m != nil {
+ return m.NumReceived
+ }
+ return 0
+}
+
+func (m *FunctionStatus) GetNumSuccessfullyProcessed() int64 {
+ if m != nil {
+ return m.NumSuccessfullyProcessed
+ }
+ return 0
+}
+
+func (m *FunctionStatus) GetNumUserExceptions() int64 {
+ if m != nil {
+ return m.NumUserExceptions
+ }
+ return 0
+}
+
+func (m *FunctionStatus) GetLatestUserExceptions()
[]*FunctionStatus_ExceptionInformation {
+ if m != nil {
+ return m.LatestUserExceptions
+ }
+ return nil
+}
+
+func (m *FunctionStatus) GetNumSystemExceptions() int64 {
+ if m != nil {
+ return m.NumSystemExceptions
+ }
+ return 0
+}
+
+func (m *FunctionStatus) GetLatestSystemExceptions()
[]*FunctionStatus_ExceptionInformation {
+ if m != nil {
+ return m.LatestSystemExceptions
+ }
+ return nil
+}
+
+func (m *FunctionStatus) GetNumSourceExceptions() int64 {
+ if m != nil {
+ return m.NumSourceExceptions
+ }
+ return 0
+}
+
+func (m *FunctionStatus) GetLatestSourceExceptions()
[]*FunctionStatus_ExceptionInformation {
+ if m != nil {
+ return m.LatestSourceExceptions
+ }
+ return nil
+}
+
+func (m *FunctionStatus) GetNumSinkExceptions() int64 {
+ if m != nil {
+ return m.NumSinkExceptions
+ }
+ return 0
+}
+
+func (m *FunctionStatus) GetLatestSinkExceptions()
[]*FunctionStatus_ExceptionInformation {
+ if m != nil {
+ return m.LatestSinkExceptions
+ }
+ return nil
+}
+
+func (m *FunctionStatus) GetAverageLatency() float64 {
+ if m != nil {
+ return m.AverageLatency
+ }
+ return 0
+}
+
+func (m *FunctionStatus) GetLastInvocationTime() int64 {
+ if m != nil {
+ return m.LastInvocationTime
+ }
+ return 0
+}
+
+func (m *FunctionStatus) GetInstanceId() string {
+ if m != nil {
+ return m.InstanceId
+ }
+ return ""
+}
+
+func (m *FunctionStatus) GetWorkerId() string {
+ if m != nil {
+ return m.WorkerId
+ }
+ return ""
+}
+
+type FunctionStatus_ExceptionInformation struct {
+ ExceptionString string
`protobuf:"bytes,1,opt,name=exceptionString,proto3"
json:"exceptionString,omitempty"`
+ MsSinceEpoch int64
`protobuf:"varint,2,opt,name=msSinceEpoch,proto3" json:"msSinceEpoch,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *FunctionStatus_ExceptionInformation) Reset() { *m =
FunctionStatus_ExceptionInformation{} }
+func (m *FunctionStatus_ExceptionInformation) String() string { return
proto.CompactTextString(m) }
+func (*FunctionStatus_ExceptionInformation) ProtoMessage() {}
+func (*FunctionStatus_ExceptionInformation) Descriptor() ([]byte, []int) {
+ return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{0,
0}
+}
+func (m *FunctionStatus_ExceptionInformation) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_FunctionStatus_ExceptionInformation.Unmarshal(m,
b)
+}
+func (m *FunctionStatus_ExceptionInformation) XXX_Marshal(b []byte,
deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_FunctionStatus_ExceptionInformation.Marshal(b,
m, deterministic)
+}
+func (dst *FunctionStatus_ExceptionInformation) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_FunctionStatus_ExceptionInformation.Merge(dst, src)
+}
+func (m *FunctionStatus_ExceptionInformation) XXX_Size() int {
+ return xxx_messageInfo_FunctionStatus_ExceptionInformation.Size(m)
+}
+func (m *FunctionStatus_ExceptionInformation) XXX_DiscardUnknown() {
+ xxx_messageInfo_FunctionStatus_ExceptionInformation.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_FunctionStatus_ExceptionInformation
proto.InternalMessageInfo
+
+func (m *FunctionStatus_ExceptionInformation) GetExceptionString() string {
+ if m != nil {
+ return m.ExceptionString
+ }
+ return ""
+}
+
+func (m *FunctionStatus_ExceptionInformation) GetMsSinceEpoch() int64 {
+ if m != nil {
+ return m.MsSinceEpoch
+ }
+ return 0
+}
+
+// Deprecated
+type FunctionStatusList struct {
+ Error string
`protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"`
+ FunctionStatusList []*FunctionStatus
`protobuf:"bytes,1,rep,name=functionStatusList,proto3"
json:"functionStatusList,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *FunctionStatusList) Reset() { *m = FunctionStatusList{} }
+func (m *FunctionStatusList) String() string { return
proto.CompactTextString(m) }
+func (*FunctionStatusList) ProtoMessage() {}
+func (*FunctionStatusList) Descriptor() ([]byte, []int) {
+ return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{1}
+}
+func (m *FunctionStatusList) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_FunctionStatusList.Unmarshal(m, b)
+}
+func (m *FunctionStatusList) XXX_Marshal(b []byte, deterministic bool)
([]byte, error) {
+ return xxx_messageInfo_FunctionStatusList.Marshal(b, m, deterministic)
+}
+func (dst *FunctionStatusList) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_FunctionStatusList.Merge(dst, src)
+}
+func (m *FunctionStatusList) XXX_Size() int {
+ return xxx_messageInfo_FunctionStatusList.Size(m)
+}
+func (m *FunctionStatusList) XXX_DiscardUnknown() {
+ xxx_messageInfo_FunctionStatusList.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_FunctionStatusList proto.InternalMessageInfo
+
+func (m *FunctionStatusList) GetError() string {
+ if m != nil {
+ return m.Error
+ }
+ return ""
+}
+
+func (m *FunctionStatusList) GetFunctionStatusList() []*FunctionStatus {
+ if m != nil {
+ return m.FunctionStatusList
+ }
+ return nil
+}
+
+type MetricsData struct {
+ // Total number of records function received from source
+ ReceivedTotal int64
`protobuf:"varint,2,opt,name=receivedTotal,proto3"
json:"receivedTotal,omitempty"`
+ ReceivedTotal_1Min int64
`protobuf:"varint,10,opt,name=receivedTotal_1min,json=receivedTotal1min,proto3"
json:"receivedTotal_1min,omitempty"`
+ // Total number of records successfully processed by user function
+ ProcessedSuccessfullyTotal int64
`protobuf:"varint,4,opt,name=processedSuccessfullyTotal,proto3"
json:"processedSuccessfullyTotal,omitempty"`
+ ProcessedSuccessfullyTotal_1Min int64
`protobuf:"varint,12,opt,name=processedSuccessfullyTotal_1min,json=processedSuccessfullyTotal1min,proto3"
json:"processedSuccessfullyTotal_1min,omitempty"`
+ // Total number of system exceptions thrown
+ SystemExceptionsTotal int64
`protobuf:"varint,5,opt,name=systemExceptionsTotal,proto3"
json:"systemExceptionsTotal,omitempty"`
+ SystemExceptionsTotal_1Min int64
`protobuf:"varint,13,opt,name=systemExceptionsTotal_1min,json=systemExceptionsTotal1min,proto3"
json:"systemExceptionsTotal_1min,omitempty"`
+ // Total number of user exceptions thrown
+ UserExceptionsTotal int64
`protobuf:"varint,6,opt,name=userExceptionsTotal,proto3"
json:"userExceptionsTotal,omitempty"`
+ UserExceptionsTotal_1Min int64
`protobuf:"varint,14,opt,name=userExceptionsTotal_1min,json=userExceptionsTotal1min,proto3"
json:"userExceptionsTotal_1min,omitempty"`
+ // Average process latency for function
+ AvgProcessLatency float64
`protobuf:"fixed64,7,opt,name=avgProcessLatency,proto3"
json:"avgProcessLatency,omitempty"`
+ AvgProcessLatency_1Min float64
`protobuf:"fixed64,15,opt,name=avgProcessLatency_1min,json=avgProcessLatency1min,proto3"
json:"avgProcessLatency_1min,omitempty"`
+ // Timestamp of when the function was last invoked
+ LastInvocation int64
`protobuf:"varint,8,opt,name=lastInvocation,proto3"
json:"lastInvocation,omitempty"`
+ // User defined metrics
+ UserMetrics map[string]float64
`protobuf:"bytes,9,rep,name=userMetrics,proto3" json:"userMetrics,omitempty"
protobuf_key:"bytes,1,opt,name=key,proto3"
protobuf_val:"fixed64,2,opt,name=value,proto3"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *MetricsData) Reset() { *m = MetricsData{} }
+func (m *MetricsData) String() string { return proto.CompactTextString(m) }
+func (*MetricsData) ProtoMessage() {}
+func (*MetricsData) Descriptor() ([]byte, []int) {
+ return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{2}
+}
+func (m *MetricsData) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_MetricsData.Unmarshal(m, b)
+}
+func (m *MetricsData) XXX_Marshal(b []byte, deterministic bool) ([]byte,
error) {
+ return xxx_messageInfo_MetricsData.Marshal(b, m, deterministic)
+}
+func (dst *MetricsData) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_MetricsData.Merge(dst, src)
+}
+func (m *MetricsData) XXX_Size() int {
+ return xxx_messageInfo_MetricsData.Size(m)
+}
+func (m *MetricsData) XXX_DiscardUnknown() {
+ xxx_messageInfo_MetricsData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_MetricsData proto.InternalMessageInfo
+
+func (m *MetricsData) GetReceivedTotal() int64 {
+ if m != nil {
+ return m.ReceivedTotal
+ }
+ return 0
+}
+
+func (m *MetricsData) GetReceivedTotal_1Min() int64 {
+ if m != nil {
+ return m.ReceivedTotal_1Min
+ }
+ return 0
+}
+
+func (m *MetricsData) GetProcessedSuccessfullyTotal() int64 {
+ if m != nil {
+ return m.ProcessedSuccessfullyTotal
+ }
+ return 0
+}
+
+func (m *MetricsData) GetProcessedSuccessfullyTotal_1Min() int64 {
+ if m != nil {
+ return m.ProcessedSuccessfullyTotal_1Min
+ }
+ return 0
+}
+
+func (m *MetricsData) GetSystemExceptionsTotal() int64 {
+ if m != nil {
+ return m.SystemExceptionsTotal
+ }
+ return 0
+}
+
+func (m *MetricsData) GetSystemExceptionsTotal_1Min() int64 {
+ if m != nil {
+ return m.SystemExceptionsTotal_1Min
+ }
+ return 0
+}
+
+func (m *MetricsData) GetUserExceptionsTotal() int64 {
+ if m != nil {
+ return m.UserExceptionsTotal
+ }
+ return 0
+}
+
+func (m *MetricsData) GetUserExceptionsTotal_1Min() int64 {
+ if m != nil {
+ return m.UserExceptionsTotal_1Min
+ }
+ return 0
+}
+
+func (m *MetricsData) GetAvgProcessLatency() float64 {
+ if m != nil {
+ return m.AvgProcessLatency
+ }
+ return 0
+}
+
+func (m *MetricsData) GetAvgProcessLatency_1Min() float64 {
+ if m != nil {
+ return m.AvgProcessLatency_1Min
+ }
+ return 0
+}
+
+func (m *MetricsData) GetLastInvocation() int64 {
+ if m != nil {
+ return m.LastInvocation
+ }
+ return 0
+}
+
+func (m *MetricsData) GetUserMetrics() map[string]float64 {
+ if m != nil {
+ return m.UserMetrics
+ }
+ return nil
+}
+
+type HealthCheckResult struct {
+ Success bool
`protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *HealthCheckResult) Reset() { *m = HealthCheckResult{} }
+func (m *HealthCheckResult) String() string { return
proto.CompactTextString(m) }
+func (*HealthCheckResult) ProtoMessage() {}
+func (*HealthCheckResult) Descriptor() ([]byte, []int) {
+ return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{3}
+}
+func (m *HealthCheckResult) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_HealthCheckResult.Unmarshal(m, b)
+}
+func (m *HealthCheckResult) XXX_Marshal(b []byte, deterministic bool) ([]byte,
error) {
+ return xxx_messageInfo_HealthCheckResult.Marshal(b, m, deterministic)
+}
+func (dst *HealthCheckResult) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_HealthCheckResult.Merge(dst, src)
+}
+func (m *HealthCheckResult) XXX_Size() int {
+ return xxx_messageInfo_HealthCheckResult.Size(m)
+}
+func (m *HealthCheckResult) XXX_DiscardUnknown() {
+ xxx_messageInfo_HealthCheckResult.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_HealthCheckResult proto.InternalMessageInfo
+
+func (m *HealthCheckResult) GetSuccess() bool {
+ if m != nil {
+ return m.Success
+ }
+ return false
+}
+
+type Metrics struct {
+ Metrics []*Metrics_InstanceMetrics
`protobuf:"bytes,1,rep,name=metrics,proto3" json:"metrics,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Metrics) Reset() { *m = Metrics{} }
+func (m *Metrics) String() string { return proto.CompactTextString(m) }
+func (*Metrics) ProtoMessage() {}
+func (*Metrics) Descriptor() ([]byte, []int) {
+ return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{4}
+}
+func (m *Metrics) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Metrics.Unmarshal(m, b)
+}
+func (m *Metrics) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_Metrics.Marshal(b, m, deterministic)
+}
+func (dst *Metrics) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Metrics.Merge(dst, src)
+}
+func (m *Metrics) XXX_Size() int {
+ return xxx_messageInfo_Metrics.Size(m)
+}
+func (m *Metrics) XXX_DiscardUnknown() {
+ xxx_messageInfo_Metrics.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Metrics proto.InternalMessageInfo
+
+func (m *Metrics) GetMetrics() []*Metrics_InstanceMetrics {
+ if m != nil {
+ return m.Metrics
+ }
+ return nil
+}
+
+type Metrics_InstanceMetrics struct {
+ Name string
`protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+ InstanceId int32
`protobuf:"varint,2,opt,name=instanceId,proto3" json:"instanceId,omitempty"`
+ MetricsData *MetricsData
`protobuf:"bytes,3,opt,name=metricsData,proto3" json:"metricsData,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Metrics_InstanceMetrics) Reset() { *m =
Metrics_InstanceMetrics{} }
+func (m *Metrics_InstanceMetrics) String() string { return
proto.CompactTextString(m) }
+func (*Metrics_InstanceMetrics) ProtoMessage() {}
+func (*Metrics_InstanceMetrics) Descriptor() ([]byte, []int) {
+ return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{4,
0}
+}
+func (m *Metrics_InstanceMetrics) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Metrics_InstanceMetrics.Unmarshal(m, b)
+}
+func (m *Metrics_InstanceMetrics) XXX_Marshal(b []byte, deterministic bool)
([]byte, error) {
+ return xxx_messageInfo_Metrics_InstanceMetrics.Marshal(b, m,
deterministic)
+}
+func (dst *Metrics_InstanceMetrics) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Metrics_InstanceMetrics.Merge(dst, src)
+}
+func (m *Metrics_InstanceMetrics) XXX_Size() int {
+ return xxx_messageInfo_Metrics_InstanceMetrics.Size(m)
+}
+func (m *Metrics_InstanceMetrics) XXX_DiscardUnknown() {
+ xxx_messageInfo_Metrics_InstanceMetrics.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Metrics_InstanceMetrics proto.InternalMessageInfo
+
+func (m *Metrics_InstanceMetrics) GetName() string {
+ if m != nil {
+ return m.Name
+ }
+ return ""
+}
+
+func (m *Metrics_InstanceMetrics) GetInstanceId() int32 {
+ if m != nil {
+ return m.InstanceId
+ }
+ return 0
+}
+
+func (m *Metrics_InstanceMetrics) GetMetricsData() *MetricsData {
+ if m != nil {
+ return m.MetricsData
+ }
+ return nil
+}
+
+func init() {
+ proto.RegisterType((*FunctionStatus)(nil), "proto.FunctionStatus")
+ proto.RegisterType((*FunctionStatus_ExceptionInformation)(nil),
"proto.FunctionStatus.ExceptionInformation")
+ proto.RegisterType((*FunctionStatusList)(nil),
"proto.FunctionStatusList")
+ proto.RegisterType((*MetricsData)(nil), "proto.MetricsData")
+ proto.RegisterMapType((map[string]float64)(nil),
"proto.MetricsData.UserMetricsEntry")
+ proto.RegisterType((*HealthCheckResult)(nil), "proto.HealthCheckResult")
+ proto.RegisterType((*Metrics)(nil), "proto.Metrics")
+ proto.RegisterType((*Metrics_InstanceMetrics)(nil),
"proto.Metrics.InstanceMetrics")
+}
+
+func init() {
+ proto.RegisterFile("InstanceCommunication.proto",
fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3)
+}
+
+var fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3 = []byte{
+ // 917 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56,
0xef, 0x6e, 0x1b, 0x45,
+ 0x10, 0xcf, 0xd5, 0x75, 0x9d, 0x8c, 0x53, 0x27, 0x9e, 0xc4, 0xe1, 0x70,
0xa5, 0x60, 0x0e, 0x54,
+ 0x59, 0x55, 0x7b, 0x2d, 0xa1, 0x48, 0x25, 0x12, 0x15, 0x4d, 0x6a, 0x82,
0xa5, 0x22, 0xa1, 0x73,
+ 0xfb, 0x15, 0xb4, 0x39, 0xaf, 0x9d, 0x53, 0xee, 0x76, 0xcd, 0xee, 0x9e,
0xc1, 0xe2, 0x45, 0x78,
+ 0x1c, 0x1e, 0x84, 0x2f, 0xbc, 0x09, 0xba, 0xdd, 0x3b, 0xfb, 0xfe, 0x19,
0x41, 0x3e, 0xf9, 0x76,
+ 0x7e, 0xf3, 0x9b, 0xf9, 0x79, 0x77, 0x66, 0x76, 0xe1, 0xd1, 0x98, 0x49,
0x45, 0x98, 0x4f, 0x2f,
+ 0x79, 0x14, 0xc5, 0x2c, 0xf0, 0x89, 0x0a, 0x38, 0x73, 0x17, 0x82, 0x2b,
0x8e, 0x4d, 0xfd, 0xd3,
+ 0x7f, 0x34, 0xe7, 0x7c, 0x1e, 0xd2, 0xe7, 0x7a, 0x75, 0x1d, 0xcf, 0x9e,
0xd3, 0x68, 0xa1, 0x56,
+ 0xc6, 0xc7, 0xf9, 0x63, 0x17, 0x3a, 0xdf, 0xc5, 0xcc, 0x4f, 0x68, 0x13,
0x45, 0x54, 0x2c, 0xd1,
+ 0x86, 0x96, 0x88, 0x19, 0x0b, 0xd8, 0xdc, 0xb6, 0x06, 0xd6, 0x70, 0xd7,
0xcb, 0x96, 0xf8, 0x04,
+ 0x0e, 0x67, 0x24, 0x08, 0x63, 0x41, 0x47, 0xbf, 0xf9, 0x74, 0x91, 0x70,
0xec, 0x7b, 0x03, 0x6b,
+ 0xb8, 0xe7, 0x55, 0xec, 0x38, 0x80, 0x36, 0x8b, 0x23, 0x8f, 0x4a, 0x45,
0x84, 0x92, 0x76, 0x63,
+ 0x60, 0x0d, 0x1b, 0x5e, 0xde, 0xb4, 0xf6, 0xf0, 0x69, 0xb0, 0xa4, 0x53,
0xbb, 0x9b, 0xf3, 0x30,
+ 0x26, 0x3c, 0x07, 0x9b, 0xc5, 0xd1, 0x24, 0xf6, 0x7d, 0x2a, 0xe5, 0x2c,
0x0e, 0xc3, 0xd5, 0x8f,
+ 0x82, 0x27, 0xdf, 0x74, 0x6a, 0x37, 0xb5, 0xfb, 0x56, 0x1c, 0x9f, 0x42,
0x97, 0xc5, 0xd1, 0x07,
+ 0x49, 0xc5, 0x5a, 0x93, 0xb4, 0x1f, 0x68, 0x52, 0x15, 0xc0, 0x9f, 0xe0,
0x38, 0x24, 0x8a, 0x4a,
+ 0x55, 0x22, 0xb4, 0x06, 0x8d, 0x61, 0xfb, 0xec, 0x89, 0xd9, 0x2c, 0xb7,
0xb8, 0x51, 0xee, 0xda,
+ 0x6f, 0xcc, 0x66, 0x5c, 0x44, 0x7a, 0xeb, 0xbd, 0xda, 0x38, 0xf8, 0x02,
0x8e, 0x12, 0xa5, 0x2b,
+ 0xa9, 0x68, 0x94, 0x0b, 0xbf, 0xab, 0xf5, 0xd4, 0x41, 0x78, 0x0d, 0x27,
0x26, 0x52, 0x85, 0xb4,
+ 0xf7, 0xbf, 0x35, 0x6d, 0x89, 0x94, 0xa9, 0xe2, 0xb1, 0xf0, 0x69, 0x2e,
0x01, 0x6e, 0x54, 0x95,
+ 0xa0, 0x9c, 0xaa, 0x32, 0xe9, 0xe8, 0xce, 0xaa, 0xca, 0x39, 0xcc, 0xc9,
0x4d, 0x02, 0x76, 0x9b,
+ 0x0b, 0x7f, 0xbc, 0x3e, 0xb9, 0x22, 0xb0, 0x39, 0xb9, 0x12, 0xa1, 0x77,
0xd7, 0x93, 0x2b, 0xc5,
+ 0x7f, 0x0c, 0x1d, 0xb2, 0xa4, 0x82, 0xcc, 0xe9, 0x3b, 0xa2, 0x28, 0xf3,
0x57, 0xf6, 0xfe, 0xc0,
+ 0x1a, 0x5a, 0x5e, 0xc9, 0x8a, 0x2e, 0x60, 0x48, 0xa4, 0x1a, 0xb3, 0x25,
0x37, 0x4d, 0xf8, 0x3e,
+ 0x88, 0xa8, 0xfd, 0x50, 0xcb, 0xae, 0x41, 0xf0, 0x14, 0x20, 0x48, 0x7b,
0x77, 0x3c, 0xb5, 0x3b,
+ 0xba, 0x8b, 0x72, 0x16, 0xec, 0xc3, 0xee, 0xaf, 0x5c, 0xdc, 0x52, 0x31,
0x9e, 0xda, 0x87, 0x1a,
+ 0x5d, 0xaf, 0xfb, 0x53, 0x38, 0xae, 0xfb, 0x07, 0x38, 0x84, 0x03, 0x9a,
0xd9, 0x27, 0x4a, 0x64,
+ 0x1d, 0xbc, 0xe7, 0x95, 0xcd, 0xe8, 0xc0, 0x7e, 0x24, 0x27, 0x01, 0xf3,
0xe9, 0x68, 0xc1, 0xfd,
+ 0x1b, 0xdd, 0xc5, 0x0d, 0xaf, 0x60, 0x73, 0x7e, 0x01, 0x2c, 0x6e, 0xdb,
0xbb, 0x40, 0x2a, 0x3c,
+ 0x86, 0x26, 0x15, 0x82, 0x8b, 0xb4, 0xf1, 0xcd, 0x02, 0x47, 0x80, 0xb3,
0x8a, 0xaf, 0x6d, 0xe9,
+ 0x33, 0xe8, 0xd5, 0x9e, 0x81, 0x57, 0x43, 0x70, 0xfe, 0x6e, 0x42, 0xfb,
0x07, 0xaa, 0x44, 0xe0,
+ 0xcb, 0xb7, 0x44, 0x11, 0xfc, 0x1c, 0x1e, 0x8a, 0x74, 0x18, 0xbc, 0xe7,
0x8a, 0x84, 0xa9, 0xce,
+ 0xa2, 0x11, 0x9f, 0x01, 0x16, 0x0c, 0x3f, 0x7f, 0x11, 0x05, 0xcc, 0x06,
0x53, 0x31, 0x05, 0x24,
+ 0x01, 0xf0, 0x35, 0xf4, 0x17, 0xd9, 0x98, 0xc8, 0xcf, 0x0e, 0x93, 0xe1,
0xbe, 0xa6, 0xfd, 0x8b,
+ 0x07, 0x5e, 0xc1, 0x27, 0xdb, 0x51, 0x93, 0x7b, 0x5f, 0x07, 0x39, 0xdd,
0xee, 0xa6, 0x85, 0xbc,
+ 0x84, 0x9e, 0x2c, 0xb5, 0xa4, 0xd1, 0x60, 0x66, 0x5b, 0x3d, 0x88, 0xdf,
0x40, 0xbf, 0x16, 0x30,
+ 0x99, 0x4d, 0xc1, 0x7d, 0x5c, 0xeb, 0xa1, 0x93, 0xbe, 0x80, 0xa3, 0xb8,
0x30, 0x9b, 0x4c, 0x4a,
+ 0x33, 0x19, 0xeb, 0x20, 0xfc, 0x1a, 0xec, 0x1a, 0xb3, 0x49, 0xd7, 0xd1,
0xb4, 0x8f, 0x6a, 0x70,
+ 0x9d, 0xec, 0x29, 0x74, 0xc9, 0x72, 0x9e, 0x0e, 0xe5, 0xac, 0x7f, 0x5a,
0xba, 0x7f, 0xaa, 0x00,
+ 0x7e, 0x05, 0x27, 0x15, 0xa3, 0x49, 0x73, 0xa0, 0x29, 0xbd, 0x0a, 0xaa,
0x93, 0x3c, 0x86, 0x4e,
+ 0xb1, 0xbf, 0xd2, 0xb1, 0x5a, 0xb2, 0xe2, 0x08, 0xda, 0x89, 0xce, 0xb4,
0xbe, 0xd2, 0x31, 0xfa,
+ 0x59, 0x5a, 0x9c, 0xb9, 0xaa, 0x73, 0x3f, 0x6c, 0xbc, 0x46, 0x4c, 0x89,
0x95, 0x97, 0xe7, 0xf5,
+ 0x5f, 0xc3, 0x61, 0xd9, 0x01, 0x0f, 0xa1, 0x71, 0x4b, 0x57, 0x69, 0xb3,
0x25, 0x9f, 0x49, 0x9b,
+ 0x2c, 0x49, 0x18, 0x53, 0x5d, 0xb1, 0x96, 0x67, 0x16, 0xe7, 0xf7, 0x5e,
0x59, 0xce, 0x33, 0xe8,
+ 0x7e, 0x4f, 0x49, 0xa8, 0x6e, 0x2e, 0x6f, 0xa8, 0x7f, 0xeb, 0x51, 0x19,
0x87, 0x2a, 0xb9, 0x73,
+ 0xa5, 0xa9, 0x91, 0xec, 0xce, 0x4d, 0x97, 0xce, 0x9f, 0x16, 0xb4, 0xd2,
0x5c, 0xf8, 0x0a, 0x5a,
+ 0x51, 0xaa, 0xde, 0xb4, 0xd6, 0x69, 0x51, 0xbd, 0x9b, 0xbd, 0x06, 0xd2,
0xb5, 0x97, 0xb9, 0xf7,
+ 0x7f, 0x87, 0x83, 0x12, 0x86, 0x08, 0xf7, 0x19, 0x89, 0x68, 0x2a, 0x5a,
0x7f, 0x97, 0x86, 0x52,
+ 0x22, 0xbd, 0x59, 0x18, 0x4a, 0x2f, 0xa1, 0x1d, 0x6d, 0x36, 0x4a, 0x5f,
0xea, 0xed, 0x33, 0xac,
+ 0x6e, 0xa1, 0x97, 0x77, 0x3b, 0xfb, 0xeb, 0xde, 0x26, 0xfb, 0x25, 0x67,
0x4a, 0xf0, 0x10, 0xdf,
+ 0x42, 0xf7, 0x8a, 0xaa, 0xd2, 0xcb, 0xe3, 0xc4, 0x35, 0x4f, 0x15, 0x37,
0x7b, 0xaa, 0xb8, 0xa3,
+ 0xe4, 0xa9, 0xd2, 0xaf, 0x9f, 0x20, 0xce, 0x0e, 0x5e, 0x00, 0x5e, 0x51,
0xf5, 0x86, 0x4d, 0x3d,
+ 0x2a, 0xa9, 0xca, 0xfe, 0xd9, 0xb6, 0x30, 0x35, 0x42, 0x9d, 0x1d, 0xfc,
0x16, 0xf6, 0xff, 0x13,
+ 0x7b, 0x8b, 0xdd, 0xd9, 0xc1, 0x73, 0x80, 0xab, 0xbb, 0x66, 0x7f, 0x03,
0xed, 0x5c, 0x35, 0x6c,
+ 0x25, 0xdb, 0x29, 0xb9, 0x52, 0x39, 0xce, 0xce, 0xc5, 0x39, 0x7c, 0xca,
0xc5, 0xdc, 0x25, 0x0b,
+ 0xe2, 0xdf, 0x50, 0x77, 0x11, 0x87, 0x92, 0x08, 0x37, 0x1b, 0xae, 0xd2,
0x10, 0x2f, 0x7a, 0xb5,
+ 0x0f, 0xc5, 0xeb, 0x07, 0x1a, 0xfd, 0xf2, 0x9f, 0x00, 0x00, 0x00, 0xff,
0xff, 0x52, 0xef, 0x2d,
+ 0x97, 0x48, 0x0a, 0x00, 0x00,
+}
diff --git a/pulsar-function-go/pb/Request.pb.go
b/pulsar-function-go/pb/Request.pb.go
new file mode 100644
index 0000000..733d0b0
--- /dev/null
+++ b/pulsar-function-go/pb/Request.pb.go
@@ -0,0 +1,153 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: Request.proto
+
+package pb
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type ServiceRequest_ServiceRequestType int32
+
+const (
+ ServiceRequest_UPDATE ServiceRequest_ServiceRequestType = 0
+ ServiceRequest_DELETE ServiceRequest_ServiceRequestType = 1
+ ServiceRequest_INITIALIZE ServiceRequest_ServiceRequestType = 2
+)
+
+var ServiceRequest_ServiceRequestType_name = map[int32]string{
+ 0: "UPDATE",
+ 1: "DELETE",
+ 2: "INITIALIZE",
+}
+var ServiceRequest_ServiceRequestType_value = map[string]int32{
+ "UPDATE": 0,
+ "DELETE": 1,
+ "INITIALIZE": 2,
+}
+
+func (x ServiceRequest_ServiceRequestType) String() string {
+ return proto.EnumName(ServiceRequest_ServiceRequestType_name, int32(x))
+}
+func (ServiceRequest_ServiceRequestType) EnumDescriptor() ([]byte, []int) {
+ return fileDescriptor_Request_db06a8eacb0614b9, []int{0, 0}
+}
+
+type ServiceRequest struct {
+ ServiceRequestType ServiceRequest_ServiceRequestType
`protobuf:"varint,1,opt,name=serviceRequestType,proto3,enum=proto.ServiceRequest_ServiceRequestType"
json:"serviceRequestType,omitempty"`
+ RequestId string
`protobuf:"bytes,2,opt,name=requestId,proto3" json:"requestId,omitempty"`
+ FunctionMetaData *FunctionMetaData
`protobuf:"bytes,3,opt,name=functionMetaData,proto3"
json:"functionMetaData,omitempty"`
+ WorkerId string
`protobuf:"bytes,4,opt,name=workerId,proto3" json:"workerId,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *ServiceRequest) Reset() { *m = ServiceRequest{} }
+func (m *ServiceRequest) String() string { return proto.CompactTextString(m) }
+func (*ServiceRequest) ProtoMessage() {}
+func (*ServiceRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_Request_db06a8eacb0614b9, []int{0}
+}
+func (m *ServiceRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_ServiceRequest.Unmarshal(m, b)
+}
+func (m *ServiceRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte,
error) {
+ return xxx_messageInfo_ServiceRequest.Marshal(b, m, deterministic)
+}
+func (dst *ServiceRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ServiceRequest.Merge(dst, src)
+}
+func (m *ServiceRequest) XXX_Size() int {
+ return xxx_messageInfo_ServiceRequest.Size(m)
+}
+func (m *ServiceRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_ServiceRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ServiceRequest proto.InternalMessageInfo
+
+func (m *ServiceRequest) GetServiceRequestType()
ServiceRequest_ServiceRequestType {
+ if m != nil {
+ return m.ServiceRequestType
+ }
+ return ServiceRequest_UPDATE
+}
+
+func (m *ServiceRequest) GetRequestId() string {
+ if m != nil {
+ return m.RequestId
+ }
+ return ""
+}
+
+func (m *ServiceRequest) GetFunctionMetaData() *FunctionMetaData {
+ if m != nil {
+ return m.FunctionMetaData
+ }
+ return nil
+}
+
+func (m *ServiceRequest) GetWorkerId() string {
+ if m != nil {
+ return m.WorkerId
+ }
+ return ""
+}
+
+func init() {
+ proto.RegisterType((*ServiceRequest)(nil), "proto.ServiceRequest")
+ proto.RegisterEnum("proto.ServiceRequest_ServiceRequestType",
ServiceRequest_ServiceRequestType_name, ServiceRequest_ServiceRequestType_value)
+}
+
+func init() { proto.RegisterFile("Request.proto",
fileDescriptor_Request_db06a8eacb0614b9) }
+
+var fileDescriptor_Request_db06a8eacb0614b9 = []byte{
+ // 247 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2,
0x0d, 0x4a, 0x2d, 0x2c,
+ 0x4d, 0x2d, 0x2e, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05,
0x53, 0x52, 0x7c, 0x6e,
+ 0xa5, 0x79, 0xc9, 0x25, 0x99, 0xf9, 0x79, 0x10, 0x61, 0xa5, 0xe5, 0x4c,
0x5c, 0x7c, 0xc1, 0xa9,
+ 0x45, 0x65, 0x99, 0xc9, 0xa9, 0x50, 0xf5, 0x42, 0x11, 0x5c, 0x42, 0xc5,
0x28, 0x22, 0x21, 0x95,
+ 0x05, 0xa9, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x7c, 0x46, 0x1a, 0x10, 0x6d,
0x7a, 0xa8, 0x5a, 0xd0,
+ 0xb8, 0x20, 0xf5, 0x41, 0x58, 0xcc, 0x10, 0x92, 0xe1, 0xe2, 0x2c, 0x82,
0x70, 0x3d, 0x53, 0x24,
+ 0x98, 0x14, 0x18, 0x35, 0x38, 0x83, 0x10, 0x02, 0x42, 0xce, 0x5c, 0x02,
0x69, 0x50, 0xc7, 0xf9,
+ 0xa6, 0x96, 0x24, 0xba, 0x24, 0x96, 0x24, 0x4a, 0x30, 0x2b, 0x30, 0x6a,
0x70, 0x1b, 0x89, 0x43,
+ 0x6d, 0x75, 0x43, 0x93, 0x0e, 0xc2, 0xd0, 0x20, 0x24, 0xc5, 0xc5, 0x51,
0x9e, 0x5f, 0x94, 0x9d,
+ 0x5a, 0xe4, 0x99, 0x22, 0xc1, 0x02, 0xb6, 0x01, 0xce, 0x57, 0xb2, 0xe1,
0x12, 0xc2, 0x74, 0xa8,
+ 0x10, 0x17, 0x17, 0x5b, 0x68, 0x80, 0x8b, 0x63, 0x88, 0xab, 0x00, 0x03,
0x88, 0xed, 0xe2, 0xea,
+ 0xe3, 0x1a, 0xe2, 0x2a, 0xc0, 0x28, 0xc4, 0xc7, 0xc5, 0xe5, 0xe9, 0xe7,
0x19, 0xe2, 0xe9, 0xe8,
+ 0xe3, 0x19, 0xe5, 0x2a, 0xc0, 0xe4, 0xa4, 0xc3, 0xa5, 0x98, 0x5f, 0x94,
0xae, 0x97, 0x58, 0x90,
+ 0x98, 0x9c, 0x91, 0xaa, 0x57, 0x50, 0x9a, 0x53, 0x9c, 0x58, 0xa4, 0x07,
0xb3, 0xbf, 0x18, 0xe2,
+ 0x42, 0x27, 0x76, 0xa8, 0xc9, 0x49, 0x6c, 0x60, 0xbe, 0x31, 0x20, 0x00,
0x00, 0xff, 0xff, 0xfa,
+ 0x6c, 0xdd, 0x44, 0x86, 0x01, 0x00, 0x00,
+}
diff --git a/pulsar-function-go/pb/doc.go b/pulsar-function-go/pb/doc.go
new file mode 100644
index 0000000..aafd43a
--- /dev/null
+++ b/pulsar-function-go/pb/doc.go
@@ -0,0 +1,34 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+// Package api provides the protocol buffer messages that Pulsar
+// uses for the client/broker wire protocol.
+// See "Pulsar binary protocol specification" for more information.
+// https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/
+//
+// The protocol definition files are part of the main Pulsar source,
+// located within the Pulsar repository at:
+//
https://github.com/apache/incubator-pulsar/tree/master/pulsar-common/src/main/proto
+//
+// The generated Go code was created from the source Pulsar files at git:
+// tag: v1.18-2614-g548c726b8
+// revision: 548c726b8e7f0e163b1132c9ada6ba83d6bec572
+//
+// Files generated by the protoc-gen-go program should not be modified.
+package pb
diff --git a/pulsar-function-go/pb/generate.sh
b/pulsar-function-go/pb/generate.sh
new file mode 100755
index 0000000..ab20949
--- /dev/null
+++ b/pulsar-function-go/pb/generate.sh
@@ -0,0 +1,81 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Bash script to automate the generation of the api
+# package.
+#
+# It uses the .proto files included in Pulsar's source
+# to compile the Go code capable of encoding/decoding the
+# wire format used by Pulsar brokers.
+#
+# Requirements:
+# * protoc and protoc-gen-go are installed. See:
https://github.com/golang/protobuf
+# * The Pulsar project is checked out somewhere on the file system
+# in order to source the .proto files
+echo "generate pulsar go function protobuf code..."
+
+set -euo pipefail
+
+pkg="api"
+
+defaultPulsarSrc="${HOME}/github.com/apache/pulsar"
+
+help="usage: ${0} <path to Pulsar repo (default \"${defaultPulsarSrc}\")>"
+
+pulsarSrc="${1-${defaultPulsarSrc}}"
+if [ ! -d "${pulsarSrc}" ]; then
+ echo "error: Pulsar source is not a directory: ${pulsarSrc}"
+ echo "${help}"
+ exit 1
+fi
+protoDefinitions="${pulsarSrc}/pulsar-functions/proto/src/main/proto"
+if [ ! -d "${protoDefinitions}" ]; then
+ echo "error: Proto definitions directory not found: ${protoDefinitions}"
+ echo "${help}"
+ exit 1
+fi
+protoFiles="${protoDefinitions}/*.proto"
+
+protoc \
+ --go_out=import_path=${pkg}:. \
+ --proto_path="${protoDefinitions}" ${protoFiles}
+
+pulsarGitRev=$(git -C ${pulsarSrc} rev-parse HEAD)
+pulsarGitTag=$(git -C ${pulsarSrc} describe --tags HEAD)
+
+# Generate godoc describing this package and the
+# git sha it was created from
+cat <<EOF > doc.go
+// Package ${pkg} provides the protocol buffer messages that Pulsar
+// uses for the client/broker wire protocol.
+// See "Pulsar binary protocol specification" for more information.
+// https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/
+//
+// The protocol definition files are part of the main Pulsar source,
+// located within the Pulsar repository at:
+//
https://github.com/apache/pulsar/tree/master/pulsar-functions/proto/src/main/proto
+//
+// The generated Go code was created from the source Pulsar files at git:
+// tag: ${pulsarGitTag}
+// revision: ${pulsarGitRev}
+//
+// Files generated by the protoc-gen-go program should not be modified.
+package ${pkg}
+EOF
diff --git a/pulsar-function-go/pf/context.go b/pulsar-function-go/pf/context.go
new file mode 100644
index 0000000..4818326
--- /dev/null
+++ b/pulsar-function-go/pf/context.go
@@ -0,0 +1,98 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pf
+
+import (
+ "context"
+)
+
+type FunctionContext struct {
+ instanceConf *instanceConf
+ userConfigs map[string]interface{}
+ inputTopics []string
+}
+
+func NewFuncContext() *FunctionContext {
+ fc := &FunctionContext{
+ instanceConf: newInstanceConf(),
+ userConfigs: make(map[string]interface{}),
+ }
+ return fc
+}
+
+func (c *FunctionContext) GetInstanceID() int {
+ return c.instanceConf.instanceID
+}
+
+func (c *FunctionContext) GetInputTopics() []string {
+ return c.inputTopics
+}
+
+func (c *FunctionContext) GetOutputTopic() string {
+ return c.instanceConf.funcDetails.GetSink().Topic
+}
+
+func (c *FunctionContext) GetFuncTenant() string {
+ return c.instanceConf.funcDetails.Tenant
+}
+
+func (c *FunctionContext) GetFuncName() string {
+ return c.instanceConf.funcDetails.Name
+}
+
+func (c *FunctionContext) GetFuncNamespace() string {
+ return c.instanceConf.funcDetails.Namespace
+}
+
+func (c *FunctionContext) GetFuncID() string {
+ return c.instanceConf.funcID
+}
+
+func (c *FunctionContext) GetFuncVersion() string {
+ return c.instanceConf.funcVersion
+}
+
+func (c *FunctionContext) GetUserConfValue(key string) interface{} {
+ return c.userConfigs[key]
+}
+
+func (c *FunctionContext) GetUserConfMap() map[string]interface{} {
+ return c.userConfigs
+}
+
+// An unexported type to be used as the key for types in this package.
+// This prevents collisions with keys defined in other packages.
+type key struct{}
+
+// contextKey is the key for user.User values in Contexts. It is
+// unexported; clients use user.NewContext and user.FromContext
+// instead of using this key directly.
+var contextKey = &key{}
+
+// NewContext returns a new Context that carries value u.
+func NewContext(parent context.Context, fc *FunctionContext) context.Context {
+ return context.WithValue(parent, contextKey, fc)
+}
+
+// FromContext returns the User value stored in ctx, if any.
+func FromContext(ctx context.Context) (*FunctionContext, bool) {
+ fc, ok := ctx.Value(contextKey).(*FunctionContext)
+ return fc, ok
+}
diff --git a/pulsar-function-go/pf/context_test.go
b/pulsar-function-go/pf/context_test.go
new file mode 100644
index 0000000..4ef4ac5
--- /dev/null
+++ b/pulsar-function-go/pf/context_test.go
@@ -0,0 +1,45 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pf
+
+import (
+ "context"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestContext(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ fc := NewFuncContext()
+ ctx = NewContext(ctx, fc)
+
+ ctx = context.WithValue(ctx, "pulsar", "function")
+
+ if resfc, ok := FromContext(ctx); ok {
+ assert.Equal(t, []string{"topic-1", "topic-2"},
resfc.GetInputTopics())
+ assert.Equal(t, "1.0.0", resfc.GetFuncVersion())
+ assert.Equal(t, "pulsar-function", resfc.GetFuncID())
+ assert.Equal(t, "go-function", resfc.GetFuncName())
+ assert.Equal(t, "topic-3", resfc.GetOutputTopic())
+ }
+ assert.Equal(t, "function", ctx.Value("pulsar"))
+}
diff --git a/pulsar-function-go/pf/function.go
b/pulsar-function-go/pf/function.go
new file mode 100644
index 0000000..0be349d
--- /dev/null
+++ b/pulsar-function-go/pf/function.go
@@ -0,0 +1,172 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+//
+// This file borrows some of the implementations from {@link
https://github.com/aws/aws-lambda-go/blob/master/lambda/handler.go}
+// - errorHandler
+// - validateArguments
+// - validateReturns
+// - NewFunction
+// - Process
+//
+
+package pf
+
+import (
+ "context"
+ "fmt"
+ "reflect"
+
+ "github.com/apache/pulsar/pulsar-function-go/log"
+)
+
+type function interface {
+ process(ctx context.Context, input []byte) ([]byte, error)
+}
+
+type pulsarFunction func(ctx context.Context, input []byte) ([]byte, error)
+
+func (function pulsarFunction) process(ctx context.Context, input []byte)
([]byte, error) {
+ output, err := function(ctx, input)
+ if err != nil {
+ log.Errorf("process function error:[%s]\n", err.Error())
+ return nil, err
+ }
+
+ return output, nil
+}
+
+func errorHandler(e error) pulsarFunction {
+ return func(ctx context.Context, input []byte) ([]byte, error) {
+ return nil, e
+ }
+}
+
+func validateArguments(handler reflect.Type) (bool, error) {
+ handlerTakesContext := false
+ if handler.NumIn() > 2 {
+ return false, fmt.Errorf("functions may not take more than two
arguments, but function takes %d", handler.NumIn())
+ } else if handler.NumIn() > 0 {
+ contextType := reflect.TypeOf((*context.Context)(nil)).Elem()
+ argumentType := handler.In(0)
+ handlerTakesContext = argumentType.Implements(contextType)
+ if handler.NumIn() > 1 && !handlerTakesContext {
+ return false, fmt.Errorf("function takes two arguments,
but the first is not Context. got %s", argumentType.Kind())
+ }
+ }
+
+ return handlerTakesContext, nil
+}
+
+func validateReturns(handler reflect.Type) error {
+ errorType := reflect.TypeOf((*error)(nil)).Elem()
+ if handler.NumOut() > 2 {
+ return fmt.Errorf("function may not return more than two
values")
+ } else if handler.NumOut() > 1 {
+ if !handler.Out(1).Implements(errorType) {
+ return fmt.Errorf("function returns two values, but the
second does not implement error")
+ }
+ } else if handler.NumOut() == 1 {
+ if !handler.Out(0).Implements(errorType) {
+ return fmt.Errorf("function returns a single value, but
it does not implement error")
+ }
+ }
+ return nil
+}
+
+func newFunction(inputFunc interface{}) function {
+ if inputFunc == nil {
+ return errorHandler(fmt.Errorf("function is nil"))
+ }
+ handler := reflect.ValueOf(inputFunc)
+ handlerType := reflect.TypeOf(inputFunc)
+ if handlerType.Kind() != reflect.Func {
+ return errorHandler(fmt.Errorf("function kind %s is not %s",
handlerType.Kind(), reflect.Func))
+ }
+
+ takesContext, err := validateArguments(handlerType)
+ if err != nil {
+ return errorHandler(err)
+ }
+
+ if err := validateReturns(handlerType); err != nil {
+ return errorHandler(err)
+ }
+
+ return pulsarFunction(func(ctx context.Context, input []byte) ([]byte,
error) {
+ // construct arguments
+ var args []reflect.Value
+ if takesContext {
+ args = append(args, reflect.ValueOf(ctx))
+ }
+
+ if (handlerType.NumIn() == 1 && !takesContext) ||
handlerType.NumIn() == 2 {
+ args = append(args, reflect.ValueOf(input))
+ }
+ response := handler.Call(args)
+
+ // convert return values into ([]byte, error)
+ var err error
+ if len(response) > 0 {
+ if errVal, ok :=
response[len(response)-1].Interface().(error); ok {
+ err = errVal
+ }
+ }
+
+ var val []byte
+ if len(response) > 1 {
+ val = response[0].Bytes()
+ }
+
+ return val, err
+ })
+}
+
+// Rules:
+//
+// * handler must be a function
+// * handler may take between 0 and two arguments.
+// * if there are two arguments, the first argument must satisfy the
"context.Context" interface.
+// * handler may return between 0 and two arguments.
+// * if there are two return values, the second argument must be an error.
+// * if there is one return value it must be an error.
+//
+// Valid function signatures:
+//
+// func ()
+// func () error
+// func (input) error
+// func () (output, error)
+// func (input) (output, error)
+// func (context.Context) error
+// func (context.Context, input) error
+// func (context.Context) (output, error)
+// func (context.Context, input) (output, error)
+//
+// Where "input" and "output" are types compatible with the "encoding/json"
standard library.
+// See https://golang.org/pkg/encoding/json/#Unmarshal for how deserialization
behaves
+func Start(funcName interface{}) {
+ function := newFunction(funcName)
+ goInstance := newGoInstance()
+ err := goInstance.startFunction(function)
+ if err != nil {
+ log.Fatal(err)
+ panic("start function failed, please check.")
+ }
+}
diff --git a/pulsar-function-go/pf/function_test.go
b/pulsar-function-go/pf/function_test.go
new file mode 100644
index 0000000..30c45bb
--- /dev/null
+++ b/pulsar-function-go/pf/function_test.go
@@ -0,0 +1,182 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+//
+// This file borrows some of the implementations from {@link
https://github.com/aws/aws-lambda-go/blob/master/lambda/function_test.go}
+// - TestInvalidFunctions
+//
+
+package pf
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestInvalidFunctions(t *testing.T) {
+
+ testCases := []struct {
+ name string
+ function interface{}
+ expected error
+ }{
+ {
+ name: "nil function",
+ expected: errors.New("function is nil"),
+ function: nil,
+ },
+ {
+ name: "function is not a function",
+ expected: errors.New("function kind struct is not
func"),
+ function: struct{}{},
+ },
+ {
+ name: "function declares too many arguments",
+ expected: errors.New("functions may not take more than
two arguments, but function takes 3"),
+ function: func(n context.Context, x string, y string)
error {
+ return nil
+ },
+ },
+ {
+ name: "two argument function does not context as
first argument",
+ expected: errors.New("function takes two arguments, but
the first is not Context. got string"),
+ function: func(a string, x context.Context) error {
+ return nil
+ },
+ },
+ {
+ name: "function returns too many values",
+ expected: errors.New("function may not return more than
two values"),
+ function: func() (error, error, error) {
+ return nil, nil, nil
+ },
+ },
+ {
+ name: "function returning two values does not
declare error as the second return value",
+ expected: errors.New("function returns two values, but
the second does not implement error"),
+ function: func() (error, string) {
+ return nil, "hello"
+ },
+ },
+ {
+ name: "function returning a single value does not
implement error",
+ expected: errors.New("function returns a single value,
but it does not implement error"),
+ function: func() string {
+ return "hello"
+ },
+ },
+ {
+ name: "no return value should not result in error",
+ expected: nil,
+ function: func() {
+ },
+ },
+ }
+ for i, testCase := range testCases {
+ t.Run(fmt.Sprintf("testCase[%d] %s", i, testCase.name), func(t
*testing.T) {
+ pulsarFunction := newFunction(testCase.function)
+ _, err := pulsarFunction.process(context.TODO(),
make([]byte, 0))
+ assert.Equal(t, testCase.expected, err)
+ })
+ }
+}
+
+type expected struct {
+ val []byte
+ err error
+}
+
+var (
+ input = []byte{102, 117, 110, 99, 116, 105, 111, 110}
+)
+
+func TestInvokes(t *testing.T) {
+ testCases := []struct {
+ name string
+ input []byte
+ expected expected
+ function interface{}
+ }{
+ {
+ input: input,
+ expected: expected{input, nil},
+ function: func(in []byte) ([]byte, error) {
+ return input, nil
+ },
+ },
+ {
+ input: input,
+ expected: expected{input, nil},
+ function: func(in []byte) ([]byte, error) {
+ return input, nil
+ },
+ },
+ {
+ input: input,
+ expected: expected{input, nil},
+ function: func(ctx context.Context, in []byte) ([]byte,
error) {
+ return input, nil
+ },
+ },
+ {
+ input: input,
+ expected: expected{nil, errors.New("bad stuff")},
+ function: func() error {
+ return errors.New("bad stuff")
+ },
+ },
+ {
+ input: input,
+ expected: expected{nil, errors.New("bad stuff")},
+ function: func() ([]byte, error) {
+ return nil, errors.New("bad stuff")
+ },
+ },
+ {
+ input: input,
+ expected: expected{nil, errors.New("bad stuff")},
+ function: func(e []byte) ([]byte, error) {
+ return nil, errors.New("bad stuff")
+ },
+ },
+ {
+ input: input,
+ expected: expected{nil, errors.New("bad stuff")},
+ function: func(ctx context.Context, e []byte) ([]byte,
error) {
+ return nil, errors.New("bad stuff")
+ },
+ },
+ }
+ for i, testCase := range testCases {
+ t.Run(fmt.Sprintf("testCase[%d] %s", i, testCase.name), func(t
*testing.T) {
+ pulsarFunction := newFunction(testCase.function)
+ response, err := pulsarFunction.process(context.TODO(),
testCase.input)
+ if testCase.expected.err != nil {
+ assert.Equal(t, testCase.expected.err, err)
+ } else {
+ assert.NoError(t, err)
+ assert.Equal(t, testCase.expected.val, response)
+ }
+ })
+ }
+}
diff --git a/pulsar-function-go/pf/instance.go
b/pulsar-function-go/pf/instance.go
new file mode 100644
index 0000000..9c4e670
--- /dev/null
+++ b/pulsar-function-go/pf/instance.go
@@ -0,0 +1,275 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pf
+
+import (
+ "context"
+ "math"
+ "time"
+
+ "github.com/apache/pulsar/pulsar-client-go/pulsar"
+ "github.com/apache/pulsar/pulsar-function-go/log"
+ "github.com/apache/pulsar/pulsar-function-go/pb"
+)
+
+type goInstance struct {
+ function function
+ context *FunctionContext
+ producer pulsar.Producer
+ consumers map[string]pulsar.Consumer
+ client pulsar.Client
+}
+
+// newGoInstance init goInstance and init function context
+func newGoInstance() *goInstance {
+ goInstance := &goInstance{
+ context: NewFuncContext(),
+ consumers: make(map[string]pulsar.Consumer),
+ }
+ return goInstance
+}
+
+func (gi *goInstance) startFunction(function function) error {
+ gi.function = function
+ err := gi.setupClient()
+ if err != nil {
+ log.Errorf("setup client failed, error is:%v", err)
+ return err
+ }
+ err = gi.setupProducer()
+ if err != nil {
+ log.Errorf("setup producer failed, error is:%v", err)
+ return err
+ }
+ channel, err := gi.setupConsumer()
+ if err != nil {
+ log.Errorf("setup consumer failed, error is:%v", err)
+ return err
+ }
+
+CLOSE:
+ for {
+ select {
+ case cm := <-channel:
+ msgInput := cm.Message
+ atMostOnce :=
gi.context.instanceConf.funcDetails.ProcessingGuarantees ==
pb.ProcessingGuarantees_ATMOST_ONCE
+ atLeastOnce :=
gi.context.instanceConf.funcDetails.ProcessingGuarantees ==
pb.ProcessingGuarantees_ATLEAST_ONCE
+ autoAck := gi.context.instanceConf.funcDetails.AutoAck
+ if autoAck && atMostOnce {
+ gi.ackInputMessage(msgInput)
+ }
+ output, err := gi.handlerMsg(msgInput)
+ if err != nil {
+ log.Errorf("handler message error:%v", err)
+ if autoAck && atLeastOnce {
+ gi.nackInputMessage(msgInput)
+ }
+ return err
+ }
+ gi.processResult(msgInput, output)
+
+ case <-time.After(getIdleTimeout(time.Millisecond *
gi.context.instanceConf.killAfterIdleMs)):
+ close(channel)
+ break CLOSE
+ }
+ }
+
+ gi.close()
+ return nil
+}
+
+func (gi *goInstance) setupClient() error {
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: gi.context.instanceConf.pulsarServiceURL,
+ })
+ if err != nil {
+ log.Errorf("create client error:%v", err)
+ return err
+ }
+ gi.client = client
+ return nil
+}
+
+func (gi *goInstance) setupProducer() (err error) {
+ if gi.context.instanceConf.funcDetails.Sink.Topic != "" &&
len(gi.context.instanceConf.funcDetails.Sink.Topic) > 0 {
+ log.Debugf("Setting up producer for topic %s",
gi.context.instanceConf.funcDetails.Sink.Topic)
+ properties := getProperties(getDefaultSubscriptionName(
+ gi.context.instanceConf.funcDetails.Tenant,
+ gi.context.instanceConf.funcDetails.Namespace,
+ gi.context.instanceConf.funcDetails.Name),
gi.context.instanceConf.instanceID)
+ gi.producer, err =
gi.client.CreateProducer(pulsar.ProducerOptions{
+ Topic:
gi.context.instanceConf.funcDetails.Sink.Topic,
+ Properties: properties,
+ CompressionType: pulsar.LZ4,
+ BlockIfQueueFull: true,
+ Batching: true,
+ BatchingMaxPublishDelay: time.Millisecond * 10,
+ // set send timeout to be infinity to prevent potential
deadlock with consumer
+ // that might happen when consumer is blocked due to
unacked messages
+ SendTimeout: 0,
+ })
+ if err != nil {
+ log.Errorf("create producer error:%s", err.Error())
+ return err
+ }
+ }
+ return nil
+}
+
+func (gi *goInstance) setupConsumer() (chan pulsar.ConsumerMessage, error) {
+ subscriptionType := pulsar.Shared
+ if int32(gi.context.instanceConf.funcDetails.Source.SubscriptionType)
== pb.SubscriptionType_value["FAILOVER"] {
+ subscriptionType = pulsar.Failover
+ }
+
+ funcDetails := gi.context.instanceConf.funcDetails
+ subscriptionName := funcDetails.Tenant + "/" + funcDetails.Namespace +
"/" + funcDetails.Name
+
+ properties := getProperties(getDefaultSubscriptionName(
+ funcDetails.Tenant,
+ funcDetails.Namespace,
+ funcDetails.Name), gi.context.instanceConf.instanceID)
+
+ channel := make(chan pulsar.ConsumerMessage)
+
+ var (
+ consumer pulsar.Consumer
+ err error
+ )
+
+ for topic, consumerConf := range funcDetails.Source.InputSpecs {
+ log.Debugf("Setting up consumer for topic: %s with subscription
name: %s", topic, subscriptionName)
+ if consumerConf.ReceiverQueueSize != nil {
+ if consumerConf.IsRegexPattern {
+ consumer, err =
gi.client.Subscribe(pulsar.ConsumerOptions{
+ TopicsPattern: topic,
+ ReceiverQueueSize:
int(consumerConf.ReceiverQueueSize.Value),
+ SubscriptionName: subscriptionName,
+ Properties: properties,
+ Type: subscriptionType,
+ MessageChannel: channel,
+ })
+ } else {
+ consumer, err =
gi.client.Subscribe(pulsar.ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: subscriptionName,
+ Properties: properties,
+ Type: subscriptionType,
+ ReceiverQueueSize:
int(consumerConf.ReceiverQueueSize.Value),
+ MessageChannel: channel,
+ })
+ }
+ } else {
+ if consumerConf.IsRegexPattern {
+ consumer, err =
gi.client.Subscribe(pulsar.ConsumerOptions{
+ TopicsPattern: topic,
+ SubscriptionName: subscriptionName,
+ Properties: properties,
+ Type: subscriptionType,
+ MessageChannel: channel,
+ })
+ } else {
+ consumer, err =
gi.client.Subscribe(pulsar.ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: subscriptionName,
+ Properties: properties,
+ Type: subscriptionType,
+ MessageChannel: channel,
+ })
+
+ }
+ }
+
+ if err != nil {
+ log.Errorf("create consumer error:%s", err.Error())
+ return nil, err
+ }
+ gi.consumers[topic] = consumer
+ gi.context.inputTopics = append(gi.context.inputTopics, topic)
+ }
+ return channel, nil
+}
+
+func (gi *goInstance) handlerMsg(input pulsar.Message) (output []byte, err
error) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ ctx = NewContext(ctx, gi.context)
+ msgInput := input.Payload()
+ return gi.function.process(ctx, msgInput)
+}
+
+func (gi *goInstance) processResult(msgInput pulsar.Message, output []byte) {
+ atLeastOnce := gi.context.instanceConf.funcDetails.ProcessingGuarantees
== pb.ProcessingGuarantees_ATLEAST_ONCE
+ atMostOnce := gi.context.instanceConf.funcDetails.ProcessingGuarantees
== pb.ProcessingGuarantees_ATMOST_ONCE
+ autoAck := gi.context.instanceConf.funcDetails.AutoAck
+
+ if output != nil && gi.context.instanceConf.funcDetails.Sink.Topic !=
"" {
+ asyncMsg := pulsar.ProducerMessage{
+ Payload: output,
+ }
+ // Attempt to send the message asynchronously and handle the
response
+ gi.producer.SendAsync(context.Background(), asyncMsg,
func(message pulsar.ProducerMessage, e error) {
+ if e != nil {
+ if autoAck && atLeastOnce {
+ gi.nackInputMessage(msgInput)
+ }
+ log.Fatal(e)
+ } else if autoAck && !atMostOnce {
+ gi.ackInputMessage(msgInput)
+ }
+ })
+ } else {
+ if autoAck && atLeastOnce {
+ gi.ackInputMessage(msgInput)
+ }
+ }
+}
+
+// ackInputMessage doesn't produce any result or the user doesn't want the
result.
+func (gi *goInstance) ackInputMessage(inputMessage pulsar.Message) {
+ gi.consumers[inputMessage.Topic()].Ack(inputMessage)
+}
+
+func (gi *goInstance) nackInputMessage(inputMessage pulsar.Message) {
+ //todo: in the current version of pulsar-client-go, we do not support
this operation
+ //gi.consumers[inputMessage.Topic()].Nack(inputMessage)
+}
+
+func getIdleTimeout(timeoutMilliSecond time.Duration) time.Duration {
+ if timeoutMilliSecond < 0 {
+ return time.Duration(math.MaxInt64)
+ }
+ return timeoutMilliSecond
+}
+
+func (gi *goInstance) close() {
+ log.Info("closing go instance...")
+ if gi.producer != nil {
+ gi.producer.Close()
+ }
+ if gi.consumers != nil {
+ for _, consumer := range gi.consumers {
+ consumer.Close()
+ }
+ }
+ if gi.client != nil {
+ gi.client.Close()
+ }
+}
diff --git a/pulsar-function-go/pf/instanceConf.go
b/pulsar-function-go/pf/instanceConf.go
new file mode 100644
index 0000000..8690ef0
--- /dev/null
+++ b/pulsar-function-go/pf/instanceConf.go
@@ -0,0 +1,104 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pf
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/apache/pulsar/pulsar-function-go/conf"
+ "github.com/apache/pulsar/pulsar-function-go/pb"
+)
+
+// This is the config passed to the Golang Instance. Contains all the
information
+// passed to run functions
+type instanceConf struct {
+ instanceID int
+ funcID string
+ funcVersion string
+ funcDetails pb.FunctionDetails
+ maxBufTuples int
+ port int
+ clusterName string
+ pulsarServiceURL string
+ killAfterIdleMs time.Duration
+}
+
+func newInstanceConf() *instanceConf {
+ config := &conf.Conf{}
+ cfg := config.GetConf()
+ if cfg == nil {
+ panic("config file is nil.")
+ }
+ instanceConf := &instanceConf{
+ instanceID: cfg.InstanceID,
+ funcID: cfg.FuncID,
+ funcVersion: cfg.FuncVersion,
+ maxBufTuples: cfg.MaxBufTuples,
+ port: cfg.Port,
+ clusterName: cfg.ClusterName,
+ pulsarServiceURL: cfg.PulsarServiceURL,
+ killAfterIdleMs: cfg.KillAfterIdleMs,
+ funcDetails: pb.FunctionDetails{
+ Tenant: cfg.Tenant,
+ Namespace: cfg.NameSpace,
+ Name: cfg.Name,
+ LogTopic: cfg.LogTopic,
+ ProcessingGuarantees:
pb.ProcessingGuarantees(cfg.ProcessingGuarantees),
+ SecretsMap: cfg.SecretsMap,
+ Runtime:
pb.FunctionDetails_Runtime(cfg.Runtime),
+ AutoAck: cfg.AutoACK,
+ Parallelism: cfg.Parallelism,
+ Source: &pb.SourceSpec{
+ SubscriptionType:
pb.SubscriptionType(cfg.SubscriptionType),
+ InputSpecs: map[string]*pb.ConsumerSpec{
+ cfg.SourceSpecTopic: {
+ SchemaType:
cfg.SourceSchemaType,
+ IsRegexPattern:
cfg.IsRegexPatternSubscription,
+ ReceiverQueueSize:
&pb.ConsumerSpec_ReceiverQueueSize{
+ Value:
cfg.ReceiverQueueSize,
+ },
+ },
+ },
+ TimeoutMs: cfg.TimeoutMs,
+ SubscriptionName: cfg.SubscriptionName,
+ CleanupSubscription: cfg.CleanupSubscription,
+ },
+ Sink: &pb.SinkSpec{
+ Topic: cfg.SinkSpecTopic,
+ SchemaType: cfg.SinkSchemaType,
+ },
+ Resources: &pb.Resources{
+ Cpu: cfg.Cpu,
+ Ram: cfg.Ram,
+ Disk: cfg.Disk,
+ },
+ RetryDetails: &pb.RetryDetails{
+ MaxMessageRetries: cfg.MaxMessageRetries,
+ DeadLetterTopic: cfg.DeadLetterTopic,
+ },
+ },
+ }
+ return instanceConf
+}
+
+func (ic *instanceConf) getInstanceName() string {
+ return "" + fmt.Sprintf("%d", ic.instanceID)
+}
diff --git a/pulsar-function-go/pf/instanceConf_test.go
b/pulsar-function-go/pf/instanceConf_test.go
new file mode 100644
index 0000000..7b71b11
--- /dev/null
+++ b/pulsar-function-go/pf/instanceConf_test.go
@@ -0,0 +1,32 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pf
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestInstanceConf_GetInstanceName(t *testing.T) {
+ instanceConf := newInstanceConf()
+ str := instanceConf.getInstanceName()
+ assert.Equal(t, "101", str)
+}
diff --git a/pulsar-function-go/pf/util.go b/pulsar-function-go/pf/util.go
new file mode 100644
index 0000000..586fc86
--- /dev/null
+++ b/pulsar-function-go/pf/util.go
@@ -0,0 +1,41 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pf
+
+import (
+ "fmt"
+)
+
+func getProperties(fullyQualifiedName string, instanceId int)
map[string]string {
+ propertiesMap := make(map[string]string)
+ propertiesMap["application"] = "pulsar-function"
+ propertiesMap["id"] = fullyQualifiedName
+ propertiesMap["instance_id"] = fmt.Sprintf("%d", instanceId)
+
+ return propertiesMap
+}
+
+func getDefaultSubscriptionName(tenant, namespace, name string) string {
+ return fmt.Sprintf("%s/%s/%s", tenant, namespace, name)
+}
+
+func getFullyQualifiedInstanceId(tenant, namespace, name string, instanceID
int) string {
+ return fmt.Sprintf("%s/%s/%s:%d", tenant, namespace, name, instanceID)
+}
diff --git a/pulsar-function-go/pf/util_test.go
b/pulsar-function-go/pf/util_test.go
new file mode 100644
index 0000000..1c620a5
--- /dev/null
+++ b/pulsar-function-go/pf/util_test.go
@@ -0,0 +1,52 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pf
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+var (
+ tenant = "pulsar"
+ namespace = "function"
+ name = "go"
+ instanceID = 100
+)
+
+func TestUtils(t *testing.T) {
+ fqfn := tenant + "/" + namespace + "/" + name
+
+ propertiesMap := make(map[string]string)
+ propertiesMap["application"] = "pulsar-function"
+ propertiesMap["id"] = "pulsar/function/go"
+ propertiesMap["instance_id"] = fmt.Sprintf("%d", instanceID)
+
+ expectedFQFN := getDefaultSubscriptionName(tenant, namespace, name)
+ assert.Equal(t, expectedFQFN, fqfn)
+
+ actualtMap := getProperties(fqfn, 100)
+ assert.Equal(t, propertiesMap, actualtMap)
+
+ expectedRes := getFullyQualifiedInstanceId(tenant, namespace, name,
instanceID)
+ assert.Equal(t, expectedRes, "pulsar/function/go:100")
+}
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto
b/pulsar-functions/proto/src/main/proto/Function.proto
index 4e7dfc1..c435c09 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -49,6 +49,7 @@ message FunctionDetails {
enum Runtime {
JAVA = 0;
PYTHON = 1;
+ GO = 3;
}
string tenant = 1;
string namespace = 2;
@@ -70,9 +71,9 @@ message FunctionDetails {
}
message ConsumerSpec {
- string schemaType = 1;
+ string schemaType = 1;
string serdeClassName = 2;
- bool isRegexPattern = 3;
+ bool isRegexPattern = 3;
message ReceiverQueueSize {
int32 value = 1;
}
@@ -88,12 +89,12 @@ message SourceSpec {
// configs used only when source feeds into functions
SubscriptionType subscriptionType = 3;
- // @deprecated -- use topicsToSchema
- map<string,string> topicsToSerDeClassName = 4 [deprecated = true];
+ // @deprecated -- use topicsToSchema
+ map<string, string> topicsToSerDeClassName = 4 [deprecated = true];
- /**
- *
- */
+ /**
+ *
+ */
map<string, ConsumerSpec> inputSpecs = 10;
uint64 timeoutMs = 6;
@@ -121,9 +122,9 @@ message SinkSpec {
* already present in the server */
string builtin = 6;
- /**
- * Builtin schema type or custom schema class name
- */
+ /**
+ * Builtin schema type or custom schema class name
+ */
string schemaType = 7;
}