This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f229d3b  [issue #3767] support go function for pulsar (#3854)
f229d3b is described below

commit f229d3b34c6d6d26ee3ddf32bdc0e5f568d92bfd
Author: 冉小龙 <[email protected]>
AuthorDate: Tue Apr 9 16:09:46 2019 +0800

    [issue #3767] support go function for pulsar (#3854)
    
    Master Issue: #3767
    
    ### Motivation
    
    At present, go function only supports the simplest function. Input and 
output only allow []byte. For details, refer to: 
[PIP32](https://github.com/apache/pulsar/wiki/PIP-32%3A-Go-Function-API%2C-Instance-and-LocalRun)
---
 pulsar-function-go/README.md                       |    0
 pulsar-function-go/conf/conf.go                    |  109 +++
 pulsar-function-go/conf/conf.yaml                  |   57 ++
 pulsar-function-go/examples/contextFunc.go         |   38 +
 pulsar-function-go/examples/hello.go               |   34 +
 pulsar-function-go/examples/inputFunc.go           |   36 +
 pulsar-function-go/examples/outputFunc.go          |   35 +
 pulsar-function-go/examples/test/consumer.go       |   61 ++
 pulsar-function-go/examples/test/producer.go       |   57 ++
 pulsar-function-go/go.mod                          |   11 +
 pulsar-function-go/go.sum                          |   36 +
 pulsar-function-go/pb/Function.pb.go               | 1025 ++++++++++++++++++++
 pulsar-function-go/pb/InstanceCommunication.pb.go  |  648 +++++++++++++
 pulsar-function-go/pb/Request.pb.go                |  153 +++
 pulsar-function-go/pb/doc.go                       |   34 +
 pulsar-function-go/pb/generate.sh                  |   81 ++
 pulsar-function-go/pf/context.go                   |   98 ++
 pulsar-function-go/pf/context_test.go              |   45 +
 pulsar-function-go/pf/function.go                  |  172 ++++
 pulsar-function-go/pf/function_test.go             |  182 ++++
 pulsar-function-go/pf/instance.go                  |  275 ++++++
 pulsar-function-go/pf/instanceConf.go              |  104 ++
 pulsar-function-go/pf/instanceConf_test.go         |   32 +
 pulsar-function-go/pf/util.go                      |   41 +
 pulsar-function-go/pf/util_test.go                 |   52 +
 .../proto/src/main/proto/Function.proto            |   21 +-
 26 files changed, 3427 insertions(+), 10 deletions(-)

diff --git a/pulsar-function-go/README.md b/pulsar-function-go/README.md
new file mode 100644
index 0000000..e69de29
diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
new file mode 100644
index 0000000..d784855
--- /dev/null
+++ b/pulsar-function-go/conf/conf.go
@@ -0,0 +1,109 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package conf
+
+import (
+       "flag"
+       "io/ioutil"
+       "os"
+       "os/user"
+       "time"
+
+       "github.com/apache/pulsar/pulsar-function-go/log"
+       "gopkg.in/yaml.v2"
+)
+
+const ConfigPath = "github.com/apache/pulsar/pulsar-function-go/conf/conf.yaml"
+
+type Conf struct {
+       PulsarServiceURL string        `yaml:"pulsarServiceURL"`
+       InstanceID       int           `yaml:"instanceID"`
+       FuncID           string        `yaml:"funcID"`
+       FuncVersion      string        `yaml:"funcVersion"`
+       MaxBufTuples     int           `yaml:"maxBufTuples"`
+       Port             int           `yaml:"port"`
+       ClusterName      string        `yaml:"clusterName"`
+       KillAfterIdleMs  time.Duration `yaml:"killAfterIdleMs"`
+       // function details config
+       Tenant               string `yaml:"tenant"`
+       NameSpace            string `yaml:"nameSpace"`
+       Name                 string `yaml:"name"`
+       LogTopic             string `yaml:"logTopic"`
+       ProcessingGuarantees int32  `yaml:"processingGuarantees"`
+       SecretsMap           string `yaml:"secretsMap"`
+       Runtime              int32  `yaml:"runtime"`
+       AutoACK              bool   `yaml:"autoAck"`
+       Parallelism          int32  `yaml:"parallelism"`
+       //source config
+       SubscriptionType    int32  `yaml:"subscriptionType"`
+       TimeoutMs           uint64 `yaml:"timeoutMs"`
+       SubscriptionName    string `yaml:"subscriptionName"`
+       CleanupSubscription bool   `yaml:"cleanupSubscription"`
+       //source input specs
+       SourceSpecTopic            string `yaml:"sourceSpecsTopic"`
+       SourceSchemaType           string `yaml:"sourceSchemaType"`
+       IsRegexPatternSubscription bool   `yaml:"isRegexPatternSubscription"`
+       ReceiverQueueSize          int32  `yaml:"receiverQueueSize"`
+       //sink spec config
+       SinkSpecTopic  string `yaml:"sinkSpecsTopic"`
+       SinkSchemaType string `yaml:"sinkSchemaType"`
+       //resources config
+       Cpu  float64 `yaml:"cpu"`
+       Ram  int64   `yaml:"ram"`
+       Disk int64   `yaml:"disk"`
+       //retryDetails config
+       MaxMessageRetries int32  `yaml:"maxMessageRetries"`
+       DeadLetterTopic   string `yaml:"deadLetterTopic"`
+}
+
+var opts string
+
+func (c *Conf) GetConf() *Conf {
+       flag.Parse()
+
+       yamlFile, err := ioutil.ReadFile(opts)
+       if err != nil {
+               log.Errorf("not found conf file, err:%s", err.Error())
+               return nil
+       }
+       err = yaml.Unmarshal(yamlFile, c)
+       if err != nil {
+               log.Errorf("unmarshal yaml file error:%s", err.Error())
+               return nil
+       }
+       return c
+}
+
+func init() {
+       var homeDir string
+       usr, err := user.Current()
+       if err == nil {
+               homeDir = usr.HomeDir
+       }
+
+       // Fall back to standard HOME environment variable that works
+       // for most POSIX OSes if the directory from the Go standard
+       // lib failed.
+       if err != nil || homeDir == "" {
+               homeDir = os.Getenv("HOME")
+       }
+       defaultPath := homeDir + "/" + ConfigPath
+       flag.StringVar(&opts, "instance-conf", defaultPath, "config conf.yml 
filepath")
+}
diff --git a/pulsar-function-go/conf/conf.yaml 
b/pulsar-function-go/conf/conf.yaml
new file mode 100644
index 0000000..f786cca
--- /dev/null
+++ b/pulsar-function-go/conf/conf.yaml
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+pulsarServiceURL: "pulsar://localhost:6650"
+instanceID: 101
+funcID: "pulsar-function"
+funcVersion: "1.0.0"
+maxBufTuples: 10
+port: 8091
+clusterName: "pulsar-function-go"
+killAfterIdleMs: 50000
+# function details config
+tenant: ""
+nameSpace: ""
+name: "go-function"
+logTopic: ""
+processingGuarantees: 0
+secretsMap: ""
+runtime: 0
+autoAck: true
+parallelism: 0
+# source config
+subscriptionType: 0
+timeoutMs: 0
+subscriptionName: ""
+cleanupSubscription: false
+# source input specs
+sourceSpecsTopic: persistent://public/default/topic-01
+sourceSchemaType: ""
+isRegexPatternSubscription: false
+receiverQueueSize: 10
+# sink specs config
+sinkSpecsTopic: persistent://public/default/topic-02
+sinkSchemaType: ""
+# resource config
+cpu: 0
+ram: 0
+disk: 0
+# retryDetails config
+maxMessageRetries: 0
+deadLetterTopic: ""
diff --git a/pulsar-function-go/examples/contextFunc.go 
b/pulsar-function-go/examples/contextFunc.go
new file mode 100644
index 0000000..7a057fc
--- /dev/null
+++ b/pulsar-function-go/examples/contextFunc.go
@@ -0,0 +1,38 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+       "context"
+       "fmt"
+
+       "github.com/apache/pulsar/pulsar-function-go/pf"
+)
+
+func contextFunc(ctx context.Context) {
+       if fc, ok := pf.FromContext(ctx); ok {
+               fmt.Printf("function ID is:%s, ", fc.GetFuncID())
+               fmt.Printf("function version is:%s\n", fc.GetFuncVersion())
+       }
+}
+
+func main() {
+       pf.Start(contextFunc)
+}
diff --git a/pulsar-function-go/examples/hello.go 
b/pulsar-function-go/examples/hello.go
new file mode 100644
index 0000000..ad27dec
--- /dev/null
+++ b/pulsar-function-go/examples/hello.go
@@ -0,0 +1,34 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+       "fmt"
+
+       "github.com/apache/pulsar/pulsar-function-go/pf"
+)
+
+func hello() {
+       fmt.Println("hello pulsar function")
+}
+
+func main() {
+       pf.Start(hello)
+}
diff --git a/pulsar-function-go/examples/inputFunc.go 
b/pulsar-function-go/examples/inputFunc.go
new file mode 100644
index 0000000..0d270de
--- /dev/null
+++ b/pulsar-function-go/examples/inputFunc.go
@@ -0,0 +1,36 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+       "context"
+       "fmt"
+
+       "github.com/apache/pulsar/pulsar-function-go/pf"
+)
+
+func HandleRequest(ctx context.Context, in []byte) error{
+       fmt.Println(string(in) + "!")
+       return nil
+}
+
+func main() {
+       pf.Start(HandleRequest)
+}
diff --git a/pulsar-function-go/examples/outputFunc.go 
b/pulsar-function-go/examples/outputFunc.go
new file mode 100644
index 0000000..603e278
--- /dev/null
+++ b/pulsar-function-go/examples/outputFunc.go
@@ -0,0 +1,35 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+       "context"
+
+       "github.com/apache/pulsar/pulsar-function-go/pf"
+)
+
+func HandleResponse(ctx context.Context, in []byte) ([]byte, error) {
+       res := append(in, 110)
+       return res, nil
+}
+
+func main() {
+       pf.Start(HandleResponse)
+}
diff --git a/pulsar-function-go/examples/test/consumer.go 
b/pulsar-function-go/examples/test/consumer.go
new file mode 100644
index 0000000..ba5e5b5
--- /dev/null
+++ b/pulsar-function-go/examples/test/consumer.go
@@ -0,0 +1,61 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "log"
+
+       "github.com/apache/pulsar/pulsar-client-go/pulsar"
+)
+
+func main() {
+       client, err := pulsar.NewClient(pulsar.ClientOptions{URL: 
"pulsar://localhost:6650"})
+       if err != nil {
+               log.Fatal(err)
+       }
+
+       defer client.Close()
+
+       consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+               Topic:            "topic-02",
+               SubscriptionName: "my-subscription",
+               Type:             pulsar.Shared,
+       })
+       if err != nil {
+               log.Fatal(err)
+       }
+
+       defer consumer.Close()
+
+       for {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)
+               }
+
+               fmt.Printf("Received message  msgId: %s -- content: '%s'\n",
+                       msg.ID(), string(msg.Payload()))
+
+               consumer.Ack(msg)
+       }
+}
+
diff --git a/pulsar-function-go/examples/test/producer.go 
b/pulsar-function-go/examples/test/producer.go
new file mode 100644
index 0000000..475f83d
--- /dev/null
+++ b/pulsar-function-go/examples/test/producer.go
@@ -0,0 +1,57 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "log"
+
+       "github.com/apache/pulsar/pulsar-client-go/pulsar"
+)
+
+func main() {
+       client, err := pulsar.NewClient(pulsar.ClientOptions{
+               URL: "pulsar://localhost:6650",
+       })
+
+       if err != nil {
+               log.Fatal(err)
+               return
+       }
+
+       defer client.Close()
+
+       producer, err := client.CreateProducer(pulsar.ProducerOptions{
+               Topic: "topic-01",
+       })
+
+       defer producer.Close()
+
+       ctx := context.Background()
+
+       for i := 0; i < 10; i++ {
+               if err := producer.Send(ctx, pulsar.ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       log.Fatal(err)
+               }
+       }
+}
diff --git a/pulsar-function-go/go.mod b/pulsar-function-go/go.mod
new file mode 100644
index 0000000..0762d53
--- /dev/null
+++ b/pulsar-function-go/go.mod
@@ -0,0 +1,11 @@
+module github.com/apache/pulsar/pulsar-function-go
+
+require (
+       github.com/apache/pulsar/pulsar-client-go 
v0.0.0-20190312044336-ff4db8db12be
+       github.com/davecgh/go-spew v1.1.1
+       github.com/golang/protobuf v1.3.0
+       github.com/sirupsen/logrus v1.4.0
+       github.com/stretchr/testify v1.3.0
+       gopkg.in/natefinch/lumberjack.v2 v2.0.0
+       gopkg.in/yaml.v2 v2.2.2
+)
diff --git a/pulsar-function-go/go.sum b/pulsar-function-go/go.sum
new file mode 100644
index 0000000..4a17410
--- /dev/null
+++ b/pulsar-function-go/go.sum
@@ -0,0 +1,36 @@
+github.com/BurntSushi/toml v0.3.1 
h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
+github.com/BurntSushi/toml v0.3.1/go.mod 
h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/apache/pulsar/pulsar-client-go v0.0.0-20190312044336-ff4db8db12be 
h1:BpPXJTLeqlWQPlMZxFP+/Sm1Zva7NJB2gWRfWlN8xi8=
+github.com/apache/pulsar/pulsar-client-go 
v0.0.0-20190312044336-ff4db8db12be/go.mod 
h1:Dt5jPpS2v4WlPya7e9jXrqGN+6BVzVyaBw+bixjLFFU=
+github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/golang/protobuf v1.3.0 
h1:kbxbvI4Un1LUWKxufD+BiE6AEExYYgkQLQmLFqA1LFk=
+github.com/golang/protobuf v1.3.0/go.mod 
h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1 
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod 
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/sirupsen/logrus v1.3.0/go.mod 
h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.0 
h1:yKenngtzGh+cUSSh6GWbxW2abRqhYUSR/t/6+2QqNvE=
+github.com/sirupsen/logrus v1.4.0/go.mod 
h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.1.1/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.2.2/go.mod 
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0 
h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod 
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 
h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod 
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd 
h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f 
h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 
h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod 
h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 
h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/natefinch/lumberjack.v2 v2.0.0 
h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
+gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod 
h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
+gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/pulsar-function-go/pb/Function.pb.go 
b/pulsar-function-go/pb/Function.pb.go
new file mode 100644
index 0000000..9766ea6
--- /dev/null
+++ b/pulsar-function-go/pb/Function.pb.go
@@ -0,0 +1,1025 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: Function.proto
+
+package pb
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type ProcessingGuarantees int32
+
+const (
+       ProcessingGuarantees_ATLEAST_ONCE     ProcessingGuarantees = 0
+       ProcessingGuarantees_ATMOST_ONCE      ProcessingGuarantees = 1
+       ProcessingGuarantees_EFFECTIVELY_ONCE ProcessingGuarantees = 2
+)
+
+var ProcessingGuarantees_name = map[int32]string{
+       0: "ATLEAST_ONCE",
+       1: "ATMOST_ONCE",
+       2: "EFFECTIVELY_ONCE",
+}
+var ProcessingGuarantees_value = map[string]int32{
+       "ATLEAST_ONCE":     0,
+       "ATMOST_ONCE":      1,
+       "EFFECTIVELY_ONCE": 2,
+}
+
+func (x ProcessingGuarantees) String() string {
+       return proto.EnumName(ProcessingGuarantees_name, int32(x))
+}
+func (ProcessingGuarantees) EnumDescriptor() ([]byte, []int) {
+       return fileDescriptor_Function_33c6e1841d2624f0, []int{0}
+}
+
+type SubscriptionType int32
+
+const (
+       SubscriptionType_SHARED   SubscriptionType = 0
+       SubscriptionType_FAILOVER SubscriptionType = 1
+)
+
+var SubscriptionType_name = map[int32]string{
+       0: "SHARED",
+       1: "FAILOVER",
+}
+var SubscriptionType_value = map[string]int32{
+       "SHARED":   0,
+       "FAILOVER": 1,
+}
+
+func (x SubscriptionType) String() string {
+       return proto.EnumName(SubscriptionType_name, int32(x))
+}
+func (SubscriptionType) EnumDescriptor() ([]byte, []int) {
+       return fileDescriptor_Function_33c6e1841d2624f0, []int{1}
+}
+
+type FunctionState int32
+
+const (
+       FunctionState_RUNNING FunctionState = 0
+       FunctionState_STOPPED FunctionState = 1
+)
+
+var FunctionState_name = map[int32]string{
+       0: "RUNNING",
+       1: "STOPPED",
+}
+var FunctionState_value = map[string]int32{
+       "RUNNING": 0,
+       "STOPPED": 1,
+}
+
+func (x FunctionState) String() string {
+       return proto.EnumName(FunctionState_name, int32(x))
+}
+func (FunctionState) EnumDescriptor() ([]byte, []int) {
+       return fileDescriptor_Function_33c6e1841d2624f0, []int{2}
+}
+
+type FunctionDetails_Runtime int32
+
+const (
+       FunctionDetails_JAVA   FunctionDetails_Runtime = 0
+       FunctionDetails_PYTHON FunctionDetails_Runtime = 1
+       FunctionDetails_GO     FunctionDetails_Runtime = 3
+)
+
+var FunctionDetails_Runtime_name = map[int32]string{
+       0: "JAVA",
+       1: "PYTHON",
+       3: "GO",
+}
+var FunctionDetails_Runtime_value = map[string]int32{
+       "JAVA":   0,
+       "PYTHON": 1,
+       "GO":     3,
+}
+
+func (x FunctionDetails_Runtime) String() string {
+       return proto.EnumName(FunctionDetails_Runtime_name, int32(x))
+}
+func (FunctionDetails_Runtime) EnumDescriptor() ([]byte, []int) {
+       return fileDescriptor_Function_33c6e1841d2624f0, []int{2, 0}
+}
+
+type Resources struct {
+       Cpu                  float64  `protobuf:"fixed64,1,opt,name=cpu,proto3" 
json:"cpu,omitempty"`
+       Ram                  int64    `protobuf:"varint,2,opt,name=ram,proto3" 
json:"ram,omitempty"`
+       Disk                 int64    `protobuf:"varint,3,opt,name=disk,proto3" 
json:"disk,omitempty"`
+       XXX_NoUnkeyedLiteral struct{} `json:"-"`
+       XXX_unrecognized     []byte   `json:"-"`
+       XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *Resources) Reset()         { *m = Resources{} }
+func (m *Resources) String() string { return proto.CompactTextString(m) }
+func (*Resources) ProtoMessage()    {}
+func (*Resources) Descriptor() ([]byte, []int) {
+       return fileDescriptor_Function_33c6e1841d2624f0, []int{0}
+}
+func (m *Resources) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_Resources.Unmarshal(m, b)
+}
+func (m *Resources) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+       return xxx_messageInfo_Resources.Marshal(b, m, deterministic)
+}
+func (dst *Resources) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_Resources.Merge(dst, src)
+}
+func (m *Resources) XXX_Size() int {
+       return xxx_messageInfo_Resources.Size(m)
+}
+func (m *Resources) XXX_DiscardUnknown() {
+       xxx_messageInfo_Resources.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Resources proto.InternalMessageInfo
+
+func (m *Resources) GetCpu() float64 {
+       if m != nil {
+               return m.Cpu
+       }
+       return 0
+}
+
+func (m *Resources) GetRam() int64 {
+       if m != nil {
+               return m.Ram
+       }
+       return 0
+}
+
+func (m *Resources) GetDisk() int64 {
+       if m != nil {
+               return m.Disk
+       }
+       return 0
+}
+
+type RetryDetails struct {
+       MaxMessageRetries    int32    
`protobuf:"varint,1,opt,name=maxMessageRetries,proto3" 
json:"maxMessageRetries,omitempty"`
+       DeadLetterTopic      string   
`protobuf:"bytes,2,opt,name=deadLetterTopic,proto3" 
json:"deadLetterTopic,omitempty"`
+       XXX_NoUnkeyedLiteral struct{} `json:"-"`
+       XXX_unrecognized     []byte   `json:"-"`
+       XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *RetryDetails) Reset()         { *m = RetryDetails{} }
+func (m *RetryDetails) String() string { return proto.CompactTextString(m) }
+func (*RetryDetails) ProtoMessage()    {}
+func (*RetryDetails) Descriptor() ([]byte, []int) {
+       return fileDescriptor_Function_33c6e1841d2624f0, []int{1}
+}
+func (m *RetryDetails) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_RetryDetails.Unmarshal(m, b)
+}
+func (m *RetryDetails) XXX_Marshal(b []byte, deterministic bool) ([]byte, 
error) {
+       return xxx_messageInfo_RetryDetails.Marshal(b, m, deterministic)
+}
+func (dst *RetryDetails) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_RetryDetails.Merge(dst, src)
+}
+func (m *RetryDetails) XXX_Size() int {
+       return xxx_messageInfo_RetryDetails.Size(m)
+}
+func (m *RetryDetails) XXX_DiscardUnknown() {
+       xxx_messageInfo_RetryDetails.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_RetryDetails proto.InternalMessageInfo
+
+func (m *RetryDetails) GetMaxMessageRetries() int32 {
+       if m != nil {
+               return m.MaxMessageRetries
+       }
+       return 0
+}
+
+func (m *RetryDetails) GetDeadLetterTopic() string {
+       if m != nil {
+               return m.DeadLetterTopic
+       }
+       return ""
+}
+
+type FunctionDetails struct {
+       Tenant               string                  
`protobuf:"bytes,1,opt,name=tenant,proto3" json:"tenant,omitempty"`
+       Namespace            string                  
`protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"`
+       Name                 string                  
`protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"`
+       ClassName            string                  
`protobuf:"bytes,4,opt,name=className,proto3" json:"className,omitempty"`
+       LogTopic             string                  
`protobuf:"bytes,5,opt,name=logTopic,proto3" json:"logTopic,omitempty"`
+       ProcessingGuarantees ProcessingGuarantees    
`protobuf:"varint,6,opt,name=processingGuarantees,proto3,enum=proto.ProcessingGuarantees"
 json:"processingGuarantees,omitempty"`
+       UserConfig           string                  
`protobuf:"bytes,7,opt,name=userConfig,proto3" json:"userConfig,omitempty"`
+       SecretsMap           string                  
`protobuf:"bytes,16,opt,name=secretsMap,proto3" json:"secretsMap,omitempty"`
+       Runtime              FunctionDetails_Runtime 
`protobuf:"varint,8,opt,name=runtime,proto3,enum=proto.FunctionDetails_Runtime" 
json:"runtime,omitempty"`
+       AutoAck              bool                    
`protobuf:"varint,9,opt,name=autoAck,proto3" json:"autoAck,omitempty"`
+       Parallelism          int32                   
`protobuf:"varint,10,opt,name=parallelism,proto3" json:"parallelism,omitempty"`
+       Source               *SourceSpec             
`protobuf:"bytes,11,opt,name=source,proto3" json:"source,omitempty"`
+       Sink                 *SinkSpec               
`protobuf:"bytes,12,opt,name=sink,proto3" json:"sink,omitempty"`
+       Resources            *Resources              
`protobuf:"bytes,13,opt,name=resources,proto3" json:"resources,omitempty"`
+       PackageUrl           string                  
`protobuf:"bytes,14,opt,name=packageUrl,proto3" json:"packageUrl,omitempty"`
+       RetryDetails         *RetryDetails           
`protobuf:"bytes,15,opt,name=retryDetails,proto3" json:"retryDetails,omitempty"`
+       XXX_NoUnkeyedLiteral struct{}                `json:"-"`
+       XXX_unrecognized     []byte                  `json:"-"`
+       XXX_sizecache        int32                   `json:"-"`
+}
+
+func (m *FunctionDetails) Reset()         { *m = FunctionDetails{} }
+func (m *FunctionDetails) String() string { return proto.CompactTextString(m) }
+func (*FunctionDetails) ProtoMessage()    {}
+func (*FunctionDetails) Descriptor() ([]byte, []int) {
+       return fileDescriptor_Function_33c6e1841d2624f0, []int{2}
+}
+func (m *FunctionDetails) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_FunctionDetails.Unmarshal(m, b)
+}
+func (m *FunctionDetails) XXX_Marshal(b []byte, deterministic bool) ([]byte, 
error) {
+       return xxx_messageInfo_FunctionDetails.Marshal(b, m, deterministic)
+}
+func (dst *FunctionDetails) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_FunctionDetails.Merge(dst, src)
+}
+func (m *FunctionDetails) XXX_Size() int {
+       return xxx_messageInfo_FunctionDetails.Size(m)
+}
+func (m *FunctionDetails) XXX_DiscardUnknown() {
+       xxx_messageInfo_FunctionDetails.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_FunctionDetails proto.InternalMessageInfo
+
+func (m *FunctionDetails) GetTenant() string {
+       if m != nil {
+               return m.Tenant
+       }
+       return ""
+}
+
+func (m *FunctionDetails) GetNamespace() string {
+       if m != nil {
+               return m.Namespace
+       }
+       return ""
+}
+
+func (m *FunctionDetails) GetName() string {
+       if m != nil {
+               return m.Name
+       }
+       return ""
+}
+
+func (m *FunctionDetails) GetClassName() string {
+       if m != nil {
+               return m.ClassName
+       }
+       return ""
+}
+
+func (m *FunctionDetails) GetLogTopic() string {
+       if m != nil {
+               return m.LogTopic
+       }
+       return ""
+}
+
+func (m *FunctionDetails) GetProcessingGuarantees() ProcessingGuarantees {
+       if m != nil {
+               return m.ProcessingGuarantees
+       }
+       return ProcessingGuarantees_ATLEAST_ONCE
+}
+
+func (m *FunctionDetails) GetUserConfig() string {
+       if m != nil {
+               return m.UserConfig
+       }
+       return ""
+}
+
+func (m *FunctionDetails) GetSecretsMap() string {
+       if m != nil {
+               return m.SecretsMap
+       }
+       return ""
+}
+
+func (m *FunctionDetails) GetRuntime() FunctionDetails_Runtime {
+       if m != nil {
+               return m.Runtime
+       }
+       return FunctionDetails_JAVA
+}
+
+func (m *FunctionDetails) GetAutoAck() bool {
+       if m != nil {
+               return m.AutoAck
+       }
+       return false
+}
+
+func (m *FunctionDetails) GetParallelism() int32 {
+       if m != nil {
+               return m.Parallelism
+       }
+       return 0
+}
+
+func (m *FunctionDetails) GetSource() *SourceSpec {
+       if m != nil {
+               return m.Source
+       }
+       return nil
+}
+
+func (m *FunctionDetails) GetSink() *SinkSpec {
+       if m != nil {
+               return m.Sink
+       }
+       return nil
+}
+
+func (m *FunctionDetails) GetResources() *Resources {
+       if m != nil {
+               return m.Resources
+       }
+       return nil
+}
+
+func (m *FunctionDetails) GetPackageUrl() string {
+       if m != nil {
+               return m.PackageUrl
+       }
+       return ""
+}
+
+func (m *FunctionDetails) GetRetryDetails() *RetryDetails {
+       if m != nil {
+               return m.RetryDetails
+       }
+       return nil
+}
+
+type ConsumerSpec struct {
+       SchemaType           string                          
`protobuf:"bytes,1,opt,name=schemaType,proto3" json:"schemaType,omitempty"`
+       SerdeClassName       string                          
`protobuf:"bytes,2,opt,name=serdeClassName,proto3" 
json:"serdeClassName,omitempty"`
+       IsRegexPattern       bool                            
`protobuf:"varint,3,opt,name=isRegexPattern,proto3" 
json:"isRegexPattern,omitempty"`
+       ReceiverQueueSize    *ConsumerSpec_ReceiverQueueSize 
`protobuf:"bytes,4,opt,name=receiverQueueSize,proto3" 
json:"receiverQueueSize,omitempty"`
+       XXX_NoUnkeyedLiteral struct{}                        `json:"-"`
+       XXX_unrecognized     []byte                          `json:"-"`
+       XXX_sizecache        int32                           `json:"-"`
+}
+
+func (m *ConsumerSpec) Reset()         { *m = ConsumerSpec{} }
+func (m *ConsumerSpec) String() string { return proto.CompactTextString(m) }
+func (*ConsumerSpec) ProtoMessage()    {}
+func (*ConsumerSpec) Descriptor() ([]byte, []int) {
+       return fileDescriptor_Function_33c6e1841d2624f0, []int{3}
+}
+func (m *ConsumerSpec) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_ConsumerSpec.Unmarshal(m, b)
+}
+func (m *ConsumerSpec) XXX_Marshal(b []byte, deterministic bool) ([]byte, 
error) {
+       return xxx_messageInfo_ConsumerSpec.Marshal(b, m, deterministic)
+}
+func (dst *ConsumerSpec) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_ConsumerSpec.Merge(dst, src)
+}
+func (m *ConsumerSpec) XXX_Size() int {
+       return xxx_messageInfo_ConsumerSpec.Size(m)
+}
+func (m *ConsumerSpec) XXX_DiscardUnknown() {
+       xxx_messageInfo_ConsumerSpec.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ConsumerSpec proto.InternalMessageInfo
+
+func (m *ConsumerSpec) GetSchemaType() string {
+       if m != nil {
+               return m.SchemaType
+       }
+       return ""
+}
+
+func (m *ConsumerSpec) GetSerdeClassName() string {
+       if m != nil {
+               return m.SerdeClassName
+       }
+       return ""
+}
+
+func (m *ConsumerSpec) GetIsRegexPattern() bool {
+       if m != nil {
+               return m.IsRegexPattern
+       }
+       return false
+}
+
+func (m *ConsumerSpec) GetReceiverQueueSize() *ConsumerSpec_ReceiverQueueSize {
+       if m != nil {
+               return m.ReceiverQueueSize
+       }
+       return nil
+}
+
+type ConsumerSpec_ReceiverQueueSize struct {
+       Value                int32    
`protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"`
+       XXX_NoUnkeyedLiteral struct{} `json:"-"`
+       XXX_unrecognized     []byte   `json:"-"`
+       XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *ConsumerSpec_ReceiverQueueSize) Reset()         { *m = 
ConsumerSpec_ReceiverQueueSize{} }
+func (m *ConsumerSpec_ReceiverQueueSize) String() string { return 
proto.CompactTextString(m) }
+func (*ConsumerSpec_ReceiverQueueSize) ProtoMessage()    {}
+func (*ConsumerSpec_ReceiverQueueSize) Descriptor() ([]byte, []int) {
+       return fileDescriptor_Function_33c6e1841d2624f0, []int{3, 0}
+}
+func (m *ConsumerSpec_ReceiverQueueSize) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_ConsumerSpec_ReceiverQueueSize.Unmarshal(m, b)
+}
+func (m *ConsumerSpec_ReceiverQueueSize) XXX_Marshal(b []byte, deterministic 
bool) ([]byte, error) {
+       return xxx_messageInfo_ConsumerSpec_ReceiverQueueSize.Marshal(b, m, 
deterministic)
+}
+func (dst *ConsumerSpec_ReceiverQueueSize) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_ConsumerSpec_ReceiverQueueSize.Merge(dst, src)
+}
+func (m *ConsumerSpec_ReceiverQueueSize) XXX_Size() int {
+       return xxx_messageInfo_ConsumerSpec_ReceiverQueueSize.Size(m)
+}
+func (m *ConsumerSpec_ReceiverQueueSize) XXX_DiscardUnknown() {
+       xxx_messageInfo_ConsumerSpec_ReceiverQueueSize.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ConsumerSpec_ReceiverQueueSize proto.InternalMessageInfo
+
+func (m *ConsumerSpec_ReceiverQueueSize) GetValue() int32 {
+       if m != nil {
+               return m.Value
+       }
+       return 0
+}
+
+type SourceSpec struct {
+       ClassName string `protobuf:"bytes,1,opt,name=className,proto3" 
json:"className,omitempty"`
+       // map in json format
+       Configs       string `protobuf:"bytes,2,opt,name=configs,proto3" 
json:"configs,omitempty"`
+       TypeClassName string `protobuf:"bytes,5,opt,name=typeClassName,proto3" 
json:"typeClassName,omitempty"`
+       // configs used only when source feeds into functions
+       SubscriptionType SubscriptionType 
`protobuf:"varint,3,opt,name=subscriptionType,proto3,enum=proto.SubscriptionType"
 json:"subscriptionType,omitempty"`
+       // @deprecated -- use topicsToSchema
+       TopicsToSerDeClassName map[string]string 
`protobuf:"bytes,4,rep,name=topicsToSerDeClassName,proto3" 
json:"topicsToSerDeClassName,omitempty" 
protobuf_key:"bytes,1,opt,name=key,proto3" 
protobuf_val:"bytes,2,opt,name=value,proto3"` // Deprecated: Do not use.
+       // *
+       //
+       InputSpecs    map[string]*ConsumerSpec 
`protobuf:"bytes,10,rep,name=inputSpecs,proto3" json:"inputSpecs,omitempty" 
protobuf_key:"bytes,1,opt,name=key,proto3" 
protobuf_val:"bytes,2,opt,name=value,proto3"`
+       TimeoutMs     uint64                   
`protobuf:"varint,6,opt,name=timeoutMs,proto3" json:"timeoutMs,omitempty"`
+       TopicsPattern string                   
`protobuf:"bytes,7,opt,name=topicsPattern,proto3" 
json:"topicsPattern,omitempty"` // Deprecated: Do not use.
+       // If specified, this will refer to an archive that is
+       // already present in the server
+       Builtin              string   
`protobuf:"bytes,8,opt,name=builtin,proto3" json:"builtin,omitempty"`
+       SubscriptionName     string   
`protobuf:"bytes,9,opt,name=subscriptionName,proto3" 
json:"subscriptionName,omitempty"`
+       CleanupSubscription  bool     
`protobuf:"varint,11,opt,name=cleanupSubscription,proto3" 
json:"cleanupSubscription,omitempty"`
+       XXX_NoUnkeyedLiteral struct{} `json:"-"`
+       XXX_unrecognized     []byte   `json:"-"`
+       XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *SourceSpec) Reset()         { *m = SourceSpec{} }
+func (m *SourceSpec) String() string { return proto.CompactTextString(m) }
+func (*SourceSpec) ProtoMessage()    {}
+func (*SourceSpec) Descriptor() ([]byte, []int) {
+       return fileDescriptor_Function_33c6e1841d2624f0, []int{4}
+}
+func (m *SourceSpec) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_SourceSpec.Unmarshal(m, b)
+}
+func (m *SourceSpec) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) 
{
+       return xxx_messageInfo_SourceSpec.Marshal(b, m, deterministic)
+}
+func (dst *SourceSpec) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_SourceSpec.Merge(dst, src)
+}
+func (m *SourceSpec) XXX_Size() int {
+       return xxx_messageInfo_SourceSpec.Size(m)
+}
+func (m *SourceSpec) XXX_DiscardUnknown() {
+       xxx_messageInfo_SourceSpec.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_SourceSpec proto.InternalMessageInfo
+
+func (m *SourceSpec) GetClassName() string {
+       if m != nil {
+               return m.ClassName
+       }
+       return ""
+}
+
+func (m *SourceSpec) GetConfigs() string {
+       if m != nil {
+               return m.Configs
+       }
+       return ""
+}
+
+func (m *SourceSpec) GetTypeClassName() string {
+       if m != nil {
+               return m.TypeClassName
+       }
+       return ""
+}
+
+func (m *SourceSpec) GetSubscriptionType() SubscriptionType {
+       if m != nil {
+               return m.SubscriptionType
+       }
+       return SubscriptionType_SHARED
+}
+
+// Deprecated: Do not use.
+func (m *SourceSpec) GetTopicsToSerDeClassName() map[string]string {
+       if m != nil {
+               return m.TopicsToSerDeClassName
+       }
+       return nil
+}
+
+func (m *SourceSpec) GetInputSpecs() map[string]*ConsumerSpec {
+       if m != nil {
+               return m.InputSpecs
+       }
+       return nil
+}
+
+func (m *SourceSpec) GetTimeoutMs() uint64 {
+       if m != nil {
+               return m.TimeoutMs
+       }
+       return 0
+}
+
+// Deprecated: Do not use.
+func (m *SourceSpec) GetTopicsPattern() string {
+       if m != nil {
+               return m.TopicsPattern
+       }
+       return ""
+}
+
+func (m *SourceSpec) GetBuiltin() string {
+       if m != nil {
+               return m.Builtin
+       }
+       return ""
+}
+
+func (m *SourceSpec) GetSubscriptionName() string {
+       if m != nil {
+               return m.SubscriptionName
+       }
+       return ""
+}
+
+func (m *SourceSpec) GetCleanupSubscription() bool {
+       if m != nil {
+               return m.CleanupSubscription
+       }
+       return false
+}
+
+type SinkSpec struct {
+       ClassName string `protobuf:"bytes,1,opt,name=className,proto3" 
json:"className,omitempty"`
+       // map in json format
+       Configs       string `protobuf:"bytes,2,opt,name=configs,proto3" 
json:"configs,omitempty"`
+       TypeClassName string `protobuf:"bytes,5,opt,name=typeClassName,proto3" 
json:"typeClassName,omitempty"`
+       // configs used only when functions output to sink
+       Topic          string `protobuf:"bytes,3,opt,name=topic,proto3" 
json:"topic,omitempty"`
+       SerDeClassName string 
`protobuf:"bytes,4,opt,name=serDeClassName,proto3" 
json:"serDeClassName,omitempty"`
+       // If specified, this will refer to an archive that is
+       // already present in the server
+       Builtin string `protobuf:"bytes,6,opt,name=builtin,proto3" 
json:"builtin,omitempty"`
+       // *
+       // Builtin schema type or custom schema class name
+       SchemaType           string   
`protobuf:"bytes,7,opt,name=schemaType,proto3" json:"schemaType,omitempty"`
+       XXX_NoUnkeyedLiteral struct{} `json:"-"`
+       XXX_unrecognized     []byte   `json:"-"`
+       XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *SinkSpec) Reset()         { *m = SinkSpec{} }
+func (m *SinkSpec) String() string { return proto.CompactTextString(m) }
+func (*SinkSpec) ProtoMessage()    {}
+func (*SinkSpec) Descriptor() ([]byte, []int) {
+       return fileDescriptor_Function_33c6e1841d2624f0, []int{5}
+}
+func (m *SinkSpec) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_SinkSpec.Unmarshal(m, b)
+}
+func (m *SinkSpec) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+       return xxx_messageInfo_SinkSpec.Marshal(b, m, deterministic)
+}
+func (dst *SinkSpec) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_SinkSpec.Merge(dst, src)
+}
+func (m *SinkSpec) XXX_Size() int {
+       return xxx_messageInfo_SinkSpec.Size(m)
+}
+func (m *SinkSpec) XXX_DiscardUnknown() {
+       xxx_messageInfo_SinkSpec.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_SinkSpec proto.InternalMessageInfo
+
+func (m *SinkSpec) GetClassName() string {
+       if m != nil {
+               return m.ClassName
+       }
+       return ""
+}
+
+func (m *SinkSpec) GetConfigs() string {
+       if m != nil {
+               return m.Configs
+       }
+       return ""
+}
+
+func (m *SinkSpec) GetTypeClassName() string {
+       if m != nil {
+               return m.TypeClassName
+       }
+       return ""
+}
+
+func (m *SinkSpec) GetTopic() string {
+       if m != nil {
+               return m.Topic
+       }
+       return ""
+}
+
+func (m *SinkSpec) GetSerDeClassName() string {
+       if m != nil {
+               return m.SerDeClassName
+       }
+       return ""
+}
+
+func (m *SinkSpec) GetBuiltin() string {
+       if m != nil {
+               return m.Builtin
+       }
+       return ""
+}
+
+func (m *SinkSpec) GetSchemaType() string {
+       if m != nil {
+               return m.SchemaType
+       }
+       return ""
+}
+
+type PackageLocationMetaData struct {
+       PackagePath          string   
`protobuf:"bytes,1,opt,name=packagePath,proto3" json:"packagePath,omitempty"`
+       OriginalFileName     string   
`protobuf:"bytes,2,opt,name=originalFileName,proto3" 
json:"originalFileName,omitempty"`
+       XXX_NoUnkeyedLiteral struct{} `json:"-"`
+       XXX_unrecognized     []byte   `json:"-"`
+       XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *PackageLocationMetaData) Reset()         { *m = 
PackageLocationMetaData{} }
+func (m *PackageLocationMetaData) String() string { return 
proto.CompactTextString(m) }
+func (*PackageLocationMetaData) ProtoMessage()    {}
+func (*PackageLocationMetaData) Descriptor() ([]byte, []int) {
+       return fileDescriptor_Function_33c6e1841d2624f0, []int{6}
+}
+func (m *PackageLocationMetaData) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_PackageLocationMetaData.Unmarshal(m, b)
+}
+func (m *PackageLocationMetaData) XXX_Marshal(b []byte, deterministic bool) 
([]byte, error) {
+       return xxx_messageInfo_PackageLocationMetaData.Marshal(b, m, 
deterministic)
+}
+func (dst *PackageLocationMetaData) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_PackageLocationMetaData.Merge(dst, src)
+}
+func (m *PackageLocationMetaData) XXX_Size() int {
+       return xxx_messageInfo_PackageLocationMetaData.Size(m)
+}
+func (m *PackageLocationMetaData) XXX_DiscardUnknown() {
+       xxx_messageInfo_PackageLocationMetaData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_PackageLocationMetaData proto.InternalMessageInfo
+
+func (m *PackageLocationMetaData) GetPackagePath() string {
+       if m != nil {
+               return m.PackagePath
+       }
+       return ""
+}
+
+func (m *PackageLocationMetaData) GetOriginalFileName() string {
+       if m != nil {
+               return m.OriginalFileName
+       }
+       return ""
+}
+
+type FunctionMetaData struct {
+       FunctionDetails      *FunctionDetails         
`protobuf:"bytes,1,opt,name=functionDetails,proto3" 
json:"functionDetails,omitempty"`
+       PackageLocation      *PackageLocationMetaData 
`protobuf:"bytes,2,opt,name=packageLocation,proto3" 
json:"packageLocation,omitempty"`
+       Version              uint64                   
`protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"`
+       CreateTime           uint64                   
`protobuf:"varint,4,opt,name=createTime,proto3" json:"createTime,omitempty"`
+       InstanceStates       map[int32]FunctionState  
`protobuf:"bytes,5,rep,name=instanceStates,proto3" 
json:"instanceStates,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" 
protobuf_val:"varint,2,opt,name=value,proto3,enum=proto.FunctionState"`
+       XXX_NoUnkeyedLiteral struct{}                 `json:"-"`
+       XXX_unrecognized     []byte                   `json:"-"`
+       XXX_sizecache        int32                    `json:"-"`
+}
+
+func (m *FunctionMetaData) Reset()         { *m = FunctionMetaData{} }
+func (m *FunctionMetaData) String() string { return proto.CompactTextString(m) 
}
+func (*FunctionMetaData) ProtoMessage()    {}
+func (*FunctionMetaData) Descriptor() ([]byte, []int) {
+       return fileDescriptor_Function_33c6e1841d2624f0, []int{7}
+}
+func (m *FunctionMetaData) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_FunctionMetaData.Unmarshal(m, b)
+}
+func (m *FunctionMetaData) XXX_Marshal(b []byte, deterministic bool) ([]byte, 
error) {
+       return xxx_messageInfo_FunctionMetaData.Marshal(b, m, deterministic)
+}
+func (dst *FunctionMetaData) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_FunctionMetaData.Merge(dst, src)
+}
+func (m *FunctionMetaData) XXX_Size() int {
+       return xxx_messageInfo_FunctionMetaData.Size(m)
+}
+func (m *FunctionMetaData) XXX_DiscardUnknown() {
+       xxx_messageInfo_FunctionMetaData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_FunctionMetaData proto.InternalMessageInfo
+
+func (m *FunctionMetaData) GetFunctionDetails() *FunctionDetails {
+       if m != nil {
+               return m.FunctionDetails
+       }
+       return nil
+}
+
+func (m *FunctionMetaData) GetPackageLocation() *PackageLocationMetaData {
+       if m != nil {
+               return m.PackageLocation
+       }
+       return nil
+}
+
+func (m *FunctionMetaData) GetVersion() uint64 {
+       if m != nil {
+               return m.Version
+       }
+       return 0
+}
+
+func (m *FunctionMetaData) GetCreateTime() uint64 {
+       if m != nil {
+               return m.CreateTime
+       }
+       return 0
+}
+
+func (m *FunctionMetaData) GetInstanceStates() map[int32]FunctionState {
+       if m != nil {
+               return m.InstanceStates
+       }
+       return nil
+}
+
+type Instance struct {
+       FunctionMetaData     *FunctionMetaData 
`protobuf:"bytes,1,opt,name=functionMetaData,proto3" 
json:"functionMetaData,omitempty"`
+       InstanceId           int32             
`protobuf:"varint,2,opt,name=instanceId,proto3" json:"instanceId,omitempty"`
+       XXX_NoUnkeyedLiteral struct{}          `json:"-"`
+       XXX_unrecognized     []byte            `json:"-"`
+       XXX_sizecache        int32             `json:"-"`
+}
+
+func (m *Instance) Reset()         { *m = Instance{} }
+func (m *Instance) String() string { return proto.CompactTextString(m) }
+func (*Instance) ProtoMessage()    {}
+func (*Instance) Descriptor() ([]byte, []int) {
+       return fileDescriptor_Function_33c6e1841d2624f0, []int{8}
+}
+func (m *Instance) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_Instance.Unmarshal(m, b)
+}
+func (m *Instance) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+       return xxx_messageInfo_Instance.Marshal(b, m, deterministic)
+}
+func (dst *Instance) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_Instance.Merge(dst, src)
+}
+func (m *Instance) XXX_Size() int {
+       return xxx_messageInfo_Instance.Size(m)
+}
+func (m *Instance) XXX_DiscardUnknown() {
+       xxx_messageInfo_Instance.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Instance proto.InternalMessageInfo
+
+func (m *Instance) GetFunctionMetaData() *FunctionMetaData {
+       if m != nil {
+               return m.FunctionMetaData
+       }
+       return nil
+}
+
+func (m *Instance) GetInstanceId() int32 {
+       if m != nil {
+               return m.InstanceId
+       }
+       return 0
+}
+
+type Assignment struct {
+       Instance             *Instance 
`protobuf:"bytes,1,opt,name=instance,proto3" json:"instance,omitempty"`
+       WorkerId             string    
`protobuf:"bytes,2,opt,name=workerId,proto3" json:"workerId,omitempty"`
+       XXX_NoUnkeyedLiteral struct{}  `json:"-"`
+       XXX_unrecognized     []byte    `json:"-"`
+       XXX_sizecache        int32     `json:"-"`
+}
+
+func (m *Assignment) Reset()         { *m = Assignment{} }
+func (m *Assignment) String() string { return proto.CompactTextString(m) }
+func (*Assignment) ProtoMessage()    {}
+func (*Assignment) Descriptor() ([]byte, []int) {
+       return fileDescriptor_Function_33c6e1841d2624f0, []int{9}
+}
+func (m *Assignment) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_Assignment.Unmarshal(m, b)
+}
+func (m *Assignment) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) 
{
+       return xxx_messageInfo_Assignment.Marshal(b, m, deterministic)
+}
+func (dst *Assignment) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_Assignment.Merge(dst, src)
+}
+func (m *Assignment) XXX_Size() int {
+       return xxx_messageInfo_Assignment.Size(m)
+}
+func (m *Assignment) XXX_DiscardUnknown() {
+       xxx_messageInfo_Assignment.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Assignment proto.InternalMessageInfo
+
+func (m *Assignment) GetInstance() *Instance {
+       if m != nil {
+               return m.Instance
+       }
+       return nil
+}
+
+func (m *Assignment) GetWorkerId() string {
+       if m != nil {
+               return m.WorkerId
+       }
+       return ""
+}
+
+func init() {
+       proto.RegisterType((*Resources)(nil), "proto.Resources")
+       proto.RegisterType((*RetryDetails)(nil), "proto.RetryDetails")
+       proto.RegisterType((*FunctionDetails)(nil), "proto.FunctionDetails")
+       proto.RegisterType((*ConsumerSpec)(nil), "proto.ConsumerSpec")
+       proto.RegisterType((*ConsumerSpec_ReceiverQueueSize)(nil), 
"proto.ConsumerSpec.ReceiverQueueSize")
+       proto.RegisterType((*SourceSpec)(nil), "proto.SourceSpec")
+       proto.RegisterMapType((map[string]*ConsumerSpec)(nil), 
"proto.SourceSpec.InputSpecsEntry")
+       proto.RegisterMapType((map[string]string)(nil), 
"proto.SourceSpec.TopicsToSerDeClassNameEntry")
+       proto.RegisterType((*SinkSpec)(nil), "proto.SinkSpec")
+       proto.RegisterType((*PackageLocationMetaData)(nil), 
"proto.PackageLocationMetaData")
+       proto.RegisterType((*FunctionMetaData)(nil), "proto.FunctionMetaData")
+       proto.RegisterMapType((map[int32]FunctionState)(nil), 
"proto.FunctionMetaData.InstanceStatesEntry")
+       proto.RegisterType((*Instance)(nil), "proto.Instance")
+       proto.RegisterType((*Assignment)(nil), "proto.Assignment")
+       proto.RegisterEnum("proto.ProcessingGuarantees", 
ProcessingGuarantees_name, ProcessingGuarantees_value)
+       proto.RegisterEnum("proto.SubscriptionType", SubscriptionType_name, 
SubscriptionType_value)
+       proto.RegisterEnum("proto.FunctionState", FunctionState_name, 
FunctionState_value)
+       proto.RegisterEnum("proto.FunctionDetails_Runtime", 
FunctionDetails_Runtime_name, FunctionDetails_Runtime_value)
+}
+
+func init() { proto.RegisterFile("Function.proto", 
fileDescriptor_Function_33c6e1841d2624f0) }
+
+var fileDescriptor_Function_33c6e1841d2624f0 = []byte{
+       // 1220 bytes of a gzipped FileDescriptorProto
+       0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 
0xdb, 0x6e, 0xdb, 0x46,
+       0x13, 0x36, 0xad, 0xf3, 0xc8, 0xb6, 0xe8, 0x8d, 0x91, 0x10, 0xce, 0x8f, 
0x40, 0xd1, 0xdf, 0x83,
+       0xec, 0x24, 0x42, 0xe1, 0x5e, 0x34, 0xe8, 0x55, 0x15, 0x59, 0x4e, 0x54, 
0xd8, 0x96, 0xba, 0x52,
+       0x52, 0xe4, 0xaa, 0xd8, 0xd0, 0x63, 0x65, 0x21, 0x8a, 0x24, 0x76, 0x97, 
0x69, 0xdc, 0x07, 0xe8,
+       0x63, 0xf4, 0x49, 0xfa, 0x30, 0x7d, 0x92, 0xa2, 0xd8, 0x25, 0x29, 0x91, 
0x94, 0xdc, 0xbb, 0x5e,
+       0x89, 0xf3, 0xcd, 0x71, 0x67, 0xbf, 0x99, 0x15, 0x1c, 0x5c, 0x44, 0xbe, 
0xab, 0x78, 0xe0, 0xf7,
+       0x42, 0x11, 0xa8, 0x80, 0x54, 0xcc, 0x4f, 0x67, 0x00, 0x0d, 0x8a, 0x32, 
0x88, 0x84, 0x8b, 0x92,
+       0xd8, 0x50, 0x72, 0xc3, 0xc8, 0xb1, 0xda, 0x56, 0xd7, 0xa2, 0xfa, 0x53, 
0x23, 0x82, 0x2d, 0x9d,
+       0xdd, 0xb6, 0xd5, 0x2d, 0x51, 0xfd, 0x49, 0x08, 0x94, 0x6f, 0xb8, 0x5c, 
0x38, 0x25, 0x03, 0x99,
+       0xef, 0xce, 0x2d, 0xec, 0x51, 0x54, 0xe2, 0xee, 0x1c, 0x15, 0xe3, 0x9e, 
0x24, 0xcf, 0xe1, 0x70,
+       0xc9, 0x3e, 0x5f, 0xa1, 0x94, 0x6c, 0x8e, 0x5a, 0xc3, 0x51, 0x9a, 0xa8, 
0x15, 0xba, 0xa9, 0x20,
+       0x5d, 0x68, 0xdd, 0x20, 0xbb, 0xb9, 0x44, 0xa5, 0x50, 0xcc, 0x82, 0x90, 
0xbb, 0x26, 0x5f, 0x83,
+       0x16, 0xe1, 0xce, 0x1f, 0x15, 0x68, 0xa5, 0xc7, 0x48, 0x73, 0x3d, 0x84, 
0xaa, 0x42, 0x9f, 0xf9,
+       0xca, 0x24, 0x68, 0xd0, 0x44, 0x22, 0xff, 0x83, 0x86, 0xcf, 0x96, 0x28, 
0x43, 0xe6, 0x62, 0x12,
+       0x6f, 0x0d, 0xe8, 0x53, 0x68, 0xc1, 0x9c, 0xa2, 0x41, 0xcd, 0xb7, 0xf6, 
0x70, 0x3d, 0x26, 0xe5,
+       0xb5, 0x56, 0x94, 0x63, 0x8f, 0x15, 0x40, 0x8e, 0xa1, 0xee, 0x05, 0xf3, 
0xb8, 0xbc, 0x8a, 0x51,
+       0xae, 0x64, 0x32, 0x86, 0xa3, 0x50, 0x04, 0x2e, 0x4a, 0xc9, 0xfd, 0xf9, 
0xeb, 0x88, 0x09, 0xe6,
+       0x2b, 0x44, 0xe9, 0x54, 0xdb, 0x56, 0xf7, 0xe0, 0xec, 0x71, 0xdc, 0xf1, 
0xde, 0x64, 0x8b, 0x09,
+       0xdd, 0xea, 0x48, 0x9e, 0x00, 0x44, 0x12, 0xc5, 0x20, 0xf0, 0x6f, 0xf9, 
0xdc, 0xa9, 0x99, 0x74,
+       0x19, 0x44, 0xeb, 0x25, 0xba, 0x02, 0x95, 0xbc, 0x62, 0xa1, 0x63, 0xc7, 
0xfa, 0x35, 0x42, 0x5e,
+       0x42, 0x4d, 0x44, 0xbe, 0xe2, 0x4b, 0x74, 0xea, 0xa6, 0x86, 0x27, 0x49, 
0x0d, 0x85, 0xee, 0xf5,
+       0x68, 0x6c, 0x45, 0x53, 0x73, 0xe2, 0x40, 0x8d, 0x45, 0x2a, 0xe8, 0xbb, 
0x0b, 0xa7, 0xd1, 0xb6,
+       0xba, 0x75, 0x9a, 0x8a, 0xa4, 0x0d, 0xcd, 0x90, 0x09, 0xe6, 0x79, 0xe8, 
0x71, 0xb9, 0x74, 0xc0,
+       0x5c, 0x67, 0x16, 0x22, 0x27, 0x50, 0x8d, 0x99, 0xe4, 0x34, 0xdb, 0x56, 
0xb7, 0x79, 0x76, 0x98,
+       0x24, 0x9d, 0x1a, 0x70, 0x1a, 0xa2, 0x4b, 0x13, 0x03, 0xf2, 0x7f, 0x28, 
0x4b, 0xee, 0x2f, 0x9c,
+       0x3d, 0x63, 0xd8, 0x4a, 0x0d, 0xb9, 0xbf, 0x30, 0x66, 0x46, 0x49, 0x7a, 
0xd0, 0x10, 0x29, 0x37,
+       0x9d, 0x7d, 0x63, 0x69, 0x27, 0x96, 0x2b, 0xce, 0xd2, 0xb5, 0x89, 0xee, 
0x4a, 0xc8, 0xdc, 0x05,
+       0x9b, 0xe3, 0x5b, 0xe1, 0x39, 0x07, 0x71, 0x57, 0xd6, 0x08, 0xf9, 0x0e, 
0xf6, 0x44, 0x86, 0xa6,
+       0x4e, 0xcb, 0x84, 0x7c, 0xb0, 0x0a, 0xb9, 0x56, 0xd1, 0x9c, 0x61, 0xe7, 
0x6b, 0xa8, 0x25, 0x8d,
+       0x22, 0x75, 0x28, 0xff, 0xd8, 0x7f, 0xd7, 0xb7, 0x77, 0x08, 0x40, 0x75, 
0xf2, 0x7e, 0xf6, 0x66,
+       0x7c, 0x6d, 0x5b, 0xa4, 0x0a, 0xbb, 0xaf, 0xc7, 0x76, 0xa9, 0xf3, 0xb7, 
0x05, 0x7b, 0x83, 0xc0,
+       0x97, 0xd1, 0x12, 0x85, 0x3e, 0x88, 0xb9, 0x28, 0xf7, 0x23, 0x2e, 0xd9, 
0xec, 0x2e, 0xc4, 0x84,
+       0xa1, 0x19, 0x84, 0x7c, 0x05, 0x07, 0x12, 0xc5, 0x0d, 0x0e, 0x56, 0xc4, 
0x8b, 0xa9, 0x5a, 0x40,
+       0xb5, 0x1d, 0x97, 0x14, 0xe7, 0xf8, 0x79, 0xc2, 0xf4, 0x3c, 0xf8, 0x86, 
0xb9, 0x75, 0x5a, 0x40,
+       0xc9, 0x14, 0x0e, 0x05, 0xba, 0xc8, 0x3f, 0xa1, 0xf8, 0x29, 0xc2, 0x08, 
0xa7, 0xfc, 0xb7, 0x98,
+       0xcb, 0xcd, 0xb3, 0x2f, 0x93, 0x73, 0x66, 0xeb, 0xeb, 0xd1, 0xa2, 0x31, 
0xdd, 0xf4, 0x3f, 0x3e,
+       0x81, 0xc3, 0x0d, 0x3b, 0x72, 0x04, 0x95, 0x4f, 0xcc, 0x8b, 0x30, 0x99, 
0xeb, 0x58, 0xe8, 0xfc,
+       0x59, 0x01, 0x58, 0x5f, 0x77, 0x7e, 0xa4, 0xac, 0xe2, 0x48, 0x39, 0x50, 
0x73, 0x0d, 0x9f, 0x65,
+       0x72, 0xea, 0x54, 0x24, 0x5f, 0xc0, 0xbe, 0xba, 0x0b, 0x33, 0x5d, 0x89, 
0x27, 0x2e, 0x0f, 0x92,
+       0x01, 0xd8, 0x32, 0xfa, 0x20, 0x5d, 0xc1, 0x43, 0xcd, 0x69, 0xd3, 0xe2, 
0x92, 0xa1, 0xfb, 0xa3,
+       0x94, 0x50, 0x05, 0x35, 0xdd, 0x70, 0x20, 0x1c, 0x1e, 0x2a, 0x3d, 0xc4, 
0x72, 0x16, 0x4c, 0x51,
+       0x9c, 0x67, 0x72, 0x96, 0xdb, 0xa5, 0x6e, 0xf3, 0xec, 0xc5, 0x06, 0x89, 
0x7b, 0xb3, 0xad, 0xf6,
+       0x43, 0x5f, 0x89, 0xbb, 0x57, 0xbb, 0x8e, 0x45, 0xef, 0x09, 0x48, 0xfa, 
0x00, 0xdc, 0x0f, 0x23,
+       0xa5, 0x83, 0x48, 0x07, 0x4c, 0xf8, 0xa7, 0x9b, 0xe1, 0x47, 0x2b, 0x1b, 
0x13, 0x92, 0x66, 0x9c,
+       0x74, 0x43, 0x35, 0x0d, 0x83, 0x48, 0x5d, 0xc5, 0xeb, 0xa5, 0x4c, 0xd7, 
0x00, 0xe9, 0xc2, 0x7e,
+       0x9c, 0x3a, 0x25, 0x89, 0xd9, 0x1c, 0xa6, 0xa6, 0xbc, 0x42, 0xb7, 0xfe, 
0x43, 0xc4, 0x3d, 0xc5,
+       0x7d, 0xb3, 0x20, 0x1a, 0x34, 0x15, 0xc9, 0x69, 0xbe, 0xa9, 0xa6, 0x13, 
0x0d, 0x63, 0xb2, 0x81,
+       0x93, 0x6f, 0xe0, 0x81, 0xeb, 0x21, 0xf3, 0xa3, 0x30, 0xdb, 0x68, 0x33, 
0xfd, 0x75, 0xba, 0x4d,
+       0x75, 0x3c, 0x82, 0xc7, 0xff, 0xd2, 0x3d, 0xfd, 0xdc, 0x2c, 0xf0, 0x2e, 
0x61, 0x8a, 0xfe, 0x5c,
+       0xd3, 0x2c, 0x66, 0x48, 0x2c, 0x7c, 0xbf, 0xfb, 0xd2, 0x3a, 0xa6, 0xd0, 
0x2a, 0x74, 0x6a, 0x8b,
+       0xfb, 0x49, 0xd6, 0x7d, 0x3d, 0xeb, 0xd9, 0x19, 0xc8, 0xc4, 0xec, 0xfc, 
0x65, 0x41, 0x3d, 0x5d,
+       0x42, 0xff, 0x31, 0x79, 0x8f, 0xa0, 0x62, 0xae, 0x24, 0x79, 0x82, 0x62, 
0x21, 0xd9, 0x07, 0x79,
+       0x16, 0xa6, 0xfb, 0x20, 0x4b, 0xa5, 0xcc, 0xfd, 0x55, 0xf3, 0xf7, 0x97, 
0xdf, 0x38, 0xb5, 0xe2,
+       0xc6, 0xe9, 0xcc, 0xe1, 0xd1, 0x24, 0x5e, 0x89, 0x97, 0x81, 0xcb, 0xf4, 
0xa5, 0x5c, 0xa1, 0x62,
+       0xe7, 0x4c, 0xb1, 0x78, 0xc3, 0x1b, 0xd5, 0x84, 0xa9, 0x8f, 0xc9, 0x91, 
0xb3, 0x90, 0x26, 0x47,
+       0x20, 0xf8, 0x9c, 0xfb, 0xcc, 0xbb, 0xe0, 0x1e, 0x66, 0x16, 0xd6, 0x06, 
0xde, 0xf9, 0xbd, 0x04,
+       0x76, 0xfa, 0xdc, 0xac, 0x52, 0xfc, 0x00, 0xad, 0xdb, 0xfc, 0x13, 0x64, 
0xd2, 0x34, 0xcf, 0x1e,
+       0x6e, 0x7f, 0xa0, 0x68, 0xd1, 0x9c, 0xbc, 0x81, 0x56, 0x98, 0xaf, 0x3f, 
0xb9, 0xdb, 0xf4, 0x89,
+       0xbb, 0xe7, 0x74, 0xb4, 0xe8, 0xa6, 0x7b, 0xf8, 0x09, 0x85, 0xd4, 0x11, 
0x4a, 0x66, 0x92, 0x52,
+       0x51, 0xf7, 0xd0, 0x15, 0xc8, 0x14, 0xce, 0x78, 0x72, 0x03, 0x65, 0x9a, 
0x41, 0xc8, 0x14, 0x0e,
+       0xb8, 0x2f, 0x15, 0xf3, 0x5d, 0x9c, 0x2a, 0xa6, 0x50, 0x3a, 0x15, 0x33, 
0xcc, 0xcf, 0x0a, 0x87,
+       0x48, 0x73, 0xf7, 0x46, 0x39, 0xeb, 0x78, 0xac, 0x0b, 0x21, 0x8e, 0x7f, 
0x86, 0x07, 0x5b, 0xcc,
+       0xb2, 0x9c, 0xae, 0xc4, 0x9c, 0x3e, 0xcd, 0x72, 0xfa, 0xe0, 0xec, 0xa8, 
0x90, 0xd4, 0x38, 0x67,
+       0x49, 0x1d, 0x40, 0x3d, 0x0d, 0xac, 0x57, 0xe6, 0x6d, 0xa1, 0xb8, 0xe4, 
0x02, 0x1e, 0xdd, 0x53,
+       0x3b, 0xdd, 0x70, 0xd0, 0xed, 0x49, 0x6b, 0x1f, 0xdd, 0x98, 0x2a, 0x2a, 
0x34, 0x83, 0x74, 0xde,
+       0x02, 0xf4, 0xa5, 0xe4, 0x73, 0x7f, 0x89, 0xbe, 0x22, 0xcf, 0xa0, 0x9e, 
0xea, 0x92, 0x54, 0xe9,
+       0x73, 0x9f, 0x56, 0x45, 0x57, 0x06, 0xfa, 0x5f, 0xd6, 0xaf, 0x81, 0x58, 
0xa0, 0x48, 0x02, 0x37,
+       0xe8, 0x4a, 0x3e, 0x1d, 0xc3, 0xd1, 0xb6, 0xbf, 0x50, 0xc4, 0x86, 0xbd, 
0xfe, 0xec, 0x72, 0xd8,
+       0x9f, 0xce, 0x7e, 0x19, 0x5f, 0x0f, 0x86, 0xf6, 0x0e, 0x69, 0x41, 0xb3, 
0x3f, 0xbb, 0x1a, 0xa7,
+       0x80, 0x45, 0x8e, 0xc0, 0x1e, 0x5e, 0x5c, 0x0c, 0x07, 0xb3, 0xd1, 0xbb, 
0xe1, 0xe5, 0xfb, 0x18,
+       0xdd, 0x3d, 0x7d, 0x0e, 0x76, 0xf1, 0x81, 0xd0, 0xaf, 0xfa, 0xf4, 0x4d, 
0x9f, 0x0e, 0xcf, 0xed,
+       0x1d, 0xb2, 0x07, 0xf5, 0x8b, 0xfe, 0xe8, 0x72, 0xfc, 0x6e, 0x48, 0x6d, 
0xeb, 0xf4, 0x04, 0xf6,
+       0x73, 0x2d, 0x26, 0x4d, 0xa8, 0xd1, 0xb7, 0xd7, 0xd7, 0xa3, 0xeb, 0xd7, 
0xf6, 0x8e, 0x16, 0xa6,
+       0xb3, 0xf1, 0x64, 0x32, 0x3c, 0xb7, 0xad, 0x57, 0x2f, 0xe0, 0x69, 0x20, 
0xe6, 0x3d, 0x16, 0x32,
+       0xf7, 0x23, 0xf6, 0xc2, 0xc8, 0x93, 0x4c, 0xf4, 0xd2, 0x36, 0xca, 0xf8, 
0xf4, 0xaf, 0xea, 0x69,
+       0xb4, 0x0f, 0x55, 0x03, 0x7c, 0xfb, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 
0xfe, 0xa8, 0xa5, 0x5f,
+       0xa3, 0x0b, 0x00, 0x00,
+}
diff --git a/pulsar-function-go/pb/InstanceCommunication.pb.go 
b/pulsar-function-go/pb/InstanceCommunication.pb.go
new file mode 100644
index 0000000..a0c36e0
--- /dev/null
+++ b/pulsar-function-go/pb/InstanceCommunication.pb.go
@@ -0,0 +1,648 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: InstanceCommunication.proto
+
+package pb
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+import _ "github.com/golang/protobuf/ptypes/empty"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type FunctionStatus struct {
+       Running          bool   `protobuf:"varint,1,opt,name=running,proto3" 
json:"running,omitempty"`
+       FailureException string 
`protobuf:"bytes,2,opt,name=failureException,proto3" 
json:"failureException,omitempty"`
+       NumRestarts      int64  
`protobuf:"varint,3,opt,name=numRestarts,proto3" json:"numRestarts,omitempty"`
+       // int64 numProcessed = 4;
+       NumReceived              int64                                  
`protobuf:"varint,17,opt,name=numReceived,proto3" json:"numReceived,omitempty"`
+       NumSuccessfullyProcessed int64                                  
`protobuf:"varint,5,opt,name=numSuccessfullyProcessed,proto3" 
json:"numSuccessfullyProcessed,omitempty"`
+       NumUserExceptions        int64                                  
`protobuf:"varint,6,opt,name=numUserExceptions,proto3" 
json:"numUserExceptions,omitempty"`
+       LatestUserExceptions     []*FunctionStatus_ExceptionInformation 
`protobuf:"bytes,7,rep,name=latestUserExceptions,proto3" 
json:"latestUserExceptions,omitempty"`
+       NumSystemExceptions      int64                                  
`protobuf:"varint,8,opt,name=numSystemExceptions,proto3" 
json:"numSystemExceptions,omitempty"`
+       LatestSystemExceptions   []*FunctionStatus_ExceptionInformation 
`protobuf:"bytes,9,rep,name=latestSystemExceptions,proto3" 
json:"latestSystemExceptions,omitempty"`
+       NumSourceExceptions      int64                                  
`protobuf:"varint,18,opt,name=numSourceExceptions,proto3" 
json:"numSourceExceptions,omitempty"`
+       LatestSourceExceptions   []*FunctionStatus_ExceptionInformation 
`protobuf:"bytes,19,rep,name=latestSourceExceptions,proto3" 
json:"latestSourceExceptions,omitempty"`
+       NumSinkExceptions        int64                                  
`protobuf:"varint,20,opt,name=numSinkExceptions,proto3" 
json:"numSinkExceptions,omitempty"`
+       LatestSinkExceptions     []*FunctionStatus_ExceptionInformation 
`protobuf:"bytes,21,rep,name=latestSinkExceptions,proto3" 
json:"latestSinkExceptions,omitempty"`
+       // map from topic name to number of deserialization exceptions
+       //    map<string, int64> deserializationExceptions = 10;
+       // number of serialization exceptions on the output
+       //    int64 serializationExceptions = 11;
+       // average latency
+       AverageLatency float64 
`protobuf:"fixed64,12,opt,name=averageLatency,proto3" 
json:"averageLatency,omitempty"`
+       // When was the last time the function was invoked.
+       // expressed in ms since epoch
+       LastInvocationTime int64  
`protobuf:"varint,13,opt,name=lastInvocationTime,proto3" 
json:"lastInvocationTime,omitempty"`
+       InstanceId         string 
`protobuf:"bytes,14,opt,name=instanceId,proto3" json:"instanceId,omitempty"`
+       //    MetricsData metrics = 15 [deprecated=true];
+       // owner of function-instance
+       WorkerId             string   
`protobuf:"bytes,16,opt,name=workerId,proto3" json:"workerId,omitempty"`
+       XXX_NoUnkeyedLiteral struct{} `json:"-"`
+       XXX_unrecognized     []byte   `json:"-"`
+       XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *FunctionStatus) Reset()         { *m = FunctionStatus{} }
+func (m *FunctionStatus) String() string { return proto.CompactTextString(m) }
+func (*FunctionStatus) ProtoMessage()    {}
+func (*FunctionStatus) Descriptor() ([]byte, []int) {
+       return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{0}
+}
+func (m *FunctionStatus) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_FunctionStatus.Unmarshal(m, b)
+}
+func (m *FunctionStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, 
error) {
+       return xxx_messageInfo_FunctionStatus.Marshal(b, m, deterministic)
+}
+func (dst *FunctionStatus) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_FunctionStatus.Merge(dst, src)
+}
+func (m *FunctionStatus) XXX_Size() int {
+       return xxx_messageInfo_FunctionStatus.Size(m)
+}
+func (m *FunctionStatus) XXX_DiscardUnknown() {
+       xxx_messageInfo_FunctionStatus.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_FunctionStatus proto.InternalMessageInfo
+
+func (m *FunctionStatus) GetRunning() bool {
+       if m != nil {
+               return m.Running
+       }
+       return false
+}
+
+func (m *FunctionStatus) GetFailureException() string {
+       if m != nil {
+               return m.FailureException
+       }
+       return ""
+}
+
+func (m *FunctionStatus) GetNumRestarts() int64 {
+       if m != nil {
+               return m.NumRestarts
+       }
+       return 0
+}
+
+func (m *FunctionStatus) GetNumReceived() int64 {
+       if m != nil {
+               return m.NumReceived
+       }
+       return 0
+}
+
+func (m *FunctionStatus) GetNumSuccessfullyProcessed() int64 {
+       if m != nil {
+               return m.NumSuccessfullyProcessed
+       }
+       return 0
+}
+
+func (m *FunctionStatus) GetNumUserExceptions() int64 {
+       if m != nil {
+               return m.NumUserExceptions
+       }
+       return 0
+}
+
+func (m *FunctionStatus) GetLatestUserExceptions() 
[]*FunctionStatus_ExceptionInformation {
+       if m != nil {
+               return m.LatestUserExceptions
+       }
+       return nil
+}
+
+func (m *FunctionStatus) GetNumSystemExceptions() int64 {
+       if m != nil {
+               return m.NumSystemExceptions
+       }
+       return 0
+}
+
+func (m *FunctionStatus) GetLatestSystemExceptions() 
[]*FunctionStatus_ExceptionInformation {
+       if m != nil {
+               return m.LatestSystemExceptions
+       }
+       return nil
+}
+
+func (m *FunctionStatus) GetNumSourceExceptions() int64 {
+       if m != nil {
+               return m.NumSourceExceptions
+       }
+       return 0
+}
+
+func (m *FunctionStatus) GetLatestSourceExceptions() 
[]*FunctionStatus_ExceptionInformation {
+       if m != nil {
+               return m.LatestSourceExceptions
+       }
+       return nil
+}
+
+func (m *FunctionStatus) GetNumSinkExceptions() int64 {
+       if m != nil {
+               return m.NumSinkExceptions
+       }
+       return 0
+}
+
+func (m *FunctionStatus) GetLatestSinkExceptions() 
[]*FunctionStatus_ExceptionInformation {
+       if m != nil {
+               return m.LatestSinkExceptions
+       }
+       return nil
+}
+
+func (m *FunctionStatus) GetAverageLatency() float64 {
+       if m != nil {
+               return m.AverageLatency
+       }
+       return 0
+}
+
+func (m *FunctionStatus) GetLastInvocationTime() int64 {
+       if m != nil {
+               return m.LastInvocationTime
+       }
+       return 0
+}
+
+func (m *FunctionStatus) GetInstanceId() string {
+       if m != nil {
+               return m.InstanceId
+       }
+       return ""
+}
+
+func (m *FunctionStatus) GetWorkerId() string {
+       if m != nil {
+               return m.WorkerId
+       }
+       return ""
+}
+
+type FunctionStatus_ExceptionInformation struct {
+       ExceptionString      string   
`protobuf:"bytes,1,opt,name=exceptionString,proto3" 
json:"exceptionString,omitempty"`
+       MsSinceEpoch         int64    
`protobuf:"varint,2,opt,name=msSinceEpoch,proto3" json:"msSinceEpoch,omitempty"`
+       XXX_NoUnkeyedLiteral struct{} `json:"-"`
+       XXX_unrecognized     []byte   `json:"-"`
+       XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *FunctionStatus_ExceptionInformation) Reset()         { *m = 
FunctionStatus_ExceptionInformation{} }
+func (m *FunctionStatus_ExceptionInformation) String() string { return 
proto.CompactTextString(m) }
+func (*FunctionStatus_ExceptionInformation) ProtoMessage()    {}
+func (*FunctionStatus_ExceptionInformation) Descriptor() ([]byte, []int) {
+       return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{0, 
0}
+}
+func (m *FunctionStatus_ExceptionInformation) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_FunctionStatus_ExceptionInformation.Unmarshal(m, 
b)
+}
+func (m *FunctionStatus_ExceptionInformation) XXX_Marshal(b []byte, 
deterministic bool) ([]byte, error) {
+       return xxx_messageInfo_FunctionStatus_ExceptionInformation.Marshal(b, 
m, deterministic)
+}
+func (dst *FunctionStatus_ExceptionInformation) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_FunctionStatus_ExceptionInformation.Merge(dst, src)
+}
+func (m *FunctionStatus_ExceptionInformation) XXX_Size() int {
+       return xxx_messageInfo_FunctionStatus_ExceptionInformation.Size(m)
+}
+func (m *FunctionStatus_ExceptionInformation) XXX_DiscardUnknown() {
+       xxx_messageInfo_FunctionStatus_ExceptionInformation.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_FunctionStatus_ExceptionInformation 
proto.InternalMessageInfo
+
+func (m *FunctionStatus_ExceptionInformation) GetExceptionString() string {
+       if m != nil {
+               return m.ExceptionString
+       }
+       return ""
+}
+
+func (m *FunctionStatus_ExceptionInformation) GetMsSinceEpoch() int64 {
+       if m != nil {
+               return m.MsSinceEpoch
+       }
+       return 0
+}
+
+// Deprecated
+type FunctionStatusList struct {
+       Error                string            
`protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"`
+       FunctionStatusList   []*FunctionStatus 
`protobuf:"bytes,1,rep,name=functionStatusList,proto3" 
json:"functionStatusList,omitempty"`
+       XXX_NoUnkeyedLiteral struct{}          `json:"-"`
+       XXX_unrecognized     []byte            `json:"-"`
+       XXX_sizecache        int32             `json:"-"`
+}
+
+func (m *FunctionStatusList) Reset()         { *m = FunctionStatusList{} }
+func (m *FunctionStatusList) String() string { return 
proto.CompactTextString(m) }
+func (*FunctionStatusList) ProtoMessage()    {}
+func (*FunctionStatusList) Descriptor() ([]byte, []int) {
+       return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{1}
+}
+func (m *FunctionStatusList) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_FunctionStatusList.Unmarshal(m, b)
+}
+func (m *FunctionStatusList) XXX_Marshal(b []byte, deterministic bool) 
([]byte, error) {
+       return xxx_messageInfo_FunctionStatusList.Marshal(b, m, deterministic)
+}
+func (dst *FunctionStatusList) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_FunctionStatusList.Merge(dst, src)
+}
+func (m *FunctionStatusList) XXX_Size() int {
+       return xxx_messageInfo_FunctionStatusList.Size(m)
+}
+func (m *FunctionStatusList) XXX_DiscardUnknown() {
+       xxx_messageInfo_FunctionStatusList.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_FunctionStatusList proto.InternalMessageInfo
+
+func (m *FunctionStatusList) GetError() string {
+       if m != nil {
+               return m.Error
+       }
+       return ""
+}
+
+func (m *FunctionStatusList) GetFunctionStatusList() []*FunctionStatus {
+       if m != nil {
+               return m.FunctionStatusList
+       }
+       return nil
+}
+
+type MetricsData struct {
+       // Total number of records function received from source
+       ReceivedTotal      int64 
`protobuf:"varint,2,opt,name=receivedTotal,proto3" 
json:"receivedTotal,omitempty"`
+       ReceivedTotal_1Min int64 
`protobuf:"varint,10,opt,name=receivedTotal_1min,json=receivedTotal1min,proto3" 
json:"receivedTotal_1min,omitempty"`
+       // Total number of records successfully processed by user function
+       ProcessedSuccessfullyTotal      int64 
`protobuf:"varint,4,opt,name=processedSuccessfullyTotal,proto3" 
json:"processedSuccessfullyTotal,omitempty"`
+       ProcessedSuccessfullyTotal_1Min int64 
`protobuf:"varint,12,opt,name=processedSuccessfullyTotal_1min,json=processedSuccessfullyTotal1min,proto3"
 json:"processedSuccessfullyTotal_1min,omitempty"`
+       // Total number of system exceptions thrown
+       SystemExceptionsTotal      int64 
`protobuf:"varint,5,opt,name=systemExceptionsTotal,proto3" 
json:"systemExceptionsTotal,omitempty"`
+       SystemExceptionsTotal_1Min int64 
`protobuf:"varint,13,opt,name=systemExceptionsTotal_1min,json=systemExceptionsTotal1min,proto3"
 json:"systemExceptionsTotal_1min,omitempty"`
+       // Total number of user exceptions thrown
+       UserExceptionsTotal      int64 
`protobuf:"varint,6,opt,name=userExceptionsTotal,proto3" 
json:"userExceptionsTotal,omitempty"`
+       UserExceptionsTotal_1Min int64 
`protobuf:"varint,14,opt,name=userExceptionsTotal_1min,json=userExceptionsTotal1min,proto3"
 json:"userExceptionsTotal_1min,omitempty"`
+       // Average process latency for function
+       AvgProcessLatency      float64 
`protobuf:"fixed64,7,opt,name=avgProcessLatency,proto3" 
json:"avgProcessLatency,omitempty"`
+       AvgProcessLatency_1Min float64 
`protobuf:"fixed64,15,opt,name=avgProcessLatency_1min,json=avgProcessLatency1min,proto3"
 json:"avgProcessLatency_1min,omitempty"`
+       // Timestamp of when the function was last invoked
+       LastInvocation int64 
`protobuf:"varint,8,opt,name=lastInvocation,proto3" 
json:"lastInvocation,omitempty"`
+       // User defined metrics
+       UserMetrics          map[string]float64 
`protobuf:"bytes,9,rep,name=userMetrics,proto3" json:"userMetrics,omitempty" 
protobuf_key:"bytes,1,opt,name=key,proto3" 
protobuf_val:"fixed64,2,opt,name=value,proto3"`
+       XXX_NoUnkeyedLiteral struct{}           `json:"-"`
+       XXX_unrecognized     []byte             `json:"-"`
+       XXX_sizecache        int32              `json:"-"`
+}
+
+func (m *MetricsData) Reset()         { *m = MetricsData{} }
+func (m *MetricsData) String() string { return proto.CompactTextString(m) }
+func (*MetricsData) ProtoMessage()    {}
+func (*MetricsData) Descriptor() ([]byte, []int) {
+       return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{2}
+}
+func (m *MetricsData) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_MetricsData.Unmarshal(m, b)
+}
+func (m *MetricsData) XXX_Marshal(b []byte, deterministic bool) ([]byte, 
error) {
+       return xxx_messageInfo_MetricsData.Marshal(b, m, deterministic)
+}
+func (dst *MetricsData) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_MetricsData.Merge(dst, src)
+}
+func (m *MetricsData) XXX_Size() int {
+       return xxx_messageInfo_MetricsData.Size(m)
+}
+func (m *MetricsData) XXX_DiscardUnknown() {
+       xxx_messageInfo_MetricsData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_MetricsData proto.InternalMessageInfo
+
+func (m *MetricsData) GetReceivedTotal() int64 {
+       if m != nil {
+               return m.ReceivedTotal
+       }
+       return 0
+}
+
+func (m *MetricsData) GetReceivedTotal_1Min() int64 {
+       if m != nil {
+               return m.ReceivedTotal_1Min
+       }
+       return 0
+}
+
+func (m *MetricsData) GetProcessedSuccessfullyTotal() int64 {
+       if m != nil {
+               return m.ProcessedSuccessfullyTotal
+       }
+       return 0
+}
+
+func (m *MetricsData) GetProcessedSuccessfullyTotal_1Min() int64 {
+       if m != nil {
+               return m.ProcessedSuccessfullyTotal_1Min
+       }
+       return 0
+}
+
+func (m *MetricsData) GetSystemExceptionsTotal() int64 {
+       if m != nil {
+               return m.SystemExceptionsTotal
+       }
+       return 0
+}
+
+func (m *MetricsData) GetSystemExceptionsTotal_1Min() int64 {
+       if m != nil {
+               return m.SystemExceptionsTotal_1Min
+       }
+       return 0
+}
+
+func (m *MetricsData) GetUserExceptionsTotal() int64 {
+       if m != nil {
+               return m.UserExceptionsTotal
+       }
+       return 0
+}
+
+func (m *MetricsData) GetUserExceptionsTotal_1Min() int64 {
+       if m != nil {
+               return m.UserExceptionsTotal_1Min
+       }
+       return 0
+}
+
+func (m *MetricsData) GetAvgProcessLatency() float64 {
+       if m != nil {
+               return m.AvgProcessLatency
+       }
+       return 0
+}
+
+func (m *MetricsData) GetAvgProcessLatency_1Min() float64 {
+       if m != nil {
+               return m.AvgProcessLatency_1Min
+       }
+       return 0
+}
+
+func (m *MetricsData) GetLastInvocation() int64 {
+       if m != nil {
+               return m.LastInvocation
+       }
+       return 0
+}
+
+func (m *MetricsData) GetUserMetrics() map[string]float64 {
+       if m != nil {
+               return m.UserMetrics
+       }
+       return nil
+}
+
+type HealthCheckResult struct {
+       Success              bool     
`protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"`
+       XXX_NoUnkeyedLiteral struct{} `json:"-"`
+       XXX_unrecognized     []byte   `json:"-"`
+       XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *HealthCheckResult) Reset()         { *m = HealthCheckResult{} }
+func (m *HealthCheckResult) String() string { return 
proto.CompactTextString(m) }
+func (*HealthCheckResult) ProtoMessage()    {}
+func (*HealthCheckResult) Descriptor() ([]byte, []int) {
+       return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{3}
+}
+func (m *HealthCheckResult) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_HealthCheckResult.Unmarshal(m, b)
+}
+func (m *HealthCheckResult) XXX_Marshal(b []byte, deterministic bool) ([]byte, 
error) {
+       return xxx_messageInfo_HealthCheckResult.Marshal(b, m, deterministic)
+}
+func (dst *HealthCheckResult) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_HealthCheckResult.Merge(dst, src)
+}
+func (m *HealthCheckResult) XXX_Size() int {
+       return xxx_messageInfo_HealthCheckResult.Size(m)
+}
+func (m *HealthCheckResult) XXX_DiscardUnknown() {
+       xxx_messageInfo_HealthCheckResult.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_HealthCheckResult proto.InternalMessageInfo
+
+func (m *HealthCheckResult) GetSuccess() bool {
+       if m != nil {
+               return m.Success
+       }
+       return false
+}
+
+type Metrics struct {
+       Metrics              []*Metrics_InstanceMetrics 
`protobuf:"bytes,1,rep,name=metrics,proto3" json:"metrics,omitempty"`
+       XXX_NoUnkeyedLiteral struct{}                   `json:"-"`
+       XXX_unrecognized     []byte                     `json:"-"`
+       XXX_sizecache        int32                      `json:"-"`
+}
+
+func (m *Metrics) Reset()         { *m = Metrics{} }
+func (m *Metrics) String() string { return proto.CompactTextString(m) }
+func (*Metrics) ProtoMessage()    {}
+func (*Metrics) Descriptor() ([]byte, []int) {
+       return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{4}
+}
+func (m *Metrics) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_Metrics.Unmarshal(m, b)
+}
+func (m *Metrics) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+       return xxx_messageInfo_Metrics.Marshal(b, m, deterministic)
+}
+func (dst *Metrics) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_Metrics.Merge(dst, src)
+}
+func (m *Metrics) XXX_Size() int {
+       return xxx_messageInfo_Metrics.Size(m)
+}
+func (m *Metrics) XXX_DiscardUnknown() {
+       xxx_messageInfo_Metrics.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Metrics proto.InternalMessageInfo
+
+func (m *Metrics) GetMetrics() []*Metrics_InstanceMetrics {
+       if m != nil {
+               return m.Metrics
+       }
+       return nil
+}
+
+type Metrics_InstanceMetrics struct {
+       Name                 string       
`protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+       InstanceId           int32        
`protobuf:"varint,2,opt,name=instanceId,proto3" json:"instanceId,omitempty"`
+       MetricsData          *MetricsData 
`protobuf:"bytes,3,opt,name=metricsData,proto3" json:"metricsData,omitempty"`
+       XXX_NoUnkeyedLiteral struct{}     `json:"-"`
+       XXX_unrecognized     []byte       `json:"-"`
+       XXX_sizecache        int32        `json:"-"`
+}
+
+func (m *Metrics_InstanceMetrics) Reset()         { *m = 
Metrics_InstanceMetrics{} }
+func (m *Metrics_InstanceMetrics) String() string { return 
proto.CompactTextString(m) }
+func (*Metrics_InstanceMetrics) ProtoMessage()    {}
+func (*Metrics_InstanceMetrics) Descriptor() ([]byte, []int) {
+       return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{4, 
0}
+}
+func (m *Metrics_InstanceMetrics) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_Metrics_InstanceMetrics.Unmarshal(m, b)
+}
+func (m *Metrics_InstanceMetrics) XXX_Marshal(b []byte, deterministic bool) 
([]byte, error) {
+       return xxx_messageInfo_Metrics_InstanceMetrics.Marshal(b, m, 
deterministic)
+}
+func (dst *Metrics_InstanceMetrics) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_Metrics_InstanceMetrics.Merge(dst, src)
+}
+func (m *Metrics_InstanceMetrics) XXX_Size() int {
+       return xxx_messageInfo_Metrics_InstanceMetrics.Size(m)
+}
+func (m *Metrics_InstanceMetrics) XXX_DiscardUnknown() {
+       xxx_messageInfo_Metrics_InstanceMetrics.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Metrics_InstanceMetrics proto.InternalMessageInfo
+
+func (m *Metrics_InstanceMetrics) GetName() string {
+       if m != nil {
+               return m.Name
+       }
+       return ""
+}
+
+func (m *Metrics_InstanceMetrics) GetInstanceId() int32 {
+       if m != nil {
+               return m.InstanceId
+       }
+       return 0
+}
+
+func (m *Metrics_InstanceMetrics) GetMetricsData() *MetricsData {
+       if m != nil {
+               return m.MetricsData
+       }
+       return nil
+}
+
+func init() {
+       proto.RegisterType((*FunctionStatus)(nil), "proto.FunctionStatus")
+       proto.RegisterType((*FunctionStatus_ExceptionInformation)(nil), 
"proto.FunctionStatus.ExceptionInformation")
+       proto.RegisterType((*FunctionStatusList)(nil), 
"proto.FunctionStatusList")
+       proto.RegisterType((*MetricsData)(nil), "proto.MetricsData")
+       proto.RegisterMapType((map[string]float64)(nil), 
"proto.MetricsData.UserMetricsEntry")
+       proto.RegisterType((*HealthCheckResult)(nil), "proto.HealthCheckResult")
+       proto.RegisterType((*Metrics)(nil), "proto.Metrics")
+       proto.RegisterType((*Metrics_InstanceMetrics)(nil), 
"proto.Metrics.InstanceMetrics")
+}
+
+func init() {
+       proto.RegisterFile("InstanceCommunication.proto", 
fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3)
+}
+
+var fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3 = []byte{
+       // 917 bytes of a gzipped FileDescriptorProto
+       0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 
0xef, 0x6e, 0x1b, 0x45,
+       0x10, 0xcf, 0xd5, 0x75, 0x9d, 0x8c, 0x53, 0x27, 0x9e, 0xc4, 0xe1, 0x70, 
0xa5, 0x60, 0x0e, 0x54,
+       0x59, 0x55, 0x7b, 0x2d, 0xa1, 0x48, 0x25, 0x12, 0x15, 0x4d, 0x6a, 0x82, 
0xa5, 0x22, 0xa1, 0x73,
+       0xfb, 0x15, 0xb4, 0x39, 0xaf, 0x9d, 0x53, 0xee, 0x76, 0xcd, 0xee, 0x9e, 
0xc1, 0xe2, 0x45, 0x78,
+       0x1c, 0x1e, 0x84, 0x2f, 0xbc, 0x09, 0xba, 0xdd, 0x3b, 0xfb, 0xfe, 0x19, 
0x41, 0x3e, 0xf9, 0x76,
+       0x7e, 0xf3, 0x9b, 0xf9, 0x79, 0x77, 0x66, 0x76, 0xe1, 0xd1, 0x98, 0x49, 
0x45, 0x98, 0x4f, 0x2f,
+       0x79, 0x14, 0xc5, 0x2c, 0xf0, 0x89, 0x0a, 0x38, 0x73, 0x17, 0x82, 0x2b, 
0x8e, 0x4d, 0xfd, 0xd3,
+       0x7f, 0x34, 0xe7, 0x7c, 0x1e, 0xd2, 0xe7, 0x7a, 0x75, 0x1d, 0xcf, 0x9e, 
0xd3, 0x68, 0xa1, 0x56,
+       0xc6, 0xc7, 0xf9, 0x63, 0x17, 0x3a, 0xdf, 0xc5, 0xcc, 0x4f, 0x68, 0x13, 
0x45, 0x54, 0x2c, 0xd1,
+       0x86, 0x96, 0x88, 0x19, 0x0b, 0xd8, 0xdc, 0xb6, 0x06, 0xd6, 0x70, 0xd7, 
0xcb, 0x96, 0xf8, 0x04,
+       0x0e, 0x67, 0x24, 0x08, 0x63, 0x41, 0x47, 0xbf, 0xf9, 0x74, 0x91, 0x70, 
0xec, 0x7b, 0x03, 0x6b,
+       0xb8, 0xe7, 0x55, 0xec, 0x38, 0x80, 0x36, 0x8b, 0x23, 0x8f, 0x4a, 0x45, 
0x84, 0x92, 0x76, 0x63,
+       0x60, 0x0d, 0x1b, 0x5e, 0xde, 0xb4, 0xf6, 0xf0, 0x69, 0xb0, 0xa4, 0x53, 
0xbb, 0x9b, 0xf3, 0x30,
+       0x26, 0x3c, 0x07, 0x9b, 0xc5, 0xd1, 0x24, 0xf6, 0x7d, 0x2a, 0xe5, 0x2c, 
0x0e, 0xc3, 0xd5, 0x8f,
+       0x82, 0x27, 0xdf, 0x74, 0x6a, 0x37, 0xb5, 0xfb, 0x56, 0x1c, 0x9f, 0x42, 
0x97, 0xc5, 0xd1, 0x07,
+       0x49, 0xc5, 0x5a, 0x93, 0xb4, 0x1f, 0x68, 0x52, 0x15, 0xc0, 0x9f, 0xe0, 
0x38, 0x24, 0x8a, 0x4a,
+       0x55, 0x22, 0xb4, 0x06, 0x8d, 0x61, 0xfb, 0xec, 0x89, 0xd9, 0x2c, 0xb7, 
0xb8, 0x51, 0xee, 0xda,
+       0x6f, 0xcc, 0x66, 0x5c, 0x44, 0x7a, 0xeb, 0xbd, 0xda, 0x38, 0xf8, 0x02, 
0x8e, 0x12, 0xa5, 0x2b,
+       0xa9, 0x68, 0x94, 0x0b, 0xbf, 0xab, 0xf5, 0xd4, 0x41, 0x78, 0x0d, 0x27, 
0x26, 0x52, 0x85, 0xb4,
+       0xf7, 0xbf, 0x35, 0x6d, 0x89, 0x94, 0xa9, 0xe2, 0xb1, 0xf0, 0x69, 0x2e, 
0x01, 0x6e, 0x54, 0x95,
+       0xa0, 0x9c, 0xaa, 0x32, 0xe9, 0xe8, 0xce, 0xaa, 0xca, 0x39, 0xcc, 0xc9, 
0x4d, 0x02, 0x76, 0x9b,
+       0x0b, 0x7f, 0xbc, 0x3e, 0xb9, 0x22, 0xb0, 0x39, 0xb9, 0x12, 0xa1, 0x77, 
0xd7, 0x93, 0x2b, 0xc5,
+       0x7f, 0x0c, 0x1d, 0xb2, 0xa4, 0x82, 0xcc, 0xe9, 0x3b, 0xa2, 0x28, 0xf3, 
0x57, 0xf6, 0xfe, 0xc0,
+       0x1a, 0x5a, 0x5e, 0xc9, 0x8a, 0x2e, 0x60, 0x48, 0xa4, 0x1a, 0xb3, 0x25, 
0x37, 0x4d, 0xf8, 0x3e,
+       0x88, 0xa8, 0xfd, 0x50, 0xcb, 0xae, 0x41, 0xf0, 0x14, 0x20, 0x48, 0x7b, 
0x77, 0x3c, 0xb5, 0x3b,
+       0xba, 0x8b, 0x72, 0x16, 0xec, 0xc3, 0xee, 0xaf, 0x5c, 0xdc, 0x52, 0x31, 
0x9e, 0xda, 0x87, 0x1a,
+       0x5d, 0xaf, 0xfb, 0x53, 0x38, 0xae, 0xfb, 0x07, 0x38, 0x84, 0x03, 0x9a, 
0xd9, 0x27, 0x4a, 0x64,
+       0x1d, 0xbc, 0xe7, 0x95, 0xcd, 0xe8, 0xc0, 0x7e, 0x24, 0x27, 0x01, 0xf3, 
0xe9, 0x68, 0xc1, 0xfd,
+       0x1b, 0xdd, 0xc5, 0x0d, 0xaf, 0x60, 0x73, 0x7e, 0x01, 0x2c, 0x6e, 0xdb, 
0xbb, 0x40, 0x2a, 0x3c,
+       0x86, 0x26, 0x15, 0x82, 0x8b, 0xb4, 0xf1, 0xcd, 0x02, 0x47, 0x80, 0xb3, 
0x8a, 0xaf, 0x6d, 0xe9,
+       0x33, 0xe8, 0xd5, 0x9e, 0x81, 0x57, 0x43, 0x70, 0xfe, 0x6e, 0x42, 0xfb, 
0x07, 0xaa, 0x44, 0xe0,
+       0xcb, 0xb7, 0x44, 0x11, 0xfc, 0x1c, 0x1e, 0x8a, 0x74, 0x18, 0xbc, 0xe7, 
0x8a, 0x84, 0xa9, 0xce,
+       0xa2, 0x11, 0x9f, 0x01, 0x16, 0x0c, 0x3f, 0x7f, 0x11, 0x05, 0xcc, 0x06, 
0x53, 0x31, 0x05, 0x24,
+       0x01, 0xf0, 0x35, 0xf4, 0x17, 0xd9, 0x98, 0xc8, 0xcf, 0x0e, 0x93, 0xe1, 
0xbe, 0xa6, 0xfd, 0x8b,
+       0x07, 0x5e, 0xc1, 0x27, 0xdb, 0x51, 0x93, 0x7b, 0x5f, 0x07, 0x39, 0xdd, 
0xee, 0xa6, 0x85, 0xbc,
+       0x84, 0x9e, 0x2c, 0xb5, 0xa4, 0xd1, 0x60, 0x66, 0x5b, 0x3d, 0x88, 0xdf, 
0x40, 0xbf, 0x16, 0x30,
+       0x99, 0x4d, 0xc1, 0x7d, 0x5c, 0xeb, 0xa1, 0x93, 0xbe, 0x80, 0xa3, 0xb8, 
0x30, 0x9b, 0x4c, 0x4a,
+       0x33, 0x19, 0xeb, 0x20, 0xfc, 0x1a, 0xec, 0x1a, 0xb3, 0x49, 0xd7, 0xd1, 
0xb4, 0x8f, 0x6a, 0x70,
+       0x9d, 0xec, 0x29, 0x74, 0xc9, 0x72, 0x9e, 0x0e, 0xe5, 0xac, 0x7f, 0x5a, 
0xba, 0x7f, 0xaa, 0x00,
+       0x7e, 0x05, 0x27, 0x15, 0xa3, 0x49, 0x73, 0xa0, 0x29, 0xbd, 0x0a, 0xaa, 
0x93, 0x3c, 0x86, 0x4e,
+       0xb1, 0xbf, 0xd2, 0xb1, 0x5a, 0xb2, 0xe2, 0x08, 0xda, 0x89, 0xce, 0xb4, 
0xbe, 0xd2, 0x31, 0xfa,
+       0x59, 0x5a, 0x9c, 0xb9, 0xaa, 0x73, 0x3f, 0x6c, 0xbc, 0x46, 0x4c, 0x89, 
0x95, 0x97, 0xe7, 0xf5,
+       0x5f, 0xc3, 0x61, 0xd9, 0x01, 0x0f, 0xa1, 0x71, 0x4b, 0x57, 0x69, 0xb3, 
0x25, 0x9f, 0x49, 0x9b,
+       0x2c, 0x49, 0x18, 0x53, 0x5d, 0xb1, 0x96, 0x67, 0x16, 0xe7, 0xf7, 0x5e, 
0x59, 0xce, 0x33, 0xe8,
+       0x7e, 0x4f, 0x49, 0xa8, 0x6e, 0x2e, 0x6f, 0xa8, 0x7f, 0xeb, 0x51, 0x19, 
0x87, 0x2a, 0xb9, 0x73,
+       0xa5, 0xa9, 0x91, 0xec, 0xce, 0x4d, 0x97, 0xce, 0x9f, 0x16, 0xb4, 0xd2, 
0x5c, 0xf8, 0x0a, 0x5a,
+       0x51, 0xaa, 0xde, 0xb4, 0xd6, 0x69, 0x51, 0xbd, 0x9b, 0xbd, 0x06, 0xd2, 
0xb5, 0x97, 0xb9, 0xf7,
+       0x7f, 0x87, 0x83, 0x12, 0x86, 0x08, 0xf7, 0x19, 0x89, 0x68, 0x2a, 0x5a, 
0x7f, 0x97, 0x86, 0x52,
+       0x22, 0xbd, 0x59, 0x18, 0x4a, 0x2f, 0xa1, 0x1d, 0x6d, 0x36, 0x4a, 0x5f, 
0xea, 0xed, 0x33, 0xac,
+       0x6e, 0xa1, 0x97, 0x77, 0x3b, 0xfb, 0xeb, 0xde, 0x26, 0xfb, 0x25, 0x67, 
0x4a, 0xf0, 0x10, 0xdf,
+       0x42, 0xf7, 0x8a, 0xaa, 0xd2, 0xcb, 0xe3, 0xc4, 0x35, 0x4f, 0x15, 0x37, 
0x7b, 0xaa, 0xb8, 0xa3,
+       0xe4, 0xa9, 0xd2, 0xaf, 0x9f, 0x20, 0xce, 0x0e, 0x5e, 0x00, 0x5e, 0x51, 
0xf5, 0x86, 0x4d, 0x3d,
+       0x2a, 0xa9, 0xca, 0xfe, 0xd9, 0xb6, 0x30, 0x35, 0x42, 0x9d, 0x1d, 0xfc, 
0x16, 0xf6, 0xff, 0x13,
+       0x7b, 0x8b, 0xdd, 0xd9, 0xc1, 0x73, 0x80, 0xab, 0xbb, 0x66, 0x7f, 0x03, 
0xed, 0x5c, 0x35, 0x6c,
+       0x25, 0xdb, 0x29, 0xb9, 0x52, 0x39, 0xce, 0xce, 0xc5, 0x39, 0x7c, 0xca, 
0xc5, 0xdc, 0x25, 0x0b,
+       0xe2, 0xdf, 0x50, 0x77, 0x11, 0x87, 0x92, 0x08, 0x37, 0x1b, 0xae, 0xd2, 
0x10, 0x2f, 0x7a, 0xb5,
+       0x0f, 0xc5, 0xeb, 0x07, 0x1a, 0xfd, 0xf2, 0x9f, 0x00, 0x00, 0x00, 0xff, 
0xff, 0x52, 0xef, 0x2d,
+       0x97, 0x48, 0x0a, 0x00, 0x00,
+}
diff --git a/pulsar-function-go/pb/Request.pb.go 
b/pulsar-function-go/pb/Request.pb.go
new file mode 100644
index 0000000..733d0b0
--- /dev/null
+++ b/pulsar-function-go/pb/Request.pb.go
@@ -0,0 +1,153 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: Request.proto
+
+package pb
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type ServiceRequest_ServiceRequestType int32
+
+const (
+       ServiceRequest_UPDATE     ServiceRequest_ServiceRequestType = 0
+       ServiceRequest_DELETE     ServiceRequest_ServiceRequestType = 1
+       ServiceRequest_INITIALIZE ServiceRequest_ServiceRequestType = 2
+)
+
+var ServiceRequest_ServiceRequestType_name = map[int32]string{
+       0: "UPDATE",
+       1: "DELETE",
+       2: "INITIALIZE",
+}
+var ServiceRequest_ServiceRequestType_value = map[string]int32{
+       "UPDATE":     0,
+       "DELETE":     1,
+       "INITIALIZE": 2,
+}
+
+func (x ServiceRequest_ServiceRequestType) String() string {
+       return proto.EnumName(ServiceRequest_ServiceRequestType_name, int32(x))
+}
+func (ServiceRequest_ServiceRequestType) EnumDescriptor() ([]byte, []int) {
+       return fileDescriptor_Request_db06a8eacb0614b9, []int{0, 0}
+}
+
+type ServiceRequest struct {
+       ServiceRequestType   ServiceRequest_ServiceRequestType 
`protobuf:"varint,1,opt,name=serviceRequestType,proto3,enum=proto.ServiceRequest_ServiceRequestType"
 json:"serviceRequestType,omitempty"`
+       RequestId            string                            
`protobuf:"bytes,2,opt,name=requestId,proto3" json:"requestId,omitempty"`
+       FunctionMetaData     *FunctionMetaData                 
`protobuf:"bytes,3,opt,name=functionMetaData,proto3" 
json:"functionMetaData,omitempty"`
+       WorkerId             string                            
`protobuf:"bytes,4,opt,name=workerId,proto3" json:"workerId,omitempty"`
+       XXX_NoUnkeyedLiteral struct{}                          `json:"-"`
+       XXX_unrecognized     []byte                            `json:"-"`
+       XXX_sizecache        int32                             `json:"-"`
+}
+
+func (m *ServiceRequest) Reset()         { *m = ServiceRequest{} }
+func (m *ServiceRequest) String() string { return proto.CompactTextString(m) }
+func (*ServiceRequest) ProtoMessage()    {}
+func (*ServiceRequest) Descriptor() ([]byte, []int) {
+       return fileDescriptor_Request_db06a8eacb0614b9, []int{0}
+}
+func (m *ServiceRequest) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_ServiceRequest.Unmarshal(m, b)
+}
+func (m *ServiceRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, 
error) {
+       return xxx_messageInfo_ServiceRequest.Marshal(b, m, deterministic)
+}
+func (dst *ServiceRequest) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_ServiceRequest.Merge(dst, src)
+}
+func (m *ServiceRequest) XXX_Size() int {
+       return xxx_messageInfo_ServiceRequest.Size(m)
+}
+func (m *ServiceRequest) XXX_DiscardUnknown() {
+       xxx_messageInfo_ServiceRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ServiceRequest proto.InternalMessageInfo
+
+func (m *ServiceRequest) GetServiceRequestType() 
ServiceRequest_ServiceRequestType {
+       if m != nil {
+               return m.ServiceRequestType
+       }
+       return ServiceRequest_UPDATE
+}
+
+func (m *ServiceRequest) GetRequestId() string {
+       if m != nil {
+               return m.RequestId
+       }
+       return ""
+}
+
+func (m *ServiceRequest) GetFunctionMetaData() *FunctionMetaData {
+       if m != nil {
+               return m.FunctionMetaData
+       }
+       return nil
+}
+
+func (m *ServiceRequest) GetWorkerId() string {
+       if m != nil {
+               return m.WorkerId
+       }
+       return ""
+}
+
+func init() {
+       proto.RegisterType((*ServiceRequest)(nil), "proto.ServiceRequest")
+       proto.RegisterEnum("proto.ServiceRequest_ServiceRequestType", 
ServiceRequest_ServiceRequestType_name, ServiceRequest_ServiceRequestType_value)
+}
+
+func init() { proto.RegisterFile("Request.proto", 
fileDescriptor_Request_db06a8eacb0614b9) }
+
+var fileDescriptor_Request_db06a8eacb0614b9 = []byte{
+       // 247 bytes of a gzipped FileDescriptorProto
+       0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 
0x0d, 0x4a, 0x2d, 0x2c,
+       0x4d, 0x2d, 0x2e, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 
0x53, 0x52, 0x7c, 0x6e,
+       0xa5, 0x79, 0xc9, 0x25, 0x99, 0xf9, 0x79, 0x10, 0x61, 0xa5, 0xe5, 0x4c, 
0x5c, 0x7c, 0xc1, 0xa9,
+       0x45, 0x65, 0x99, 0xc9, 0xa9, 0x50, 0xf5, 0x42, 0x11, 0x5c, 0x42, 0xc5, 
0x28, 0x22, 0x21, 0x95,
+       0x05, 0xa9, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x7c, 0x46, 0x1a, 0x10, 0x6d, 
0x7a, 0xa8, 0x5a, 0xd0,
+       0xb8, 0x20, 0xf5, 0x41, 0x58, 0xcc, 0x10, 0x92, 0xe1, 0xe2, 0x2c, 0x82, 
0x70, 0x3d, 0x53, 0x24,
+       0x98, 0x14, 0x18, 0x35, 0x38, 0x83, 0x10, 0x02, 0x42, 0xce, 0x5c, 0x02, 
0x69, 0x50, 0xc7, 0xf9,
+       0xa6, 0x96, 0x24, 0xba, 0x24, 0x96, 0x24, 0x4a, 0x30, 0x2b, 0x30, 0x6a, 
0x70, 0x1b, 0x89, 0x43,
+       0x6d, 0x75, 0x43, 0x93, 0x0e, 0xc2, 0xd0, 0x20, 0x24, 0xc5, 0xc5, 0x51, 
0x9e, 0x5f, 0x94, 0x9d,
+       0x5a, 0xe4, 0x99, 0x22, 0xc1, 0x02, 0xb6, 0x01, 0xce, 0x57, 0xb2, 0xe1, 
0x12, 0xc2, 0x74, 0xa8,
+       0x10, 0x17, 0x17, 0x5b, 0x68, 0x80, 0x8b, 0x63, 0x88, 0xab, 0x00, 0x03, 
0x88, 0xed, 0xe2, 0xea,
+       0xe3, 0x1a, 0xe2, 0x2a, 0xc0, 0x28, 0xc4, 0xc7, 0xc5, 0xe5, 0xe9, 0xe7, 
0x19, 0xe2, 0xe9, 0xe8,
+       0xe3, 0x19, 0xe5, 0x2a, 0xc0, 0xe4, 0xa4, 0xc3, 0xa5, 0x98, 0x5f, 0x94, 
0xae, 0x97, 0x58, 0x90,
+       0x98, 0x9c, 0x91, 0xaa, 0x57, 0x50, 0x9a, 0x53, 0x9c, 0x58, 0xa4, 0x07, 
0xb3, 0xbf, 0x18, 0xe2,
+       0x42, 0x27, 0x76, 0xa8, 0xc9, 0x49, 0x6c, 0x60, 0xbe, 0x31, 0x20, 0x00, 
0x00, 0xff, 0xff, 0xfa,
+       0x6c, 0xdd, 0x44, 0x86, 0x01, 0x00, 0x00,
+}
diff --git a/pulsar-function-go/pb/doc.go b/pulsar-function-go/pb/doc.go
new file mode 100644
index 0000000..aafd43a
--- /dev/null
+++ b/pulsar-function-go/pb/doc.go
@@ -0,0 +1,34 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+// Package api provides the protocol buffer messages that Pulsar
+// uses for the client/broker wire protocol.
+// See "Pulsar binary protocol specification" for more information.
+// https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/
+//
+// The protocol definition files are part of the main Pulsar source,
+// located within the Pulsar repository at:
+// 
https://github.com/apache/incubator-pulsar/tree/master/pulsar-common/src/main/proto
+//
+// The generated Go code was created from the source Pulsar files at git:
+//    tag:      v1.18-2614-g548c726b8
+//    revision: 548c726b8e7f0e163b1132c9ada6ba83d6bec572
+//
+// Files generated by the protoc-gen-go program should not be modified.
+package pb
diff --git a/pulsar-function-go/pb/generate.sh 
b/pulsar-function-go/pb/generate.sh
new file mode 100755
index 0000000..ab20949
--- /dev/null
+++ b/pulsar-function-go/pb/generate.sh
@@ -0,0 +1,81 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Bash script to automate the generation of the api
+# package.
+#
+# It uses the .proto files included in Pulsar's source
+# to compile the Go code capable of encoding/decoding the
+# wire format used by Pulsar brokers.
+#
+# Requirements:
+#  * protoc and protoc-gen-go are installed. See: 
https://github.com/golang/protobuf
+#  * The Pulsar project is checked out somewhere on the file system
+#    in order to source the .proto files
+echo "generate pulsar go function protobuf code..."
+
+set -euo pipefail
+
+pkg="api"
+
+defaultPulsarSrc="${HOME}/github.com/apache/pulsar"
+
+help="usage: ${0} <path to Pulsar repo (default \"${defaultPulsarSrc}\")>"
+
+pulsarSrc="${1-${defaultPulsarSrc}}"
+if [ ! -d "${pulsarSrc}" ]; then
+       echo "error: Pulsar source is not a directory: ${pulsarSrc}"
+       echo "${help}"
+       exit 1
+fi
+protoDefinitions="${pulsarSrc}/pulsar-functions/proto/src/main/proto"
+if [ ! -d "${protoDefinitions}" ]; then
+       echo "error: Proto definitions directory not found: ${protoDefinitions}"
+       echo "${help}"
+       exit 1
+fi
+protoFiles="${protoDefinitions}/*.proto"
+
+protoc \
+       --go_out=import_path=${pkg}:. \
+       --proto_path="${protoDefinitions}" ${protoFiles}
+
+pulsarGitRev=$(git -C ${pulsarSrc} rev-parse HEAD)
+pulsarGitTag=$(git -C ${pulsarSrc} describe --tags HEAD)
+
+# Generate godoc describing this package and the
+# git sha it was created from
+cat <<EOF > doc.go
+// Package ${pkg} provides the protocol buffer messages that Pulsar
+// uses for the client/broker wire protocol.
+// See "Pulsar binary protocol specification" for more information.
+// https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/
+//
+// The protocol definition files are part of the main Pulsar source,
+// located within the Pulsar repository at:
+// 
https://github.com/apache/pulsar/tree/master/pulsar-functions/proto/src/main/proto
+//
+// The generated Go code was created from the source Pulsar files at git:
+//    tag:      ${pulsarGitTag}
+//    revision: ${pulsarGitRev}
+//
+// Files generated by the protoc-gen-go program should not be modified.
+package ${pkg}
+EOF
diff --git a/pulsar-function-go/pf/context.go b/pulsar-function-go/pf/context.go
new file mode 100644
index 0000000..4818326
--- /dev/null
+++ b/pulsar-function-go/pf/context.go
@@ -0,0 +1,98 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pf
+
+import (
+       "context"
+)
+
+type FunctionContext struct {
+       instanceConf *instanceConf
+       userConfigs  map[string]interface{}
+       inputTopics  []string
+}
+
+func NewFuncContext() *FunctionContext {
+       fc := &FunctionContext{
+               instanceConf: newInstanceConf(),
+               userConfigs:  make(map[string]interface{}),
+       }
+       return fc
+}
+
+func (c *FunctionContext) GetInstanceID() int {
+       return c.instanceConf.instanceID
+}
+
+func (c *FunctionContext) GetInputTopics() []string {
+       return c.inputTopics
+}
+
+func (c *FunctionContext) GetOutputTopic() string {
+       return c.instanceConf.funcDetails.GetSink().Topic
+}
+
+func (c *FunctionContext) GetFuncTenant() string {
+       return c.instanceConf.funcDetails.Tenant
+}
+
+func (c *FunctionContext) GetFuncName() string {
+       return c.instanceConf.funcDetails.Name
+}
+
+func (c *FunctionContext) GetFuncNamespace() string {
+       return c.instanceConf.funcDetails.Namespace
+}
+
+func (c *FunctionContext) GetFuncID() string {
+       return c.instanceConf.funcID
+}
+
+func (c *FunctionContext) GetFuncVersion() string {
+       return c.instanceConf.funcVersion
+}
+
+func (c *FunctionContext) GetUserConfValue(key string) interface{} {
+       return c.userConfigs[key]
+}
+
+func (c *FunctionContext) GetUserConfMap() map[string]interface{} {
+       return c.userConfigs
+}
+
+// An unexported type to be used as the key for types in this package.
+// This prevents collisions with keys defined in other packages.
+type key struct{}
+
+// contextKey is the key for user.User values in Contexts. It is
+// unexported; clients use user.NewContext and user.FromContext
+// instead of using this key directly.
+var contextKey = &key{}
+
+// NewContext returns a new Context that carries value u.
+func NewContext(parent context.Context, fc *FunctionContext) context.Context {
+       return context.WithValue(parent, contextKey, fc)
+}
+
+// FromContext returns the User value stored in ctx, if any.
+func FromContext(ctx context.Context) (*FunctionContext, bool) {
+       fc, ok := ctx.Value(contextKey).(*FunctionContext)
+       return fc, ok
+}
diff --git a/pulsar-function-go/pf/context_test.go 
b/pulsar-function-go/pf/context_test.go
new file mode 100644
index 0000000..4ef4ac5
--- /dev/null
+++ b/pulsar-function-go/pf/context_test.go
@@ -0,0 +1,45 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pf
+
+import (
+       "context"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestContext(t *testing.T) {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       fc := NewFuncContext()
+       ctx = NewContext(ctx, fc)
+
+       ctx = context.WithValue(ctx, "pulsar", "function")
+
+       if resfc, ok := FromContext(ctx); ok {
+               assert.Equal(t, []string{"topic-1", "topic-2"}, 
resfc.GetInputTopics())
+               assert.Equal(t, "1.0.0", resfc.GetFuncVersion())
+               assert.Equal(t, "pulsar-function", resfc.GetFuncID())
+               assert.Equal(t, "go-function", resfc.GetFuncName())
+               assert.Equal(t, "topic-3", resfc.GetOutputTopic())
+       }
+       assert.Equal(t, "function", ctx.Value("pulsar"))
+}
diff --git a/pulsar-function-go/pf/function.go 
b/pulsar-function-go/pf/function.go
new file mode 100644
index 0000000..0be349d
--- /dev/null
+++ b/pulsar-function-go/pf/function.go
@@ -0,0 +1,172 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+//
+// This file borrows some of the implementations from {@link 
https://github.com/aws/aws-lambda-go/blob/master/lambda/handler.go}
+//  - errorHandler
+//  - validateArguments
+//  - validateReturns
+//  - NewFunction
+//  - Process
+//
+
+package pf
+
+import (
+       "context"
+       "fmt"
+       "reflect"
+
+       "github.com/apache/pulsar/pulsar-function-go/log"
+)
+
+type function interface {
+       process(ctx context.Context, input []byte) ([]byte, error)
+}
+
+type pulsarFunction func(ctx context.Context, input []byte) ([]byte, error)
+
+func (function pulsarFunction) process(ctx context.Context, input []byte) 
([]byte, error) {
+       output, err := function(ctx, input)
+       if err != nil {
+               log.Errorf("process function error:[%s]\n", err.Error())
+               return nil, err
+       }
+
+       return output, nil
+}
+
+func errorHandler(e error) pulsarFunction {
+       return func(ctx context.Context, input []byte) ([]byte, error) {
+               return nil, e
+       }
+}
+
+func validateArguments(handler reflect.Type) (bool, error) {
+       handlerTakesContext := false
+       if handler.NumIn() > 2 {
+               return false, fmt.Errorf("functions may not take more than two 
arguments, but function takes %d", handler.NumIn())
+       } else if handler.NumIn() > 0 {
+               contextType := reflect.TypeOf((*context.Context)(nil)).Elem()
+               argumentType := handler.In(0)
+               handlerTakesContext = argumentType.Implements(contextType)
+               if handler.NumIn() > 1 && !handlerTakesContext {
+                       return false, fmt.Errorf("function takes two arguments, 
but the first is not Context. got %s", argumentType.Kind())
+               }
+       }
+
+       return handlerTakesContext, nil
+}
+
+func validateReturns(handler reflect.Type) error {
+       errorType := reflect.TypeOf((*error)(nil)).Elem()
+       if handler.NumOut() > 2 {
+               return fmt.Errorf("function may not return more than two 
values")
+       } else if handler.NumOut() > 1 {
+               if !handler.Out(1).Implements(errorType) {
+                       return fmt.Errorf("function returns two values, but the 
second does not implement error")
+               }
+       } else if handler.NumOut() == 1 {
+               if !handler.Out(0).Implements(errorType) {
+                       return fmt.Errorf("function returns a single value, but 
it does not implement error")
+               }
+       }
+       return nil
+}
+
+func newFunction(inputFunc interface{}) function {
+       if inputFunc == nil {
+               return errorHandler(fmt.Errorf("function is nil"))
+       }
+       handler := reflect.ValueOf(inputFunc)
+       handlerType := reflect.TypeOf(inputFunc)
+       if handlerType.Kind() != reflect.Func {
+               return errorHandler(fmt.Errorf("function kind %s is not %s", 
handlerType.Kind(), reflect.Func))
+       }
+
+       takesContext, err := validateArguments(handlerType)
+       if err != nil {
+               return errorHandler(err)
+       }
+
+       if err := validateReturns(handlerType); err != nil {
+               return errorHandler(err)
+       }
+
+       return pulsarFunction(func(ctx context.Context, input []byte) ([]byte, 
error) {
+               // construct arguments
+               var args []reflect.Value
+               if takesContext {
+                       args = append(args, reflect.ValueOf(ctx))
+               }
+
+               if (handlerType.NumIn() == 1 && !takesContext) || 
handlerType.NumIn() == 2 {
+                       args = append(args, reflect.ValueOf(input))
+               }
+               response := handler.Call(args)
+
+               // convert return values into ([]byte, error)
+               var err error
+               if len(response) > 0 {
+                       if errVal, ok := 
response[len(response)-1].Interface().(error); ok {
+                               err = errVal
+                       }
+               }
+
+               var val []byte
+               if len(response) > 1 {
+                       val = response[0].Bytes()
+               }
+
+               return val, err
+       })
+}
+
+// Rules:
+//
+//     * handler must be a function
+//     * handler may take between 0 and two arguments.
+//     * if there are two arguments, the first argument must satisfy the 
"context.Context" interface.
+//     * handler may return between 0 and two arguments.
+//     * if there are two return values, the second argument must be an error.
+//     * if there is one return value it must be an error.
+//
+// Valid function signatures:
+//
+//     func ()
+//     func () error
+//     func (input) error
+//     func () (output, error)
+//     func (input) (output, error)
+//     func (context.Context) error
+//     func (context.Context, input) error
+//     func (context.Context) (output, error)
+//     func (context.Context, input) (output, error)
+//
+// Where "input" and "output" are types compatible with the "encoding/json" 
standard library.
+// See https://golang.org/pkg/encoding/json/#Unmarshal for how deserialization 
behaves
+func Start(funcName interface{}) {
+       function := newFunction(funcName)
+       goInstance := newGoInstance()
+       err := goInstance.startFunction(function)
+       if err != nil {
+               log.Fatal(err)
+               panic("start function failed, please check.")
+       }
+}
diff --git a/pulsar-function-go/pf/function_test.go 
b/pulsar-function-go/pf/function_test.go
new file mode 100644
index 0000000..30c45bb
--- /dev/null
+++ b/pulsar-function-go/pf/function_test.go
@@ -0,0 +1,182 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+//
+// This file borrows some of the implementations from {@link 
https://github.com/aws/aws-lambda-go/blob/master/lambda/function_test.go}
+//  - TestInvalidFunctions
+//
+
+package pf
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestInvalidFunctions(t *testing.T) {
+
+       testCases := []struct {
+               name     string
+               function interface{}
+               expected error
+       }{
+               {
+                       name:     "nil function",
+                       expected: errors.New("function is nil"),
+                       function: nil,
+               },
+               {
+                       name:     "function is not a function",
+                       expected: errors.New("function kind struct is not 
func"),
+                       function: struct{}{},
+               },
+               {
+                       name:     "function declares too many arguments",
+                       expected: errors.New("functions may not take more than 
two arguments, but function takes 3"),
+                       function: func(n context.Context, x string, y string) 
error {
+                               return nil
+                       },
+               },
+               {
+                       name:     "two argument function does not context as 
first argument",
+                       expected: errors.New("function takes two arguments, but 
the first is not Context. got string"),
+                       function: func(a string, x context.Context) error {
+                               return nil
+                       },
+               },
+               {
+                       name:     "function returns too many values",
+                       expected: errors.New("function may not return more than 
two values"),
+                       function: func() (error, error, error) {
+                               return nil, nil, nil
+                       },
+               },
+               {
+                       name:     "function returning two values does not 
declare error as the second return value",
+                       expected: errors.New("function returns two values, but 
the second does not implement error"),
+                       function: func() (error, string) {
+                               return nil, "hello"
+                       },
+               },
+               {
+                       name:     "function returning a single value does not 
implement error",
+                       expected: errors.New("function returns a single value, 
but it does not implement error"),
+                       function: func() string {
+                               return "hello"
+                       },
+               },
+               {
+                       name:     "no return value should not result in error",
+                       expected: nil,
+                       function: func() {
+                       },
+               },
+       }
+       for i, testCase := range testCases {
+               t.Run(fmt.Sprintf("testCase[%d] %s", i, testCase.name), func(t 
*testing.T) {
+                       pulsarFunction := newFunction(testCase.function)
+                       _, err := pulsarFunction.process(context.TODO(), 
make([]byte, 0))
+                       assert.Equal(t, testCase.expected, err)
+               })
+       }
+}
+
+type expected struct {
+       val []byte
+       err error
+}
+
+var (
+       input       = []byte{102, 117, 110, 99, 116, 105, 111, 110}
+)
+
+func TestInvokes(t *testing.T) {
+       testCases := []struct {
+               name     string
+               input    []byte
+               expected expected
+               function interface{}
+       }{
+               {
+                       input:    input,
+                       expected: expected{input, nil},
+                       function: func(in []byte) ([]byte, error) {
+                               return input, nil
+                       },
+               },
+               {
+                       input:    input,
+                       expected: expected{input, nil},
+                       function: func(in []byte) ([]byte, error) {
+                               return input, nil
+                       },
+               },
+               {
+                       input:    input,
+                       expected: expected{input, nil},
+                       function: func(ctx context.Context, in []byte) ([]byte, 
error) {
+                               return input, nil
+                       },
+               },
+               {
+                       input:    input,
+                       expected: expected{nil, errors.New("bad stuff")},
+                       function: func() error {
+                               return errors.New("bad stuff")
+                       },
+               },
+               {
+                       input:    input,
+                       expected: expected{nil, errors.New("bad stuff")},
+                       function: func() ([]byte, error) {
+                               return nil, errors.New("bad stuff")
+                       },
+               },
+               {
+                       input:    input,
+                       expected: expected{nil, errors.New("bad stuff")},
+                       function: func(e []byte) ([]byte, error) {
+                               return nil, errors.New("bad stuff")
+                       },
+               },
+               {
+                       input:    input,
+                       expected: expected{nil, errors.New("bad stuff")},
+                       function: func(ctx context.Context, e []byte) ([]byte, 
error) {
+                               return nil, errors.New("bad stuff")
+                       },
+               },
+       }
+       for i, testCase := range testCases {
+               t.Run(fmt.Sprintf("testCase[%d] %s", i, testCase.name), func(t 
*testing.T) {
+                       pulsarFunction := newFunction(testCase.function)
+                       response, err := pulsarFunction.process(context.TODO(), 
testCase.input)
+                       if testCase.expected.err != nil {
+                               assert.Equal(t, testCase.expected.err, err)
+                       } else {
+                               assert.NoError(t, err)
+                               assert.Equal(t, testCase.expected.val, response)
+                       }
+               })
+       }
+}
diff --git a/pulsar-function-go/pf/instance.go 
b/pulsar-function-go/pf/instance.go
new file mode 100644
index 0000000..9c4e670
--- /dev/null
+++ b/pulsar-function-go/pf/instance.go
@@ -0,0 +1,275 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pf
+
+import (
+       "context"
+       "math"
+       "time"
+
+       "github.com/apache/pulsar/pulsar-client-go/pulsar"
+       "github.com/apache/pulsar/pulsar-function-go/log"
+       "github.com/apache/pulsar/pulsar-function-go/pb"
+)
+
+type goInstance struct {
+       function  function
+       context   *FunctionContext
+       producer  pulsar.Producer
+       consumers map[string]pulsar.Consumer
+       client    pulsar.Client
+}
+
+// newGoInstance init goInstance and init function context
+func newGoInstance() *goInstance {
+       goInstance := &goInstance{
+               context:   NewFuncContext(),
+               consumers: make(map[string]pulsar.Consumer),
+       }
+       return goInstance
+}
+
+func (gi *goInstance) startFunction(function function) error {
+       gi.function = function
+       err := gi.setupClient()
+       if err != nil {
+               log.Errorf("setup client failed, error is:%v", err)
+               return err
+       }
+       err = gi.setupProducer()
+       if err != nil {
+               log.Errorf("setup producer failed, error is:%v", err)
+               return err
+       }
+       channel, err := gi.setupConsumer()
+       if err != nil {
+               log.Errorf("setup consumer failed, error is:%v", err)
+               return err
+       }
+
+CLOSE:
+       for {
+               select {
+               case cm := <-channel:
+                       msgInput := cm.Message
+                       atMostOnce := 
gi.context.instanceConf.funcDetails.ProcessingGuarantees == 
pb.ProcessingGuarantees_ATMOST_ONCE
+                       atLeastOnce := 
gi.context.instanceConf.funcDetails.ProcessingGuarantees == 
pb.ProcessingGuarantees_ATLEAST_ONCE
+                       autoAck := gi.context.instanceConf.funcDetails.AutoAck
+                       if autoAck && atMostOnce {
+                               gi.ackInputMessage(msgInput)
+                       }
+                       output, err := gi.handlerMsg(msgInput)
+                       if err != nil {
+                               log.Errorf("handler message error:%v", err)
+                               if autoAck && atLeastOnce {
+                                       gi.nackInputMessage(msgInput)
+                               }
+                               return err
+                       }
+                       gi.processResult(msgInput, output)
+
+               case <-time.After(getIdleTimeout(time.Millisecond * 
gi.context.instanceConf.killAfterIdleMs)):
+                       close(channel)
+                       break CLOSE
+               }
+       }
+
+       gi.close()
+       return nil
+}
+
+func (gi *goInstance) setupClient() error {
+       client, err := pulsar.NewClient(pulsar.ClientOptions{
+               URL: gi.context.instanceConf.pulsarServiceURL,
+       })
+       if err != nil {
+               log.Errorf("create client error:%v", err)
+               return err
+       }
+       gi.client = client
+       return nil
+}
+
+func (gi *goInstance) setupProducer() (err error) {
+       if gi.context.instanceConf.funcDetails.Sink.Topic != "" && 
len(gi.context.instanceConf.funcDetails.Sink.Topic) > 0 {
+               log.Debugf("Setting up producer for topic %s", 
gi.context.instanceConf.funcDetails.Sink.Topic)
+               properties := getProperties(getDefaultSubscriptionName(
+                       gi.context.instanceConf.funcDetails.Tenant,
+                       gi.context.instanceConf.funcDetails.Namespace,
+                       gi.context.instanceConf.funcDetails.Name), 
gi.context.instanceConf.instanceID)
+               gi.producer, err = 
gi.client.CreateProducer(pulsar.ProducerOptions{
+                       Topic:                   
gi.context.instanceConf.funcDetails.Sink.Topic,
+                       Properties:              properties,
+                       CompressionType:         pulsar.LZ4,
+                       BlockIfQueueFull:        true,
+                       Batching:                true,
+                       BatchingMaxPublishDelay: time.Millisecond * 10,
+                       // set send timeout to be infinity to prevent potential 
deadlock with consumer
+                       // that might happen when consumer is blocked due to 
unacked messages
+                       SendTimeout: 0,
+               })
+               if err != nil {
+                       log.Errorf("create producer error:%s", err.Error())
+                       return err
+               }
+       }
+       return nil
+}
+
+func (gi *goInstance) setupConsumer() (chan pulsar.ConsumerMessage, error) {
+       subscriptionType := pulsar.Shared
+       if int32(gi.context.instanceConf.funcDetails.Source.SubscriptionType) 
== pb.SubscriptionType_value["FAILOVER"] {
+               subscriptionType = pulsar.Failover
+       }
+
+       funcDetails := gi.context.instanceConf.funcDetails
+       subscriptionName := funcDetails.Tenant + "/" + funcDetails.Namespace + 
"/" + funcDetails.Name
+
+       properties := getProperties(getDefaultSubscriptionName(
+               funcDetails.Tenant,
+               funcDetails.Namespace,
+               funcDetails.Name), gi.context.instanceConf.instanceID)
+
+       channel := make(chan pulsar.ConsumerMessage)
+
+       var (
+               consumer pulsar.Consumer
+               err      error
+       )
+
+       for topic, consumerConf := range funcDetails.Source.InputSpecs {
+               log.Debugf("Setting up consumer for topic: %s with subscription 
name: %s", topic, subscriptionName)
+               if consumerConf.ReceiverQueueSize != nil {
+                       if consumerConf.IsRegexPattern {
+                               consumer, err = 
gi.client.Subscribe(pulsar.ConsumerOptions{
+                                       TopicsPattern:     topic,
+                                       ReceiverQueueSize: 
int(consumerConf.ReceiverQueueSize.Value),
+                                       SubscriptionName:  subscriptionName,
+                                       Properties:        properties,
+                                       Type:              subscriptionType,
+                                       MessageChannel:    channel,
+                               })
+                       } else {
+                               consumer, err = 
gi.client.Subscribe(pulsar.ConsumerOptions{
+                                       Topic:             topic,
+                                       SubscriptionName:  subscriptionName,
+                                       Properties:        properties,
+                                       Type:              subscriptionType,
+                                       ReceiverQueueSize: 
int(consumerConf.ReceiverQueueSize.Value),
+                                       MessageChannel:    channel,
+                               })
+                       }
+               } else {
+                       if consumerConf.IsRegexPattern {
+                               consumer, err = 
gi.client.Subscribe(pulsar.ConsumerOptions{
+                                       TopicsPattern:    topic,
+                                       SubscriptionName: subscriptionName,
+                                       Properties:       properties,
+                                       Type:             subscriptionType,
+                                       MessageChannel:   channel,
+                               })
+                       } else {
+                               consumer, err = 
gi.client.Subscribe(pulsar.ConsumerOptions{
+                                       Topic:            topic,
+                                       SubscriptionName: subscriptionName,
+                                       Properties:       properties,
+                                       Type:             subscriptionType,
+                                       MessageChannel:   channel,
+                               })
+
+                       }
+               }
+
+               if err != nil {
+                       log.Errorf("create consumer error:%s", err.Error())
+                       return nil, err
+               }
+               gi.consumers[topic] = consumer
+               gi.context.inputTopics = append(gi.context.inputTopics, topic)
+       }
+       return channel, nil
+}
+
+func (gi *goInstance) handlerMsg(input pulsar.Message) (output []byte, err 
error) {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       ctx = NewContext(ctx, gi.context)
+       msgInput := input.Payload()
+       return gi.function.process(ctx, msgInput)
+}
+
+func (gi *goInstance) processResult(msgInput pulsar.Message, output []byte) {
+       atLeastOnce := gi.context.instanceConf.funcDetails.ProcessingGuarantees 
== pb.ProcessingGuarantees_ATLEAST_ONCE
+       atMostOnce := gi.context.instanceConf.funcDetails.ProcessingGuarantees 
== pb.ProcessingGuarantees_ATMOST_ONCE
+       autoAck := gi.context.instanceConf.funcDetails.AutoAck
+
+       if output != nil && gi.context.instanceConf.funcDetails.Sink.Topic != 
"" {
+               asyncMsg := pulsar.ProducerMessage{
+                       Payload: output,
+               }
+               // Attempt to send the message asynchronously and handle the 
response
+               gi.producer.SendAsync(context.Background(), asyncMsg, 
func(message pulsar.ProducerMessage, e error) {
+                       if e != nil {
+                               if autoAck && atLeastOnce {
+                                       gi.nackInputMessage(msgInput)
+                               }
+                               log.Fatal(e)
+                       } else if autoAck && !atMostOnce {
+                               gi.ackInputMessage(msgInput)
+                       }
+               })
+       } else {
+               if autoAck && atLeastOnce {
+                       gi.ackInputMessage(msgInput)
+               }
+       }
+}
+
+// ackInputMessage doesn't produce any result or the user doesn't want the 
result.
+func (gi *goInstance) ackInputMessage(inputMessage pulsar.Message) {
+       gi.consumers[inputMessage.Topic()].Ack(inputMessage)
+}
+
+func (gi *goInstance) nackInputMessage(inputMessage pulsar.Message) {
+       //todo: in the current version of pulsar-client-go, we do not support 
this operation
+       //gi.consumers[inputMessage.Topic()].Nack(inputMessage)
+}
+
+func getIdleTimeout(timeoutMilliSecond time.Duration) time.Duration {
+       if timeoutMilliSecond < 0 {
+               return time.Duration(math.MaxInt64)
+       }
+       return timeoutMilliSecond
+}
+
+func (gi *goInstance) close() {
+       log.Info("closing go instance...")
+       if gi.producer != nil {
+               gi.producer.Close()
+       }
+       if gi.consumers != nil {
+               for _, consumer := range gi.consumers {
+                       consumer.Close()
+               }
+       }
+       if gi.client != nil {
+               gi.client.Close()
+       }
+}
diff --git a/pulsar-function-go/pf/instanceConf.go 
b/pulsar-function-go/pf/instanceConf.go
new file mode 100644
index 0000000..8690ef0
--- /dev/null
+++ b/pulsar-function-go/pf/instanceConf.go
@@ -0,0 +1,104 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pf
+
+import (
+       "fmt"
+       "time"
+
+       "github.com/apache/pulsar/pulsar-function-go/conf"
+       "github.com/apache/pulsar/pulsar-function-go/pb"
+)
+
+// This is the config passed to the Golang Instance. Contains all the 
information
+// passed to run functions
+type instanceConf struct {
+       instanceID       int
+       funcID           string
+       funcVersion      string
+       funcDetails      pb.FunctionDetails
+       maxBufTuples     int
+       port             int
+       clusterName      string
+       pulsarServiceURL string
+       killAfterIdleMs  time.Duration
+}
+
+func newInstanceConf() *instanceConf {
+       config := &conf.Conf{}
+       cfg := config.GetConf()
+       if cfg == nil {
+               panic("config file is nil.")
+       }
+       instanceConf := &instanceConf{
+               instanceID:       cfg.InstanceID,
+               funcID:           cfg.FuncID,
+               funcVersion:      cfg.FuncVersion,
+               maxBufTuples:     cfg.MaxBufTuples,
+               port:             cfg.Port,
+               clusterName:      cfg.ClusterName,
+               pulsarServiceURL: cfg.PulsarServiceURL,
+               killAfterIdleMs:  cfg.KillAfterIdleMs,
+               funcDetails: pb.FunctionDetails{
+                       Tenant:               cfg.Tenant,
+                       Namespace:            cfg.NameSpace,
+                       Name:                 cfg.Name,
+                       LogTopic:             cfg.LogTopic,
+                       ProcessingGuarantees: 
pb.ProcessingGuarantees(cfg.ProcessingGuarantees),
+                       SecretsMap:           cfg.SecretsMap,
+                       Runtime:              
pb.FunctionDetails_Runtime(cfg.Runtime),
+                       AutoAck:              cfg.AutoACK,
+                       Parallelism:          cfg.Parallelism,
+                       Source: &pb.SourceSpec{
+                               SubscriptionType: 
pb.SubscriptionType(cfg.SubscriptionType),
+                               InputSpecs: map[string]*pb.ConsumerSpec{
+                                       cfg.SourceSpecTopic: {
+                                               SchemaType:     
cfg.SourceSchemaType,
+                                               IsRegexPattern: 
cfg.IsRegexPatternSubscription,
+                                               ReceiverQueueSize: 
&pb.ConsumerSpec_ReceiverQueueSize{
+                                                       Value: 
cfg.ReceiverQueueSize,
+                                               },
+                                       },
+                               },
+                               TimeoutMs:           cfg.TimeoutMs,
+                               SubscriptionName:    cfg.SubscriptionName,
+                               CleanupSubscription: cfg.CleanupSubscription,
+                       },
+                       Sink: &pb.SinkSpec{
+                               Topic:      cfg.SinkSpecTopic,
+                               SchemaType: cfg.SinkSchemaType,
+                       },
+                       Resources: &pb.Resources{
+                               Cpu:  cfg.Cpu,
+                               Ram:  cfg.Ram,
+                               Disk: cfg.Disk,
+                       },
+                       RetryDetails: &pb.RetryDetails{
+                               MaxMessageRetries: cfg.MaxMessageRetries,
+                               DeadLetterTopic:   cfg.DeadLetterTopic,
+                       },
+               },
+       }
+       return instanceConf
+}
+
+func (ic *instanceConf) getInstanceName() string {
+       return "" + fmt.Sprintf("%d", ic.instanceID)
+}
diff --git a/pulsar-function-go/pf/instanceConf_test.go 
b/pulsar-function-go/pf/instanceConf_test.go
new file mode 100644
index 0000000..7b71b11
--- /dev/null
+++ b/pulsar-function-go/pf/instanceConf_test.go
@@ -0,0 +1,32 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pf
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestInstanceConf_GetInstanceName(t *testing.T) {
+       instanceConf := newInstanceConf()
+       str := instanceConf.getInstanceName()
+       assert.Equal(t, "101", str)
+}
diff --git a/pulsar-function-go/pf/util.go b/pulsar-function-go/pf/util.go
new file mode 100644
index 0000000..586fc86
--- /dev/null
+++ b/pulsar-function-go/pf/util.go
@@ -0,0 +1,41 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pf
+
+import (
+       "fmt"
+)
+
+func getProperties(fullyQualifiedName string, instanceId int) 
map[string]string {
+       propertiesMap := make(map[string]string)
+       propertiesMap["application"] = "pulsar-function"
+       propertiesMap["id"] = fullyQualifiedName
+       propertiesMap["instance_id"] = fmt.Sprintf("%d", instanceId)
+
+       return propertiesMap
+}
+
+func getDefaultSubscriptionName(tenant, namespace, name string) string {
+       return fmt.Sprintf("%s/%s/%s", tenant, namespace, name)
+}
+
+func getFullyQualifiedInstanceId(tenant, namespace, name string, instanceID 
int) string {
+       return fmt.Sprintf("%s/%s/%s:%d", tenant, namespace, name, instanceID)
+}
diff --git a/pulsar-function-go/pf/util_test.go 
b/pulsar-function-go/pf/util_test.go
new file mode 100644
index 0000000..1c620a5
--- /dev/null
+++ b/pulsar-function-go/pf/util_test.go
@@ -0,0 +1,52 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pf
+
+import (
+       "fmt"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+var (
+       tenant     = "pulsar"
+       namespace  = "function"
+       name       = "go"
+       instanceID = 100
+)
+
+func TestUtils(t *testing.T) {
+       fqfn := tenant + "/" + namespace + "/" + name
+
+       propertiesMap := make(map[string]string)
+       propertiesMap["application"] = "pulsar-function"
+       propertiesMap["id"] = "pulsar/function/go"
+       propertiesMap["instance_id"] = fmt.Sprintf("%d", instanceID)
+
+       expectedFQFN := getDefaultSubscriptionName(tenant, namespace, name)
+       assert.Equal(t, expectedFQFN, fqfn)
+
+       actualtMap := getProperties(fqfn, 100)
+       assert.Equal(t, propertiesMap, actualtMap)
+
+       expectedRes := getFullyQualifiedInstanceId(tenant, namespace, name, 
instanceID)
+       assert.Equal(t, expectedRes, "pulsar/function/go:100")
+}
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index 4e7dfc1..c435c09 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -49,6 +49,7 @@ message FunctionDetails {
     enum Runtime {
         JAVA = 0;
         PYTHON = 1;
+        GO = 3;
     }
     string tenant = 1;
     string namespace = 2;
@@ -70,9 +71,9 @@ message FunctionDetails {
 }
 
 message ConsumerSpec {
-       string schemaType   = 1;
+    string schemaType = 1;
     string serdeClassName = 2;
-       bool isRegexPattern = 3;
+    bool isRegexPattern = 3;
     message ReceiverQueueSize {
         int32 value = 1;
     }
@@ -88,12 +89,12 @@ message SourceSpec {
     // configs used only when source feeds into functions
     SubscriptionType subscriptionType = 3;
 
-       // @deprecated -- use topicsToSchema
-    map<string,string> topicsToSerDeClassName = 4 [deprecated = true];
+    // @deprecated -- use topicsToSchema
+    map<string, string> topicsToSerDeClassName = 4 [deprecated = true];
 
-       /**
-        *
-        */
+    /**
+     *
+     */
     map<string, ConsumerSpec> inputSpecs = 10;
 
     uint64 timeoutMs = 6;
@@ -121,9 +122,9 @@ message SinkSpec {
      * already present in the server */
     string builtin = 6;
 
-       /**
-        * Builtin schema type or custom schema class name
-        */
+    /**
+     * Builtin schema type or custom schema class name
+     */
     string schemaType = 7;
 }
 

Reply via email to