scwhittle commented on a change in pull request #13862:
URL: https://github.com/apache/beam/pull/13862#discussion_r581820541



##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateCache.java
##########
@@ -116,222 +129,198 @@ private ForComputation(String computation) {
 
     /** Invalidate all cache entries for this computation and {@code 
processingKey}. */
     public void invalidate(ByteString processingKey, long shardingKey) {
-      synchronized (this) {
-        WindmillComputationKey key =
-            WindmillComputationKey.create(computation, processingKey, 
shardingKey);
-        for (StateId id : keyIndex.removeAll(key)) {
-          stateCache.invalidate(id);
-        }
-      }
+      WindmillComputationKey key =
+          WindmillComputationKey.create(computation, processingKey, 
shardingKey);
+      // By removing the ForKey object, all state for the key is orphaned in 
the cache and will
+      // be removed by normal cache cleanup.
+      keyIndex.remove(key);
     }
 
-    /** Returns a per-computation, per-key view of the state cache. */
-    public ForKey forKey(
+    /** Returns a per-computation, per-key, per-state-family view of the state 
cache. */
+    public ForKeyAndFamily forKey(
         WindmillComputationKey computationKey,
         String stateFamily,
         long cacheToken,
         long workToken) {
-      return new ForKey(computationKey, stateFamily, cacheToken, workToken);
+      ForKey forKey = keyIndex.get(computationKey);

Review comment:
       done (there was one regarding this up by WindmillStateCache but I agree 
calling out here  makes sense too)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to