jrmccluskey commented on a change in pull request #15717:
URL: https://github.com/apache/beam/pull/15717#discussion_r728298792
##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go
##########
@@ -44,18 +52,19 @@ type token string
type SideInputCache struct {
capacity int
mu sync.Mutex
- cache map[token]exec.ReStream
+ cache map[cacheKey]exec.ReStream
idsToTokens map[string]token
validTokens map[token]int8 // Maps tokens to active bundle counts
metrics CacheMetrics
}
// CacheMetrics stores metrics for the cache across a pipeline run.
type CacheMetrics struct {
- Hits int64
- Misses int64
- Evictions int64
- InUseEvictions int64
+ Hits *metrics.Counter
+ Misses *metrics.Counter
+ Evictions *metrics.Counter
+ InUseEvictions *metrics.Counter
+ ReStreamErrors *metrics.Counter
Review comment:
Fixed
##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go
##########
@@ -145,19 +150,24 @@ func (c *SideInputCache)
makeAndValidateToken(transformID, sideInputID string) (
return tok, c.isValid(tok)
}
+func (c *SideInputCache) makeCacheKey(tok token, w, key []byte) cacheKey {
+ return cacheKey{tok: tok, win: string(w), key: string(key)}
+}
+
// QueryCache takes a transform ID and side input ID and checking if a
corresponding side
// input has been cached. A query having a bad token (e.g. one that doesn't
make a known
// token or one that makes a known but currently invalid token) is treated the
same as a
// cache miss.
-func (c *SideInputCache) QueryCache(transformID, sideInputID string)
exec.ReStream {
+func (c *SideInputCache) QueryCache(ctx context.Context, transformID,
sideInputID string, win, key []byte) exec.ReStream {
c.mu.Lock()
defer c.mu.Unlock()
tok, ok := c.makeAndValidateToken(transformID, sideInputID)
if !ok {
return nil
}
+ keyString := c.makeCacheKey(tok, win, key)
Review comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]