This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git


The following commit(s) were added to refs/heads/master by this push:
     new 725202c  [FLINK-26340][statefun-golang-sdk] Add ability in Golang SDK 
to create new statefun.Context from existing one, but with a new underlying 
context.Context
725202c is described below

commit 725202cd69b78442d1287deec000ae2d52da4bda
Author: Galen Warren <[email protected]>
AuthorDate: Tue Feb 22 17:30:26 2022 -0500

    [FLINK-26340][statefun-golang-sdk] Add ability in Golang SDK to create new 
statefun.Context from existing one, but with a new underlying context.Context
    
    Change sync.Mutex in statefunContext to be *sync.Mutex, to make it 
copyable, and update construction of statefunContext accordingly
    Add WithContext to statefun.Context interface and implement on 
statefunContext
    Add unit test
    
    This closes #303
---
 docs/content/docs/sdk/golang.md                 | 21 ++++++++++
 statefun-sdk-go/v3/pkg/statefun/context.go      | 19 ++++++++-
 statefun-sdk-go/v3/pkg/statefun/context_test.go | 51 +++++++++++++++++++++++--
 statefun-sdk-go/v3/pkg/statefun/handler.go      |  7 +++-
 4 files changed, 90 insertions(+), 8 deletions(-)

diff --git a/docs/content/docs/sdk/golang.md b/docs/content/docs/sdk/golang.md
index f2ef316..cfceb5f 100644
--- a/docs/content/docs/sdk/golang.md
+++ b/docs/content/docs/sdk/golang.md
@@ -323,6 +323,27 @@ func (g *Greeter) Invoke(ctx statefun.Context, message: 
statefun.Message) error
 {{< /tab >}}
 {{< /tabs >}}
 
+## Context
+
+The `Context` interface exposed by the Golang SDK -- which is used above to 
access storage, egresses, and invoke other stateful functions -- embeds the 
standard Golang `Context` interface from the `context` package. You can further 
customize the wrapped `Context` using `DeriveContext`, for example:
+
+```
+import (
+    "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
+)
+
+func (g *Greeter) Invoke(ctx statefun.Context, message: statefun.Message) 
error {
+
+    ctx = statefun.DeriveContext(ctx, context.WithValue(ctx, "key", "value"))
+
+    // do something with ctx, which now holds key=value
+
+    return nil
+}
+```
+
+`DeriveContext` accepts a stateful-function `Context` and a standard 
`Context`; it returns a new stateful-function `Context` that is functionally 
equivalent to the original stateful-function `Context`, as far as 
stateful-function operations are concerned, but which wraps the supplied 
standard `Context` instead. 
+
 ## Serving Functions
 
 The Golang SDK ships with a ``RequestReplyHandler`` that is a standard http 
`Handler` and automatically dispatches function calls based on RESTful HTTP 
``POSTS``.
diff --git a/statefun-sdk-go/v3/pkg/statefun/context.go 
b/statefun-sdk-go/v3/pkg/statefun/context.go
index 677cfcd..7f99826 100644
--- a/statefun-sdk-go/v3/pkg/statefun/context.go
+++ b/statefun-sdk-go/v3/pkg/statefun/context.go
@@ -18,9 +18,11 @@ package statefun
 
 import (
        "context"
-       
"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+       "errors"
        "sync"
        "time"
+
+       
"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
 )
 
 // A Context contains information about the current function invocation, such 
