This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git

commit f09617426baf77e9d8354fdb779f7ccead524bf0
Author: sjwiesman <[email protected]>
AuthorDate: Tue Aug 24 12:54:58 2021 -0500

    [FLINK-23951] Golang Showcase
    
    This closes #12
---
 go/showcase/README.md                          |  64 ++++++++++++++
 go/showcase/docker-compose.yml                 |  97 ++++++++++++++++++++
 go/showcase/go.mod                             |  21 +++++
 go/showcase/go.sum                             |  20 +++++
 go/showcase/input-example.json                 |   2 +
 go/showcase/module.yaml                        |  41 +++++++++
 go/showcase/pkg/showcase/part1/types.go        |  84 ++++++++++++++++++
 go/showcase/pkg/showcase/part1/user_login.go   |  87 ++++++++++++++++++
 go/showcase/pkg/showcase/part2/messaging.go    | 108 +++++++++++++++++++++++
 go/showcase/pkg/showcase/part3/egress.go       |  67 ++++++++++++++
 go/showcase/pkg/showcase/part4/storage.go      |  94 ++++++++++++++++++++
 go/showcase/pkg/showcase/part5/asyncops.go     | 117 +++++++++++++++++++++++++
 go/showcase/pkg/showcase/part6/greetings_fn.go |  70 +++++++++++++++
 go/showcase/pkg/showcase/part6/profile.go      |  28 ++++++
 go/showcase/pkg/showcase/part6/serving.go      |  70 +++++++++++++++
 go/showcase/pkg/showcase/part6/user_fn.go      |  95 ++++++++++++++++++++
 16 files changed, 1065 insertions(+)

