This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git
commit ac4a9c2e350672ce07f43f56a8ef1a41605c4d81 Author: sjwiesman <[email protected]> AuthorDate: Tue Aug 24 12:48:46 2021 -0500 [FLINK-23951] Golang Greeter --- go/greeter/.dockerignore | 3 ++ go/greeter/Dockerfile | 34 ++++++++++++ go/greeter/README.md | 55 +++++++++++++++++++ go/greeter/arch.png | Bin 0 -> 35343 bytes go/greeter/docker-compose.yml | 101 ++++++++++++++++++++++++++++++++++ go/greeter/go.mod | 21 ++++++++ go/greeter/go.sum | 20 +++++++ go/greeter/greeter.go | 123 ++++++++++++++++++++++++++++++++++++++++++ go/greeter/input-example.json | 2 + go/greeter/module.yaml | 40 ++++++++++++++ 10 files changed, 399 insertions(+) diff --git a/go/greeter/.dockerignore b/go/greeter/.dockerignore new file mode 100644 index 0000000..c1e502b --- /dev/null +++ b/go/greeter/.dockerignore @@ -0,0 +1,3 @@ +.idea +venv/ +checkpoint-dir/ diff --git a/go/greeter/Dockerfile b/go/greeter/Dockerfile new file mode 100644 index 0000000..525c965 --- /dev/null +++ b/go/greeter/Dockerfile @@ -0,0 +1,34 @@ + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM golang:1.16-alpine +RUN apk add --no-cache git + +RUN mkdir -p /app +WORKDIR /app + +COPY go.mod ./ +COPY go.sum ./ +RUN go mod download + +COPY *.go ./ + +RUN go build -o /greeter + +EXPOSE 8000 + +CMD [ "/greeter" ] diff --git a/go/greeter/README.md b/go/greeter/README.md new file mode 100644 index 0000000..44b18c6 --- /dev/null +++ b/go/greeter/README.md @@ -0,0 +1,55 @@ +# The Greeter Example + +This is a simple example of a stateful functions application implemented in `Go`. + +In this example, we imagine a service that computes personalized greetings. +Our service, consist out of the following components: + +* `kafka ingress` - This component forwards messages produced to the `names` kafka topic, +to the `person` stateful function. Messages produced to this topic has the following +schema `{ "name" : "bob"}`. + +* `person` - This function is triggered by the ingress defined above. +This function keeps track of the number of visits, and triggers the next functions: + +* `greeter` - This function, computes a personalized greeting, based on the name and the number +of visits of that user. The output of that computation is forward to a Kafka egress defined below. + +* `kafka egress` - This wraps a Kafka producer that emits `utf-8` greetings to the `greetings` Kafka topic. + + + + +## Running the example + +``` +docker-compose build +docker-compose up +``` + +To observe the customized greeting, as they appear in the `greetings` Kafka topic, run in a separate terminal: + +``` +docker-compose exec kafka kafka-console-consumer \ + --bootstrap-server kafka:9092 \ + --isolation-level read_committed \ + --from-beginning \ + --topic greetings +``` + +Try adding few more input lines to [input-example.json](input-example.json), and restart +the producer service. + +``` +docker-compose restart producer +``` + +Feeling curious? add the following print to the `person` function at [greeter.go](greeter.go): +```fmt.Printf("Hello there %d!", ctx.Self().Id)```. + +Then, rebuild and restart only the `functions` service. + +``` +docker-compose build functions +docker-compose up functions +``` diff --git a/go/greeter/arch.png b/go/greeter/arch.png new file mode 100644 index 0000000..1723613 Binary files /dev/null and b/go/greeter/arch.png differ diff --git a/go/greeter/docker-compose.yml b/go/greeter/docker-compose.yml new file mode 100644 index 0000000..9a07833 --- /dev/null +++ b/go/greeter/docker-compose.yml @@ -0,0 +1,101 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +version: "2.1" + +services: + + ############################################################### + # Functions service + ############################################################### + + functions: + build: + context: ./ + expose: + - "8000" + + ############################################################### + # StateFun runtime + ############################################################### + + statefun-manager: + image: apache/flink-statefun:3.1.0 + expose: + - "6123" + ports: + - "8081:8081" + environment: + ROLE: master + MASTER_HOST: statefun-manager + volumes: + - ./module.yaml:/opt/statefun/modules/greeter/module.yaml + + statefun-worker: + image: apache/flink-statefun:3.1.0 + expose: + - "6121" + - "6122" + depends_on: + - statefun-manager + - kafka + environment: + ROLE: worker + MASTER_HOST: statefun-manager + volumes: + - ./module.yaml:/opt/statefun/modules/greeter/module.yaml + + ############################################################### + # Kafka for ingress and egress + ############################################################### + + zookeeper: + image: confluentinc/cp-zookeeper:5.4.3 + environment: + ZOOKEEPER_CLIENT_PORT: "2181" + ports: + - "2181:2181" + + kafka: + image: confluentinc/cp-kafka:5.4.3 + ports: + - "9092:9092" + depends_on: + - zookeeper + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + + ############################################################### + # Simple Kafka JSON producer to simulate ingress events + ############################################################### + + producer: + image: ververica/statefun-playground-producer:latest + depends_on: + - kafka + - statefun-worker + environment: + APP_PATH: /mnt/input-example.json + APP_KAFKA_HOST: kafka:9092 + APP_KAFKA_TOPIC: names + APP_JSON_PATH: name + APP_DELAY_SECONDS: 1 + volumes: + - ./input-example.json:/mnt/input-example.json diff --git a/go/greeter/go.mod b/go/greeter/go.mod new file mode 100644 index 0000000..f4d5657 --- /dev/null +++ b/go/greeter/go.mod @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +module statefun.io/greeter + +go 1.16 + +require github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0 diff --git a/go/greeter/go.sum b/go/greeter/go.sum new file mode 100644 index 0000000..c35cbea --- /dev/null +++ b/go/greeter/go.sum @@ -0,0 +1,20 @@ +github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0 h1:uE56xfgn4c/ytXppcW/NQUNxtPM8NpvkbU/VFMuaXN4= +github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0/go.mod h1:uHiPJsi71a161NMH/ISkkSPIXenkcG9A2m+uhT8UlJ4= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/go/greeter/greeter.go b/go/greeter/greeter.go new file mode 100644 index 0000000..e5941bc --- /dev/null +++ b/go/greeter/greeter.go @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" + "net/http" +) + +type GreetRequest struct { + Name string `json:"name"` + Visits int32 `json:"visits"` +} + +var ( + PersonTypeName = statefun.TypeNameFrom("example/person") + GreeterTypeName = statefun.TypeNameFrom("example/greeter") + KafkaEgressTypeName = statefun.TypeNameFrom("example/greets") + GreetRequestType = statefun.MakeJsonType(statefun.TypeNameFrom("example/GreetRequest")) +) + +type Person struct { + Visits statefun.ValueSpec +} + +func (p *Person) Invoke(ctx statefun.Context, message statefun.Message) error { + // update the visit count. + var visits int32 + ctx.Storage().Get(p.Visits, &visits) + + visits += 1 + + fmt.Printf("seen %d", visits) + ctx.Storage().Set(p.Visits, visits) + + // enrich the request with the number of visits. + var request GreetRequest + if err := message.As(GreetRequestType, &request); err != nil { + return fmt.Errorf("failed to deserialize greet reqeuest: %w", err) + } + request.Visits = visits + + // next, we will forward a message to a special greeter function, + // that will compute a personalized greeting based on the number + // of visits that this person has been seen. + ctx.Send(statefun.MessageBuilder{ + Target: statefun.Address{ + FunctionType: GreeterTypeName, + Id: request.Name, + }, + Value: request, + ValueType: GreetRequestType, + }) + + return nil +} + +func Greeter(ctx statefun.Context, message statefun.Message) error { + var request GreetRequest + if err := message.As(GreetRequestType, &request); err != nil { + return fmt.Errorf("failed to deserialize greet reqeuest: %w", err) + } + + greeting := computeGreeting(request.Name, request.Visits) + + ctx.SendEgress(statefun.KafkaEgressBuilder{ + Target: KafkaEgressTypeName, + Topic: "greetings", + Key: request.Name, + Value: []byte(greeting), + }) + + return nil +} + +func computeGreeting(name string, visits int32) string { + templates := []string{"", "Welcome %s", "Nice to see you again %s", "Third time is the charm %s"} + if visits < int32(len(templates)) { + return fmt.Sprintf(templates[visits], name) + } + + return fmt.Sprintf("Nice to see you for the %d-th time %s!", visits, name) +} + +func main() { + + builder := statefun.StatefulFunctionsBuilder() + + person := &Person{ + Visits: statefun.ValueSpec{ + Name: "visits", + ValueType: statefun.Int32Type, + }, + } + _ = builder.WithSpec(statefun.StatefulFunctionSpec{ + FunctionType: PersonTypeName, + States: []statefun.ValueSpec{person.Visits}, + Function: person, + }) + + _ = builder.WithSpec(statefun.StatefulFunctionSpec{ + FunctionType: GreeterTypeName, + Function: statefun.StatefulFunctionPointer(Greeter), + }) + + http.Handle("/statefun", builder.AsHandler()) + _ = http.ListenAndServe(":8000", nil) +} diff --git a/go/greeter/input-example.json b/go/greeter/input-example.json new file mode 100644 index 0000000..ad72aa8 --- /dev/null +++ b/go/greeter/input-example.json @@ -0,0 +1,2 @@ +{"name" : "Bob"} +{"name" : "Joe"} diff --git a/go/greeter/module.yaml b/go/greeter/module.yaml new file mode 100644 index 0000000..19a6efe --- /dev/null +++ b/go/greeter/module.yaml @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +kind: io.statefun.endpoints.v2/http +spec: + functions: example/* + urlPathTemplate: http://functions:8000/statefun + maxNumBatchRequests: 10000 +--- +kind: io.statefun.kafka.v1/ingress +spec: + id: example/names + address: kafka:9092 + consumerGroupId: my-group-id + startupPosition: + type: earliest + topics: + - topic: names + valueType: example/GreetRequest + targets: + - example/person +--- +kind: io.statefun.kafka.v1/egress +spec: + id: example/greets + address: kafka:9092 + deliverySemantic: + type: exactly-once + transactionTimeout: 15min
