scwhittle commented on a change in pull request #13862:
URL: https://github.com/apache/beam/pull/13862#discussion_r581820541
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateCache.java
##########
@@ -116,222 +129,198 @@ private ForComputation(String computation) {
/** Invalidate all cache entries for this computation and {@code
processingKey}. */
public void invalidate(ByteString processingKey, long shardingKey) {
- synchronized (this) {
- WindmillComputationKey key =
- WindmillComputationKey.create(computation, processingKey,
shardingKey);
- for (StateId id : keyIndex.removeAll(key)) {
- stateCache.invalidate(id);
- }
- }
+ WindmillComputationKey key =
+ WindmillComputationKey.create(computation, processingKey,
shardingKey);
+ // By removing the ForKey object, all state for the key is orphaned in
the cache and will
+ // be removed by normal cache cleanup.
+ keyIndex.remove(key);
}
- /** Returns a per-computation, per-key view of the state cache. */
- public ForKey forKey(
+ /** Returns a per-computation, per-key, per-state-family view of the state
cache. */
+ public ForKeyAndFamily forKey(
WindmillComputationKey computationKey,
String stateFamily,
long cacheToken,
long workToken) {
- return new ForKey(computationKey, stateFamily, cacheToken, workToken);
+ ForKey forKey = keyIndex.get(computationKey);
Review comment:
done (there was one regarding this up by WindmillStateCache but I agree
calling out here makes sense too)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]