scwhittle commented on code in PR #29082:
URL: https://github.com/apache/beam/pull/29082#discussion_r1377484669


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -108,10 +130,26 @@ synchronized ActivateWorkResult 
activateWorkForKey(ShardedKey shardedKey, Work w
     }
 
     // Ensure we don't already have this work token queued.
-    for (Work queuedWork : workQueue) {
-      if (queuedWork.getWorkItem().getWorkToken() == 
work.getWorkItem().getWorkToken()) {
+    Iterator<Work> workQueueIterator = workQueue.iterator();
+    while (workQueueIterator.hasNext()) {
+      Work queuedWork = workQueueIterator.next();
+
+      // Work tokens and cache tokens are equal.
+      if (queuedWork.id().equals(work.id())) {
+        if (work.newerThan(queuedWork)) {

Review Comment:
   yes, the listed are correct (maybe return STALE in the ignore case?).
   
   > If it is equal cache tokens, but different work tokens what does that 
mean? Same worker with different work?
   It means that the the user worker may be observing a stale retry of work 
previously sent by the windmill worker (if new work token is less or equal to 
existing work) or it might mean the existing item was a retry of a work item 
that has since committed and the newly arriving work item for the key is the 
now active item.
   
   In general, if we get different cache tokens we don't know how the previous 
work relates to the existing work.  We could guess based upon observed ordering 
but since the requests could be received out of worker if there were multiple 
windmill workers who sent work, it seems safer to queue in that case.
   
   There is separate work ongoing to use heartbeats to identify work items that 
are no longer valid and stop processing them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to