Repository: aurora Updated Branches: refs/heads/master 8c900e585 -> 3cbff4117
Batching explicit task reconciliation calls Reviewed at https://reviews.apache.org/r/47373/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/3cbff411 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/3cbff411 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/3cbff411 Branch: refs/heads/master Commit: 3cbff4117ff7b95e6110a38881aada7708137573 Parents: 8c900e5 Author: Maxim Khutornenko <[email protected]> Authored: Mon May 16 12:22:59 2016 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Mon May 16 12:22:59 2016 -0700 ---------------------------------------------------------------------- .../reconciliation/ReconciliationModule.java | 15 +++++- .../reconciliation/TaskReconciler.java | 53 +++++++++++++------- .../reconciliation/TaskReconcilerTest.java | 40 +++++++++++---- .../testing/FakeScheduledExecutor.java | 5 ++ 4 files changed, 86 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/3cbff411/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java index cccee08..e076e80 100644 --- a/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java +++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java @@ -88,6 +88,17 @@ public class ReconciliationModule extends AbstractModule { private static final Arg<Amount<Long, Time>> RECONCILIATION_SCHEDULE_SPREAD = Arg.create(Amount.of(30L, Time.MINUTES)); + @Positive + @CmdLine(name = "reconciliation_explicit_batch_size", + help = "Number of tasks in a single batch request sent to Mesos for explicit reconciliation.") + private static final Arg<Integer> RECONCILIATION_BATCH_SIZE = Arg.create(1000); + + @Positive + @CmdLine(name = "reconciliation_explicit_batch_interval", + help = "Interval between explicit batch reconciliation requests.") + private static final Arg<Amount<Long, Time>> RECONCILIATION_BATCH_INTERVAL = + Arg.create(Amount.of(5L, Time.SECONDS)); + @Qualifier @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) @interface BackgroundWorker { } @@ -127,7 +138,9 @@ public class ReconciliationModule extends AbstractModule { RECONCILIATION_INITIAL_DELAY.get(), RECONCILIATION_EXPLICIT_INTERVAL.get(), RECONCILIATION_IMPLICIT_INTERVAL.get(), - RECONCILIATION_SCHEDULE_SPREAD.get())); + RECONCILIATION_SCHEDULE_SPREAD.get(), + RECONCILIATION_BATCH_INTERVAL.get(), + RECONCILIATION_BATCH_SIZE.get())); bind(ScheduledExecutorService.class).annotatedWith(BackgroundWorker.class) .toInstance(AsyncUtil.loggingScheduledExecutor(1, "TaskReconciler-%d", LOG)); bind(TaskReconciler.class).in(Singleton.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/3cbff411/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java index 57d2061..3275d72 100644 --- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java +++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java @@ -13,6 +13,7 @@ */ package org.apache.aurora.scheduler.reconciliation; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicLong; @@ -21,7 +22,9 @@ import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.AbstractIdleService; import org.apache.aurora.common.quantity.Amount; @@ -34,12 +37,14 @@ import org.apache.aurora.scheduler.reconciliation.ReconciliationModule.Backgroun import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.mesos.Protos; +import org.apache.mesos.Protos.TaskStatus; import static java.util.Objects.requireNonNull; import static com.google.common.base.Preconditions.checkArgument; import static org.apache.aurora.common.quantity.Time.MINUTES; +import static org.apache.aurora.common.quantity.Time.SECONDS; /** * A task reconciler that periodically triggers Mesos (implicit) and Aurora (explicit) task @@ -66,24 +71,35 @@ public class TaskReconciler extends AbstractIdleService { private final Amount<Long, Time> implicitInterval; private final long explicitDelayMinutes; private final long implicitDelayMinutes; + private final long explicitBatchDelaySeconds; + private final int explicitBatchSize; @VisibleForTesting TaskReconcilerSettings( Amount<Long, Time> initialDelay, Amount<Long, Time> explicitInterval, Amount<Long, Time> implicitInterval, - Amount<Long, Time> scheduleSpread) { + Amount<Long, Time> scheduleSpread, + Amount<Long, Time> explicitBatchInterval, + int explicitBatchSize) { this.explicitInterval = requireNonNull(explicitInterval); this.implicitInterval = requireNonNull(implicitInterval); explicitDelayMinutes = requireNonNull(initialDelay).as(MINUTES); implicitDelayMinutes = initialDelay.as(MINUTES) + scheduleSpread.as(MINUTES); + explicitBatchDelaySeconds = explicitBatchInterval.as(SECONDS); + this.explicitBatchSize = explicitBatchSize; + checkArgument( explicitDelayMinutes >= 0, "Invalid explicit reconciliation delay: " + explicitDelayMinutes); checkArgument( implicitDelayMinutes >= 0L, "Invalid implicit reconciliation delay: " + implicitDelayMinutes); + checkArgument( + explicitBatchDelaySeconds >= 0L, + "Invalid explicit batch reconciliation delay: " + explicitBatchDelaySeconds + ); } } @@ -108,20 +124,24 @@ public class TaskReconciler extends AbstractIdleService { // Schedule explicit reconciliation. executor.scheduleAtFixedRate( () -> { - ImmutableSet<Protos.TaskStatus> active = FluentIterable + ImmutableList<TaskStatus> active = FluentIterable .from(Storage.Util.fetchTasks( storage, Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES))) .transform(TASK_TO_PROTO) - .toSet(); - - driver.reconcileTasks(active); + .toList(); + + List<List<TaskStatus>> batches = Lists.partition(active, settings.explicitBatchSize); + long delay = 0; + for (List<TaskStatus> batch : batches) { + executor.schedule(() -> driver.reconcileTasks(batch), delay, SECONDS.getTimeUnit()); + delay += settings.explicitBatchDelaySeconds; + } explicitRuns.incrementAndGet(); }, settings.explicitDelayMinutes, settings.explicitInterval.as(MINUTES), MINUTES.getTimeUnit()); - // Schedule implicit reconciliation. executor.scheduleAtFixedRate( () -> { @@ -139,15 +159,14 @@ public class TaskReconciler extends AbstractIdleService { } @VisibleForTesting - static final Function<IScheduledTask, Protos.TaskStatus> TASK_TO_PROTO = - t -> Protos.TaskStatus.newBuilder() - // TODO(maxim): State is required by protobuf but ignored by Mesos for reconciliation - // purposes. This is the artifact of the native API. The new HTTP Mesos API will be - // accepting task IDs instead. AURORA-1326 tracks solution on the scheduler side. - // Setting TASK_RUNNING as a safe dummy value here. - .setState(Protos.TaskState.TASK_RUNNING) - .setSlaveId( - Protos.SlaveID.newBuilder().setValue(t.getAssignedTask().getSlaveId()).build()) - .setTaskId(Protos.TaskID.newBuilder().setValue(t.getAssignedTask().getTaskId()).build()) - .build(); + static final Function<IScheduledTask, TaskStatus> TASK_TO_PROTO = t -> TaskStatus.newBuilder() + // TODO(maxim): State is required by protobuf but ignored by Mesos for reconciliation + // purposes. This is the artifact of the native API. The new HTTP Mesos API will be + // accepting task IDs instead. AURORA-1326 tracks solution on the scheduler side. + // Setting TASK_RUNNING as a safe dummy value here. + .setState(Protos.TaskState.TASK_RUNNING) + .setSlaveId( + Protos.SlaveID.newBuilder().setValue(t.getAssignedTask().getSlaveId()).build()) + .setTaskId(Protos.TaskID.newBuilder().setValue(t.getAssignedTask().getTaskId()).build()) + .build(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/3cbff411/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java index 5b4b3ac..b9317dc 100644 --- a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java @@ -13,12 +13,13 @@ */ package org.apache.aurora.scheduler.reconciliation; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicLong; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; @@ -36,10 +37,13 @@ import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; +import org.apache.mesos.Protos; +import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; import static org.apache.aurora.common.quantity.Time.MINUTES; +import static org.apache.aurora.common.quantity.Time.SECONDS; import static org.apache.aurora.scheduler.reconciliation.TaskReconciler.EXPLICIT_STAT_NAME; import static org.apache.aurora.scheduler.reconciliation.TaskReconciler.IMPLICIT_STAT_NAME; import static org.apache.aurora.scheduler.reconciliation.TaskReconciler.TASK_TO_PROTO; @@ -53,11 +57,15 @@ public class TaskReconcilerTest extends EasyMockTest { private static final Amount<Long, Time> EXPLICIT_SCHEDULE = Amount.of(60L, MINUTES); private static final Amount<Long, Time> IMPLICT_SCHEDULE = Amount.of(180L, MINUTES); private static final Amount<Long, Time> SPREAD = Amount.of(30L, MINUTES); + private static final Amount<Long, Time> BATCH_DELAY = Amount.of(3L, SECONDS); + private static final int BATCH_SIZE = 1; private static final TaskReconcilerSettings SETTINGS = new TaskReconcilerSettings( INITIAL_DELAY, EXPLICIT_SCHEDULE, IMPLICT_SCHEDULE, - SPREAD); + SPREAD, + BATCH_DELAY, + BATCH_SIZE); private StorageTestUtil storageUtil; private StatsProvider statsProvider; @@ -83,15 +91,25 @@ public class TaskReconcilerTest extends EasyMockTest { FakeScheduledExecutor clock = FakeScheduledExecutor.scheduleAtFixedRateExecutor(executorService, 2, 5); - IScheduledTask task = makeTask("id1", TaskTestUtil.makeConfig(TaskTestUtil.JOB)); + IScheduledTask task1 = makeTask("id1", TaskTestUtil.makeConfig(TaskTestUtil.JOB)); + IScheduledTask task2 = makeTask("id2", TaskTestUtil.makeConfig(TaskTestUtil.JOB)); storageUtil.expectOperations(); - storageUtil.expectTaskFetch(Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES), task) - .times(5); + storageUtil.expectTaskFetch( + Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES), + task1, + task2).times(5); - driver.reconcileTasks(ImmutableSet.of(TASK_TO_PROTO.apply(task))); + List<List<Protos.TaskStatus>> batches = Lists.partition(ImmutableList.of( + TASK_TO_PROTO.apply(task1), + TASK_TO_PROTO.apply(task2)), BATCH_SIZE); + + driver.reconcileTasks(batches.get(0)); + expectLastCall().times(5); + + driver.reconcileTasks(batches.get(1)); expectLastCall().times(5); - driver.reconcileTasks(ImmutableSet.of()); + driver.reconcileTasks(EasyMock.anyObject()); expectLastCall().times(2); control.replay(); @@ -130,7 +148,9 @@ public class TaskReconcilerTest extends EasyMockTest { INITIAL_DELAY, EXPLICIT_SCHEDULE, IMPLICT_SCHEDULE, - Amount.of(Long.MAX_VALUE, MINUTES)); + Amount.of(Long.MAX_VALUE, MINUTES), + BATCH_DELAY, + BATCH_SIZE); } @Test(expected = IllegalArgumentException.class) @@ -141,7 +161,9 @@ public class TaskReconcilerTest extends EasyMockTest { Amount.of(Long.MAX_VALUE, MINUTES), EXPLICIT_SCHEDULE, IMPLICT_SCHEDULE, - SPREAD); + SPREAD, + BATCH_DELAY, + BATCH_SIZE); } private static IScheduledTask makeTask(String id, ITaskConfig config) { http://git-wip-us.apache.org/repos/asf/aurora/blob/3cbff411/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java index 9082a31..ce6e5a4 100644 --- a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java +++ b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java @@ -120,6 +120,11 @@ public final class FakeScheduledExecutor extends FakeClock { int maxInvocations) { FakeScheduledExecutor executor = new FakeScheduledExecutor(); + mock.schedule(EasyMock.<Runnable>anyObject(), EasyMock.anyLong(), EasyMock.anyObject()); + expectLastCall().andAnswer(() -> { + ((Runnable) EasyMock.getCurrentArguments()[0]).run(); + return null; + }).anyTimes(); mock.scheduleAtFixedRate( EasyMock.anyObject(), EasyMock.anyLong(),
