jrmccluskey commented on a change in pull request #15594:
URL: https://github.com/apache/beam/pull/15594#discussion_r716978714



##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, 
side []SideInputAdapter
        offset := len(param) - len(side)
 
        var ret []ReusableInput
+       var cache *statecache.SideInputCache
+       if reader != nil {

Review comment:
       There are multiple tests in the exec package that don't provide a 
reader, so I spent a bit of time eliminating null references. I'll take some 
time to clean this up and make it cleaner. 

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -33,6 +33,7 @@ const iterableSideInputKey = ""
 // encapsulates StreamID and coding as needed.
 type SideInputAdapter interface {
        NewIterable(ctx context.Context, reader StateReader, w typex.Window) 
(ReStream, error)
+       GetIDs() (StreamID, string)

Review comment:
       That's a solution if we wanted to move the cache checking logic here, 
although it seems like it would still need a new function in the interface to 
handle it (changing the NewIterable signature to return a ReStream *or* a 
ReusableInput feels messy.) 

##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go
##########
@@ -125,7 +125,7 @@ func makeRequest(transformID, sideInputID string, t token) 
fnpb.ProcessBundleReq
        wrap.SideInput = &side
        tok.Type = &wrap
        tok.Token = []byte(t)
-       return tok
+       return &tok

Review comment:
       Done

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -33,6 +33,7 @@ const iterableSideInputKey = ""
 // encapsulates StreamID and coding as needed.
 type SideInputAdapter interface {
        NewIterable(ctx context.Context, reader StateReader, w typex.Window) 
(ReStream, error)
+       GetIDs() (StreamID, string)

Review comment:
       Changing to putting cache accesses in the Adapter cleaned up a lot of 
the null checks. Dropped GetIDs for a first run at cache-involved functions

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -24,6 +24,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"

Review comment:
       Removed reference as it was no longer necessary 

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, 
side []SideInputAdapter
        offset := len(param) - len(side)
 
        var ret []ReusableInput
+       var cache *statecache.SideInputCache
+       if reader != nil {

Review comment:
       Fixed

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, 
side []SideInputAdapter
        offset := len(param) - len(side)
 
        var ret []ReusableInput
+       var cache *statecache.SideInputCache
+       if reader != nil {
+               cache = reader.GetSideInputCache()
+       } else {
+               cache = &statecache.SideInputCache{}
+               cache.Init(1)

Review comment:
       No longer necessary

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, 
side []SideInputAdapter
        offset := len(param) - len(side)
 
        var ret []ReusableInput
+       var cache *statecache.SideInputCache
+       if reader != nil {
+               cache = reader.GetSideInputCache()
+       } else {
+               cache = &statecache.SideInputCache{}
+               cache.Init(1)
+       }
        for i := 0; i < len(streams); i++ {
+               sid, sideInputID := side[i].GetIDs()
+               var transformID string
+               if sideInputID == "" {
+                       transformID = ""

Review comment:
       It was caused by the other adapter implementations used in the direct 
runner/testing, which is no longer a problem




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to