[
https://issues.apache.org/jira/browse/BEAM-11097?focusedWorklogId=655712&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-655712
]
ASF GitHub Bot logged work on BEAM-11097:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 27/Sep/21 19:02
Start Date: 27/Sep/21 19:02
Worklog Time Spent: 10m
Work Description: lostluck commented on a change in pull request #15594:
URL: https://github.com/apache/beam/pull/15594#discussion_r716947697
##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window,
side []SideInputAdapter
offset := len(param) - len(side)
var ret []ReusableInput
+ var cache *statecache.SideInputCache
+ if reader != nil {
+ cache = reader.GetSideInputCache()
+ } else {
+ cache = &statecache.SideInputCache{}
+ cache.Init(1)
Review comment:
Here's where the factory function or similar would be useful. However,
an alternative is a simpler/1 element cache that adheres to the same interface
as the other cache.
##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -24,6 +24,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"
Review comment:
We don't want the exec package to depend on parts defined in harness, at
least explicitly. It's bad coupling, and makes the SDK tree harder to follow.
This means we need to have a factory function or similar that
gets/initializes the cache that is passed in to create the cache, or change it
so the reader never provides a nil version of the cache. In particular, we can
have the exec package define an interface of how it interacts with the cache,
and then there's no explicit coupling at all. This is good Go design, as
interfaces should be defined by the caller/user of the interface, not the other
way around.
##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window,
side []SideInputAdapter
offset := len(param) - len(side)
var ret []ReusableInput
+ var cache *statecache.SideInputCache
+ if reader != nil {
+ cache = reader.GetSideInputCache()
+ } else {
+ cache = &statecache.SideInputCache{}
+ cache.Init(1)
+ }
for i := 0; i < len(streams); i++ {
+ sid, sideInputID := side[i].GetIDs()
+ var transformID string
+ if sideInputID == "" {
+ transformID = ""
Review comment:
It would be worth commenting on why this branching is necessary. Do we
know what causes this case?
##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window,
side []SideInputAdapter
offset := len(param) - len(side)
var ret []ReusableInput
+ var cache *statecache.SideInputCache
+ if reader != nil {
Review comment:
When is the reader going to be nil? Is it something to worry about?
##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -277,11 +278,33 @@ func makeSideInputs(ctx context.Context, w typex.Window,
side []SideInputAdapter
offset := len(param) - len(side)
var ret []ReusableInput
+ var cache *statecache.SideInputCache
+ if reader != nil {
+ cache = reader.GetSideInputCache()
+ } else {
+ cache = &statecache.SideInputCache{}
+ cache.Init(1)
+ }
for i := 0; i < len(streams); i++ {
+ sid, sideInputID := side[i].GetIDs()
+ var transformID string
+ if sideInputID == "" {
+ transformID = ""
+ } else {
+ transformID = sid.PtransformID
+ }
+ c := cache.QueryCache(transformID, sideInputID)
+ // Cache hit
+ if c != nil {
+ ret = append(ret, c)
+ continue
+ }
Review comment:
We can inline this call into the if;
if
```suggestion
if c := cache.QueryCache(transformID, sideInputID); c != nil {
// Cache hit
ret = append(ret, c)
continue
}
```
##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -33,6 +33,7 @@ const iterableSideInputKey = ""
// encapsulates StreamID and coding as needed.
type SideInputAdapter interface {
NewIterable(ctx context.Context, reader StateReader, w typex.Window)
(ReStream, error)
+ GetIDs() (StreamID, string)
Review comment:
TBH it still feels like the the Adapter is the right place for the cache
to be looked up, since it's dealing with actual elements and similar, and
produces the ReStream instances. It's already getting a StateReader that it can
pull the cache from too (depends on what happens to the cache of course, it
might be best if it's simply hidden within the reader entirely...).
Specifially, any time you need to add methods to access data elsewhere, the
question is whether that has to be the case? What does the solution look like
when you're not able/allowed to do that?
##########
File path: sdks/go/pkg/beam/core/runtime/exec/sideinput.go
##########
@@ -76,6 +77,11 @@ func (s *sideInputAdapter) NewIterable(ctx context.Context,
reader StateReader,
}, nil
}
+// GetIDs returns the StreamID and Side Input ID for the adapter. Used
primarily for sidei nput caching.
Review comment:
typo
```suggestion
// GetIDs returns the StreamID and Side Input ID for the adapter. Used
primarily for side input caching.
```
##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go
##########
@@ -125,7 +125,7 @@ func makeRequest(transformID, sideInputID string, t token)
fnpb.ProcessBundleReq
wrap.SideInput = &side
tok.Type = &wrap
tok.Token = []byte(t)
- return tok
+ return &tok
Review comment:
Style-wise, we should probably inline all of this, as it's easier on the
reader to see that the compiler can make all the allocations at once.
```
return &fnpb.ProcessBundleRequest_CacheToken{
Token: []byte(t),
Type: &fnpb.ProcessBundleRequest_CacheToken_SideInput_{
SideInput:
&fnpb.ProcessBundleRequest_CacheToken_SideInput{
TransformId: transformID,
SideInputId: sideInputID,
},
},
}
}
```
The main reason to separate things out when constructing protos vs inlineing
is error handling. This function doesn't have anything that can error, so all
inlined it can go.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 655712)
Time Spent: 8.5h (was: 8h 20m)
> [Go SDK] Windowed Side Input Caching (Cross Bundle)
> ---------------------------------------------------
>
> Key: BEAM-11097
> URL: https://issues.apache.org/jira/browse/BEAM-11097
> Project: Beam
> Issue Type: Improvement
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Jack McCluskey
> Priority: P3
> Labels: starter
> Time Spent: 8.5h
> Remaining Estimate: 0h
>
> This is implementing https://issues.apache.org/jira/browse/BEAM-5428 for the
> Go SDK.
> Side inputs are valid for given window. That means they can be and correctly
> cached and re-used in multiple bundles if the data has already been loaded
> into a given worker.
> Side input data for a given bundle is specified and keyed by the window and a
> a side input request token. The Go SDK presently doesn't use this information
> for a SDK side worker cache of data.
> Care needs to be taken to allow the data to be garbage collected if it hasn't
> been used in the last minute or so. In practice, Global Window side inputs
> will not be evicted while still in use, but infrequently accessed data will
> possibly be evicted more regularly. A limit should be set to avoid running
> out of memory.
> As presently implemented, each ProcessElement call (or StartBundle or
> FinishBundle) will only query for side input data on user request. However,
> results of user requests are not cached between elements or bundles, which
> means there's additional lookup and serialization time from the runner side.
> It may be possible to re-use allocated side input elements, but even a
> solution that re-decodes data on every call would be an improvement if the
> data isn't re-streamed from the runner.
> If https://issues.apache.org/jira/browse/BEAM-3293 has been implemented, then
> per-key+window caching should additionally be considered for more granular
> caching of data.
> This optimization has an outsized impact on streaming performance as side
> input data can change per bundle, and streaming bundles are typically small,
> and quickly processed.
> This work would be implemented in the exec package:
>
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sideinput.go]
>
> and probably the harness package
>
> [https://github.com/apache/beam/blob/c185adb7652034b5d044aae65a1a972d9ceb4377/sdks/go/pkg/beam/core/runtime/harness/statemgr.go#L50]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)