scwhittle commented on code in PR #33755:
URL: https://github.com/apache/beam/pull/33755#discussion_r1928917523


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -124,28 +120,30 @@ private static String elapsedString(Instant start, 
Instant end) {
    */
   synchronized ActivateWorkResult activateWorkForKey(ExecutableWork 
executableWork) {
     ShardedKey shardedKey = executableWork.work().getShardedKey();
-    Deque<ExecutableWork> workQueue = activeWork.getOrDefault(shardedKey, new 
ArrayDeque<>());
+    long shardingKey = shardedKey.shardingKey();
+    LinkedHashMap<WorkId, ExecutableWork> workQueue =
+        activeWork.computeIfAbsent(shardingKey, (unused) -> new 
LinkedHashMap<>());
     // This key does not have any work queued up on it. Create one, insert 
Work, and mark the work
     // to be executed.
-    if (!activeWork.containsKey(shardedKey) || workQueue.isEmpty()) {
-      workQueue.addLast(executableWork);
-      activeWork.put(shardedKey, workQueue);
+    if (workQueue.isEmpty()) {
+      workQueue.put(executableWork.id(), executableWork);
       incrementActiveWorkBudget(executableWork.work());
       return ActivateWorkResult.EXECUTE;
     }
 
     // Check to see if we have this work token queued.
-    Iterator<ExecutableWork> workIterator = workQueue.iterator();
+    Iterator<Entry<WorkId, ExecutableWork>> workIterator = 
workQueue.entrySet().iterator();
     while (workIterator.hasNext()) {
-      ExecutableWork queuedWork = workIterator.next();
+      ExecutableWork queuedWork = workIterator.next().getValue();
       if (queuedWork.id().equals(executableWork.id())) {
         return ActivateWorkResult.DUPLICATE;
       }
-      if (queuedWork.id().cacheToken() == executableWork.id().cacheToken()) {
+      if (queuedWork.id().cacheToken() == executableWork.id().cacheToken()
+          && 
queuedWork.work().getShardedKey().equals(executableWork.work().getShardedKey()))
 {
         if (executableWork.id().workToken() > queuedWork.id().workToken()) {
           // Check to see if the queuedWork is active. We only want to remove 
it if it is NOT
           // currently active.
-          if (!queuedWork.equals(workQueue.peek())) {
+          if 
(!queuedWork.equals(Preconditions.checkNotNull(firstEntry(workQueue)).getValue()))
 {

Review Comment:
   I think it is more up to Work on whether or not we want equivalent things to 
be equal.  Since Work doesn't really want that it doesn't override equals so it 
is using the reference equality.
   
   ExecutableWork is an autovalue though, so it may have an equals which it 
seems like it shouldn't.  Maybe we should unautovalue that instead of changing 
here?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -167,54 +165,28 @@ synchronized ActivateWorkResult 
activateWorkForKey(ExecutableWork executableWork
    *
    * @param failedWork a map from sharding_key to tokens for the corresponding 
work.
    */
-  synchronized void failWorkForKey(Multimap<Long, WorkId> failedWork) {
-    // Note we can't construct a ShardedKey and look it up in activeWork 
directly since
-    // HeartbeatResponse doesn't include the user key.
-    for (Entry<ShardedKey, Deque<ExecutableWork>> entry : 
activeWork.entrySet()) {
-      Collection<WorkId> failedWorkIds = 
failedWork.get(entry.getKey().shardingKey());
-      for (WorkId failedWorkId : failedWorkIds) {
-        for (ExecutableWork queuedWork : entry.getValue()) {
-          WorkItem workItem = queuedWork.work().getWorkItem();
-          if (workItem.getWorkToken() == failedWorkId.workToken()
-              && workItem.getCacheToken() == failedWorkId.cacheToken()) {
-            LOG.debug(
-                "Failing work "
-                    + computationStateCache.getComputation()
-                    + " "
-                    + entry.getKey().shardingKey()
-                    + " "
-                    + failedWorkId.workToken()
-                    + " "
-                    + failedWorkId.cacheToken()
-                    + ". The work will be retried and is not lost.");
-            queuedWork.work().setFailed();
-            break;
-          }
-        }
+  synchronized void failWorkForKey(ImmutableList<WorkIdWithShardingKey> 
failedWork) {
+    for (WorkIdWithShardingKey failedId : failedWork) {
+      LinkedHashMap<WorkId, ExecutableWork> workQueue = 
activeWork.get(failedId.shardingKey());
+      if (workQueue == null) {
+        // Work could complete/fail before heartbeat response arrives
+        continue;
+      }
+      ExecutableWork executableWork = workQueue.get(failedId.workId());
+      if (executableWork == null) {
+        continue;
       }
+      executableWork.work().setFailed();
+      LOG.debug(
+          "Failing work {} {} The work will be retried and is not lost.",

Review Comment:
   nit: dropped period



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to