diff --git a/go/showcase/README.md b/go/showcase/README.md
new file mode 100644
index 0000000..f50c23c
--- /dev/null
+++ b/go/showcase/README.md
@@ -0,0 +1,64 @@
+# StateFun GoLang SDK Showcase
+
+This project is intended for new StateFun users that would like to start 
implementing their StateFun application functions using GoLang.
+The tutorial is streamlined and split into a few parts which we recommend to 
go through a specific order, as lay out below.
+Each part is demonstrated with some code snippets plus comments to guide you 
through the SDK fundamentals.
+
+## Prerequisites
+
+- golang
+- docker-compose
+
+## Tutorial Sections
+
+###  Type System
+
+This function demonstrates StateFun's type system using the GoLang SDK.
+
+
+###  Messaging Primitives
+
+[This function](pkg/showcase/part1/types.go) demonstrates how to send messages 
to other functions.
+
+### Sending messages to egresses
+
+To let your StateFun application interact with the outside world, functions 
may write messages
+to egresses. [This function](pkg/showcase/part3/egress.go) demonstrates 
sending messages to an Apache Kafka or AWS Kinesis
+egress, which is currently our most commonly used egresses that are natively 
supported by
+StateFun.
+
+###  Function state storage
+
+Consistent state is at the core of stateful functions. [This 
function](pkg/showcase/part4/storage.go)
+demonstrates interacting with function state. 
+
+###  Asynchronous operations
+
+[This function](pkg/showcase/part5/asyncops.go) demonstrates performing 
asynchronous operations during a function invocation. It
+is a common scenario for functions to have external dependencies in order for 
it to complete its
+work, such as fetching enrichment information from remote databases.
+
+###  Serving
+
+[This function](pkg/showcase/part6/serving.go) builds a full stateful 
functions application
+and shows how they are exposed and deployed in the real world. Run this 
function locally 
+along with the stateful functions runtime!
+
+To actually start serving run from one terminal:
+```bash
+$ cd pkg/showcase/part6
+$ go build
+$ ./part6
+```
+
+And from another:
+```docker-compose up```
+
+# Next Steps
+
+The setup you executed in the last part of this tutorial is not how you'd 
normally deploy StateFun processes
+and functions. It's a rather simplified setup to allow you to explore the 
interaction between
+functions and the StateFun processes by setting debugger breakpoints in the 
function code in your IDE.
+
+We recommend now to take a look at a slightly more realistic setup, using 
Docker Compose, in the
+[Greeter Docker Compose Example](../greeter).
diff --git a/go/showcase/docker-compose.yml b/go/showcase/docker-compose.yml
new file mode 100644
index 0000000..f20cc53
--- /dev/null
+++ b/go/showcase/docker-compose.yml
@@ -0,0 +1,97 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+version: "2.1"
+
+services:
+
+  ###############################################################
+  #    StateFun runtime
+  ###############################################################
+
+  statefun-manager:
+    image: apache/flink-statefun:3.1.0
+    expose:
+      - "6123"
+    ports:
+      - "8081:8081"
+    environment:
+      ROLE: master
+      MASTER_HOST: statefun-manager
+    volumes:
+      - ./module.yaml:/opt/statefun/modules/greeter/module.yaml
+
+  statefun-worker:
+    image: apache/flink-statefun:3.1.0
+    expose:
+      - "6121"
+      - "6122"
+    environment:
+      ROLE: worker
+      MASTER_HOST: statefun-manager
+    volumes:
+      - ./module.yaml:/opt/statefun/modules/greeter/module.yaml
+
+  ###############################################################
+  #    Kafka for ingress and egress
+  ###############################################################
+
+  zookeeper:
+    image: confluentinc/cp-zookeeper:5.4.3
+    environment:
+      ZOOKEEPER_CLIENT_PORT: "2181"
+    ports:
+      - "2181:2181"
+
+  kafka:
+    image: confluentinc/cp-kafka:5.4.3
+    ports:
+      - "9092:9092"
+    depends_on:
+      - zookeeper
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+
+  ###############################################################
+  # Forward a port 8000 from the host's machine
+  ###############################################################
+  
+  host-machine:
+    image: 
qoomon/docker-host@sha256:e0f021dd77c7c26d37b825ab2cbf73cd0a77ca993417da80a14192cb041937b0
+    cap_add: [ 'NET_ADMIN', 'NET_RAW' ]
+    mem_limit: 8M
+    restart: on-failure
+    environment:
+      PORTS: 8000
+
+  ###############################################################
+  #    Simple Kafka JSON producer to simulate ingress events
+  ###############################################################
+
+  producer:
+    image: ververica/statefun-playground-producer:latest
+    environment:
+      APP_PATH: /mnt/input-example.json
+      APP_KAFKA_HOST: kafka:9092
+      APP_KAFKA_TOPIC: logins
+      APP_JSON_PATH: user_id
+      APP_DELAY_SECONDS: 1
+    volumes:
+      - ./input-example.json:/mnt/input-example.json
diff --git a/go/showcase/go.mod b/go/showcase/go.mod
new file mode 100644
index 0000000..dc342a0
--- /dev/null
+++ b/go/showcase/go.mod
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+module statefun.io/showcase
+
+go 1.16
+
+require github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0
diff --git a/go/showcase/go.sum b/go/showcase/go.sum
new file mode 100644
index 0000000..c35cbea
--- /dev/null
+++ b/go/showcase/go.sum
@@ -0,0 +1,20 @@
+github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0 
h1:uE56xfgn4c/ytXppcW/NQUNxtPM8NpvkbU/VFMuaXN4=
+github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0/go.mod 
h1:uHiPJsi71a161NMH/ISkkSPIXenkcG9A2m+uhT8UlJ4=
+github.com/davecgh/go-spew v1.1.0 
h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
+github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/golang/protobuf v1.5.0/go.mod 
h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
+github.com/google/go-cmp v0.5.5/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.7.0 
h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
+github.com/stretchr/testify v1.7.0/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 
h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod 
h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0 
h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
+google.golang.org/protobuf v1.26.0/go.mod 
h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c 
h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod 
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/go/showcase/input-example.json b/go/showcase/input-example.json
new file mode 100644
index 0000000..6952457
--- /dev/null
+++ b/go/showcase/input-example.json
@@ -0,0 +1,2 @@
+{"user_id" :  "bid", "user_name":  "Bob", "login_type":  "web"}
+{"user_id" :  "jid", "user_name":  "Joe", "login_type":  "mobile"}
diff --git a/go/showcase/module.yaml b/go/showcase/module.yaml
new file mode 100644
index 0000000..10ac7fd
--- /dev/null
+++ b/go/showcase/module.yaml
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+kind: io.statefun.endpoints.v2/http
+spec:
+  functions: showcase.fns/*
+  urlPathTemplate: http://host-machine:8000/statefun
+  maxNumBatchRequests: 10000
+---
+kind: io.statefun.kafka.v1/ingress
+spec:
+  id: showcase.io/names
+  address: kafka:9092
+  consumerGroupId: my-group-id
+  startupPosition:
+    type: earliest
+  topics:
+    - topic: logins
+      valueType: showcase.types/userlogin
+      targets:
+        - showcase.fns/user
+---
+kind: io.statefun.kafka.v1/egress
+spec:
+  id: showcase.io/greets
+  address: kafka:9092
+  deliverySemantic:
+    type: exactly-once
+    transactionTimeout: 15min
diff --git a/go/showcase/pkg/showcase/part1/types.go 
b/go/showcase/pkg/showcase/part1/types.go
new file mode 100644
index 0000000..57d85da
--- /dev/null
+++ b/go/showcase/pkg/showcase/part1/types.go
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package part1
+
+import (
+       "fmt"
+       "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
+)
+
+// Types
+//  Showcase Part 1: Type System
+//  ============================
+//  This function demonstrates StateFun's type system using the GoLang SDK.
+//
+//  Core Type abstraction
+//  =====================
+//  The core abstraction used by StateFun's type system is the Type interface, 
which
+//  consists of a few things that StateFun uses to handle messages and state 
values:
+//
+//  A TypeName to identify the type.
+//  A TypeSerializer for serializing and deserializing instances of the type.
+//
+//  Cross-language primitive types
+//  ==============================
+//  StateFun's type system has cross-language support for common primitive 
types, such as boolean,
+//  integer, long, etc. These primitive types have built-in Types implemented 
for them
+//  already, with predefined typenames.
+//
+//  This is of course all transparent for the user, so you don't need to worry 
about it. Functions
+//  implemented in various languages (e.g. Java or Python) can message each 
other by directly sending
+//  supported primitive values as message arguments. Moreover, the type system 
is used for state
+//  values as well; so, you can expect that a function can safely read 
previous state after
+//  reimplementing it in a different language. We'll cover more on state 
storage access in later
+//  parts of the showcase series.
+//
+//  Common custom types (e.g. JSON or Protobuf)
+//  ===========================================
+//  The type system is also very easily extensible to support custom message 
types, such as JSON
+//  or Protobuf messages. This is just a matter of implementing your own Type 
with a custom
+//  typename and serializer.
+//
+//  StateFun makes this super easy by providing builder utilities to help you 
create a simple
+//  Type. Take a look at showcase_custom_types.py for few recommended ways to 
quickly create a StateFun Type
+//  for your JSON or Protobuf messages.
+func Types(_ statefun.Context, message statefun.Message) error {
+       // All values, including messages and storage values, are handled via 
StateFun's type system.
+       // StateFun ships built-in primitive types that handles 
de-/serialization of messages across
+       // functions:
+       if message.IsBool() {
+               fmt.Printf("I've got a message with a boolean %v", 
message.AsBool())
+       } else if message.IsInt32() {
+               fmt.Printf("I've got a message with an int32 %v", 
message.AsInt32())
+       } else if message.IsInt64() {
+               fmt.Printf("I've got a message with an int64 %v", 
message.AsInt64())
+       } else if message.IsFloat32() {
+               fmt.Printf("I've got a message with a float32 %v", 
message.AsFloat32())
+       } else if message.IsFloat64() {
+               fmt.Printf("I've got a message with a float64 %v", 
message.AsFloat64())
+       } else if message.IsString() {
+               fmt.Printf("I've got a message with a string %v", 
message.AsString())
+       } else if message.Is(UserLoginType) {
+               var login UserLogin
+               if err := message.As(UserLoginType, &login); err != nil {
+                       return fmt.Errorf("failed to deserialize user login: 
%w", err)
+               }
+               fmt.Printf("I've got a message with a login event %s", login)
+       }
+
+       return nil
+}
diff --git a/go/showcase/pkg/showcase/part1/user_login.go 
b/go/showcase/pkg/showcase/part1/user_login.go
new file mode 100644
index 0000000..f59e9a8
--- /dev/null
+++ b/go/showcase/pkg/showcase/part1/user_login.go
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package part1
+
+import (
+       "bytes"
+       "encoding/json"
+       "fmt"
+       "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
+       "strings"
+)
+
+// UserLoginType is a custom statefun type that marshals data using
+// standard Go JSON marshaling.
+var UserLoginType = 
statefun.MakeJsonType(statefun.TypeNameFrom("showcase.types/userlogin"))
+
+// UserLogin is a struct for user login events, which are passed around
+// in JSON form.
+type UserLogin struct {
+       UserId    string    `json:"user_id"`
+       UserName  string    `json:"user_name"`
+       LoginType LoginType `json:"login_type"`
+}
+
+func (u UserLogin) String() string {
+       b, _ := json.Marshal(&u)
+       return string(b)
+}
+
+type LoginType int
+
+const (
+       WEB LoginType = iota
+       MOBILE
+)
+
+var toString = map[LoginType]string{
+       WEB:    "WEB",
+       MOBILE: "MOBILE",
+}
+
+var toId = map[string]LoginType{
+       "WEB":    WEB,
+       "MOBILE": MOBILE,
+}
+
+func (l LoginType) String() string {
+       return toString[l]
+}
+
+func (l LoginType) MarshalJSON() ([]byte, error) {
+       buffer := bytes.NewBufferString(`"`)
+       buffer.WriteString(l.String())
+       buffer.WriteString(`"`)
+       return buffer.Bytes(), nil
+}
+
+func (l *LoginType) UnmarshalJSON(b []byte) error {
+       var s string
+       if err := json.Unmarshal(b, &s); err != nil {
+               return err
+       }
+
+       s = strings.ToUpper(s)
+
+       if login, ok := toId[s]; !ok {
+               return fmt.Errorf("unknown login type %s", s)
+       } else {
+               *l = login
+       }
+
+       return nil
+}
diff --git a/go/showcase/pkg/showcase/part2/messaging.go 
b/go/showcase/pkg/showcase/part2/messaging.go
new file mode 100644
index 0000000..4f002fe
--- /dev/null
+++ b/go/showcase/pkg/showcase/part2/messaging.go
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package part2
+
+import (
+       "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
+       "statefun.io/showcase/pkg/showcase/part1"
+       "time"
+)
+
+var TargetFnTypeName = statefun.TypeNameFrom("showcase.fns/some-other-fn")
+
+// Messaging
+//   Showcase Part 2: Messaging Primitives
+//
+//   ============================
+//
+//   This function demonstrates how to send messages to
+//   other functions.
+//
+//   ============================
+//
+//   Besides sending messages to other functions, functions may also send
+//   Messages to the outside world via egresses. This is showcased by the next
+//   part of the series, EgressFn.
+func Messaging(ctx statefun.Context, _ statefun.Message) error {
+       // You send messages to functions simply by specifying the target 
function's typename
+       // and the target instance id for that for function; StateFun handles 
the routing for you,
+       // without the need of any means for service discovery.
+       target := statefun.Address{
+               FunctionType: TargetFnTypeName,
+               Id:           "target-instance-id",
+       }
+
+       // you can directly send primitive type values as messages, ...
+       ctx.Send(statefun.MessageBuilder{
+               Target: target,
+               Value:  true,
+       })
+       ctx.Send(statefun.MessageBuilder{
+               Target: target,
+               Value:  int32(123),
+       })
+       ctx.Send(statefun.MessageBuilder{
+               Target: target,
+               Value:  int64(-19911108123046639),
+       })
+       ctx.Send(statefun.MessageBuilder{
+               Target: target,
+               Value:  float32(3.14159),
+       })
+       ctx.Send(statefun.MessageBuilder{
+               Target: target,
+               Value:  float64(3.14159e+11),
+       })
+       ctx.Send(statefun.MessageBuilder{
+               Target: target,
+               Value:  "hello world",
+       })
+
+       // ... or, in general, a value of any custom defined type.
+       ctx.Send(statefun.MessageBuilder{
+               Target: target,
+               Value: part1.UserLogin{
+                       UserId:    "id",
+                       UserName:  "john smith",
+                       LoginType: part1.MOBILE,
+               },
+               ValueType: part1.UserLoginType,
+       })
+
+       // You can send messsages to any function including yourself!
+       ctx.Send(statefun.MessageBuilder{
+               Target: ctx.Self(),
+               Value:  "hello me!",
+       })
+
+       // Additionally, you may ask StateFun to send out a message after a 
specified delay.
+       // A common usage pattern is to send delayed messages to yourself to 
model timer triggers.
+       ctx.SendAfter(time.Duration(10)*time.Minute, statefun.MessageBuilder{
+               Target: target,
+               Value: part1.UserLogin{
+                       UserId:    "id",
+                       UserName:  "john smith",
+                       LoginType: part1.MOBILE,
+               },
+               ValueType: part1.UserLoginType,
+       })
+
+       // None of the above sends are blocking operations.
+       // All side-effects, such as messaging other functions, are collected
+       // and happen after this method returns.
+       return nil
+}
diff --git a/go/showcase/pkg/showcase/part3/egress.go 
b/go/showcase/pkg/showcase/part3/egress.go
new file mode 100644
index 0000000..638e245
--- /dev/null
+++ b/go/showcase/pkg/showcase/part3/egress.go
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package part3
+
+import "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
+
+// EgressFn
+//   Showcase Part 3: Sending messages to egresses
+//
+//   ============================
+//
+//   To let your StateFun application interact with the outside world, 
functions may write messages
+//   to egresses. This function demonstrates sending messages to an Apache 
Kafka or AWS Kinesis
+//   egress, which is currently our most commonly used egresses that are 
natively supported by StateFun.
+//
+//   ============================
+//
+//   Next, we recommend learning about StateFun's state storage management and 
how to access stored
+//   values. Head over to StateStorageFn.
+func EgressFn(ctx statefun.Context, _ statefun.Message) error {
+       ctx.SendEgress(statefun.KafkaEgressBuilder{
+               Target: KafkaEgressTypeName,
+               Topic:  "my-kafka-topic",
+               Key:    "my-key",
+               Value:  []byte("hello world!"),
+       })
+
+       ctx.SendEgress(statefun.KinesisEgressBuilder{
+               Target:          KinesisEgressTypeName,
+               Stream:          "my-kinesis-stream",
+               Value:           "hello world again!",
+               PartitionKey:    "my-partition-key",
+               ExplicitHashKey: "my-explicit-hash-key",
+       })
+
+       return nil
+}
+
+// Egresses are associated with a unique TypeName. The typename is used to 
identify which
+// egress a message should be sent to.
+//
+// StateFun currently has native support for using Apache Kafka topics or AWS 
Kinesis streams
+// as egresses.
+//
+// Registering a Kafka or Kinesis egress under a given TypeName is beyond the 
scope of
+// what this specific part of the tutorial is attempting to demonstrate. For 
the time being,
+// you can simply assume that a Kafka egress has been registered under the 
typename
+// literal `golang.showcase.io/my-kafka-egress` and a Kinesis egress has been 
registered
+// under the typename literal `golang.showcase.io/my-kinesis-egress`.
+var (
+       KafkaEgressTypeName   = 
statefun.TypeNameFrom("showcase.io/my-kafka-egress")
+       KinesisEgressTypeName = 
statefun.TypeNameFrom("showcase.io/my-kinesis-egress")
+)
diff --git a/go/showcase/pkg/showcase/part4/storage.go 
b/go/showcase/pkg/showcase/part4/storage.go
new file mode 100644
index 0000000..22f6156
--- /dev/null
+++ b/go/showcase/pkg/showcase/part4/storage.go
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package part4
+
+import (
+       "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
+       "statefun.io/showcase/pkg/showcase/part1"
+       "time"
+)
+
+// StateStorageFn
+//  Showcase Part 3: Function State Storage
+//
+//  ============================
+//
+//  One of StateFun's most powerful features is its capability of managing 
function state in a
+//  consistent and fault-tolerant manner. This function demonstrates accessing 
and manipulating
+//  persisted function state that is managed by StateFun.
+//
+//  ============================
+//
+//  In some applications, functions may need to retrieve existing data from 
external services. Take a
+//  look at the next series in this showcase, AsyncOpsFn, on how to perform 
asynchronous operations in your functions.
+type StateStorageFn struct {
+       IntValue       statefun.ValueSpec
+       BoolValue      statefun.ValueSpec
+       UserLoginValue statefun.ValueSpec
+}
+
+func NewStateStorageFn() StateStorageFn {
+       return StateStorageFn{
+               IntValue: statefun.ValueSpec{
+                       Name:      "int_value",
+                       ValueType: statefun.Int32Type,
+               },
+               BoolValue: statefun.ValueSpec{
+                       Name:       "bool_value",
+                       ValueType:  statefun.BoolType,
+                       Expiration: statefun.ExpireAfterCall(time.Duration(5) * 
time.Hour),
+               },
+               UserLoginValue: statefun.ValueSpec{
+                       Name:      "user_login_value",
+                       ValueType: part1.UserLoginType,
+               },
+       }
+}
+
+func (s StateStorageFn) Invoke(ctx statefun.Context, msg statefun.Message) 
error {
+       // each function invocation gets access to storage that is scoped to 
the current function
+       // instance's address, i.e. (function typename, instance id). For 
example, if (UserFn, "Gordon")
+       // was invoked, the values you get access to belongs specifically to 
user Gordon.
+       storage := ctx.Storage()
+
+       // you can think of the storage having several "columns", with each 
column identified by an
+       // uniquely named ValueSpec. Using the ValueSpec instances, you can 
access the value of
+       // individual columns. Get returns a boolean indicating whether there 
is an existing
+       // value, so you can differentiate between missing and the zero value.
+       var storedInt int32
+       _ = storage.Get(s.IntValue, &storedInt)
+
+       var boolValue bool
+       exists := storage.Get(s.BoolValue, &boolValue)
+
+       var login part1.UserLogin
+       _ = storage.Get(s.UserLoginValue, &login)
+
+       // the ValueSpec instance is also used for manipulating the stored 
values, e.g. updating ...
+       storage.Set(s.IntValue, storedInt+1)
+
+       if !exists {
+               boolValue = true
+       }
+
+       storage.Set(s.BoolValue, boolValue)
+
+       // ... or clearing the value!
+       storage.Remove(s.UserLoginValue)
+
+       return nil
+}
diff --git a/go/showcase/pkg/showcase/part5/asyncops.go 
b/go/showcase/pkg/showcase/part5/asyncops.go
new file mode 100644
index 0000000..dbf201b
--- /dev/null
+++ b/go/showcase/pkg/showcase/part5/asyncops.go
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package part5
+
+import (
+       "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
+       "time"
+)
+
+// AsyncOpsFn
+//   Showcase Part 5: Asyncronous operations
+//
+//   ============================
+//
+//   This function demonstrates performing async operations
+//   during a function invocation. It is a common scenario for
+//   functions to have external dependencies in order for it to
+//   complete its work, such as fetching enrichment information from
+//   a remote database. statefun.Context is a valid context.Context and
+//   can be used to coordinate work across multiple channels.
+//
+//   ============================
+//
+//   After learning everything about implementing a stateful function, the 
last bit of this showcase
+//   series demonstrates how to expose the functions you implemented so that 
the StateFun runtime can
+//   reach them and dispatch message invocations. Take a look now at the 
showcase.go.
+type AsyncOpsFn struct {
+       UserAge     statefun.ValueSpec
+       FriendsList statefun.ValueSpec
+}
+
+func NewAsyncOpsFn() AsyncOpsFn {
+       return AsyncOpsFn{
+               UserAge: statefun.ValueSpec{
+                       Name:      "age",
+                       ValueType: statefun.Int32Type,
+               },
+               FriendsList: statefun.ValueSpec{
+                       Name:      "friends",
+                       ValueType: 
statefun.MakeJsonType(statefun.TypeNameFrom("statefun.types/friends")),
+               },
+       }
+}
+
+func (a AsyncOpsFn) Invoke(ctx statefun.Context, message statefun.Message) 
error {
+       username := ctx.Self().Id
+
+       ageChannel := getAgeFromRemoteDb(ctx, username)
+       friendsChannel := getFriendsListFromAnotherRemoteDatabase(ctx, username)
+
+       var age int32
+       var friends []string
+
+       finished := false
+
+       for !finished {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case age = <-ageChannel:
+                       if friends != nil {
+                               finished = true
+                       }
+               case friends = <-friendsChannel:
+                       if age != 0 {
+                               finished = true
+                       }
+               }
+       }
+
+       ctx.Storage().Set(a.UserAge, age)
+       ctx.Storage().Set(a.FriendsList, friends)
+
+       return nil
+}
+
+// getAgeFromRemoteDb is a mock async request to fetch a users
+// age information from a remote database.
+func getAgeFromRemoteDb(_ statefun.Context, _ string) <-chan int32 {
+       c := make(chan int32, 1)
+
+       go func() {
+               time.Sleep(2 * time.Second)
+               c <- 29
+               close(c)
+       }()
+
+       return c
+}
+
+// getFriendsListFromAnotherRemoteDatabase is  mock asynchronouse request to 
fetch a
+// user's friends list from a remote database
+func getFriendsListFromAnotherRemoteDatabase(_ statefun.Context, _ string) 
<-chan []string {
+       c := make(chan []string, 1)
+
+       go func() {
+               time.Sleep(3 * time.Second)
+               c <- []string{"Igal", "Marta", "Stephan", "Seth", "Konstantin"}
+               close(c)
+       }()
+
+       return c
+}
diff --git a/go/showcase/pkg/showcase/part6/greetings_fn.go 
b/go/showcase/pkg/showcase/part6/greetings_fn.go
new file mode 100644
index 0000000..afbf2b2
--- /dev/null
+++ b/go/showcase/pkg/showcase/part6/greetings_fn.go
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+       "fmt"
+       "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
+       "log"
+)
+
+var (
+       GreetingTypeName = statefun.TypeNameFrom("showcase.fns/greetings")
+       KafkaEgress      = statefun.TypeNameFrom("showcase.io/greets")
+       templates        = []string{"Welcome %s!", "Nice to see you again %s.", 
"Third time is a charm %s!"}
+)
+
+// greetings is a simple function that computes personalized
+// greeting messages based on a give UserProfile. Then, it sends
+// the greeting back to the user via an egress Kafka topic.
+//
+// For demonstration purposes, this function also prints to the
+// console the generated greeting messages.
+func greetings(ctx statefun.Context, message statefun.Message) error {
+       var profile UserProfile
+       _ = message.As(UserProfileType, &profile)
+
+       greeting := computeGreeting(profile)
+       log.Printf("GreeingsFn (instance id: %s):\t%s", ctx.Self().Id, greeting)
+
+       ctx.SendEgress(statefun.KafkaEgressBuilder{
+               Target: KafkaEgress,
+               Topic:  "greetings",
+               Key:    ctx.Self().Id,
+               Value:  greeting,
+       })
+
+       return nil
+}
+
+func computeGreeting(profile UserProfile) string {
+       count := profile.SeenCount
+
+       if count <= int32(len(templates)) {
+               return fmt.Sprintf(templates[count-1], profile.Name)
+       }
+
+       return fmt.Sprintf("Nice to see you for the %dth time, %s! It has been 
%d milliseconds since we last say you.",
+               count, profile.Name, profile.LastSeenDeltaMs)
+}
+
+func GreetingSpec() statefun.StatefulFunctionSpec {
+       return statefun.StatefulFunctionSpec{
+               FunctionType: GreetingTypeName,
+               Function:     statefun.StatefulFunctionPointer(greetings),
+       }
+}
diff --git a/go/showcase/pkg/showcase/part6/profile.go 
b/go/showcase/pkg/showcase/part6/profile.go
new file mode 100644
index 0000000..98fd96c
--- /dev/null
+++ b/go/showcase/pkg/showcase/part6/profile.go
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
+
+var UserProfileType = 
statefun.MakeJsonType(statefun.TypeNameFrom("golang.showcase.type/user_profile"))
+
+type UserProfile struct {
+       Name            string `json:"name"`
+       LoginLocation   string `json:"login_location"`
+       SeenCount       int32  `json:"seen_count"`
+       LastSeenDeltaMs int64  `json:"last_seen_delta_ms"`
+}
diff --git a/go/showcase/pkg/showcase/part6/serving.go 
b/go/showcase/pkg/showcase/part6/serving.go
new file mode 100644
index 0000000..9b22d69
--- /dev/null
+++ b/go/showcase/pkg/showcase/part6/serving.go
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+       "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
+       "log"
+       "net/http"
+)
+
+// This is where everything comes together. In this section, we'd like to guide
+// you through how to expose the functions you implemented via any HTTP 
webserver.
+// In particular, stateful functions implements the standard GoLang 
http.Handler
+// interface.
+//
+// This program is a standalone Go program that can be ran directly in the IDE,
+// and exposes some functions using the standard Go web server. In practice you
+// can choose any web server that can handle HTTP request and responses, even 
AWS Lambda.
+//
+// In addition to that, you will also run a "toy" set of StateFun runtime 
processes in
+// a separate program to see StateFun in action for the first time. The 
StateFun processes
+// are responsible  for routing and dispatching the messages from ingresses, 
functions,
+// and egresses, and also manages the function state storages to be consistent 
and fault-tolerant
+// This is all done transparently in the background!
+//
+// Next Steps
+//
+// The setup you executed in this tutorial is NOT how you would normally deploy
+// StateFun processes and functions. It's a rather simplified setup to allow 
you to
+// explore the interaction between functions and the StateFun processes by 
setting debugger
+// breakpoints in the function code.
+//
+// We recommend now to take a look at a slightly more realistic setup, using 
docker-compose
+// in the Greeter Example 
https://github.com/apache/flink-statefun-playground/java/greeter/
+func main() {
+
+       // register the functions that you want to serve...
+       builder := statefun.StatefulFunctionsBuilder()
+       if err := builder.WithSpec(UserFnSpec()); err != nil {
+               log.Fatalf("failed to register user function: %s", err)
+       }
+
+       if err := builder.WithSpec(GreetingSpec()); err != nil {
+               log.Fatalf("failed to register greeting function: %s", err)
+       }
+
+       // ... and build a request-reply handler for the registered functions, 
which understands how to
+       // decode invocation requests dispatched from StateFun / encode 
side-effects (e.g. state storage
+       // updates, or invoking other functions) as responses to be handled by 
StateFun.
+       handler := builder.AsHandler()
+
+       // Use the request-reply handler along with your favorite HTTP web 
server framework
+       // to serve the functions!
+       http.Handle("/", handler)
+       _ = http.ListenAndServe(":8000", nil)
+}
diff --git a/go/showcase/pkg/showcase/part6/user_fn.go 
b/go/showcase/pkg/showcase/part6/user_fn.go
new file mode 100644
index 0000000..aae128d
--- /dev/null
+++ b/go/showcase/pkg/showcase/part6/user_fn.go
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+       "fmt"
+       "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
+       "statefun.io/showcase/pkg/showcase/part1"
+       "time"
+)
+
+// UserFnTypeName Every registered function needs to be associated with a 
unique TypeName.
+// A function's typename is usd by other functions and ingresses to have 
messages
+// addressed to them.
+var UserFnTypeName = statefun.TypeNameFrom("showcase.fns/user")
+
+type UserFn struct {
+       Seen                statefun.ValueSpec
+       SeenTimestampMillis statefun.ValueSpec
+}
+
+// UserFnSpec specification of the functions. This can be used
+// to register the function - see main.
+func UserFnSpec() statefun.StatefulFunctionSpec {
+       function := UserFn{
+               Seen: statefun.ValueSpec{
+                       Name:      "seen_count",
+                       ValueType: statefun.Int32Type,
+               },
+               SeenTimestampMillis: statefun.ValueSpec{
+                       Name:      "seen_timestamp_millis",
+                       ValueType: statefun.Int64Type,
+               },
+       }
+
+       return statefun.StatefulFunctionSpec{
+               FunctionType: UserFnTypeName,
+               States:       []statefun.ValueSpec{function.Seen, 
function.SeenTimestampMillis},
+               Function:     function,
+       }
+}
+
+func (u UserFn) Invoke(ctx statefun.Context, message statefun.Message) error {
+       if !message.Is(part1.UserLoginType) {
+               return fmt.Errorf("unexpected message type %s", 
message.ValueTypeName())
+       }
+
+       var login part1.UserLogin
+       _ = message.As(part1.UserLoginType, &login)
+
+       var count int32
+       _ = ctx.Storage().Get(u.Seen, &count)
+       count++
+
+       now := time.Now().UnixNano() / int64(time.Millisecond)
+       var lastSeenTimestampMillis int64
+       if exists := ctx.Storage().Get(u.SeenTimestampMillis, 
&lastSeenTimestampMillis); !exists {
+               lastSeenTimestampMillis = now
+       }
+
+       ctx.Storage().Set(u.Seen, count)
+       ctx.Storage().Set(u.SeenTimestampMillis, lastSeenTimestampMillis)
+
+       profile := UserProfile{
+               Name:            login.UserName,
+               LoginLocation:   login.LoginType.String(),
+               SeenCount:       count,
+               LastSeenDeltaMs: now - lastSeenTimestampMillis,
+       }
+
+       ctx.Send(statefun.MessageBuilder{
+               Target: statefun.Address{
+                       FunctionType: GreetingTypeName,
+                       Id:           login.UserName,
+               },
+               Value:     profile,
+               ValueType: UserProfileType,
+       })
+
+       return nil
+}

Reply via email to