Repository: aurora Updated Branches: refs/heads/master 4ab4b2b2c -> 98eb99aaa
Move task conversion during reconciliation into the delayed closure. This is a small change to relieve GC pressure while explicit reconciliation runs. It moves the IScheduledTask -> TaskStatus conversion into the batch processing closure so that any object allocation and collection overhead is delayed until the batch is actually processed. It has a noticable effect on GC for large amounts of RUNNING tasks. Reviewed at https://reviews.apache.org/r/56797/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/98eb99aa Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/98eb99aa Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/98eb99aa Branch: refs/heads/master Commit: 98eb99aaa72dfede9ec25b2310b608891bc6f7c0 Parents: 4ab4b2b Author: David McLaughlin <[email protected]> Authored: Wed Feb 22 08:41:01 2017 -0800 Committer: David McLaughlin <[email protected]> Committed: Wed Feb 22 08:41:01 2017 -0800 ---------------------------------------------------------------------- .../reconciliation/TaskReconciler.java | 24 +++++++++----------- 1 file changed, 11 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/98eb99aa/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java index ec7ccaf..23ac714 100644 --- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java +++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java @@ -16,16 +16,15 @@ package org.apache.aurora.scheduler.reconciliation; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.AbstractIdleService; import org.apache.aurora.common.quantity.Amount; @@ -156,17 +155,16 @@ public class TaskReconciler extends AbstractIdleService { } private void doExplicitReconcile(int batchSize) { - ImmutableList<TaskStatus> active = FluentIterable - .from(Storage.Util.fetchTasks( - storage, - Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES))) - .transform(TASK_TO_PROTO) - .toList(); - - List<List<TaskStatus>> batches = Lists.partition(active, batchSize); + Iterable<List<IScheduledTask>> activeBatches = Iterables.partition( + Storage.Util.fetchTasks(storage, Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES)), + batchSize); + long delay = 0; - for (List<TaskStatus> batch : batches) { - executor.schedule(() -> driver.reconcileTasks(batch), delay, SECONDS.getTimeUnit()); + for (List<IScheduledTask> batch : activeBatches) { + executor.schedule(() -> driver.reconcileTasks( + batch.stream().map(TASK_TO_PROTO::apply).collect(Collectors.toList())), + delay, + SECONDS.getTimeUnit()); delay += settings.explicitBatchDelaySeconds; } explicitRuns.incrementAndGet();
