jrmccluskey commented on a change in pull request #15594:
URL: https://github.com/apache/beam/pull/15594#discussion_r716978714
##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window,
side []SideInputAdapter
offset := len(param) - len(side)
var ret []ReusableInput
+ var cache *statecache.SideInputCache
+ if reader != nil {
Review comment:
There are multiple tests in the exec package that don't provide a
reader, so I spent a bit of time eliminating null references. I'll take some
time to clean this up and make it cleaner.
##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -33,6 +33,7 @@ const iterableSideInputKey = ""
// encapsulates StreamID and coding as needed.
type SideInputAdapter interface {
NewIterable(ctx context.Context, reader StateReader, w typex.Window)
(ReStream, error)
+ GetIDs() (StreamID, string)
Review comment:
That's a solution if we wanted to move the cache checking logic here,
although it seems like it would still need a new function in the interface to
handle it (changing the NewIterable signature to return a ReStream *or* a
ReusableInput feels messy.)
##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go
##########
@@ -125,7 +125,7 @@ func makeRequest(transformID, sideInputID string, t token)
fnpb.ProcessBundleReq
wrap.SideInput = &side
tok.Type = &wrap
tok.Token = []byte(t)
- return tok
+ return &tok
Review comment:
Done
##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -33,6 +33,7 @@ const iterableSideInputKey = ""
// encapsulates StreamID and coding as needed.
type SideInputAdapter interface {
NewIterable(ctx context.Context, reader StateReader, w typex.Window)
(ReStream, error)
+ GetIDs() (StreamID, string)
Review comment:
Changing to putting cache accesses in the Adapter cleaned up a lot of
the null checks. Dropped GetIDs for a first run at cache-involved functions
##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -24,6 +24,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"
Review comment:
Removed reference as it was no longer necessary
##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window,
side []SideInputAdapter
offset := len(param) - len(side)
var ret []ReusableInput
+ var cache *statecache.SideInputCache
+ if reader != nil {
Review comment:
Fixed
##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window,
side []SideInputAdapter
offset := len(param) - len(side)
var ret []ReusableInput
+ var cache *statecache.SideInputCache
+ if reader != nil {
+ cache = reader.GetSideInputCache()
+ } else {
+ cache = &statecache.SideInputCache{}
+ cache.Init(1)
Review comment:
No longer necessary
##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window,
side []SideInputAdapter
offset := len(param) - len(side)
var ret []ReusableInput
+ var cache *statecache.SideInputCache
+ if reader != nil {
+ cache = reader.GetSideInputCache()
+ } else {
+ cache = &statecache.SideInputCache{}
+ cache.Init(1)
+ }
for i := 0; i < len(streams); i++ {
+ sid, sideInputID := side[i].GetIDs()
+ var transformID string
+ if sideInputID == "" {
+ transformID = ""
Review comment:
It was caused by the other adapter implementations used in the direct
runner/testing, which is no longer a problem
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]