lostluck commented on a change in pull request #15483: URL: https://github.com/apache/beam/pull/15483#discussion_r706341019
########## File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go ########## @@ -0,0 +1,184 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package statecache implements the state caching feature described by the +// Beam Fn API +// +// https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m +package statecache + +import ( + "sync" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" +) + +// SideInputCache stores a cache of reusable inputs for the purposes of +// eliminating redundant calls to the runner during execution of ParDos +// using side inputs. +// +// A SideInputCache should be initialized when the SDK harness is initialized, +// creating storage for side input caching. On each ProcessBundleRequest, +// the cache will process the list of tokens for cacheable side inputs and +// be queried when side inputs are requested in bundle execution. Once a +// new bundle request comes in the valid tokens will be updated and the cache +// will be re-used. In the event that the cache reaches capacity, a random, +// currently invalid cached object will be evicted. +type SideInputCache struct { + cache map[string]exec.ReusableInput + idsToTokens map[string]string + mu sync.Mutex + capacity int + metrics CacheMetrics +} + +type CacheMetrics struct { + Hits int64 + Misses int64 + Evictions int64 + InUseEvictionCalls int64 +} + +// Init makes the cache map and the map of IDs to cache tokens for the +// SideInputCache. Should only be called once. Review comment: Please add that it "returns an error for non-positive capacities". ########## File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go ########## @@ -0,0 +1,184 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package statecache implements the state caching feature described by the +// Beam Fn API +// +// https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m +package statecache + +import ( + "sync" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" +) + +// SideInputCache stores a cache of reusable inputs for the purposes of +// eliminating redundant calls to the runner during execution of ParDos +// using side inputs. +// +// A SideInputCache should be initialized when the SDK harness is initialized, +// creating storage for side input caching. On each ProcessBundleRequest, +// the cache will process the list of tokens for cacheable side inputs and +// be queried when side inputs are requested in bundle execution. Once a +// new bundle request comes in the valid tokens will be updated and the cache +// will be re-used. In the event that the cache reaches capacity, a random, +// currently invalid cached object will be evicted. +type SideInputCache struct { + cache map[string]exec.ReusableInput + idsToTokens map[string]string Review comment: When it comes to primitive types like string or int, it's good to at least comment what they are.. eg. Cache from tokens to bar. Cache from PtransformIds to tokens etc. It might be good to make the typing explcit. Eg. define a `type token string` and use that for all token instances. it means the type system will prevent accidental mis-use, and it will make the code and it's intent a bit more readable. ########## File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go ########## @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package statecache implements the state caching feature described by the +// Beam Fn API Review comment: Skimming through that section of the doc, the recommendation is a "use a memory cost weighted LRU strategy with an upper bound as a fixed percentage of available memory." which I agree, would be great, but also complicated (since we don't know the decoded size of the reusable input until after it's been decoded...). Random with a simple cap will have to do for now. ########## File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go ########## @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package statecache implements the state caching feature described by the +// Beam Fn API +package statecache + +import ( + "sync" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" +) + +// SideInputCache stores a cache of reusable inputs for the purposes of +// eliminating redundant calls to the runner during execution of ParDos +// using side inputs. +type SideInputCache struct { + cache map[string]*exec.ReusableInput + idsToTokens map[string]string + validTokens []string + mu sync.Mutex + capacity int +} + +// Init makes the cache map and the map of IDs to cache tokens for the +// SideInputCache. Should only be called once. +func (c *SideInputCache) Init(cap int) error { + if cap <= 0 { + return errors.Errorf("capacity must be a positive integer, got %v", cap) + } + c.mu.Lock() + defer c.mu.Unlock() + c.cache = make(map[string]*exec.ReusableInput, cap) + c.idsToTokens = make(map[string]string) + return nil +} + +// Completely clears the list of valid tokens. Should be called when +// starting to handle a new request. +func (c *SideInputCache) clearValidTokens() { + c.validTokens = nil +} + +// SetValidTokens clears the list of valid tokens then sets new ones, also updating the mapping of +// transform and side input IDs to cache tokens in the process. Should be called at the start of every +// new ProcessBundleRequest. If the runner does not support caching, the passed cache token values +// should be empty and all get/set requests will silently be no-ops. +func (c *SideInputCache) SetValidTokens(cacheTokens ...fnpb.ProcessBundleRequest_CacheToken) { + c.mu.Lock() + defer c.mu.Unlock() + c.clearValidTokens() Review comment: The returned cache ids are the valid cache tokens for that specific bundle, not across all active bundles in the worker. Wiping them out on every request means other bundles won't get cache behavior since `makeAndValidateToken` returns false in that case. I go into a possible solution in another comment. ########## File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go ########## @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package statecache implements the state caching feature described by the +// Beam Fn API +package statecache + +import ( + "sync" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" +) + +// SideInputCache stores a cache of reusable inputs for the purposes of +// eliminating redundant calls to the runner during execution of ParDos +// using side inputs. +type SideInputCache struct { + cache map[string]*exec.ReusableInput + idsToTokens map[string]string + validTokens []string + mu sync.Mutex + capacity int +} + +// Init makes the cache map and the map of IDs to cache tokens for the +// SideInputCache. Should only be called once. +func (c *SideInputCache) Init(cap int) error { + if cap <= 0 { + return errors.Errorf("capacity must be a positive integer, got %v", cap) + } + c.mu.Lock() + defer c.mu.Unlock() + c.cache = make(map[string]*exec.ReusableInput, cap) + c.idsToTokens = make(map[string]string) + return nil +} + +// Completely clears the list of valid tokens. Should be called when +// starting to handle a new request. +func (c *SideInputCache) clearValidTokens() { + c.validTokens = nil +} + +// SetValidTokens clears the list of valid tokens then sets new ones, also updating the mapping of +// transform and side input IDs to cache tokens in the process. Should be called at the start of every +// new ProcessBundleRequest. If the runner does not support caching, the passed cache token values +// should be empty and all get/set requests will silently be no-ops. +func (c *SideInputCache) SetValidTokens(cacheTokens ...fnpb.ProcessBundleRequest_CacheToken) { + c.mu.Lock() + defer c.mu.Unlock() + c.clearValidTokens() + for _, tok := range cacheTokens { + // User State caching is currently not supported, so these tokens are ignored + if tok.GetUserState() != nil { + continue + } else { + s := tok.GetSideInput() + transformID := s.GetTransformId() + sideInputID := s.GetSideInputId() + token := string(tok.GetToken()) + c.setValidToken(transformID, sideInputID, token) + } + } +} + +// setValidToken adds a new valid token for a request into the SideInputCache struct +// and maps the transform ID and side input ID pairing to the cache token. +func (c *SideInputCache) setValidToken(transformID, sideInputID, token string) { + idKey := transformID + sideInputID + c.idsToTokens[idKey] = token + c.validTokens = append(c.validTokens, token) +} + +func (c *SideInputCache) makeAndValidateToken(transformID, sideInputID string) (string, bool) { + idKey := transformID + sideInputID + // Check if it's a known token + tok, ok := c.idsToTokens[idKey] + if !ok { + return "", false + } + // Check if the known token is valid for this request + for _, t := range c.validTokens { + if t == tok { + return tok, true + } + } + return "", false +} + +// QueryCache takes a transform ID and side input ID and checking if a corresponding side +// input has been cached. A query having a bad token (e.g. one that doesn't make a known +// token or one that makes a known but currently invalid token) is treated the same as a Review comment: So this is the issue with the makeAndValidateToken call, is that as implemented, a newer bundle with different side inputs will invalidate cache calls for side inputs to other existing bundles. And this can flip back and forth on boundary conditions as a result. (A bundles needs set A, the next bundle needs set B, the third needs set A again, invalidating all the B cache requests...) So this implies we should roughly keep the active set of tokens for the active bundles. Say a map[token]int. (or map[token]int8) which says how many bundles are making use of a given token. At the start of the bundles we add one to the tokens to the active set, and when the bundle ends, we remove them (subtracting from the active counts), and deleting when the count reaches 0. (this is important for iterating/eviction, and to keep the map memory smaller). That way, the `isValid` call can check validity by seeing if `c.validTokens[token] > 0`, prioritizing deletions for tokens that are not valid as you intend, rather than simply prioritizing "the most recent bundle". ########## File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go ########## @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package statecache implements the state caching feature described by the +// Beam Fn API +package statecache + +import ( + "sync" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" +) + +// SideInputCache stores a cache of reusable inputs for the purposes of +// eliminating redundant calls to the runner during execution of ParDos +// using side inputs. +type SideInputCache struct { + cache map[string]*exec.ReusableInput + idsToTokens map[string]string + validTokens []string + mu sync.Mutex + capacity int +} + +// Init makes the cache map and the map of IDs to cache tokens for the +// SideInputCache. Should only be called once. +func (c *SideInputCache) Init(cap int) error { + if cap <= 0 { + return errors.Errorf("capacity must be a positive integer, got %v", cap) + } + c.mu.Lock() + defer c.mu.Unlock() + c.cache = make(map[string]*exec.ReusableInput, cap) + c.idsToTokens = make(map[string]string) + return nil +} + +// Completely clears the list of valid tokens. Should be called when +// starting to handle a new request. +func (c *SideInputCache) clearValidTokens() { + c.validTokens = nil +} + +// SetValidTokens clears the list of valid tokens then sets new ones, also updating the mapping of +// transform and side input IDs to cache tokens in the process. Should be called at the start of every +// new ProcessBundleRequest. If the runner does not support caching, the passed cache token values +// should be empty and all get/set requests will silently be no-ops. +func (c *SideInputCache) SetValidTokens(cacheTokens ...fnpb.ProcessBundleRequest_CacheToken) { + c.mu.Lock() + defer c.mu.Unlock() + c.clearValidTokens() + for _, tok := range cacheTokens { + // User State caching is currently not supported, so these tokens are ignored + if tok.GetUserState() != nil { + continue + } else { + s := tok.GetSideInput() + transformID := s.GetTransformId() + sideInputID := s.GetSideInputId() + token := string(tok.GetToken()) + c.setValidToken(transformID, sideInputID, token) + } + } +} + +// setValidToken adds a new valid token for a request into the SideInputCache struct +// and maps the transform ID and side input ID pairing to the cache token. +func (c *SideInputCache) setValidToken(transformID, sideInputID, token string) { + idKey := transformID + sideInputID + c.idsToTokens[idKey] = token + c.validTokens = append(c.validTokens, token) +} + +func (c *SideInputCache) makeAndValidateToken(transformID, sideInputID string) (string, bool) { + idKey := transformID + sideInputID + // Check if it's a known token + tok, ok := c.idsToTokens[idKey] + if !ok { + return "", false + } + // Check if the known token is valid for this request + for _, t := range c.validTokens { + if t == tok { + return tok, true + } + } + return "", false +} + +// QueryCache takes a transform ID and side input ID and checking if a corresponding side +// input has been cached. A query having a bad token (e.g. one that doesn't make a known +// token or one that makes a known but currently invalid token) is treated the same as a +// cache miss. +func (c *SideInputCache) QueryCache(transformID, sideInputID string) *exec.ReusableInput { + c.mu.Lock() + defer c.mu.Unlock() + tok, ok := c.makeAndValidateToken(transformID, sideInputID) + if !ok { + return nil + } + // Check to see if cached + input, ok := c.cache[tok] + if !ok { + return nil + } + return input +} + +// SetCache allows a user to place a ReusableInput materialized from the reader into the SideInputCache +// with its corresponding transform ID and side input ID. If the IDs do not pair with a known, valid token +// then we silently do not cache the input, as this is an indication that the runner is treating that input +// as uncacheable. +func (c *SideInputCache) SetCache(transformID, sideInputID string, input *exec.ReusableInput) error { + c.mu.Lock() + defer c.mu.Unlock() + tok, ok := c.makeAndValidateToken(transformID, sideInputID) + if !ok { + return nil + } + if len(c.cache) > c.capacity { + err := c.evictElement() + if err != nil { + return errors.Errorf("Cache at or above capacity, got %v", err) + } + } + c.cache[tok] = input + return nil +} + +func (c *SideInputCache) isValid(token string) bool { + for _, t := range c.validTokens { + if t == token { + return true + } + } + return false +} + +// evictElement randomly evicts a ReusableInput that is not currently valid from the cache. +// It should only be called by a goroutine that obtained the lock in SetCache. +func (c *SideInputCache) evictElement() error { + deleted := false + // Select a key from the cache at random + for k := range c.cache { + // Do not evict an element if it's currently valid + if !c.isValid(k) { + delete(c.cache, k) + deleted = true + break + } + } + // Nothing is deleted if every side input is still valid, meaning that the cache size + // is likely too small. + if !deleted { Review comment: Metrics: Great!. But also, as mentioned, we probably always want to delete something if an eviction is necessary to avoid memory problems which will just end up crashing a worker if left to grow. It's totally OK to do it if the currently active set of tokens says everything is being used. Hence the value of the "InUseEvictionCalls" metric. We/users can see if it's a problem, and we can adjust the default/ or make it configurable for users and they can make that choice if needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
