This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch 24.0.1
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/24.0.1 by this push:
new fddc261bdf RTR: Dedupe items in getKnownTasks. (#13273) (#13278)
fddc261bdf is described below
commit fddc261bdf54611cf34844d4ec3b33b4d2d2ebf7
Author: Kashif Faraz <[email protected]>
AuthorDate: Sun Oct 30 17:32:26 2022 +0530
RTR: Dedupe items in getKnownTasks. (#13273) (#13278)
Fixes a problem where the tasks API in OverlordResource would complain
about duplicate keys in the map it's building.
Co-authored-by: Gian Merlino <[email protected]>
---
.../druid/indexing/overlord/RemoteTaskRunner.java | 22 +++++++++++++++++++---
1 file changed, 19 insertions(+), 3 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 42ff3c2467..c3c00e2f0e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -31,7 +31,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
@@ -100,6 +99,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -464,8 +464,24 @@ public class RemoteTaskRunner implements WorkerTaskRunner,
TaskLogStreamer
@Override
public Collection<RemoteTaskRunnerWorkItem> getKnownTasks()
{
- // Racey, since there is a period of time during assignment when a task is
neither pending nor running
- return ImmutableList.copyOf(Iterables.concat(pendingTasks.values(),
runningTasks.values(), completeTasks.values()));
+ // Use a map to dedupe tasks, since they may transition from one state to
another while this method is iterating
+ // through the various collections.
+ final Map<String, RemoteTaskRunnerWorkItem> items = new LinkedHashMap<>();
+
+ // Racey, since there is a period of time during assignment when a task is
neither pending nor running.
+ for (RemoteTaskRunnerWorkItem item : pendingTasks.values()) {
+ items.put(item.getTaskId(), item);
+ }
+
+ for (RemoteTaskRunnerWorkItem item : runningTasks.values()) {
+ items.put(item.getTaskId(), item);
+ }
+
+ for (RemoteTaskRunnerWorkItem item : completeTasks.values()) {
+ items.put(item.getTaskId(), item);
+ }
+
+ return ImmutableList.copyOf(items.values());
}
@Nullable
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]