Repository: aurora Updated Branches: refs/heads/master 998993dd8 -> 6db13baf2
Implementing task reconciler. Bugs closed: AURORA-1047 Reviewed at https://reviews.apache.org/r/34440/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/6db13baf Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/6db13baf Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/6db13baf Branch: refs/heads/master Commit: 6db13baf284182a8d60d154a4146fad388f8ce5b Parents: 998993d Author: Maxim Khutornenko <[email protected]> Authored: Fri May 22 14:25:19 2015 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Fri May 22 14:25:19 2015 -0700 ---------------------------------------------------------------------- .../aurora/benchmark/fakes/FakeDriver.java | 7 + .../aurora/scheduler/async/AsyncModule.java | 42 ++++++ .../aurora/scheduler/async/TaskReconciler.java | 145 +++++++++++++++++++ .../apache/aurora/scheduler/mesos/Driver.java | 9 ++ .../scheduler/mesos/SchedulerDriverService.java | 19 ++- .../async/JobUpdateHistoryPrunerTest.java | 2 +- .../scheduler/async/TaskReconcilerTest.java | 110 ++++++++++++++ .../testing/FakeScheduledExecutor.java | 14 +- 8 files changed, 341 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/6db13baf/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java index 316ab1c..d1bb8f2 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java +++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java @@ -13,6 +13,8 @@ */ package org.apache.aurora.benchmark.fakes; +import java.util.Collection; + import com.google.common.util.concurrent.AbstractIdleService; import org.apache.aurora.scheduler.mesos.Driver; @@ -58,4 +60,9 @@ public class FakeDriver extends AbstractIdleService implements Driver { protected void shutDown() throws Exception { // no-op } + + @Override + public void reconcileTasks(Collection<Protos.TaskStatus> statuses) { + // no-op + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/6db13baf/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java index e9d47fd..5f24668 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java +++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java @@ -51,6 +51,7 @@ import org.apache.aurora.scheduler.async.OfferManager.OfferReturnDelay; import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl; import org.apache.aurora.scheduler.async.TaskGroups.TaskGroupsSettings; import org.apache.aurora.scheduler.async.TaskHistoryPruner.HistoryPrunnerSettings; +import org.apache.aurora.scheduler.async.TaskReconciler.TaskReconcilerSettings; import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl; import org.apache.aurora.scheduler.async.preemptor.BiCache; import org.apache.aurora.scheduler.async.preemptor.BiCache.BiCacheSettings; @@ -181,6 +182,32 @@ public class AsyncModule extends AbstractModule { @CmdLine(name = "gc_executor_path", help = "Path to the gc executor launch script.") private static final Arg<String> GC_EXECUTOR_PATH = Arg.create(null); + // TODO(maxim): Disabled by default until AURORA-715 is complete. + @CmdLine(name = "reconciliation_initial_delay", + help = "Initial amount of time to delay task reconciliation after scheduler start up.") + private static final Arg<Amount<Long, Time>> RECONCILIATION_INITIAL_DELAY = + Arg.create(Amount.of(Long.MAX_VALUE, Time.MINUTES)); + + @Positive + @CmdLine(name = "reconciliation_explicit_interval", + help = "Interval on which scheduler will ask Mesos for status updates of all non-terminal " + + "tasks known to scheduler.") + private static final Arg<Amount<Long, Time>> RECONCILIATION_EXPLICIT_INTERVAL = + Arg.create(Amount.of(60L, Time.MINUTES)); + + @Positive + @CmdLine(name = "reconciliation_implicit_interval", + help = "Interval on which scheduler will ask Mesos for status updates of all non-terminal " + + "tasks known to Mesos.") + private static final Arg<Amount<Long, Time>> RECONCILIATION_IMPLICIT_INTERVAL = + Arg.create(Amount.of(60L, Time.MINUTES)); + + @CmdLine(name = "reconciliation_schedule_spread", + help = "Difference between explicit and implicit reconciliation intervals intended to " + + "create a non-overlapping task reconciliation schedule.") + private static final Arg<Amount<Long, Time>> RECONCILIATION_SCHEDULE_SPREAD = + Arg.create(Amount.of(30L, Time.MINUTES)); + @Qualifier @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) private @interface AsyncExecutor { } @@ -312,6 +339,21 @@ public class AsyncModule extends AbstractModule { install(new PrivateModule() { @Override protected void configure() { + bind(TaskReconcilerSettings.class).toInstance(new TaskReconcilerSettings( + RECONCILIATION_INITIAL_DELAY.get(), + RECONCILIATION_EXPLICIT_INTERVAL.get(), + RECONCILIATION_IMPLICIT_INTERVAL.get(), + RECONCILIATION_SCHEDULE_SPREAD.get())); + bind(ScheduledExecutorService.class).toInstance(executor); + bind(TaskReconciler.class).in(Singleton.class); + expose(TaskReconciler.class); + } + }); + SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskReconciler.class); + + install(new PrivateModule() { + @Override + protected void configure() { bind(JobUpdateHistoryPruner.HistoryPrunerSettings.class).toInstance( new JobUpdateHistoryPruner.HistoryPrunerSettings( JOB_UPDATE_HISTORY_PRUNING_INTERVAL.get(), http://git-wip-us.apache.org/repos/asf/aurora/blob/6db13baf/src/main/java/org/apache/aurora/scheduler/async/TaskReconciler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskReconciler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskReconciler.java new file mode 100644 index 0000000..23f5f64 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/async/TaskReconciler.java @@ -0,0 +1,145 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.async; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +import javax.inject.Inject; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.AbstractIdleService; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.StatsProvider; + +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.mesos.Protos; + +import static java.util.Objects.requireNonNull; + +import static com.twitter.common.quantity.Time.MINUTES; + +/** + * A task reconciler that periodically triggers Mesos (implicit) and Aurora (explicit) task + * reconciliation to synchronize global task states. More on task reconciliation: + * http://mesos.apache.org/documentation/latest/reconciliation. + */ +public class TaskReconciler extends AbstractIdleService { + + @VisibleForTesting + static final String EXPLICIT_STAT_NAME = "reconciliation_explicit_runs"; + + @VisibleForTesting + static final String IMPLICIT_STAT_NAME = "reconciliation_implicit_runs"; + + private final TaskReconcilerSettings settings; + private final Storage storage; + private final Driver driver; + private final ScheduledExecutorService executor; + private final AtomicLong explicitRuns; + private final AtomicLong implicitRuns; + + static class TaskReconcilerSettings { + private final Amount<Long, Time> initialDelay; + private final Amount<Long, Time> explicitInterval; + private final Amount<Long, Time> implicitInterval; + private final Amount<Long, Time> scheduleSpread; + + @VisibleForTesting + TaskReconcilerSettings( + Amount<Long, Time> initialDelay, + Amount<Long, Time> explicitInterval, + Amount<Long, Time> implicitInterval, + Amount<Long, Time> scheduleSpread) { + + this.initialDelay = requireNonNull(initialDelay); + this.explicitInterval = requireNonNull(explicitInterval); + this.implicitInterval = requireNonNull(implicitInterval); + this.scheduleSpread = requireNonNull(scheduleSpread); + } + } + + @Inject + TaskReconciler( + TaskReconcilerSettings settings, + Storage storage, + Driver driver, + ScheduledExecutorService executor, + StatsProvider stats) { + + this.settings = requireNonNull(settings); + this.storage = requireNonNull(storage); + this.driver = requireNonNull(driver); + this.executor = requireNonNull(executor); + this.explicitRuns = stats.makeCounter(EXPLICIT_STAT_NAME); + this.implicitRuns = stats.makeCounter(IMPLICIT_STAT_NAME); + } + + @Override + protected void startUp() { + // Schedule explicit reconciliation. + executor.scheduleAtFixedRate( + new Runnable() { + @Override + public void run() { + ImmutableSet<Protos.TaskStatus> active = FluentIterable + .from(Storage.Util.fetchTasks(storage, Query.unscoped().active())) + .transform(TASK_TO_PROTO) + .toSet(); + + driver.reconcileTasks(active); + explicitRuns.incrementAndGet(); + } + }, + settings.initialDelay.as(MINUTES), + settings.explicitInterval.as(MINUTES), + MINUTES.getTimeUnit()); + + // Schedule implicit reconciliation. + executor.scheduleAtFixedRate( + new Runnable() { + @Override + public void run() { + driver.reconcileTasks(ImmutableSet.of()); + implicitRuns.incrementAndGet(); + } + }, + settings.initialDelay.as(MINUTES) + settings.scheduleSpread.as(MINUTES), + settings.implicitInterval.as(MINUTES), + MINUTES.getTimeUnit()); + } + + @Override + protected void shutDown() { + // Nothing to do - await VM shutdown. + } + + @VisibleForTesting + static final Function<IScheduledTask, Protos.TaskStatus> TASK_TO_PROTO = + t -> Protos.TaskStatus.newBuilder() + // TODO(maxim): State is required by protobuf but ignored by Mesos for reconciliation + // purposes. This is the artifact of the native API. The new HTTP Mesos API will be + // accepting task IDs instead. AURORA-1326 tracks solution on the scheduler side. + // Setting TASK_RUNNING as a safe dummy value here. + .setState(Protos.TaskState.TASK_RUNNING) + .setTaskId(Protos.TaskID.newBuilder().setValue(t.getAssignedTask().getTaskId()).build()) + .build(); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/6db13baf/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java b/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java index 975ea02..013c50c 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java @@ -13,6 +13,8 @@ */ package org.apache.aurora.scheduler.mesos; +import java.util.Collection; + import com.google.common.util.concurrent.Service; import org.apache.mesos.Protos.OfferID; @@ -67,4 +69,11 @@ public interface Driver extends Service { * Aborts the driver. */ void abort(); + + /** + * Requests task reconciliation. + * + * @param statuses Task statuses to reconcile. + */ + void reconcileTasks(Collection<TaskStatus> statuses); } http://git-wip-us.apache.org/repos/asf/aurora/blob/6db13baf/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java index 35cada6..5567fe0 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java @@ -13,6 +13,7 @@ */ package org.apache.aurora.scheduler.mesos; +import java.util.Collection; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; @@ -120,20 +121,20 @@ class SchedulerDriverService extends AbstractIdleService implements Driver { @Override public void launchTask(Protos.OfferID offerId, Protos.TaskInfo task) { - checkState(isRunning(), "Driver is not running."); + ensureRunning(); Futures.getUnchecked(driverFuture) .launchTasks(ImmutableList.of(offerId), ImmutableList.of(task)); } @Override public void declineOffer(Protos.OfferID offerId) { - checkState(isRunning(), "Driver is not running."); + ensureRunning(); Futures.getUnchecked(driverFuture).declineOffer(offerId); } @Override public void killTask(String taskId) { - checkState(isRunning(), "Driver is not running."); + ensureRunning(); Protos.Status status = Futures.getUnchecked(driverFuture).killTask( Protos.TaskID.newBuilder().setValue(taskId).build()); @@ -146,7 +147,17 @@ class SchedulerDriverService extends AbstractIdleService implements Driver { @Override public void acknowledgeStatusUpdate(Protos.TaskStatus status) { - checkState(isRunning(), "Driver is not running."); + ensureRunning(); Futures.getUnchecked(driverFuture).acknowledgeStatusUpdate(status); } + + @Override + public void reconcileTasks(Collection<Protos.TaskStatus> statuses) { + ensureRunning(); + Futures.getUnchecked(driverFuture).reconcileTasks(statuses); + } + + private void ensureRunning() { + checkState(isRunning(), "Driver is not running."); + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/6db13baf/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java index 02e8798..f73b2c6 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java @@ -64,7 +64,7 @@ public class JobUpdateHistoryPrunerTest extends EasyMockTest { 1)); pruner.startAsync().awaitRunning(); - executorClock.advance(Amount.of(2L, Time.MILLISECONDS)); + executorClock.advance(Amount.of(1L, Time.MILLISECONDS)); executorClock.advance(Amount.of(1L, Time.MILLISECONDS)); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/6db13baf/src/test/java/org/apache/aurora/scheduler/async/TaskReconcilerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskReconcilerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskReconcilerTest.java new file mode 100644 index 0000000..f56ffd2 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/async/TaskReconcilerTest.java @@ -0,0 +1,110 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.async; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.collect.ImmutableSet; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.StatsProvider; +import com.twitter.common.testing.easymock.EasyMockTest; + +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.scheduler.async.TaskReconciler.EXPLICIT_STAT_NAME; +import static org.apache.aurora.scheduler.async.TaskReconciler.IMPLICIT_STAT_NAME; +import static org.apache.aurora.scheduler.async.TaskReconciler.TASK_TO_PROTO; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; + +public class TaskReconcilerTest extends EasyMockTest { + private StorageTestUtil storageUtil; + private StatsProvider statsProvider; + private Driver driver; + private ScheduledExecutorService executorService; + private FakeScheduledExecutor clock; + + @Before + public void setUp() { + storageUtil = new StorageTestUtil(this); + statsProvider = createMock(StatsProvider.class); + driver = createMock(Driver.class); + executorService = createMock(ScheduledExecutorService.class); + clock = FakeScheduledExecutor.scheduleAtFixedRateExecutor(executorService, 2, 5); + } + + @Test + public void testExecution() { + AtomicLong explicitRuns = new AtomicLong(); + AtomicLong implicitRuns = new AtomicLong(); + expect(statsProvider.makeCounter(EXPLICIT_STAT_NAME)).andReturn(explicitRuns); + expect(statsProvider.makeCounter(IMPLICIT_STAT_NAME)).andReturn(implicitRuns); + + IScheduledTask task = TaskTestUtil.makeTask("id1", TaskTestUtil.JOB); + storageUtil.expectOperations(); + storageUtil.expectTaskFetch(Query.unscoped().active(), task).times(5); + + driver.reconcileTasks(ImmutableSet.of(TASK_TO_PROTO.apply(task))); + expectLastCall().times(5); + + driver.reconcileTasks(ImmutableSet.of()); + expectLastCall().times(2); + + control.replay(); + + Amount<Long, Time> initialDelay = Amount.of(10L, Time.MINUTES); + Amount<Long, Time> explicitSchedule = Amount.of(60L, Time.MINUTES); + Amount<Long, Time> implicitSchedule = Amount.of(180L, Time.MINUTES); + Amount<Long, Time> spread = Amount.of(30L, Time.MINUTES); + + TaskReconciler reconciler = new TaskReconciler( + new TaskReconciler.TaskReconcilerSettings( + initialDelay, + explicitSchedule, + implicitSchedule, + spread), + storageUtil.storage, + driver, + executorService, + statsProvider); + + reconciler.startAsync().awaitRunning(); + + clock.advance(initialDelay); + assertEquals(1L, explicitRuns.get()); + assertEquals(0L, implicitRuns.get()); + + clock.advance(spread); + assertEquals(1L, explicitRuns.get()); + assertEquals(1L, implicitRuns.get()); + + clock.advance(explicitSchedule); + assertEquals(2L, explicitRuns.get()); + assertEquals(1L, implicitRuns.get()); + + clock.advance(implicitSchedule); + assertEquals(5L, explicitRuns.get()); + assertEquals(2L, implicitRuns.get()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/6db13baf/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java index 2beea4f..916483b 100644 --- a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java +++ b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java @@ -91,13 +91,23 @@ public final class FakeScheduledExecutor extends FakeClock { ScheduledExecutorService mock, int maxInvocations) { + return scheduleAtFixedRateExecutor(mock, 1, maxInvocations); + } + + public static FakeScheduledExecutor scheduleAtFixedRateExecutor( + ScheduledExecutorService mock, + int maxSchedules, + int maxInvocations) { + FakeScheduledExecutor executor = new FakeScheduledExecutor(); mock.scheduleAtFixedRate( EasyMock.<Runnable>anyObject(), EasyMock.anyLong(), EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject()); - expectLastCall().andAnswer(answerScheduleAtFixedRate(executor, maxInvocations)).once(); + expectLastCall() + .andAnswer(answerScheduleAtFixedRate(executor, maxInvocations)) + .times(maxSchedules); return executor; } @@ -114,7 +124,7 @@ public final class FakeScheduledExecutor extends FakeClock { long initialDelay = (Long) args[1]; long period = (Long) args[2]; TimeUnit unit = (TimeUnit) args[3]; - for (int i = 1; i <= workCount; i++) { + for (int i = 0; i <= workCount; i++) { addDelayedWork(executor, toMillis(initialDelay, unit) + i * toMillis(period, unit), work); } return null;
