This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git

commit ac4a9c2e350672ce07f43f56a8ef1a41605c4d81
Author: sjwiesman <[email protected]>
AuthorDate: Tue Aug 24 12:48:46 2021 -0500

    [FLINK-23951] Golang Greeter
---
 go/greeter/.dockerignore      |   3 ++
 go/greeter/Dockerfile         |  34 ++++++++++++
 go/greeter/README.md          |  55 +++++++++++++++++++
 go/greeter/arch.png           | Bin 0 -> 35343 bytes
 go/greeter/docker-compose.yml | 101 ++++++++++++++++++++++++++++++++++
 go/greeter/go.mod             |  21 ++++++++
 go/greeter/go.sum             |  20 +++++++
 go/greeter/greeter.go         | 123 ++++++++++++++++++++++++++++++++++++++++++
 go/greeter/input-example.json |   2 +
 go/greeter/module.yaml        |  40 ++++++++++++++
 10 files changed, 399 insertions(+)

diff --git a/go/greeter/.dockerignore b/go/greeter/.dockerignore
new file mode 100644
index 0000000..c1e502b
--- /dev/null
+++ b/go/greeter/.dockerignore
@@ -0,0 +1,3 @@
+.idea
+venv/
+checkpoint-dir/
diff --git a/go/greeter/Dockerfile b/go/greeter/Dockerfile
new file mode 100644
index 0000000..525c965
--- /dev/null
+++ b/go/greeter/Dockerfile
@@ -0,0 +1,34 @@
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM golang:1.16-alpine
+RUN apk add --no-cache git
+
+RUN mkdir -p /app
+WORKDIR /app
+
+COPY go.mod ./
+COPY go.sum ./
+RUN go mod download
+
+COPY *.go ./
+
+RUN go build -o /greeter
+
+EXPOSE 8000
+
+CMD [ "/greeter" ]
diff --git a/go/greeter/README.md b/go/greeter/README.md
new file mode 100644
index 0000000..44b18c6
--- /dev/null
+++ b/go/greeter/README.md
@@ -0,0 +1,55 @@
+# The Greeter Example
+
+This is a simple example of a stateful functions application implemented in 
`Go`.
+
+In this example, we imagine a service that computes personalized greetings. 
+Our service, consist out of the following components:
+
+* `kafka ingress` - This component forwards messages produced to the `names` 
kafka topic,
+to the `person` stateful function. Messages produced to this topic has the 
following 
+schema `{ "name" : "bob"}`.
+
+* `person` - This function is triggered by the ingress defined above.
+This function keeps track of the number of visits, and triggers the next 
functions:
+
+* `greeter` - This function, computes a personalized greeting, based on the 
name and the number
+of visits of that user. The output of that computation is forward to a Kafka 
egress defined below.
+ 
+* `kafka egress` - This wraps a Kafka producer that emits `utf-8` greetings to 
the `greetings` Kafka topic.
+
+
+![Flow](arch.png "Flow")
+
+## Running the example
+
+```
+docker-compose build
+docker-compose up
+```
+
+To observe the customized greeting, as they appear in the `greetings` Kafka 
topic, run in a separate terminal:
+
+```
+docker-compose exec kafka kafka-console-consumer \
+     --bootstrap-server kafka:9092 \
+     --isolation-level read_committed \
+     --from-beginning \
+     --topic greetings
+```
+
+Try adding few more input lines to [input-example.json](input-example.json), 
and restart
+the producer service.
+
+```
+docker-compose restart producer
+``` 
+
+Feeling curious? add the following print to the `person` function at 
[greeter.go](greeter.go):
+```fmt.Printf("Hello there %d!", ctx.Self().Id)```.
+
+Then, rebuild and restart only the `functions` service.
+
+```
+docker-compose build functions
+docker-compose up functions
+```
diff --git a/go/greeter/arch.png b/go/greeter/arch.png
new file mode 100644
index 0000000..1723613
Binary files /dev/null and b/go/greeter/arch.png differ
diff --git a/go/greeter/docker-compose.yml b/go/greeter/docker-compose.yml
new file mode 100644
index 0000000..9a07833
--- /dev/null
+++ b/go/greeter/docker-compose.yml
@@ -0,0 +1,101 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+version: "2.1"
+
+services:
+
+  ###############################################################
+  #    Functions service
+  ###############################################################
+
+  functions:
+    build:
+      context: ./
+    expose:
+      - "8000"
+
+  ###############################################################
+  #    StateFun runtime
+  ###############################################################
+
+  statefun-manager:
+    image: apache/flink-statefun:3.1.0
+    expose:
+      - "6123"
+    ports:
+      - "8081:8081"
+    environment:
+      ROLE: master
+      MASTER_HOST: statefun-manager
+    volumes:
+      - ./module.yaml:/opt/statefun/modules/greeter/module.yaml
+
+  statefun-worker:
+    image: apache/flink-statefun:3.1.0
+    expose:
+      - "6121"
+      - "6122"
+    depends_on:
+      - statefun-manager
+      - kafka
+    environment:
+      ROLE: worker
+      MASTER_HOST: statefun-manager
+    volumes:
+      - ./module.yaml:/opt/statefun/modules/greeter/module.yaml
+
+  ###############################################################
+  #    Kafka for ingress and egress
+  ###############################################################
+
+  zookeeper:
+    image: confluentinc/cp-zookeeper:5.4.3
+    environment:
+      ZOOKEEPER_CLIENT_PORT: "2181"
+    ports:
+      - "2181:2181"
+
+  kafka:
+    image: confluentinc/cp-kafka:5.4.3
+    ports:
+      - "9092:9092"
+    depends_on:
+      - zookeeper
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+
+  ###############################################################
+  #    Simple Kafka JSON producer to simulate ingress events
+  ###############################################################
+
+  producer:
+    image: ververica/statefun-playground-producer:latest
+    depends_on:
+      - kafka
+      - statefun-worker
+    environment:
+      APP_PATH: /mnt/input-example.json
+      APP_KAFKA_HOST: kafka:9092
+      APP_KAFKA_TOPIC: names
+      APP_JSON_PATH: name
+      APP_DELAY_SECONDS: 1
+    volumes:
+      - ./input-example.json:/mnt/input-example.json
diff --git a/go/greeter/go.mod b/go/greeter/go.mod
new file mode 100644
index 0000000..f4d5657
--- /dev/null
+++ b/go/greeter/go.mod
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+module statefun.io/greeter
+
+go 1.16
+
+require github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0
diff --git a/go/greeter/go.sum b/go/greeter/go.sum
new file mode 100644
index 0000000..c35cbea
--- /dev/null
+++ b/go/greeter/go.sum
@@ -0,0 +1,20 @@
+github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0 
h1:uE56xfgn4c/ytXppcW/NQUNxtPM8NpvkbU/VFMuaXN4=
+github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0/go.mod 
h1:uHiPJsi71a161NMH/ISkkSPIXenkcG9A2m+uhT8UlJ4=
+github.com/davecgh/go-spew v1.1.0 
h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
+github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/golang/protobuf v1.5.0/go.mod 
h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
+github.com/google/go-cmp v0.5.5/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.7.0 
h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
+github.com/stretchr/testify v1.7.0/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 
h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod 
h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0 
h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
+google.golang.org/protobuf v1.26.0/go.mod 
h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c 
h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod 
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/go/greeter/greeter.go b/go/greeter/greeter.go
new file mode 100644
index 0000000..e5941bc
--- /dev/null
+++ b/go/greeter/greeter.go
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+       "fmt"
+       "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
+       "net/http"
+)
+
+type GreetRequest struct {
+       Name   string `json:"name"`
+       Visits int32  `json:"visits"`
+}
+
+var (
+       PersonTypeName      = statefun.TypeNameFrom("example/person")
+       GreeterTypeName     = statefun.TypeNameFrom("example/greeter")
+       KafkaEgressTypeName = statefun.TypeNameFrom("example/greets")
+       GreetRequestType    = 
statefun.MakeJsonType(statefun.TypeNameFrom("example/GreetRequest"))
+)
+
+type Person struct {
+       Visits statefun.ValueSpec
+}
+
+func (p *Person) Invoke(ctx statefun.Context, message statefun.Message) error {
+       // update the visit count.
+       var visits int32
+       ctx.Storage().Get(p.Visits, &visits)
+
+       visits += 1
+
+       fmt.Printf("seen %d", visits)
+       ctx.Storage().Set(p.Visits, visits)
+
+       // enrich the request with the number of visits.
+       var request GreetRequest
+       if err := message.As(GreetRequestType, &request); err != nil {
+               return fmt.Errorf("failed to deserialize greet reqeuest: %w", 
err)
+       }
+       request.Visits = visits
+
+       // next, we will forward a message to a special greeter function,
+       // that will compute a personalized greeting based on the number
+       // of visits that this person has been seen.
+       ctx.Send(statefun.MessageBuilder{
+               Target: statefun.Address{
+                       FunctionType: GreeterTypeName,
+                       Id:           request.Name,
+               },
+               Value:     request,
+               ValueType: GreetRequestType,
+       })
+
+       return nil
+}
+
+func Greeter(ctx statefun.Context, message statefun.Message) error {
+       var request GreetRequest
+       if err := message.As(GreetRequestType, &request); err != nil {
+               return fmt.Errorf("failed to deserialize greet reqeuest: %w", 
err)
+       }
+
+       greeting := computeGreeting(request.Name, request.Visits)
+
+       ctx.SendEgress(statefun.KafkaEgressBuilder{
+               Target: KafkaEgressTypeName,
+               Topic:  "greetings",
+               Key:    request.Name,
+               Value:  []byte(greeting),
+       })
+
+       return nil
+}
+
+func computeGreeting(name string, visits int32) string {
+       templates := []string{"", "Welcome %s", "Nice to see you again %s", 
"Third time is the charm %s"}
+       if visits < int32(len(templates)) {
+               return fmt.Sprintf(templates[visits], name)
+       }
+
+       return fmt.Sprintf("Nice to see you for the %d-th time %s!", visits, 
name)
+}
+
+func main() {
+
+       builder := statefun.StatefulFunctionsBuilder()
+
+       person := &Person{
+               Visits: statefun.ValueSpec{
+                       Name:      "visits",
+                       ValueType: statefun.Int32Type,
+               },
+       }
+       _ = builder.WithSpec(statefun.StatefulFunctionSpec{
+               FunctionType: PersonTypeName,
+               States:       []statefun.ValueSpec{person.Visits},
+               Function:     person,
+       })
+
+       _ = builder.WithSpec(statefun.StatefulFunctionSpec{
+               FunctionType: GreeterTypeName,
+               Function:     statefun.StatefulFunctionPointer(Greeter),
+       })
+
+       http.Handle("/statefun", builder.AsHandler())
+       _ = http.ListenAndServe(":8000", nil)
+}
diff --git a/go/greeter/input-example.json b/go/greeter/input-example.json
new file mode 100644
index 0000000..ad72aa8
--- /dev/null
+++ b/go/greeter/input-example.json
@@ -0,0 +1,2 @@
+{"name" : "Bob"}
+{"name" : "Joe"}
diff --git a/go/greeter/module.yaml b/go/greeter/module.yaml
new file mode 100644
index 0000000..19a6efe
--- /dev/null
+++ b/go/greeter/module.yaml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+kind: io.statefun.endpoints.v2/http
+spec:
+  functions: example/*
+  urlPathTemplate: http://functions:8000/statefun
+  maxNumBatchRequests: 10000
+---
+kind: io.statefun.kafka.v1/ingress
+spec:
+  id: example/names
+  address: kafka:9092
+  consumerGroupId: my-group-id
+  startupPosition:
+    type: earliest
+  topics:
+    - topic: names
+      valueType: example/GreetRequest
+      targets:
+        - example/person
+---
+kind: io.statefun.kafka.v1/egress
+spec:
+  id: example/greets
+  address: kafka:9092
+  deliverySemantic:
+    type: exactly-once
+    transactionTimeout: 15min

Reply via email to