as the invoked
@@ -64,7 +66,7 @@ type Context interface {
 }
 
 type statefunContext struct {
-       sync.Mutex
+       *sync.Mutex
        context.Context
        self     Address
        caller   *Address
@@ -160,3 +162,16 @@ func (s *statefunContext) SendEgress(egress EgressBuilder) 
{
        s.response.OutgoingEgresses = append(s.response.OutgoingEgresses, msg)
        s.Unlock()
 }
+
+// DeriveContext derives a new statefun.Context from an existing one, replacing
+// the wrapped context.Context.
+func DeriveContext(statefunCtx Context, ctx context.Context) Context {
+       switch value := (statefunCtx).(type) {
+       case *statefunContext:
+               newStatefunContext := *value
+               newStatefunContext.Context = ctx
+               return &newStatefunContext
+       default:
+               panic(errors.New("stateful function context supplied to 
DeriveContext is not recognized"))
+       }
+}
diff --git a/statefun-sdk-go/v3/pkg/statefun/context_test.go 
b/statefun-sdk-go/v3/pkg/statefun/context_test.go
index 18b3d1c..f12f1f4 100644
--- a/statefun-sdk-go/v3/pkg/statefun/context_test.go
+++ b/statefun-sdk-go/v3/pkg/statefun/context_test.go
@@ -17,11 +17,23 @@
 package statefun
 
 import (
+       "context"
+       "sync"
+       "testing"
+       "time"
+
        
"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
        "github.com/stretchr/testify/assert"
        "google.golang.org/protobuf/proto"
-       "testing"
-       "time"
+)
+
+type testContextKeyType string
+
+const (
+       testContextKey1   = testContextKeyType("key1")
+       testContextValue1 = "value1"
+       testContextKey2   = testContextKeyType("key2")
+       testContextValue2 = "value2"
 )
 
 func TestStatefunContext_Send(t *testing.T) {
@@ -159,10 +171,41 @@ func TestStatefunContext_SendEgress_Kinesis(t *testing.T) 
{
        assert.Equal(t, "key", kinesis.PartitionKey, "incorrect kinesis key")
 }
 
-// creates a context with the minimal state to
-// run tests.
+func TestStatefunContext_WithContext(t *testing.T) {
+
+       originalContext := createContext()
+
+       // create a new statefun context with a value added to context
+       newContext := DeriveContext(originalContext, 
context.WithValue(originalContext, testContextKey2, testContextValue2))
+
+       // Context interface properties should be the same
+       assert.Equal(t, originalContext.Self(), newContext.Self())
+       assert.Equal(t, originalContext.Caller(), newContext.Caller())
+       assert.Equal(t, originalContext.Storage(), newContext.Storage())
+
+       // validate a couple of internals, to ensure the derived context 
updates the same
+       // response as the original using the same mutex
+       assert.Equal(t, originalContext.Mutex, 
newContext.(*statefunContext).Mutex)
+       assert.Equal(t, originalContext.response, 
newContext.(*statefunContext).response)
+
+       // the testContextKey1 key/value should be in both the new context and 
the original,
+       // i.e. the new context inherited the kv pairs from the original
+       assert.Equal(t, testContextValue1, newContext.Value(testContextKey1))
+       assert.Equal(t, testContextValue1, 
originalContext.Value(testContextKey1))
+
+       // the testContextKey2 key/value should be in the new context but not 
the original
+       assert.Equal(t, testContextValue2, newContext.Value(testContextKey2))
+       assert.Nil(t, originalContext.Value(testContextKey2))
+}
+
+// creates a context with the minimal state to run tests.
 func createContext() *statefunContext {
        return &statefunContext{
+               Context:  context.WithValue(context.Background(), 
testContextKey1, testContextValue1),
+               Mutex:    new(sync.Mutex),
+               caller:   &Address{FunctionType: 
TypeNameFrom("namespace/function1"), Id: "1"},
+               self:     Address{FunctionType: 
TypeNameFrom("namespace/function2"), Id: "2"},
+               storage:  new(storage),
                response: &protocol.FromFunction_InvocationResponse{},
        }
 }
diff --git a/statefun-sdk-go/v3/pkg/statefun/handler.go 
b/statefun-sdk-go/v3/pkg/statefun/handler.go
index b86a609..3f9f4f6 100644
--- a/statefun-sdk-go/v3/pkg/statefun/handler.go
+++ b/statefun-sdk-go/v3/pkg/statefun/handler.go
@@ -20,10 +20,12 @@ import (
        "bytes"
        "context"
        "fmt"
-       
"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
-       "google.golang.org/protobuf/proto"
        "log"
        "net/http"
+       "sync"
+
+       
"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun/internal/protocol"
+       "google.golang.org/protobuf/proto"
 )
 
 // StatefulFunctions is a registry for multiple StatefulFunction's. A 
RequestReplyHandler
@@ -214,6 +216,7 @@ func (h *handler) invoke(ctx context.Context, toFunction 
*protocol.ToFunction) (
                        return nil, ctx.Err()
                default:
                        sContext := statefunContext{
+                               Mutex:    new(sync.Mutex),
                                self:     self,
                                storage:  storage,
                                response: response,

Reply via email to