scwhittle commented on code in PR #33755:
URL: https://github.com/apache/beam/pull/33755#discussion_r1928917523
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -124,28 +120,30 @@ private static String elapsedString(Instant start,
Instant end) {
*/
synchronized ActivateWorkResult activateWorkForKey(ExecutableWork
executableWork) {
ShardedKey shardedKey = executableWork.work().getShardedKey();
- Deque<ExecutableWork> workQueue = activeWork.getOrDefault(shardedKey, new
ArrayDeque<>());
+ long shardingKey = shardedKey.shardingKey();
+ LinkedHashMap<WorkId, ExecutableWork> workQueue =
+ activeWork.computeIfAbsent(shardingKey, (unused) -> new
LinkedHashMap<>());
// This key does not have any work queued up on it. Create one, insert
Work, and mark the work
// to be executed.
- if (!activeWork.containsKey(shardedKey) || workQueue.isEmpty()) {
- workQueue.addLast(executableWork);
- activeWork.put(shardedKey, workQueue);
+ if (workQueue.isEmpty()) {
+ workQueue.put(executableWork.id(), executableWork);
incrementActiveWorkBudget(executableWork.work());
return ActivateWorkResult.EXECUTE;
}
// Check to see if we have this work token queued.
- Iterator<ExecutableWork> workIterator = workQueue.iterator();
+ Iterator<Entry<WorkId, ExecutableWork>> workIterator =
workQueue.entrySet().iterator();
while (workIterator.hasNext()) {
- ExecutableWork queuedWork = workIterator.next();
+ ExecutableWork queuedWork = workIterator.next().getValue();
if (queuedWork.id().equals(executableWork.id())) {
return ActivateWorkResult.DUPLICATE;
}
- if (queuedWork.id().cacheToken() == executableWork.id().cacheToken()) {
+ if (queuedWork.id().cacheToken() == executableWork.id().cacheToken()
+ &&
queuedWork.work().getShardedKey().equals(executableWork.work().getShardedKey()))
{
if (executableWork.id().workToken() > queuedWork.id().workToken()) {
// Check to see if the queuedWork is active. We only want to remove
it if it is NOT
// currently active.
- if (!queuedWork.equals(workQueue.peek())) {
+ if
(!queuedWork.equals(Preconditions.checkNotNull(firstEntry(workQueue)).getValue()))
{
Review Comment:
I think it is more up to Work on whether or not we want equivalent things to
be equal. Since Work doesn't really want that it doesn't override equals so it
is using the reference equality.
ExecutableWork is an autovalue though, so it may have an equals which it
seems like it shouldn't. Maybe we should unautovalue that instead of changing
here?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -167,54 +165,28 @@ synchronized ActivateWorkResult
activateWorkForKey(ExecutableWork executableWork
*
* @param failedWork a map from sharding_key to tokens for the corresponding
work.
*/
- synchronized void failWorkForKey(Multimap<Long, WorkId> failedWork) {
- // Note we can't construct a ShardedKey and look it up in activeWork
directly since
- // HeartbeatResponse doesn't include the user key.
- for (Entry<ShardedKey, Deque<ExecutableWork>> entry :
activeWork.entrySet()) {
- Collection<WorkId> failedWorkIds =
failedWork.get(entry.getKey().shardingKey());
- for (WorkId failedWorkId : failedWorkIds) {
- for (ExecutableWork queuedWork : entry.getValue()) {
- WorkItem workItem = queuedWork.work().getWorkItem();
- if (workItem.getWorkToken() == failedWorkId.workToken()
- && workItem.getCacheToken() == failedWorkId.cacheToken()) {
- LOG.debug(
- "Failing work "
- + computationStateCache.getComputation()
- + " "
- + entry.getKey().shardingKey()
- + " "
- + failedWorkId.workToken()
- + " "
- + failedWorkId.cacheToken()
- + ". The work will be retried and is not lost.");
- queuedWork.work().setFailed();
- break;
- }
- }
+ synchronized void failWorkForKey(ImmutableList<WorkIdWithShardingKey>
failedWork) {
+ for (WorkIdWithShardingKey failedId : failedWork) {
+ LinkedHashMap<WorkId, ExecutableWork> workQueue =
activeWork.get(failedId.shardingKey());
+ if (workQueue == null) {
+ // Work could complete/fail before heartbeat response arrives
+ continue;
+ }
+ ExecutableWork executableWork = workQueue.get(failedId.workId());
+ if (executableWork == null) {
+ continue;
}
+ executableWork.work().setFailed();
+ LOG.debug(
+ "Failing work {} {} The work will be retried and is not lost.",
Review Comment:
nit: dropped period
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]