scwhittle commented on code in PR #29082:
URL: https://github.com/apache/beam/pull/29082#discussion_r1369922526


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -108,10 +130,26 @@ synchronized ActivateWorkResult 
activateWorkForKey(ShardedKey shardedKey, Work w
     }
 
     // Ensure we don't already have this work token queued.
-    for (Work queuedWork : workQueue) {
-      if (queuedWork.getWorkItem().getWorkToken() == 
work.getWorkItem().getWorkToken()) {
+    Iterator<Work> workQueueIterator = workQueue.iterator();
+    while (workQueueIterator.hasNext()) {
+      Work queuedWork = workQueueIterator.next();
+
+      // Work tokens and cache tokens are equal.
+      if (queuedWork.id().equals(work.id())) {
+        if (work.newerThan(queuedWork)) {

Review Comment:
   If cache token + work token are equal, the queuedWork and new work shoudl be 
equivalent. Let's just return DUPLICATE in that case.
   
   What I meant with the newer than check is the case where we have equal cache 
tokens but different work tokens. Since the equal cache tokens implies it was 
from the same worker, and workers give out increasing work tokens we can just 
keep the work item with the higher work token.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to