[
https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=299243&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299243
]
ASF GitHub Bot logged work on BEAM-5428:
----------------------------------------
Author: ASF GitHub Bot
Created on: 22/Aug/19 07:46
Start Date: 22/Aug/19 07:46
Worklog Time Spent: 10m
Work Description: mxm commented on pull request #9374: [BEAM-5428]
Implement Runner support for cache tokens
URL: https://github.com/apache/beam/pull/9374#discussion_r316536089
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -336,11 +356,29 @@ public void clear(K key, W window) {
BagState<V> bagState =
stateInternals.state(namespace, StateTags.bag(userStateId,
valueCoder));
bagState.clear();
+
+ return generateAndRegisterCacheKey();
} finally {
stateBackendLock.unlock();
}
}
+ @Override
+ public Iterable<ByteString> getCacheTokens() {
+ return cacheTokens.asMap().values();
+ }
+
+ @Override
+ public void clearCacheTokens() {
+ cacheTokens.invalidateAll();
+ }
+
+ private ByteString generateAndRegisterCacheKey() {
+ ByteString cacheToken =
ByteString.copyFrom(cacheTokenGenerator.getId(), Charsets.UTF_8);
+ cacheTokens.put(userStateId, cacheToken);
Review comment:
The same cache token can be handed out for all state cells across all
windows. On the SDK side we need to scope the cache per window because state
needs to be retrieved at least once per window.
On this note, I will also remove the cache token per user state because we
just need one cache token for the key range (i.e. operator instance) for all
user state. The reason is that user state does not get invalidated for other
reasons than an updated key range. For side input, each side input needs a
separate cache token to be able to invalidate them individually in case they
get updated.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 299243)
Time Spent: 1h 50m (was: 1h 40m)
> Implement cross-bundle state caching.
> -------------------------------------
>
> Key: BEAM-5428
> URL: https://issues.apache.org/jira/browse/BEAM-5428
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-harness
> Reporter: Robert Bradshaw
> Assignee: Rakesh Kumar
> Priority: Major
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> Tech spec:
> [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m]
> Relevant document:
> [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit]
> Mailing list link:
> [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E]
--
This message was sent by Atlassian Jira
(v8.3.2#803003)