[
https://issues.apache.org/jira/browse/BEAM-11707?focusedWorklogId=556847&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-556847
]
ASF GitHub Bot logged work on BEAM-11707:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 24/Feb/21 10:13
Start Date: 24/Feb/21 10:13
Worklog Time Spent: 10m
Work Description: scwhittle commented on a change in pull request #13862:
URL: https://github.com/apache/beam/pull/13862#discussion_r581829058
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateCache.java
##########
@@ -116,222 +129,198 @@ private ForComputation(String computation) {
/** Invalidate all cache entries for this computation and {@code
processingKey}. */
public void invalidate(ByteString processingKey, long shardingKey) {
- synchronized (this) {
- WindmillComputationKey key =
- WindmillComputationKey.create(computation, processingKey,
shardingKey);
- for (StateId id : keyIndex.removeAll(key)) {
- stateCache.invalidate(id);
- }
- }
+ WindmillComputationKey key =
+ WindmillComputationKey.create(computation, processingKey,
shardingKey);
+ // By removing the ForKey object, all state for the key is orphaned in
the cache and will
+ // be removed by normal cache cleanup.
+ keyIndex.remove(key);
}
- /** Returns a per-computation, per-key view of the state cache. */
- public ForKey forKey(
+ /** Returns a per-computation, per-key, per-state-family view of the state
cache. */
+ public ForKeyAndFamily forKey(
WindmillComputationKey computationKey,
String stateFamily,
long cacheToken,
long workToken) {
- return new ForKey(computationKey, stateFamily, cacheToken, workToken);
+ ForKey forKey = keyIndex.get(computationKey);
+ if (forKey == null || !forKey.updateTokens(cacheToken, workToken)) {
+ forKey = new ForKey(computationKey, cacheToken, workToken);
+ // We prefer this implementation to using compute because that is
implemented similarly for
+ // ConcurrentHashMap with the downside of it performing inserts for
unchanged existing
+ // values as well.
+ keyIndex.put(computationKey, forKey);
+ }
+ return new ForKeyAndFamily(forKey, stateFamily);
}
}
/** Per-computation, per-key view of the state cache. */
- public class ForKey {
-
+ // Note that we utilize the default equality and hashCode for this class
based upon the instance
+ // (instead of the fields) to optimize cache invalidation.
+ private static class ForKey {
private final WindmillComputationKey computationKey;
- private final String stateFamily;
// Cache token must be consistent for the key for the cache to be valid.
private final long cacheToken;
// The work token for processing must be greater than the last work token.
As work items are
// increasing for a key, a less-than or equal to work token indicates that
the current token is
- // for stale processing. We don't use the cache so that fetches performed
will fail with a no
- // longer valid work token.
- private final long workToken;
+ // for stale processing.
+ private long workToken;
- private ForKey(
- WindmillComputationKey computationKey,
- String stateFamily,
- long cacheToken,
- long workToken) {
+ private ForKey(WindmillComputationKey computationKey, long cacheToken,
long workToken) {
this.computationKey = computationKey;
- this.stateFamily = stateFamily;
this.cacheToken = cacheToken;
this.workToken = workToken;
}
- public <T extends State> T get(StateNamespace namespace, StateTag<T>
address) {
- return WindmillStateCache.this.get(
- computationKey, stateFamily, cacheToken, workToken, namespace,
address);
- }
-
- // Note that once a value has been put for a given workToken, get calls
with that same workToken
- // will fail. This is ok as we only put entries when we are building the
commit and will no
- // longer be performing gets for the same work token.
- public <T extends State> void put(
- StateNamespace namespace, StateTag<T> address, T value, long weight) {
- WindmillStateCache.this.put(
- computationKey, stateFamily, cacheToken, workToken, namespace,
address, value, weight);
+ private boolean updateTokens(long cacheToken, long workToken) {
+ if (this.cacheToken != cacheToken || workToken <= this.workToken) {
+ return false;
+ }
+ this.workToken = workToken;
+ return true;
}
}
- /** Returns a per-computation view of the state cache. */
- public ForComputation forComputation(String computation) {
- return new ForComputation(computation);
- }
+ /**
+ * Per-computation, per-key, per-family view of the state cache.
Modifications are cached locally
+ * and must be flushed to the cache by calling persist.
+ */
+ public class ForKeyAndFamily {
+ final ForKey forKey;
+ final String stateFamily;
+ private HashMap<StateId, StateCacheEntry> localCache;
- private <T extends State> T get(
- WindmillComputationKey computationKey,
- String stateFamily,
- long cacheToken,
- long workToken,
- StateNamespace namespace,
- StateTag<T> address) {
- StateId id = new StateId(computationKey, stateFamily, namespace);
- StateCacheEntry entry = stateCache.getIfPresent(id);
- if (entry == null) {
- return null;
+ private ForKeyAndFamily(ForKey forKey, String stateFamily) {
+ this.forKey = forKey;
+ this.stateFamily = stateFamily;
+ localCache = new HashMap<>();
}
- if (entry.getCacheToken() != cacheToken) {
- stateCache.invalidate(id);
- return null;
+
+ public String getStateFamily() {
+ return stateFamily;
}
- if (workToken <= entry.getLastWorkToken()) {
- // We don't used the cached item but we don't invalidate it.
- return null;
+
+ public <T extends State> @Nullable T get(StateNamespace namespace,
StateTag<T> address) {
+ StateId id = new StateId(forKey, stateFamily, namespace);
+ @SuppressWarnings("nullness") // Unsure how to annotate lambda return
allowing null.
+ StateCacheEntry entry = localCache.computeIfAbsent(id, key ->
stateCache.getIfPresent(key));
+ return entry == null ? null : entry.get(namespace, address);
}
- return entry.get(namespace, address);
- }
- private <T extends State> void put(
- WindmillComputationKey computationKey,
- String stateFamily,
- long cacheToken,
- long workToken,
- StateNamespace namespace,
- StateTag<T> address,
- T value,
- long weight) {
- StateId id = new StateId(computationKey, stateFamily, namespace);
- StateCacheEntry entry = stateCache.getIfPresent(id);
- if (entry == null) {
- synchronized (this) {
- keyIndex.put(id.getWindmillComputationKey(), id);
+ public <T extends State> void put(
+ StateNamespace namespace, StateTag<T> address, T value, long weight) {
+ StateId id = new StateId(forKey, stateFamily, namespace);
+ @Nullable StateCacheEntry entry = localCache.get(id);
+ if (entry == null) {
+ entry = stateCache.getIfPresent(id);
+ if (entry == null) {
+ entry = new StateCacheEntry();
+ }
+ boolean hadValue = localCache.putIfAbsent(id, entry) != null;
+ assert (!hadValue);
Review comment:
Done
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateCache.java
##########
@@ -116,222 +129,198 @@ private ForComputation(String computation) {
/** Invalidate all cache entries for this computation and {@code
processingKey}. */
public void invalidate(ByteString processingKey, long shardingKey) {
- synchronized (this) {
- WindmillComputationKey key =
- WindmillComputationKey.create(computation, processingKey,
shardingKey);
- for (StateId id : keyIndex.removeAll(key)) {
- stateCache.invalidate(id);
- }
- }
+ WindmillComputationKey key =
+ WindmillComputationKey.create(computation, processingKey,
shardingKey);
+ // By removing the ForKey object, all state for the key is orphaned in
the cache and will
+ // be removed by normal cache cleanup.
+ keyIndex.remove(key);
}
- /** Returns a per-computation, per-key view of the state cache. */
- public ForKey forKey(
+ /** Returns a per-computation, per-key, per-state-family view of the state
cache. */
+ public ForKeyAndFamily forKey(
WindmillComputationKey computationKey,
String stateFamily,
long cacheToken,
long workToken) {
- return new ForKey(computationKey, stateFamily, cacheToken, workToken);
+ ForKey forKey = keyIndex.get(computationKey);
+ if (forKey == null || !forKey.updateTokens(cacheToken, workToken)) {
+ forKey = new ForKey(computationKey, cacheToken, workToken);
+ // We prefer this implementation to using compute because that is
implemented similarly for
+ // ConcurrentHashMap with the downside of it performing inserts for
unchanged existing
+ // values as well.
+ keyIndex.put(computationKey, forKey);
+ }
+ return new ForKeyAndFamily(forKey, stateFamily);
}
}
/** Per-computation, per-key view of the state cache. */
- public class ForKey {
-
+ // Note that we utilize the default equality and hashCode for this class
based upon the instance
+ // (instead of the fields) to optimize cache invalidation.
+ private static class ForKey {
private final WindmillComputationKey computationKey;
- private final String stateFamily;
// Cache token must be consistent for the key for the cache to be valid.
private final long cacheToken;
// The work token for processing must be greater than the last work token.
As work items are
// increasing for a key, a less-than or equal to work token indicates that
the current token is
- // for stale processing. We don't use the cache so that fetches performed
will fail with a no
- // longer valid work token.
- private final long workToken;
+ // for stale processing.
+ private long workToken;
- private ForKey(
- WindmillComputationKey computationKey,
- String stateFamily,
- long cacheToken,
- long workToken) {
+ private ForKey(WindmillComputationKey computationKey, long cacheToken,
long workToken) {
this.computationKey = computationKey;
- this.stateFamily = stateFamily;
this.cacheToken = cacheToken;
this.workToken = workToken;
}
- public <T extends State> T get(StateNamespace namespace, StateTag<T>
address) {
- return WindmillStateCache.this.get(
- computationKey, stateFamily, cacheToken, workToken, namespace,
address);
- }
-
- // Note that once a value has been put for a given workToken, get calls
with that same workToken
- // will fail. This is ok as we only put entries when we are building the
commit and will no
- // longer be performing gets for the same work token.
- public <T extends State> void put(
- StateNamespace namespace, StateTag<T> address, T value, long weight) {
- WindmillStateCache.this.put(
- computationKey, stateFamily, cacheToken, workToken, namespace,
address, value, weight);
+ private boolean updateTokens(long cacheToken, long workToken) {
+ if (this.cacheToken != cacheToken || workToken <= this.workToken) {
+ return false;
+ }
+ this.workToken = workToken;
+ return true;
}
}
- /** Returns a per-computation view of the state cache. */
- public ForComputation forComputation(String computation) {
- return new ForComputation(computation);
- }
+ /**
+ * Per-computation, per-key, per-family view of the state cache.
Modifications are cached locally
+ * and must be flushed to the cache by calling persist.
+ */
+ public class ForKeyAndFamily {
+ final ForKey forKey;
+ final String stateFamily;
+ private HashMap<StateId, StateCacheEntry> localCache;
- private <T extends State> T get(
- WindmillComputationKey computationKey,
- String stateFamily,
- long cacheToken,
- long workToken,
- StateNamespace namespace,
- StateTag<T> address) {
- StateId id = new StateId(computationKey, stateFamily, namespace);
- StateCacheEntry entry = stateCache.getIfPresent(id);
- if (entry == null) {
- return null;
+ private ForKeyAndFamily(ForKey forKey, String stateFamily) {
+ this.forKey = forKey;
+ this.stateFamily = stateFamily;
+ localCache = new HashMap<>();
}
- if (entry.getCacheToken() != cacheToken) {
- stateCache.invalidate(id);
- return null;
+
+ public String getStateFamily() {
+ return stateFamily;
}
- if (workToken <= entry.getLastWorkToken()) {
- // We don't used the cached item but we don't invalidate it.
- return null;
+
+ public <T extends State> @Nullable T get(StateNamespace namespace,
StateTag<T> address) {
+ StateId id = new StateId(forKey, stateFamily, namespace);
+ @SuppressWarnings("nullness") // Unsure how to annotate lambda return
allowing null.
+ StateCacheEntry entry = localCache.computeIfAbsent(id, key ->
stateCache.getIfPresent(key));
+ return entry == null ? null : entry.get(namespace, address);
}
- return entry.get(namespace, address);
- }
- private <T extends State> void put(
- WindmillComputationKey computationKey,
- String stateFamily,
- long cacheToken,
- long workToken,
- StateNamespace namespace,
- StateTag<T> address,
- T value,
- long weight) {
- StateId id = new StateId(computationKey, stateFamily, namespace);
- StateCacheEntry entry = stateCache.getIfPresent(id);
- if (entry == null) {
- synchronized (this) {
- keyIndex.put(id.getWindmillComputationKey(), id);
+ public <T extends State> void put(
+ StateNamespace namespace, StateTag<T> address, T value, long weight) {
+ StateId id = new StateId(forKey, stateFamily, namespace);
+ @Nullable StateCacheEntry entry = localCache.get(id);
+ if (entry == null) {
+ entry = stateCache.getIfPresent(id);
+ if (entry == null) {
+ entry = new StateCacheEntry();
+ }
+ boolean hadValue = localCache.putIfAbsent(id, entry) != null;
+ assert (!hadValue);
}
+ entry.put(namespace, address, value, weight);
}
- if (entry == null || entry.getCacheToken() != cacheToken) {
- entry = new StateCacheEntry(cacheToken);
- this.displayedWeight += id.getWeight();
- this.displayedWeight += entry.getWeight();
+
+ public void persist() {
+ localCache.forEach((id, entry) -> stateCache.put(id, entry));
}
- entry.setLastWorkToken(workToken);
- this.displayedWeight += entry.put(namespace, address, value, weight);
- // Always add back to the cache to update the weight.
- stateCache.put(id, entry);
}
- /** Struct identifying a cache entry that contains all data for a key and
namespace. */
- private static class StateId implements Weighted {
+ /** Returns a per-computation view of the state cache. */
+ public ForComputation forComputation(String computation) {
+ return new ForComputation(computation);
+ }
- private final WindmillComputationKey computationKey;
+ /**
+ * Struct identifying a cache entry that contains all data for a ForKey
instance and namespace.
+ */
+ private static class StateId implements Weighted {
Review comment:
Just leaving as is for now.
Since we always need the hash, might as well calculate up front and use for
equals etc too instead of just memoizing
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 556847)
Time Spent: 1h 10m (was: 1h)
> Optimize WindmillStateCache CPU usage
> -------------------------------------
>
> Key: BEAM-11707
> URL: https://issues.apache.org/jira/browse/BEAM-11707
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Reporter: Sam Whittle
> Assignee: Sam Whittle
> Priority: P2
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> From profiling nexmark Query11 which has many unique tags per key, I observed
> that the WindmillStateCache cpu usage was 6% of CPU.
> The usage appears to be due to the invalidation set maintenance as well as
> many reads/inserts.
> The invalidation set is maintained so that if a key encounters an error
> processing or the cache token changes, we can invalidate all the entries for
> a key. Currently this is done by removing all entries for the key from the
> cache. Another alternative which appears much more CPU efficient is to
> instead leave the entries in the cache but make them unreachable. This can
> be done by having a per-key object that uses object equality as part of the
> cache lookup. Then to discard entries for the key, we start using a new
> per-key object. Cleanup of per-key objects can be done with a weak reference
> map.
> Another cost to the cache is that objects are grouped by window so that they
> are kept/evicted all at once. However currently when reading items into the
> cache, we fetch the window set and then lookup each tag in it. This could be
> cached for the key to avoid multiple cache lookups. Similarly for putting
> objects we lookup and insert each tag separately and then update the cache to
> update the weight for the per-window set. This could be done once after all
> updates for the window have been made.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)