lostluck commented on a change in pull request #15594:
URL: https://github.com/apache/beam/pull/15594#discussion_r716947697



##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, 
side []SideInputAdapter
        offset := len(param) - len(side)
 
        var ret []ReusableInput
+       var cache *statecache.SideInputCache
+       if reader != nil {
+               cache = reader.GetSideInputCache()
+       } else {
+               cache = &statecache.SideInputCache{}
+               cache.Init(1)

Review comment:
       Here's where the factory function or similar would be useful. However, 
an alternative is a simpler/1 element cache that adheres to the same interface 
as the other cache.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -24,6 +24,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"

Review comment:
       We don't want the exec package to depend on parts defined in harness, at 
least explicitly. It's bad coupling, and makes the SDK tree harder to follow.
   
   This means we need to have a factory function or similar that 
gets/initializes the cache that is passed in to create the cache, or change it 
so the reader never provides a nil version of the cache. In particular, we can 
have the exec package define an interface of how it interacts with the cache, 
and then there's no explicit coupling at all. This is good Go design, as 
interfaces should be defined by the caller/user of the interface, not the other 
way around.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, 
side []SideInputAdapter
        offset := len(param) - len(side)
 
        var ret []ReusableInput
+       var cache *statecache.SideInputCache
+       if reader != nil {
+               cache = reader.GetSideInputCache()
+       } else {
+               cache = &statecache.SideInputCache{}
+               cache.Init(1)
+       }
        for i := 0; i < len(streams); i++ {
+               sid, sideInputID := side[i].GetIDs()
+               var transformID string
+               if sideInputID == "" {
+                       transformID = ""

Review comment:
       It would be worth commenting on why this branching is necessary. Do we 
know what causes this case?

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, 
side []SideInputAdapter
        offset := len(param) - len(side)
 
        var ret []ReusableInput
+       var cache *statecache.SideInputCache
+       if reader != nil {

Review comment:
       When is the reader going to be nil? Is it something to worry about?

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, 
side []SideInputAdapter
        offset := len(param) - len(side)
 
        var ret []ReusableInput
+       var cache *statecache.SideInputCache
+       if reader != nil {
+               cache = reader.GetSideInputCache()
+       } else {
+               cache = &statecache.SideInputCache{}
+               cache.Init(1)
+       }
        for i := 0; i < len(streams); i++ {
+               sid, sideInputID := side[i].GetIDs()
+               var transformID string
+               if sideInputID == "" {
+                       transformID = ""
+               } else {
+                       transformID = sid.PtransformID
+               }
+               c := cache.QueryCache(transformID, sideInputID)
+               // Cache hit
+               if c != nil {
+                       ret = append(ret, c)
+                       continue
+               }

Review comment:
       We can inline this call into the if;
   
   if 
   ```suggestion
                if c := cache.QueryCache(transformID, sideInputID); c != nil {
                        // Cache hit
                        ret = append(ret, c)
                        continue
                }
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -33,6 +33,7 @@ const iterableSideInputKey = ""
 // encapsulates StreamID and coding as needed.
 type SideInputAdapter interface {
        NewIterable(ctx context.Context, reader StateReader, w typex.Window) 
(ReStream, error)
+       GetIDs() (StreamID, string)

Review comment:
       TBH it still feels like the the Adapter is the right place for the cache 
to be looked up, since it's dealing with actual elements and similar, and 
produces the ReStream instances. It's already getting a StateReader that it can 
pull the cache from too (depends on what happens to the cache of course, it 
might be best if it's simply hidden within the reader entirely...).
   
   Specifially, any time you need to add methods to access data elsewhere, the 
question is whether that has to be the case? What does the solution look like 
when you're not able/allowed to do that?

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -76,6 +77,11 @@ func (s *sideInputAdapter) NewIterable(ctx context.Context, 
reader StateReader,
        }, nil
 }
 
+// GetIDs returns the StreamID and Side Input ID for the adapter. Used 
primarily for sidei nput caching.

Review comment:
       typo
   ```suggestion
   // GetIDs returns the StreamID and Side Input ID for the adapter. Used 
primarily for side input caching.
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go
##########
@@ -125,7 +125,7 @@ func makeRequest(transformID, sideInputID string, t token) 
fnpb.ProcessBundleReq
        wrap.SideInput = &side
        tok.Type = &wrap
        tok.Token = []byte(t)
-       return tok
+       return &tok

Review comment:
       Style-wise, we should probably inline all of this, as it's easier on the 
reader to see that the compiler can make all the allocations at once.
   ```
        return &fnpb.ProcessBundleRequest_CacheToken{
                Token: []byte(t),
                Type:  &fnpb.ProcessBundleRequest_CacheToken_SideInput_{
                        SideInput: 
&fnpb.ProcessBundleRequest_CacheToken_SideInput{
                                TransformId: transformID,
                                SideInputId: sideInputID,
                        },
                },
        }
   }
   ```
   
   The main reason to separate things out when constructing protos vs inlineing 
is error handling. This function doesn't have anything that can error, so all 
inlined it can go.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -76,6 +78,19 @@ func (s *sideInputAdapter) NewIterable(ctx context.Context, 
reader StateReader,
        }, nil
 }
 
+// QueryCache checks a reader's side input cache for an entry with a 
PtransformID and sideInputID
+// and returns the entry.
+func (s *sideInputAdapter) QueryCache(reader StateReader) ReusableInput {
+       input := reader.GetSideInputCache().QueryCache(s.sid.PtransformID, 
s.sideInputID)
+       return input

Review comment:
       Aside: simply return in this situation rather than assigning to a new 
variable, it's cleaner.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,13 +268,24 @@ func makeSideInputs(ctx context.Context, w typex.Window, 
side []SideInputAdapter
        offset := len(param) - len(side)
 
        var ret []ReusableInput
-       for i := 0; i < len(streams); i++ {
-               s, err := makeSideInput(in[i+1].Kind, 
fn.Param[param[i+offset]].T, streams[i])
+       for i, adapter := range side {
+               // Cache hit
+               if c := adapter.QueryCache(reader); c != nil {

Review comment:
       Why not put this logic into the NewIterable method instead? Is it 
important that the makeSideInputs code is aware that a cache is being used? 
Does this code *need* to know these details of how the adapter works?
   
   Is there any reason the adapter can't make use of the cache itself, and 
still produce ReStreams (and if still a good idea, cache ReusableInputs), 
avoiding this code from needing to change or know the details of it happening?
   
   Per the earlier failure, ReusableInputs aren't threadsafe, so we need to 
cache something that is, that we can use to build what we use later on.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to