scwhittle commented on a change in pull request #13862:
URL: https://github.com/apache/beam/pull/13862#discussion_r581859706



##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateCache.java
##########
@@ -116,222 +129,198 @@ private ForComputation(String computation) {
 
     /** Invalidate all cache entries for this computation and {@code 
processingKey}. */
     public void invalidate(ByteString processingKey, long shardingKey) {
-      synchronized (this) {
-        WindmillComputationKey key =
-            WindmillComputationKey.create(computation, processingKey, 
shardingKey);
-        for (StateId id : keyIndex.removeAll(key)) {
-          stateCache.invalidate(id);
-        }
-      }
+      WindmillComputationKey key =
+          WindmillComputationKey.create(computation, processingKey, 
shardingKey);
+      // By removing the ForKey object, all state for the key is orphaned in 
the cache and will
+      // be removed by normal cache cleanup.
+      keyIndex.remove(key);
     }
 
-    /** Returns a per-computation, per-key view of the state cache. */
-    public ForKey forKey(
+    /** Returns a per-computation, per-key, per-state-family view of the state 
cache. */
+    public ForKeyAndFamily forKey(
         WindmillComputationKey computationKey,
         String stateFamily,
         long cacheToken,
         long workToken) {
-      return new ForKey(computationKey, stateFamily, cacheToken, workToken);
+      ForKey forKey = keyIndex.get(computationKey);
+      if (forKey == null || !forKey.updateTokens(cacheToken, workToken)) {
+        forKey = new ForKey(computationKey, cacheToken, workToken);
+        // We prefer this implementation to using compute because that is 
implemented similarly for
+        // ConcurrentHashMap with the downside of it performing inserts for 
unchanged existing
+        // values as well.
+        keyIndex.put(computationKey, forKey);
+      }
+      return new ForKeyAndFamily(forKey, stateFamily);
     }
   }
 
   /** Per-computation, per-key view of the state cache. */
-  public class ForKey {
-
+  // Note that we utilize the default equality and hashCode for this class 
based upon the instance
+  // (instead of the fields) to optimize cache invalidation.
+  private static class ForKey {
     private final WindmillComputationKey computationKey;
-    private final String stateFamily;
     // Cache token must be consistent for the key for the cache to be valid.
     private final long cacheToken;
 
     // The work token for processing must be greater than the last work token. 
 As work items are
     // increasing for a key, a less-than or equal to work token indicates that 
the current token is
-    // for stale processing. We don't use the cache so that fetches performed 
will fail with a no
-    // longer valid work token.
-    private final long workToken;
+    // for stale processing.
+    private long workToken;
 
-    private ForKey(
-        WindmillComputationKey computationKey,
-        String stateFamily,
-        long cacheToken,
-        long workToken) {
+    private ForKey(WindmillComputationKey computationKey, long cacheToken, 
long workToken) {
       this.computationKey = computationKey;
-      this.stateFamily = stateFamily;
       this.cacheToken = cacheToken;
       this.workToken = workToken;
     }
 
-    public <T extends State> T get(StateNamespace namespace, StateTag<T> 
address) {
-      return WindmillStateCache.this.get(
-          computationKey, stateFamily, cacheToken, workToken, namespace, 
address);
-    }
-
-    // Note that once a value has been put for a given workToken, get calls 
with that same workToken
-    // will fail. This is ok as we only put entries when we are building the 
commit and will no
-    // longer be performing gets for the same work token.
-    public <T extends State> void put(
-        StateNamespace namespace, StateTag<T> address, T value, long weight) {
-      WindmillStateCache.this.put(
-          computationKey, stateFamily, cacheToken, workToken, namespace, 
address, value, weight);
+    private boolean updateTokens(long cacheToken, long workToken) {
+      if (this.cacheToken != cacheToken || workToken <= this.workToken) {
+        return false;
+      }
+      this.workToken = workToken;
+      return true;
     }
   }
 
-  /** Returns a per-computation view of the state cache. */
-  public ForComputation forComputation(String computation) {
-    return new ForComputation(computation);
-  }
+  /**
+   * Per-computation, per-key, per-family view of the state cache. 
Modifications are cached locally
+   * and must be flushed to the cache by calling persist.
+   */
+  public class ForKeyAndFamily {
+    final ForKey forKey;
+    final String stateFamily;
+    private HashMap<StateId, StateCacheEntry> localCache;
 
-  private <T extends State> T get(
-      WindmillComputationKey computationKey,
-      String stateFamily,
-      long cacheToken,
-      long workToken,
-      StateNamespace namespace,
-      StateTag<T> address) {
-    StateId id = new StateId(computationKey, stateFamily, namespace);
-    StateCacheEntry entry = stateCache.getIfPresent(id);
-    if (entry == null) {
-      return null;
+    private ForKeyAndFamily(ForKey forKey, String stateFamily) {
+      this.forKey = forKey;
+      this.stateFamily = stateFamily;
+      localCache = new HashMap<>();
     }
-    if (entry.getCacheToken() != cacheToken) {
-      stateCache.invalidate(id);
-      return null;
+
+    public String getStateFamily() {
+      return stateFamily;
     }
-    if (workToken <= entry.getLastWorkToken()) {
-      // We don't used the cached item but we don't invalidate it.
-      return null;
+
+    public <T extends State> @Nullable T get(StateNamespace namespace, 
StateTag<T> address) {
+      StateId id = new StateId(forKey, stateFamily, namespace);
+      @SuppressWarnings("nullness") // Unsure how to annotate lambda return 
allowing null.
+      StateCacheEntry entry = localCache.computeIfAbsent(id, key -> 
stateCache.getIfPresent(key));

Review comment:
       that doesn't seem to work since the computeIfAbsent doesn't want a 
lambda that can return null.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to