This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git
commit f09617426baf77e9d8354fdb779f7ccead524bf0 Author: sjwiesman <[email protected]> AuthorDate: Tue Aug 24 12:54:58 2021 -0500 [FLINK-23951] Golang Showcase This closes #12 --- go/showcase/README.md | 64 ++++++++++++++ go/showcase/docker-compose.yml | 97 ++++++++++++++++++++ go/showcase/go.mod | 21 +++++ go/showcase/go.sum | 20 +++++ go/showcase/input-example.json | 2 + go/showcase/module.yaml | 41 +++++++++ go/showcase/pkg/showcase/part1/types.go | 84 ++++++++++++++++++ go/showcase/pkg/showcase/part1/user_login.go | 87 ++++++++++++++++++ go/showcase/pkg/showcase/part2/messaging.go | 108 +++++++++++++++++++++++ go/showcase/pkg/showcase/part3/egress.go | 67 ++++++++++++++ go/showcase/pkg/showcase/part4/storage.go | 94 ++++++++++++++++++++ go/showcase/pkg/showcase/part5/asyncops.go | 117 +++++++++++++++++++++++++ go/showcase/pkg/showcase/part6/greetings_fn.go | 70 +++++++++++++++ go/showcase/pkg/showcase/part6/profile.go | 28 ++++++ go/showcase/pkg/showcase/part6/serving.go | 70 +++++++++++++++ go/showcase/pkg/showcase/part6/user_fn.go | 95 ++++++++++++++++++++ 16 files changed, 1065 insertions(+) diff --git a/go/showcase/README.md b/go/showcase/README.md new file mode 100644 index 0000000..f50c23c --- /dev/null +++ b/go/showcase/README.md @@ -0,0 +1,64 @@ +# StateFun GoLang SDK Showcase + +This project is intended for new StateFun users that would like to start implementing their StateFun application functions using GoLang. +The tutorial is streamlined and split into a few parts which we recommend to go through a specific order, as lay out below. +Each part is demonstrated with some code snippets plus comments to guide you through the SDK fundamentals. + +## Prerequisites + +- golang +- docker-compose + +## Tutorial Sections + +### Type System + +This function demonstrates StateFun's type system using the GoLang SDK. + + +### Messaging Primitives + +[This function](pkg/showcase/part1/types.go) demonstrates how to send messages to other functions. + +### Sending messages to egresses + +To let your StateFun application interact with the outside world, functions may write messages +to egresses. [This function](pkg/showcase/part3/egress.go) demonstrates sending messages to an Apache Kafka or AWS Kinesis +egress, which is currently our most commonly used egresses that are natively supported by +StateFun. + +### Function state storage + +Consistent state is at the core of stateful functions. [This function](pkg/showcase/part4/storage.go) +demonstrates interacting with function state. + +### Asynchronous operations + +[This function](pkg/showcase/part5/asyncops.go) demonstrates performing asynchronous operations during a function invocation. It +is a common scenario for functions to have external dependencies in order for it to complete its +work, such as fetching enrichment information from remote databases. + +### Serving + +[This function](pkg/showcase/part6/serving.go) builds a full stateful functions application +and shows how they are exposed and deployed in the real world. Run this function locally +along with the stateful functions runtime! + +To actually start serving run from one terminal: +```bash +$ cd pkg/showcase/part6 +$ go build +$ ./part6 +``` + +And from another: +```docker-compose up``` + +# Next Steps + +The setup you executed in the last part of this tutorial is not how you'd normally deploy StateFun processes +and functions. It's a rather simplified setup to allow you to explore the interaction between +functions and the StateFun processes by setting debugger breakpoints in the function code in your IDE. + +We recommend now to take a look at a slightly more realistic setup, using Docker Compose, in the +[Greeter Docker Compose Example](../greeter). diff --git a/go/showcase/docker-compose.yml b/go/showcase/docker-compose.yml new file mode 100644 index 0000000..f20cc53 --- /dev/null +++ b/go/showcase/docker-compose.yml @@ -0,0 +1,97 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +version: "2.1" + +services: + + ############################################################### + # StateFun runtime + ############################################################### + + statefun-manager: + image: apache/flink-statefun:3.1.0 + expose: + - "6123" + ports: + - "8081:8081" + environment: + ROLE: master + MASTER_HOST: statefun-manager + volumes: + - ./module.yaml:/opt/statefun/modules/greeter/module.yaml + + statefun-worker: + image: apache/flink-statefun:3.1.0 + expose: + - "6121" + - "6122" + environment: + ROLE: worker + MASTER_HOST: statefun-manager + volumes: + - ./module.yaml:/opt/statefun/modules/greeter/module.yaml + + ############################################################### + # Kafka for ingress and egress + ############################################################### + + zookeeper: + image: confluentinc/cp-zookeeper:5.4.3 + environment: + ZOOKEEPER_CLIENT_PORT: "2181" + ports: + - "2181:2181" + + kafka: + image: confluentinc/cp-kafka:5.4.3 + ports: + - "9092:9092" + depends_on: + - zookeeper + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + + ############################################################### + # Forward a port 8000 from the host's machine + ############################################################### + + host-machine: + image: qoomon/docker-host@sha256:e0f021dd77c7c26d37b825ab2cbf73cd0a77ca993417da80a14192cb041937b0 + cap_add: [ 'NET_ADMIN', 'NET_RAW' ] + mem_limit: 8M + restart: on-failure + environment: + PORTS: 8000 + + ############################################################### + # Simple Kafka JSON producer to simulate ingress events + ############################################################### + + producer: + image: ververica/statefun-playground-producer:latest + environment: + APP_PATH: /mnt/input-example.json + APP_KAFKA_HOST: kafka:9092 + APP_KAFKA_TOPIC: logins + APP_JSON_PATH: user_id + APP_DELAY_SECONDS: 1 + volumes: + - ./input-example.json:/mnt/input-example.json diff --git a/go/showcase/go.mod b/go/showcase/go.mod new file mode 100644 index 0000000..dc342a0 --- /dev/null +++ b/go/showcase/go.mod @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +module statefun.io/showcase + +go 1.16 + +require github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0 diff --git a/go/showcase/go.sum b/go/showcase/go.sum new file mode 100644 index 0000000..c35cbea --- /dev/null +++ b/go/showcase/go.sum @@ -0,0 +1,20 @@ +github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0 h1:uE56xfgn4c/ytXppcW/NQUNxtPM8NpvkbU/VFMuaXN4= +github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0/go.mod h1:uHiPJsi71a161NMH/ISkkSPIXenkcG9A2m+uhT8UlJ4= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/go/showcase/input-example.json b/go/showcase/input-example.json new file mode 100644 index 0000000..6952457 --- /dev/null +++ b/go/showcase/input-example.json @@ -0,0 +1,2 @@ +{"user_id" : "bid", "user_name": "Bob", "login_type": "web"} +{"user_id" : "jid", "user_name": "Joe", "login_type": "mobile"} diff --git a/go/showcase/module.yaml b/go/showcase/module.yaml new file mode 100644 index 0000000..10ac7fd --- /dev/null +++ b/go/showcase/module.yaml @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +kind: io.statefun.endpoints.v2/http +spec: + functions: showcase.fns/* + urlPathTemplate: http://host-machine:8000/statefun + maxNumBatchRequests: 10000 +--- +kind: io.statefun.kafka.v1/ingress +spec: + id: showcase.io/names + address: kafka:9092 + consumerGroupId: my-group-id + startupPosition: + type: earliest + topics: + - topic: logins + valueType: showcase.types/userlogin + targets: + - showcase.fns/user +--- +kind: io.statefun.kafka.v1/egress +spec: + id: showcase.io/greets + address: kafka:9092 + deliverySemantic: + type: exactly-once + transactionTimeout: 15min diff --git a/go/showcase/pkg/showcase/part1/types.go b/go/showcase/pkg/showcase/part1/types.go new file mode 100644 index 0000000..57d85da --- /dev/null +++ b/go/showcase/pkg/showcase/part1/types.go @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package part1 + +import ( + "fmt" + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" +) + +// Types +// Showcase Part 1: Type System +// ============================ +// This function demonstrates StateFun's type system using the GoLang SDK. +// +// Core Type abstraction +// ===================== +// The core abstraction used by StateFun's type system is the Type interface, which +// consists of a few things that StateFun uses to handle messages and state values: +// +// A TypeName to identify the type. +// A TypeSerializer for serializing and deserializing instances of the type. +// +// Cross-language primitive types +// ============================== +// StateFun's type system has cross-language support for common primitive types, such as boolean, +// integer, long, etc. These primitive types have built-in Types implemented for them +// already, with predefined typenames. +// +// This is of course all transparent for the user, so you don't need to worry about it. Functions +// implemented in various languages (e.g. Java or Python) can message each other by directly sending +// supported primitive values as message arguments. Moreover, the type system is used for state +// values as well; so, you can expect that a function can safely read previous state after +// reimplementing it in a different language. We'll cover more on state storage access in later +// parts of the showcase series. +// +// Common custom types (e.g. JSON or Protobuf) +// =========================================== +// The type system is also very easily extensible to support custom message types, such as JSON +// or Protobuf messages. This is just a matter of implementing your own Type with a custom +// typename and serializer. +// +// StateFun makes this super easy by providing builder utilities to help you create a simple +// Type. Take a look at showcase_custom_types.py for few recommended ways to quickly create a StateFun Type +// for your JSON or Protobuf messages. +func Types(_ statefun.Context, message statefun.Message) error { + // All values, including messages and storage values, are handled via StateFun's type system. + // StateFun ships built-in primitive types that handles de-/serialization of messages across + // functions: + if message.IsBool() { + fmt.Printf("I've got a message with a boolean %v", message.AsBool()) + } else if message.IsInt32() { + fmt.Printf("I've got a message with an int32 %v", message.AsInt32()) + } else if message.IsInt64() { + fmt.Printf("I've got a message with an int64 %v", message.AsInt64()) + } else if message.IsFloat32() { + fmt.Printf("I've got a message with a float32 %v", message.AsFloat32()) + } else if message.IsFloat64() { + fmt.Printf("I've got a message with a float64 %v", message.AsFloat64()) + } else if message.IsString() { + fmt.Printf("I've got a message with a string %v", message.AsString()) + } else if message.Is(UserLoginType) { + var login UserLogin + if err := message.As(UserLoginType, &login); err != nil { + return fmt.Errorf("failed to deserialize user login: %w", err) + } + fmt.Printf("I've got a message with a login event %s", login) + } + + return nil +} diff --git a/go/showcase/pkg/showcase/part1/user_login.go b/go/showcase/pkg/showcase/part1/user_login.go new file mode 100644 index 0000000..f59e9a8 --- /dev/null +++ b/go/showcase/pkg/showcase/part1/user_login.go @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package part1 + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" + "strings" +) + +// UserLoginType is a custom statefun type that marshals data using +// standard Go JSON marshaling. +var UserLoginType = statefun.MakeJsonType(statefun.TypeNameFrom("showcase.types/userlogin")) + +// UserLogin is a struct for user login events, which are passed around +// in JSON form. +type UserLogin struct { + UserId string `json:"user_id"` + UserName string `json:"user_name"` + LoginType LoginType `json:"login_type"` +} + +func (u UserLogin) String() string { + b, _ := json.Marshal(&u) + return string(b) +} + +type LoginType int + +const ( + WEB LoginType = iota + MOBILE +) + +var toString = map[LoginType]string{ + WEB: "WEB", + MOBILE: "MOBILE", +} + +var toId = map[string]LoginType{ + "WEB": WEB, + "MOBILE": MOBILE, +} + +func (l LoginType) String() string { + return toString[l] +} + +func (l LoginType) MarshalJSON() ([]byte, error) { + buffer := bytes.NewBufferString(`"`) + buffer.WriteString(l.String()) + buffer.WriteString(`"`) + return buffer.Bytes(), nil +} + +func (l *LoginType) UnmarshalJSON(b []byte) error { + var s string + if err := json.Unmarshal(b, &s); err != nil { + return err + } + + s = strings.ToUpper(s) + + if login, ok := toId[s]; !ok { + return fmt.Errorf("unknown login type %s", s) + } else { + *l = login + } + + return nil +} diff --git a/go/showcase/pkg/showcase/part2/messaging.go b/go/showcase/pkg/showcase/part2/messaging.go new file mode 100644 index 0000000..4f002fe --- /dev/null +++ b/go/showcase/pkg/showcase/part2/messaging.go @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package part2 + +import ( + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" + "statefun.io/showcase/pkg/showcase/part1" + "time" +) + +var TargetFnTypeName = statefun.TypeNameFrom("showcase.fns/some-other-fn") + +// Messaging +// Showcase Part 2: Messaging Primitives +// +// ============================ +// +// This function demonstrates how to send messages to +// other functions. +// +// ============================ +// +// Besides sending messages to other functions, functions may also send +// Messages to the outside world via egresses. This is showcased by the next +// part of the series, EgressFn. +func Messaging(ctx statefun.Context, _ statefun.Message) error { + // You send messages to functions simply by specifying the target function's typename + // and the target instance id for that for function; StateFun handles the routing for you, + // without the need of any means for service discovery. + target := statefun.Address{ + FunctionType: TargetFnTypeName, + Id: "target-instance-id", + } + + // you can directly send primitive type values as messages, ... + ctx.Send(statefun.MessageBuilder{ + Target: target, + Value: true, + }) + ctx.Send(statefun.MessageBuilder{ + Target: target, + Value: int32(123), + }) + ctx.Send(statefun.MessageBuilder{ + Target: target, + Value: int64(-19911108123046639), + }) + ctx.Send(statefun.MessageBuilder{ + Target: target, + Value: float32(3.14159), + }) + ctx.Send(statefun.MessageBuilder{ + Target: target, + Value: float64(3.14159e+11), + }) + ctx.Send(statefun.MessageBuilder{ + Target: target, + Value: "hello world", + }) + + // ... or, in general, a value of any custom defined type. + ctx.Send(statefun.MessageBuilder{ + Target: target, + Value: part1.UserLogin{ + UserId: "id", + UserName: "john smith", + LoginType: part1.MOBILE, + }, + ValueType: part1.UserLoginType, + }) + + // You can send messsages to any function including yourself! + ctx.Send(statefun.MessageBuilder{ + Target: ctx.Self(), + Value: "hello me!", + }) + + // Additionally, you may ask StateFun to send out a message after a specified delay. + // A common usage pattern is to send delayed messages to yourself to model timer triggers. + ctx.SendAfter(time.Duration(10)*time.Minute, statefun.MessageBuilder{ + Target: target, + Value: part1.UserLogin{ + UserId: "id", + UserName: "john smith", + LoginType: part1.MOBILE, + }, + ValueType: part1.UserLoginType, + }) + + // None of the above sends are blocking operations. + // All side-effects, such as messaging other functions, are collected + // and happen after this method returns. + return nil +} diff --git a/go/showcase/pkg/showcase/part3/egress.go b/go/showcase/pkg/showcase/part3/egress.go new file mode 100644 index 0000000..638e245 --- /dev/null +++ b/go/showcase/pkg/showcase/part3/egress.go @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package part3 + +import "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" + +// EgressFn +// Showcase Part 3: Sending messages to egresses +// +// ============================ +// +// To let your StateFun application interact with the outside world, functions may write messages +// to egresses. This function demonstrates sending messages to an Apache Kafka or AWS Kinesis +// egress, which is currently our most commonly used egresses that are natively supported by StateFun. +// +// ============================ +// +// Next, we recommend learning about StateFun's state storage management and how to access stored +// values. Head over to StateStorageFn. +func EgressFn(ctx statefun.Context, _ statefun.Message) error { + ctx.SendEgress(statefun.KafkaEgressBuilder{ + Target: KafkaEgressTypeName, + Topic: "my-kafka-topic", + Key: "my-key", + Value: []byte("hello world!"), + }) + + ctx.SendEgress(statefun.KinesisEgressBuilder{ + Target: KinesisEgressTypeName, + Stream: "my-kinesis-stream", + Value: "hello world again!", + PartitionKey: "my-partition-key", + ExplicitHashKey: "my-explicit-hash-key", + }) + + return nil +} + +// Egresses are associated with a unique TypeName. The typename is used to identify which +// egress a message should be sent to. +// +// StateFun currently has native support for using Apache Kafka topics or AWS Kinesis streams +// as egresses. +// +// Registering a Kafka or Kinesis egress under a given TypeName is beyond the scope of +// what this specific part of the tutorial is attempting to demonstrate. For the time being, +// you can simply assume that a Kafka egress has been registered under the typename +// literal `golang.showcase.io/my-kafka-egress` and a Kinesis egress has been registered +// under the typename literal `golang.showcase.io/my-kinesis-egress`. +var ( + KafkaEgressTypeName = statefun.TypeNameFrom("showcase.io/my-kafka-egress") + KinesisEgressTypeName = statefun.TypeNameFrom("showcase.io/my-kinesis-egress") +) diff --git a/go/showcase/pkg/showcase/part4/storage.go b/go/showcase/pkg/showcase/part4/storage.go new file mode 100644 index 0000000..22f6156 --- /dev/null +++ b/go/showcase/pkg/showcase/part4/storage.go @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package part4 + +import ( + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" + "statefun.io/showcase/pkg/showcase/part1" + "time" +) + +// StateStorageFn +// Showcase Part 3: Function State Storage +// +// ============================ +// +// One of StateFun's most powerful features is its capability of managing function state in a +// consistent and fault-tolerant manner. This function demonstrates accessing and manipulating +// persisted function state that is managed by StateFun. +// +// ============================ +// +// In some applications, functions may need to retrieve existing data from external services. Take a +// look at the next series in this showcase, AsyncOpsFn, on how to perform asynchronous operations in your functions. +type StateStorageFn struct { + IntValue statefun.ValueSpec + BoolValue statefun.ValueSpec + UserLoginValue statefun.ValueSpec +} + +func NewStateStorageFn() StateStorageFn { + return StateStorageFn{ + IntValue: statefun.ValueSpec{ + Name: "int_value", + ValueType: statefun.Int32Type, + }, + BoolValue: statefun.ValueSpec{ + Name: "bool_value", + ValueType: statefun.BoolType, + Expiration: statefun.ExpireAfterCall(time.Duration(5) * time.Hour), + }, + UserLoginValue: statefun.ValueSpec{ + Name: "user_login_value", + ValueType: part1.UserLoginType, + }, + } +} + +func (s StateStorageFn) Invoke(ctx statefun.Context, msg statefun.Message) error { + // each function invocation gets access to storage that is scoped to the current function + // instance's address, i.e. (function typename, instance id). For example, if (UserFn, "Gordon") + // was invoked, the values you get access to belongs specifically to user Gordon. + storage := ctx.Storage() + + // you can think of the storage having several "columns", with each column identified by an + // uniquely named ValueSpec. Using the ValueSpec instances, you can access the value of + // individual columns. Get returns a boolean indicating whether there is an existing + // value, so you can differentiate between missing and the zero value. + var storedInt int32 + _ = storage.Get(s.IntValue, &storedInt) + + var boolValue bool + exists := storage.Get(s.BoolValue, &boolValue) + + var login part1.UserLogin + _ = storage.Get(s.UserLoginValue, &login) + + // the ValueSpec instance is also used for manipulating the stored values, e.g. updating ... + storage.Set(s.IntValue, storedInt+1) + + if !exists { + boolValue = true + } + + storage.Set(s.BoolValue, boolValue) + + // ... or clearing the value! + storage.Remove(s.UserLoginValue) + + return nil +} diff --git a/go/showcase/pkg/showcase/part5/asyncops.go b/go/showcase/pkg/showcase/part5/asyncops.go new file mode 100644 index 0000000..dbf201b --- /dev/null +++ b/go/showcase/pkg/showcase/part5/asyncops.go @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package part5 + +import ( + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" + "time" +) + +// AsyncOpsFn +// Showcase Part 5: Asyncronous operations +// +// ============================ +// +// This function demonstrates performing async operations +// during a function invocation. It is a common scenario for +// functions to have external dependencies in order for it to +// complete its work, such as fetching enrichment information from +// a remote database. statefun.Context is a valid context.Context and +// can be used to coordinate work across multiple channels. +// +// ============================ +// +// After learning everything about implementing a stateful function, the last bit of this showcase +// series demonstrates how to expose the functions you implemented so that the StateFun runtime can +// reach them and dispatch message invocations. Take a look now at the showcase.go. +type AsyncOpsFn struct { + UserAge statefun.ValueSpec + FriendsList statefun.ValueSpec +} + +func NewAsyncOpsFn() AsyncOpsFn { + return AsyncOpsFn{ + UserAge: statefun.ValueSpec{ + Name: "age", + ValueType: statefun.Int32Type, + }, + FriendsList: statefun.ValueSpec{ + Name: "friends", + ValueType: statefun.MakeJsonType(statefun.TypeNameFrom("statefun.types/friends")), + }, + } +} + +func (a AsyncOpsFn) Invoke(ctx statefun.Context, message statefun.Message) error { + username := ctx.Self().Id + + ageChannel := getAgeFromRemoteDb(ctx, username) + friendsChannel := getFriendsListFromAnotherRemoteDatabase(ctx, username) + + var age int32 + var friends []string + + finished := false + + for !finished { + select { + case <-ctx.Done(): + return ctx.Err() + case age = <-ageChannel: + if friends != nil { + finished = true + } + case friends = <-friendsChannel: + if age != 0 { + finished = true + } + } + } + + ctx.Storage().Set(a.UserAge, age) + ctx.Storage().Set(a.FriendsList, friends) + + return nil +} + +// getAgeFromRemoteDb is a mock async request to fetch a users +// age information from a remote database. +func getAgeFromRemoteDb(_ statefun.Context, _ string) <-chan int32 { + c := make(chan int32, 1) + + go func() { + time.Sleep(2 * time.Second) + c <- 29 + close(c) + }() + + return c +} + +// getFriendsListFromAnotherRemoteDatabase is mock asynchronouse request to fetch a +// user's friends list from a remote database +func getFriendsListFromAnotherRemoteDatabase(_ statefun.Context, _ string) <-chan []string { + c := make(chan []string, 1) + + go func() { + time.Sleep(3 * time.Second) + c <- []string{"Igal", "Marta", "Stephan", "Seth", "Konstantin"} + close(c) + }() + + return c +} diff --git a/go/showcase/pkg/showcase/part6/greetings_fn.go b/go/showcase/pkg/showcase/part6/greetings_fn.go new file mode 100644 index 0000000..afbf2b2 --- /dev/null +++ b/go/showcase/pkg/showcase/part6/greetings_fn.go @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" + "log" +) + +var ( + GreetingTypeName = statefun.TypeNameFrom("showcase.fns/greetings") + KafkaEgress = statefun.TypeNameFrom("showcase.io/greets") + templates = []string{"Welcome %s!", "Nice to see you again %s.", "Third time is a charm %s!"} +) + +// greetings is a simple function that computes personalized +// greeting messages based on a give UserProfile. Then, it sends +// the greeting back to the user via an egress Kafka topic. +// +// For demonstration purposes, this function also prints to the +// console the generated greeting messages. +func greetings(ctx statefun.Context, message statefun.Message) error { + var profile UserProfile + _ = message.As(UserProfileType, &profile) + + greeting := computeGreeting(profile) + log.Printf("GreeingsFn (instance id: %s):\t%s", ctx.Self().Id, greeting) + + ctx.SendEgress(statefun.KafkaEgressBuilder{ + Target: KafkaEgress, + Topic: "greetings", + Key: ctx.Self().Id, + Value: greeting, + }) + + return nil +} + +func computeGreeting(profile UserProfile) string { + count := profile.SeenCount + + if count <= int32(len(templates)) { + return fmt.Sprintf(templates[count-1], profile.Name) + } + + return fmt.Sprintf("Nice to see you for the %dth time, %s! It has been %d milliseconds since we last say you.", + count, profile.Name, profile.LastSeenDeltaMs) +} + +func GreetingSpec() statefun.StatefulFunctionSpec { + return statefun.StatefulFunctionSpec{ + FunctionType: GreetingTypeName, + Function: statefun.StatefulFunctionPointer(greetings), + } +} diff --git a/go/showcase/pkg/showcase/part6/profile.go b/go/showcase/pkg/showcase/part6/profile.go new file mode 100644 index 0000000..98fd96c --- /dev/null +++ b/go/showcase/pkg/showcase/part6/profile.go @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" + +var UserProfileType = statefun.MakeJsonType(statefun.TypeNameFrom("golang.showcase.type/user_profile")) + +type UserProfile struct { + Name string `json:"name"` + LoginLocation string `json:"login_location"` + SeenCount int32 `json:"seen_count"` + LastSeenDeltaMs int64 `json:"last_seen_delta_ms"` +} diff --git a/go/showcase/pkg/showcase/part6/serving.go b/go/showcase/pkg/showcase/part6/serving.go new file mode 100644 index 0000000..9b22d69 --- /dev/null +++ b/go/showcase/pkg/showcase/part6/serving.go @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" + "log" + "net/http" +) + +// This is where everything comes together. In this section, we'd like to guide +// you through how to expose the functions you implemented via any HTTP webserver. +// In particular, stateful functions implements the standard GoLang http.Handler +// interface. +// +// This program is a standalone Go program that can be ran directly in the IDE, +// and exposes some functions using the standard Go web server. In practice you +// can choose any web server that can handle HTTP request and responses, even AWS Lambda. +// +// In addition to that, you will also run a "toy" set of StateFun runtime processes in +// a separate program to see StateFun in action for the first time. The StateFun processes +// are responsible for routing and dispatching the messages from ingresses, functions, +// and egresses, and also manages the function state storages to be consistent and fault-tolerant +// This is all done transparently in the background! +// +// Next Steps +// +// The setup you executed in this tutorial is NOT how you would normally deploy +// StateFun processes and functions. It's a rather simplified setup to allow you to +// explore the interaction between functions and the StateFun processes by setting debugger +// breakpoints in the function code. +// +// We recommend now to take a look at a slightly more realistic setup, using docker-compose +// in the Greeter Example https://github.com/apache/flink-statefun-playground/java/greeter/ +func main() { + + // register the functions that you want to serve... + builder := statefun.StatefulFunctionsBuilder() + if err := builder.WithSpec(UserFnSpec()); err != nil { + log.Fatalf("failed to register user function: %s", err) + } + + if err := builder.WithSpec(GreetingSpec()); err != nil { + log.Fatalf("failed to register greeting function: %s", err) + } + + // ... and build a request-reply handler for the registered functions, which understands how to + // decode invocation requests dispatched from StateFun / encode side-effects (e.g. state storage + // updates, or invoking other functions) as responses to be handled by StateFun. + handler := builder.AsHandler() + + // Use the request-reply handler along with your favorite HTTP web server framework + // to serve the functions! + http.Handle("/", handler) + _ = http.ListenAndServe(":8000", nil) +} diff --git a/go/showcase/pkg/showcase/part6/user_fn.go b/go/showcase/pkg/showcase/part6/user_fn.go new file mode 100644 index 0000000..aae128d --- /dev/null +++ b/go/showcase/pkg/showcase/part6/user_fn.go @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" + "statefun.io/showcase/pkg/showcase/part1" + "time" +) + +// UserFnTypeName Every registered function needs to be associated with a unique TypeName. +// A function's typename is usd by other functions and ingresses to have messages +// addressed to them. +var UserFnTypeName = statefun.TypeNameFrom("showcase.fns/user") + +type UserFn struct { + Seen statefun.ValueSpec + SeenTimestampMillis statefun.ValueSpec +} + +// UserFnSpec specification of the functions. This can be used +// to register the function - see main. +func UserFnSpec() statefun.StatefulFunctionSpec { + function := UserFn{ + Seen: statefun.ValueSpec{ + Name: "seen_count", + ValueType: statefun.Int32Type, + }, + SeenTimestampMillis: statefun.ValueSpec{ + Name: "seen_timestamp_millis", + ValueType: statefun.Int64Type, + }, + } + + return statefun.StatefulFunctionSpec{ + FunctionType: UserFnTypeName, + States: []statefun.ValueSpec{function.Seen, function.SeenTimestampMillis}, + Function: function, + } +} + +func (u UserFn) Invoke(ctx statefun.Context, message statefun.Message) error { + if !message.Is(part1.UserLoginType) { + return fmt.Errorf("unexpected message type %s", message.ValueTypeName()) + } + + var login part1.UserLogin + _ = message.As(part1.UserLoginType, &login) + + var count int32 + _ = ctx.Storage().Get(u.Seen, &count) + count++ + + now := time.Now().UnixNano() / int64(time.Millisecond) + var lastSeenTimestampMillis int64 + if exists := ctx.Storage().Get(u.SeenTimestampMillis, &lastSeenTimestampMillis); !exists { + lastSeenTimestampMillis = now + } + + ctx.Storage().Set(u.Seen, count) + ctx.Storage().Set(u.SeenTimestampMillis, lastSeenTimestampMillis) + + profile := UserProfile{ + Name: login.UserName, + LoginLocation: login.LoginType.String(), + SeenCount: count, + LastSeenDeltaMs: now - lastSeenTimestampMillis, + } + + ctx.Send(statefun.MessageBuilder{ + Target: statefun.Address{ + FunctionType: GreetingTypeName, + Id: login.UserName, + }, + Value: profile, + ValueType: UserProfileType, + }) + + return nil +}
