Repository: aurora Updated Branches: refs/heads/master e76862a39 -> accd46aef
Improve task history pruning by batch deleting tasks. Bugs closed: AURORA-1929 Reviewed at https://reviews.apache.org/r/59699/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/accd46ae Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/accd46ae Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/accd46ae Branch: refs/heads/master Commit: accd46aef4706819d45676b061835f92c5bbb2f2 Parents: e76862a Author: Kai Huang <[email protected]> Authored: Fri Jun 2 13:32:23 2017 -0700 Committer: David McLaughlin <[email protected]> Committed: Fri Jun 2 13:32:23 2017 -0700 ---------------------------------------------------------------------- .../scheduler/state/StateManagerImpl.java | 35 ++++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/accd46ae/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java index 7387821..6b780ec 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java @@ -16,9 +16,9 @@ package org.apache.aurora.scheduler.state; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; import javax.inject.Inject; @@ -31,7 +31,6 @@ import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; @@ -347,7 +346,10 @@ public class StateManagerImpl implements StateManager { "Operation expected task %s to be present.", taskId); - events.add(deleteTasks(taskStore, ImmutableSet.of(taskId))); + PubsubEvent.TasksDeleted event = createDeleteEvent(taskStore, ImmutableSet.of(taskId)); + taskStore.deleteTasks( + event.getTasks().stream().map(Tasks::id).collect(Collectors.toSet())); + events.add(event); break; default: @@ -369,23 +371,20 @@ public class StateManagerImpl implements StateManager { @Override public void deleteTasks(MutableStoreProvider storeProvider, final Set<String> taskIds) { - Map<String, IScheduledTask> tasks = Maps.uniqueIndex( - storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskIds)), - Tasks::id); + TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore(); + // Create a single event for all task deletions. + PubsubEvent.TasksDeleted event = createDeleteEvent(taskStore, taskIds); - for (Map.Entry<String, IScheduledTask> entry : tasks.entrySet()) { - updateTaskAndExternalState( - storeProvider.getUnsafeTaskStore(), - entry.getKey(), - Optional.of(entry.getValue()), - Optional.absent(), - Optional.absent()); - } + taskStore.deleteTasks(event.getTasks().stream().map(Tasks::id).collect(Collectors.toSet())); + + eventSink.post(event); } - private static PubsubEvent deleteTasks(TaskStore.Mutable taskStore, Set<String> taskIds) { - Iterable<IScheduledTask> tasks = taskStore.fetchTasks(Query.taskScoped(taskIds)); - taskStore.deleteTasks(taskIds); - return new PubsubEvent.TasksDeleted(ImmutableSet.copyOf(tasks)); + private static PubsubEvent.TasksDeleted createDeleteEvent( + TaskStore.Mutable taskStore, + Set<String> taskIds) { + + return new PubsubEvent.TasksDeleted( + ImmutableSet.copyOf(taskStore.fetchTasks(Query.taskScoped(taskIds)))); } }
