m-trieu commented on code in PR #28537:
URL: https://github.com/apache/beam/pull/28537#discussion_r1337616377


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java:
##########
@@ -99,127 +82,47 @@ public ConcurrentLinkedQueue<ExecutionState> 
getExecutionStateQueue() {
    * Work} if there is no active {@link Work} for the {@link ShardedKey} 
already processing.
    */
   public boolean activateWork(ShardedKey shardedKey, Work work) {
-    synchronized (activeWork) {
-      Deque<Work> queue = activeWork.get(shardedKey);
-      if (queue != null) {
-        Preconditions.checkState(!queue.isEmpty());
-        // Ensure we don't already have this work token queued.
-        for (Work queuedWork : queue) {
-          if (queuedWork.getWorkItem().getWorkToken() == 
work.getWorkItem().getWorkToken()) {
-            return false;
-          }
-        }
-        // Queue the work for later processing.
-        queue.addLast(work);
+    switch (activeWorkState.activateWorkForKey(shardedKey, work)) {
+      case DUPLICATE:
+        return false;
+      case QUEUED:
         return true;
-      } else {
-        queue = new ArrayDeque<>();
-        queue.addLast(work);
-        activeWork.put(shardedKey, queue);
-        // Fall through to execute without the lock held.
-      }
+      case EXECUTE:
+        {
+          execute(work);
+          return true;
+        }
+        // This will never happen, the switch is exhaustive.

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to