[ 
https://issues.apache.org/jira/browse/BEAM-11097?focusedWorklogId=655712&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-655712
 ]

ASF GitHub Bot logged work on BEAM-11097:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Sep/21 19:02
            Start Date: 27/Sep/21 19:02
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on a change in pull request #15594:
URL: https://github.com/apache/beam/pull/15594#discussion_r716947697



##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, 
side []SideInputAdapter
        offset := len(param) - len(side)
 
        var ret []ReusableInput
+       var cache *statecache.SideInputCache
+       if reader != nil {
+               cache = reader.GetSideInputCache()
+       } else {
+               cache = &statecache.SideInputCache{}
+               cache.Init(1)

Review comment:
       Here's where the factory function or similar would be useful. However, 
an alternative is a simpler/1 element cache that adheres to the same interface 
as the other cache.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -24,6 +24,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"

Review comment:
       We don't want the exec package to depend on parts defined in harness, at 
least explicitly. It's bad coupling, and makes the SDK tree harder to follow.
   
   This means we need to have a factory function or similar that 
gets/initializes the cache that is passed in to create the cache, or change it 
so the reader never provides a nil version of the cache. In particular, we can 
have the exec package define an interface of how it interacts with the cache, 
and then there's no explicit coupling at all. This is good Go design, as 
interfaces should be defined by the caller/user of the interface, not the other 
way around.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, 
side []SideInputAdapter
        offset := len(param) - len(side)
 
        var ret []ReusableInput
+       var cache *statecache.SideInputCache
+       if reader != nil {
+               cache = reader.GetSideInputCache()
+       } else {
+               cache = &statecache.SideInputCache{}
+               cache.Init(1)
+       }
        for i := 0; i < len(streams); i++ {
+               sid, sideInputID := side[i].GetIDs()
+               var transformID string
+               if sideInputID == "" {
+                       transformID = ""

Review comment:
       It would be worth commenting on why this branching is necessary. Do we 
know what causes this case?

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, 
side []SideInputAdapter
        offset := len(param) - len(side)
 
        var ret []ReusableInput
+       var cache *statecache.SideInputCache
+       if reader != nil {

Review comment:
       When is the reader going to be nil? Is it something to worry about?

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window, 
side []SideInputAdapter
        offset := len(param) - len(side)
 
        var ret []ReusableInput
+       var cache *statecache.SideInputCache
+       if reader != nil {
+               cache = reader.GetSideInputCache()
+       } else {
+               cache = &statecache.SideInputCache{}
+               cache.Init(1)
+       }
        for i := 0; i < len(streams); i++ {
+               sid, sideInputID := side[i].GetIDs()
+               var transformID string
+               if sideInputID == "" {
+                       transformID = ""
+               } else {
+                       transformID = sid.PtransformID
+               }
+               c := cache.QueryCache(transformID, sideInputID)
+               // Cache hit
+               if c != nil {
+                       ret = append(ret, c)
+                       continue
+               }

Review comment:
       We can inline this call into the if;
   
   if 
   ```suggestion
                if c := cache.QueryCache(transformID, sideInputID); c != nil {
                        // Cache hit
                        ret = append(ret, c)
                        continue
                }
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -33,6 +33,7 @@ const iterableSideInputKey = ""
 // encapsulates StreamID and coding as needed.
 type SideInputAdapter interface {
        NewIterable(ctx context.Context, reader StateReader, w typex.Window) 
(ReStream, error)
+       GetIDs() (StreamID, string)

Review comment:
       TBH it still feels like the the Adapter is the right place for the cache 
to be looked up, since it's dealing with actual elements and similar, and 
produces the ReStream instances. It's already getting a StateReader that it can 
pull the cache from too (depends on what happens to the cache of course, it 
might be best if it's simply hidden within the reader entirely...).
   
   Specifially, any time you need to add methods to access data elsewhere, the 
question is whether that has to be the case? What does the solution look like 
when you're not able/allowed to do that?

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -76,6 +77,11 @@ func (s *sideInputAdapter) NewIterable(ctx context.Context, 
reader StateReader,
        }, nil
 }
 
+// GetIDs returns the StreamID and Side Input ID for the adapter. Used 
primarily for sidei nput caching.

Review comment:
       typo
   ```suggestion
   // GetIDs returns the StreamID and Side Input ID for the adapter. Used 
primarily for side input caching.
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go
##########
@@ -125,7 +125,7 @@ func makeRequest(transformID, sideInputID string, t token) 
fnpb.ProcessBundleReq
        wrap.SideInput = &side
        tok.Type = &wrap
        tok.Token = []byte(t)
-       return tok
+       return &tok

Review comment:
       Style-wise, we should probably inline all of this, as it's easier on the 
reader to see that the compiler can make all the allocations at once.
   ```
        return &fnpb.ProcessBundleRequest_CacheToken{
                Token: []byte(t),
                Type:  &fnpb.ProcessBundleRequest_CacheToken_SideInput_{
                        SideInput: 
&fnpb.ProcessBundleRequest_CacheToken_SideInput{
                                TransformId: transformID,
                                SideInputId: sideInputID,
                        },
                },
        }
   }
   ```
   
   The main reason to separate things out when constructing protos vs inlineing 
is error handling. This function doesn't have anything that can error, so all 
inlined it can go.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 655712)
    Time Spent: 8.5h  (was: 8h 20m)

> [Go SDK] Windowed Side Input Caching (Cross Bundle)
> ---------------------------------------------------
>
>                 Key: BEAM-11097
>                 URL: https://issues.apache.org/jira/browse/BEAM-11097
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Jack McCluskey
>            Priority: P3
>              Labels: starter
>          Time Spent: 8.5h
>  Remaining Estimate: 0h
>
> This is implementing https://issues.apache.org/jira/browse/BEAM-5428 for the 
> Go SDK.
> Side inputs are valid for given window. That means they can be and correctly 
> cached and re-used in multiple bundles if the data has already been loaded 
> into a given worker.
> Side input data for a given bundle is specified and keyed by the window and a 
> a side input request token. The Go SDK presently doesn't use this information 
> for a SDK side worker cache of data.
> Care needs to be taken to allow the data to be garbage collected if it hasn't 
> been used in the last minute or so. In practice, Global Window side inputs 
> will not be evicted while still in use, but infrequently accessed data will 
> possibly be evicted more regularly. A limit should be set to avoid running 
> out of memory.
> As presently implemented, each ProcessElement call (or StartBundle or 
> FinishBundle) will only query for side input data on user request. However, 
> results of user requests are not cached between elements or bundles, which 
> means there's additional lookup and serialization time from the runner side.
> It may be possible to re-use allocated side input elements, but even a 
> solution that re-decodes data on every call would be an improvement if the 
> data isn't re-streamed from the runner.
> If https://issues.apache.org/jira/browse/BEAM-3293 has been implemented, then 
> per-key+window caching should additionally be considered for more granular 
> caching of data.
> This optimization has an outsized impact on streaming performance as side 
> input data can change per bundle, and streaming bundles are typically small, 
> and quickly processed.
> This work would be implemented in the exec package: 
>  
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sideinput.go]
>  
>  and probably the harness package
>  
> [https://github.com/apache/beam/blob/c185adb7652034b5d044aae65a1a972d9ceb4377/sdks/go/pkg/beam/core/runtime/harness/statemgr.go#L50]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to