scwhittle commented on code in PR #29082:
URL: https://github.com/apache/beam/pull/29082#discussion_r1377484669
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -108,10 +130,26 @@ synchronized ActivateWorkResult
activateWorkForKey(ShardedKey shardedKey, Work w
}
// Ensure we don't already have this work token queued.
- for (Work queuedWork : workQueue) {
- if (queuedWork.getWorkItem().getWorkToken() ==
work.getWorkItem().getWorkToken()) {
+ Iterator<Work> workQueueIterator = workQueue.iterator();
+ while (workQueueIterator.hasNext()) {
+ Work queuedWork = workQueueIterator.next();
+
+ // Work tokens and cache tokens are equal.
+ if (queuedWork.id().equals(work.id())) {
+ if (work.newerThan(queuedWork)) {
Review Comment:
yes, the listed are correct (maybe return STALE in the ignore case?).
> If it is equal cache tokens, but different work tokens what does that
mean? Same worker with different work?
It means that the the user worker may be observing a stale retry of work
previously sent by the windmill worker (if new work token is less or equal to
existing work) or it might mean the existing item was a retry of a work item
that has since committed and the newly arriving work item for the key is the
now active item.
In general, if we get different cache tokens we don't know how the previous
work relates to the existing work. We could guess based upon observed ordering
but since the requests could be received out of worker if there were multiple
windmill workers who sent work, it seems safer to queue in that case.
There is separate work ongoing to use heartbeats to identify work items that
are no longer valid and stop processing them.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]