austince commented on a change in pull request #250: URL: https://github.com/apache/flink-statefun/pull/250#discussion_r686965949
########## File path: docs/content/docs/sdk/golang.md ########## @@ -0,0 +1,362 @@ +--- +title: Golang +weight: 4 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Golang SDK + +Stateful functions are the building blocks of applications; they are atomic units of isolation, distribution, and persistence. +As objects, they encapsulate the state of a single entity (e.g., a specific user, device, or session) and encode its behavior. +Stateful functions can interact with each other, and external systems, through message passing. + +To get started, add the Golang SDK as a dependency to your application. + +{{< selectable >}} +``` +require github.com/apache/flink-statefun/statefun-sdk-go +``` +{{< /selectable >}} + +## Defining A Stateful Function + +A stateful function is any class that implements the `StatefulFunction` interface. +In the following example, a `StatefulFunction` maintains a count for every user +of an application, emitting a customized greeting. + +```go +import ( + "fmt" + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" +) + +type Greeter struct { + SeenCount statefun.ValueSpec +} + +func (g *Greeter) Invoke(ctx statefun.Context, message statefun.Message) error { + if !message.Is(statefun.StringType) { + return fmt.Errorf("unexpected message type %s", message.ValueTypeName()) + } + + var name string + _ = message.As(statefun.StringType, &name) + + storage := ctx.Storage() + + var count int32 + storage.Get(g.SeenCount, &count) + + count += 1 + + storage.Set(g.SeenCount, count) + + ctx.Send(statefun.MessageBuilder{ + Target: statefun.Address{ + FunctionType: statefun.TypeNameFrom("com.example.fns/inbox"), + Id: name, + }, + Value: fmt.Sprintf("Hello %s for the %dth time!", name, count), + }) + + return nil +} +``` + +This code declares a greeter function that will be [registered](#serving-functions) under the logical type name `com.example.fns/greeter`. Type names must take the form `<namesapce>/<name>`. +It contains a single `ValueSpec`, which is implicitly scoped to the current address and stores an int32. + +Alternatively, a stateful function can be defined as a function pointer. + +```go +func greeter(ctx statefun.Context, message: statefun.Message) error { Review comment: ```suggestion func greeter(ctx statefun.Context, message statefun.Message) error { ``` ########## File path: statefun-sdk-go/v3/pkg/statefun/context.go ########## @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statefun + +import ( + "context" + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol" + "sync" + "time" +) + +// A Context contains information about the current function invocation, such as the invoked +// function instance's and caller's Address. It is also used for side effects as a result of +// the invocation such as send messages to other functions or egresses, and provides access to +// AddressScopedStorage scoped to the current Address. This type is also a context.Context +// and can be used to ensure any spawned go routines do not outlive the current function +// invocation. +type Context interface { + context.Context + + // Self is the current invoked function instance's Address. + Self() Address + + // Caller is the caller function instance's Address, if applicable. This is nil + // if the message was sent to this function via an ingress. + Caller() *Address + + // Send forwards out a MessageBuilder to another function. + Send(message MessageBuilder) + + // SendAfter forwards out a MessageBuilder to another function, after a specified time.Duration delay. + SendAfter(delay time.Duration, message MessageBuilder) + + // SendAfterWithCancellationToken forwards out a MessageBuilder to another function, + // after a specified time.Duration delay. The message is tagged with a non-empty, + //unique token to attach to this message, to be used for message cancellation + SendAfterWithCancellationToken(delay time.Duration, token string, message MessageBuilder) + + // CancelDelayedMessage cancels a delayed message (a message that was send via SendAfterWithCancellationToken). + // NOTE: this is a best-effort operation, since the message might have been already delivered. + // If the message was delivered, this is a no-op operation. + CancelDelayedMessage(token string) + + // SendEgress forwards out an EgressBuilder to an egress. + SendEgress(egress EgressBuilder) + + // Storage returns the AddressScopedStorage, providing access to stored values scoped to the + // current invoked function instance's Address (which is obtainable using Self()). + Storage() AddressScopedStorage +} + +type statefunContext struct { + sync.Mutex + context.Context + self Address + caller *Address + storage *storage + response *protocol.FromFunction_InvocationResponse +} + +func (s *statefunContext) Storage() AddressScopedStorage { + return s.storage +} + +func (s *statefunContext) Self() Address { + return s.self +} + +func (s *statefunContext) Caller() *Address { + return s.caller +} + +func (s *statefunContext) Send(message MessageBuilder) { + msg, err := message.ToMessage() + + if err != nil { + panic(err) + } + + invocation := &protocol.FromFunction_Invocation{ + Target: msg.target, + Argument: msg.typedValue, + } + + s.Lock() + s.response.OutgoingMessages = append(s.response.OutgoingMessages, invocation) + s.Unlock() +} + +func (s *statefunContext) SendAfter(delay time.Duration, message MessageBuilder) { + msg, err := message.ToMessage() + + if err != nil { + panic(err) + } + + invocation := &protocol.FromFunction_DelayedInvocation{ + Target: msg.target, + Argument: msg.typedValue, + DelayInMs: delay.Milliseconds(), + } + + s.Lock() + s.response.DelayedInvocations = append(s.response.DelayedInvocations, invocation) + s.Unlock() +} + +func (s *statefunContext) SendAfterWithCancellationToken(delay time.Duration, token string, message MessageBuilder) { + if len(token) == 0 { + panic("cancellation token cannot be empty") + } Review comment: `token` is settable by the user, right? Could they handle this error case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
