jrmccluskey commented on a change in pull request #15717:
URL: https://github.com/apache/beam/pull/15717#discussion_r728298792



##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go
##########
@@ -44,18 +52,19 @@ type token string
 type SideInputCache struct {
        capacity    int
        mu          sync.Mutex
-       cache       map[token]exec.ReStream
+       cache       map[cacheKey]exec.ReStream
        idsToTokens map[string]token
        validTokens map[token]int8 // Maps tokens to active bundle counts
        metrics     CacheMetrics
 }
 
 // CacheMetrics stores metrics for the cache across a pipeline run.
 type CacheMetrics struct {
-       Hits           int64
-       Misses         int64
-       Evictions      int64
-       InUseEvictions int64
+       Hits           *metrics.Counter
+       Misses         *metrics.Counter
+       Evictions      *metrics.Counter
+       InUseEvictions *metrics.Counter
+       ReStreamErrors *metrics.Counter

Review comment:
       Fixed

##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go
##########
@@ -145,19 +150,24 @@ func (c *SideInputCache) 
makeAndValidateToken(transformID, sideInputID string) (
        return tok, c.isValid(tok)
 }
 
+func (c *SideInputCache) makeCacheKey(tok token, w, key []byte) cacheKey {
+       return cacheKey{tok: tok, win: string(w), key: string(key)}
+}
+
 // QueryCache takes a transform ID and side input ID and checking if a 
corresponding side
 // input has been cached. A query having a bad token (e.g. one that doesn't 
make a known
 // token or one that makes a known but currently invalid token) is treated the 
same as a
 // cache miss.
-func (c *SideInputCache) QueryCache(transformID, sideInputID string) 
exec.ReStream {
+func (c *SideInputCache) QueryCache(ctx context.Context, transformID, 
sideInputID string, win, key []byte) exec.ReStream {
        c.mu.Lock()
        defer c.mu.Unlock()
        tok, ok := c.makeAndValidateToken(transformID, sideInputID)
        if !ok {
                return nil
        }
+       keyString := c.makeCacheKey(tok, win, key)

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to