This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch 24.0.1
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/24.0.1 by this push:
     new fddc261bdf RTR: Dedupe items in getKnownTasks. (#13273) (#13278)
fddc261bdf is described below

commit fddc261bdf54611cf34844d4ec3b33b4d2d2ebf7
Author: Kashif Faraz <[email protected]>
AuthorDate: Sun Oct 30 17:32:26 2022 +0530

    RTR: Dedupe items in getKnownTasks. (#13273) (#13278)
    
    Fixes a problem where the tasks API in OverlordResource would complain
    about duplicate keys in the map it's building.
    
    Co-authored-by: Gian Merlino <[email protected]>
---
 .../druid/indexing/overlord/RemoteTaskRunner.java  | 22 +++++++++++++++++++---
 1 file changed, 19 insertions(+), 3 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 42ff3c2467..c3c00e2f0e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -31,7 +31,6 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.FutureCallback;
@@ -100,6 +99,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -464,8 +464,24 @@ public class RemoteTaskRunner implements WorkerTaskRunner, 
TaskLogStreamer
   @Override
   public Collection<RemoteTaskRunnerWorkItem> getKnownTasks()
   {
-    // Racey, since there is a period of time during assignment when a task is 
neither pending nor running
-    return ImmutableList.copyOf(Iterables.concat(pendingTasks.values(), 
runningTasks.values(), completeTasks.values()));
+    // Use a map to dedupe tasks, since they may transition from one state to 
another while this method is iterating
+    // through the various collections.
+    final Map<String, RemoteTaskRunnerWorkItem> items = new LinkedHashMap<>();
+
+    // Racey, since there is a period of time during assignment when a task is 
neither pending nor running.
+    for (RemoteTaskRunnerWorkItem item : pendingTasks.values()) {
+      items.put(item.getTaskId(), item);
+    }
+
+    for (RemoteTaskRunnerWorkItem item : runningTasks.values()) {
+      items.put(item.getTaskId(), item);
+    }
+
+    for (RemoteTaskRunnerWorkItem item : completeTasks.values()) {
+      items.put(item.getTaskId(), item);
+    }
+
+    return ImmutableList.copyOf(items.values());
   }
 
   @Nullable


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to