Repository: aurora Updated Branches: refs/heads/master f1e09a9c7 -> 496397aa5
Batching writes - Part 1 (of 3): Introducing BatchWorker and task event batching. Reviewed at https://reviews.apache.org/r/51759/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/ebfeb3e6 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/ebfeb3e6 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/ebfeb3e6 Branch: refs/heads/master Commit: ebfeb3e602faa9281ff7ff50f42bd21885518953 Parents: f1e09a9 Author: Maxim Khutornenko <ma...@apache.org> Authored: Fri Sep 16 14:17:04 2016 -0700 Committer: Maxim Khutornenko <ma...@apache.org> Committed: Fri Sep 16 14:17:04 2016 -0700 ---------------------------------------------------------------------- .../apache/aurora/scheduler/BatchWorker.java | 254 +++++++++++++++++++ .../aurora/scheduler/SchedulerModule.java | 25 ++ .../scheduler/pruning/TaskHistoryPruner.java | 14 +- .../scheduler/scheduling/TaskThrottler.java | 28 +- .../scheduler/state/MaintenanceController.java | 14 +- .../updater/JobUpdateControllerImpl.java | 10 +- .../aurora/scheduler/BatchWorkerTest.java | 96 +++++++ .../pruning/TaskHistoryPrunerTest.java | 10 +- .../scheduler/scheduling/TaskThrottlerTest.java | 9 +- .../state/MaintenanceControllerImplTest.java | 5 + .../scheduler/testing/BatchWorkerUtil.java | 59 +++++ .../aurora/scheduler/updater/JobUpdaterIT.java | 7 +- 12 files changed, 505 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/BatchWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/BatchWorker.java b/src/main/java/org/apache/aurora/scheduler/BatchWorker.java new file mode 100644 index 0000000..e05d4b4 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/BatchWorker.java @@ -0,0 +1,254 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler; + +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.inject.Inject; + +import com.google.common.util.concurrent.AbstractExecutionThreadService; + +import org.apache.aurora.common.stats.SlidingStats; +import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.common.util.BackoffStrategy; +import org.apache.aurora.scheduler.base.AsyncUtil; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + +import static com.google.common.base.Preconditions.checkState; + +/** + * Generic helper that allows bundling multiple work items into a single {@link Storage} + * transaction aiming to reduce the write lock contention. + * + * @param <T> Expected result type. + */ +public class BatchWorker<T> extends AbstractExecutionThreadService { + /** + * Empty result placeholder. + */ + public interface NoResult { } + + /** + * Convenience wrapper for a non-repeatable no value work {@link Result}. + */ + public static final NoResult NO_RESULT = new NoResult() { }; + + private static final Logger LOG = LoggerFactory.getLogger(BatchWorker.class); + private final Storage storage; + private final int maxBatchSize; + private final SlidingStats batchUnlocked; + private final SlidingStats batchLocked; + private final BlockingQueue<WorkItem<T>> workQueue = new LinkedBlockingQueue<>(); + private final ScheduledExecutorService scheduledExecutor; + private final AtomicInteger lastBatchSize = new AtomicInteger(0); + private final AtomicLong itemsProcessed; + private final AtomicLong batchesProcessed; + + /** + * Wraps result returned by the {@link RepeatableWork} item. + * + * @param <T> Expected result type. + */ + public static class Result<T> { + private final boolean isCompleted; + private final T value; + + /** + * Initializes a {@link Result} instance with {@code isCompleted} and {@code value}. + * <p> + * The {@code isCompleted} may be set to {@code False} for a {@link RepeatableWork} that has + * not finished yet. Otherwise, it must be set to {@code True}. + * + * @param isCompleted Flag indicating if the {@link RepeatableWork} has completed. + * @param value result value. + */ + public Result(boolean isCompleted, T value) { + this.isCompleted = isCompleted; + this.value = value; + } + } + + /** + * Encapsulates a potentially repeatable operation. + */ + public interface RepeatableWork<T> { + /** + * Abstracts a unit of repeatable (i.e.: "repeat until completed") work. + * <p> + * The work unit may be repeated as instructed by the {@link Result}. + * + * @param storeProvider {@link MutableStoreProvider} instance. + * @return {@link Result} + */ + Result<T> apply(MutableStoreProvider storeProvider); + } + + /** + * Encapsulates a non-repeatable operation. + */ + public interface Work<T> extends RepeatableWork<T> { + @Override + default Result<T> apply(MutableStoreProvider storeProvider) { + T value = execute(storeProvider); + return new Result<>(true, value); + } + + /** + * Abstracts a unit of non-repeatable (i.e.: "run exactly once") work. + * + * @param storeProvider {@link MutableStoreProvider} instance. + * @return result value. + */ + T execute(MutableStoreProvider storeProvider); + } + + @Inject + protected BatchWorker(Storage storage, StatsProvider statsProvider, int maxBatchSize) { + this.storage = requireNonNull(storage); + this.maxBatchSize = maxBatchSize; + + scheduledExecutor = AsyncUtil.singleThreadLoggingScheduledExecutor(serviceName() + "-%d", LOG); + statsProvider.makeGauge(serviceName() + "_queue_size", () -> workQueue.size()); + statsProvider.makeGauge( + serviceName() + "_last_processed_batch_size", + () -> lastBatchSize.intValue()); + batchUnlocked = new SlidingStats(serviceName() + "_batch_unlocked", "nanos"); + batchLocked = new SlidingStats(serviceName() + "_batch_locked", "nanos"); + itemsProcessed = statsProvider.makeCounter(serviceName() + "_items_processed"); + batchesProcessed = statsProvider.makeCounter(serviceName() + "_batches_processed"); + } + + /** + * Executes a non-repeatable {@link Work} and returns {@link CompletableFuture} to wait on. + * + * @param work A non-repeatable {@link Work} to execute. + * @return {@link CompletableFuture} to wait on. + */ + public CompletableFuture<T> execute(Work<T> work) { + CompletableFuture<T> result = new CompletableFuture<>(); + workQueue.add(new WorkItem<>( + work, + result, + Optional.empty(), + Optional.empty())); + + return result; + } + + /** + * Executes a {@link RepeatableWork} until it completes and returns {@link CompletableFuture} + * to wait on. + * + * @param backoffStrategy A {@link BackoffStrategy} instance to backoff subsequent runs. + * @param work A {@link RepeatableWork} to execute. + */ + public CompletableFuture<T> executeWithReplay( + BackoffStrategy backoffStrategy, + RepeatableWork<T> work) { + + CompletableFuture<T> result = new CompletableFuture<>(); + workQueue.add(new WorkItem<>( + work, + result, + Optional.of(backoffStrategy), + Optional.of(0L))); + + return result; + } + + @Override + protected void run() throws Exception { + while (isRunning()) { + List<WorkItem<T>> batch = new LinkedList<>(); + batch.add(workQueue.take()); + workQueue.drainTo(batch, maxBatchSize - batch.size()); + processBatch(batch); + } + } + + private void processBatch(List<WorkItem<T>> batch) { + if (!batch.isEmpty()) { + long unlockedStart = System.nanoTime(); + storage.write((Storage.MutateWork.NoResult.Quiet) storeProvider -> { + long lockedStart = System.nanoTime(); + for (WorkItem<T> item : batch) { + try { + Result<T> itemResult = item.work.apply(storeProvider); + if (itemResult.isCompleted) { + item.result.complete(itemResult.value); + } else { + // Work not finished yet - re-queue for a followup later. + long backoffMsec = backoffFor(item); + scheduledExecutor.schedule( + () -> workQueue.add(new WorkItem<>( + item.work, + item.result, + item.backoffStrategy, + Optional.of(backoffMsec))), + backoffMsec, + TimeUnit.MILLISECONDS); + } + } catch (RuntimeException e) { + LOG.error("{}: Failed to process batch item. Error: {}", serviceName(), e); + item.result.completeExceptionally(e); + } + } + batchLocked.accumulate(System.nanoTime() - lockedStart); + }); + batchUnlocked.accumulate(System.nanoTime() - unlockedStart); + batchesProcessed.incrementAndGet(); + lastBatchSize.set(batch.size()); + itemsProcessed.addAndGet(batch.size()); + } + } + + private long backoffFor(WorkItem<T> item) { + checkState(item.backoffStrategy.isPresent()); + checkState(item.lastBackoffMsec.isPresent()); + return item.backoffStrategy.get().calculateBackoffMs(item.lastBackoffMsec.get()); + } + + private class WorkItem<V> { + private final RepeatableWork<V> work; + private final CompletableFuture<T> result; + private final Optional<BackoffStrategy> backoffStrategy; + private final Optional<Long> lastBackoffMsec; + + WorkItem( + RepeatableWork<V> work, + CompletableFuture<T> result, + Optional<BackoffStrategy> backoffStrategy, + Optional<Long> lastBackoffMsec) { + + this.work = work; + this.result = result; + this.backoffStrategy = backoffStrategy; + this.lastBackoffMsec = lastBackoffMsec; + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java index 4a7ef0b..2ec3967 100644 --- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java +++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java @@ -16,6 +16,8 @@ package org.apache.aurora.scheduler; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; + +import javax.inject.Inject; import javax.inject.Singleton; import com.google.inject.AbstractModule; @@ -27,10 +29,13 @@ import org.apache.aurora.common.args.CmdLine; import org.apache.aurora.common.args.constraints.Positive; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.scheduler.BatchWorker.NoResult; import org.apache.aurora.scheduler.SchedulerLifecycle.LeadingOptions; import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl; import org.apache.aurora.scheduler.base.AsyncUtil; import org.apache.aurora.scheduler.events.PubsubEventModule; +import org.apache.aurora.scheduler.storage.Storage; import org.apache.mesos.Protos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +64,11 @@ public class SchedulerModule extends AbstractModule { help = "The maximum number of status updates that can be processed in a batch.") private static final Arg<Integer> MAX_STATUS_UPDATE_BATCH_SIZE = Arg.create(1000); + @Positive + @CmdLine(name = "max_task_event_batch_size", + help = "The maximum number of task state change events that can be processed in a batch.") + private static final Arg<Integer> MAX_TASK_EVENT_BATCH_SIZE = Arg.create(300); + @Override protected void configure() { bind(TaskIdGenerator.class).to(TaskIdGeneratorImpl.class); @@ -93,6 +103,21 @@ public class SchedulerModule extends AbstractModule { bind(TaskStatusHandler.class).to(TaskStatusHandlerImpl.class); bind(TaskStatusHandlerImpl.class).in(Singleton.class); addSchedulerActiveServiceBinding(binder()).to(TaskStatusHandlerImpl.class); + + bind(TaskEventBatchWorker.class).in(Singleton.class); + addSchedulerActiveServiceBinding(binder()).to(TaskEventBatchWorker.class); } + public static class TaskEventBatchWorker extends BatchWorker<NoResult> { + @Inject + TaskEventBatchWorker(Storage storage, StatsProvider statsProvider) { + + super(storage, statsProvider, MAX_TASK_EVENT_BATCH_SIZE.get()); + } + + @Override + protected String serviceName() { + return "TaskEventBatchWorker"; + } + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java index f07746c..c672826 100644 --- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java +++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java @@ -29,13 +29,14 @@ import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.util.Clock; import org.apache.aurora.gen.apiConstants; +import org.apache.aurora.scheduler.BatchWorker; +import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.slf4j.Logger; @@ -62,6 +63,7 @@ public class TaskHistoryPruner implements EventSubscriber { private final HistoryPrunnerSettings settings; private final Storage storage; private final Lifecycle lifecycle; + private final TaskEventBatchWorker batchWorker; private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>() { @Override @@ -94,7 +96,8 @@ public class TaskHistoryPruner implements EventSubscriber { Clock clock, HistoryPrunnerSettings settings, Storage storage, - Lifecycle lifecycle) { + Lifecycle lifecycle, + TaskEventBatchWorker batchWorker) { this.executor = requireNonNull(executor); this.stateManager = requireNonNull(stateManager); @@ -102,6 +105,7 @@ public class TaskHistoryPruner implements EventSubscriber { this.settings = requireNonNull(settings); this.storage = requireNonNull(storage); this.lifecycle = requireNonNull(lifecycle); + this.batchWorker = requireNonNull(batchWorker); } @VisibleForTesting @@ -131,8 +135,10 @@ public class TaskHistoryPruner implements EventSubscriber { private void deleteTasks(final Set<String> taskIds) { LOG.info("Pruning inactive tasks " + taskIds); - storage.write( - (NoResult.Quiet) storeProvider -> stateManager.deleteTasks(storeProvider, taskIds)); + batchWorker.execute(storeProvider -> { + stateManager.deleteTasks(storeProvider, taskIds); + return BatchWorker.NO_RESULT; + }); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java index bbd971a..867c9bd 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java @@ -22,13 +22,14 @@ import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.SlidingStats; import org.apache.aurora.common.util.Clock; +import org.apache.aurora.scheduler.BatchWorker; +import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.storage.Storage; import static java.util.Objects.requireNonNull; @@ -46,8 +47,8 @@ class TaskThrottler implements EventSubscriber { private final RescheduleCalculator rescheduleCalculator; private final Clock clock; private final DelayExecutor executor; - private final Storage storage; private final StateManager stateManager; + private final TaskEventBatchWorker batchWorker; private final SlidingStats throttleStats = new SlidingStats("task_throttle", "ms"); @@ -56,14 +57,14 @@ class TaskThrottler implements EventSubscriber { RescheduleCalculator rescheduleCalculator, Clock clock, @AsyncExecutor DelayExecutor executor, - Storage storage, - StateManager stateManager) { + StateManager stateManager, + TaskEventBatchWorker batchWorker) { this.rescheduleCalculator = requireNonNull(rescheduleCalculator); this.clock = requireNonNull(clock); this.executor = requireNonNull(executor); - this.storage = requireNonNull(storage); this.stateManager = requireNonNull(stateManager); + this.batchWorker = requireNonNull(batchWorker); } @Subscribe @@ -73,13 +74,16 @@ class TaskThrottler implements EventSubscriber { + rescheduleCalculator.getFlappingPenaltyMs(stateChange.getTask()); long delayMs = Math.max(0, readyAtMs - clock.nowMillis()); throttleStats.accumulate(delayMs); - executor.execute( - () -> storage.write(storeProvider -> stateManager.changeState( - storeProvider, - stateChange.getTaskId(), - Optional.of(THROTTLED), - PENDING, - Optional.absent())), + executor.execute(() -> + batchWorker.execute(storeProvider -> { + stateManager.changeState( + storeProvider, + stateChange.getTaskId(), + Optional.of(THROTTLED), + PENDING, + Optional.absent()); + return BatchWorker.NO_RESULT; + }), Amount.of(delayMs, Time.MILLISECONDS)); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java index 3c7cda0..574efc9 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java +++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java @@ -30,6 +30,8 @@ import com.google.common.eventbus.Subscribe; import org.apache.aurora.gen.HostStatus; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.scheduler.BatchWorker; +import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; @@ -37,7 +39,6 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IHostStatus; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -106,11 +107,17 @@ public interface MaintenanceController { private static final Logger LOG = LoggerFactory.getLogger(MaintenanceControllerImpl.class); private final Storage storage; private final StateManager stateManager; + private final TaskEventBatchWorker batchWorker; @Inject - public MaintenanceControllerImpl(Storage storage, StateManager stateManager) { + public MaintenanceControllerImpl( + Storage storage, + StateManager stateManager, + TaskEventBatchWorker batchWorker) { + this.storage = requireNonNull(storage); this.stateManager = requireNonNull(stateManager); + this.batchWorker = requireNonNull(batchWorker); } private Set<IHostStatus> watchDrainingTasks(MutableStoreProvider store, Set<String> hosts) { @@ -153,7 +160,7 @@ public interface MaintenanceController { public void taskChangedState(final TaskStateChange change) { if (Tasks.isTerminated(change.getNewState())) { final String host = change.getTask().getAssignedTask().getSlaveHost(); - storage.write((NoResult.Quiet) (MutableStoreProvider store) -> { + batchWorker.execute(store -> { // If the task _was_ associated with a draining host, and it was the last task on the // host. Optional<IHostAttributes> attributes = @@ -168,6 +175,7 @@ public interface MaintenanceController { LOG.info("Host {} is DRAINING with active tasks: {}", host, Tasks.ids(activeTasks)); } } + return BatchWorker.NO_RESULT; }); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java index ef6253e..25b3f37 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java @@ -44,6 +44,8 @@ import org.apache.aurora.gen.JobUpdateQuery; import org.apache.aurora.gen.JobUpdateStatus; import org.apache.aurora.gen.Lock; import org.apache.aurora.gen.LockKey; +import org.apache.aurora.scheduler.BatchWorker; +import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker; import org.apache.aurora.scheduler.base.InstanceKeys; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.Query; @@ -120,6 +122,7 @@ class JobUpdateControllerImpl implements JobUpdateController { private final Clock clock; private final PulseHandler pulseHandler; private final Lifecycle lifecycle; + private final TaskEventBatchWorker batchWorker; // Currently-active updaters. An active updater is one that is rolling forward or back. Paused // and completed updates are represented only in storage, not here. @@ -134,7 +137,8 @@ class JobUpdateControllerImpl implements JobUpdateController { ScheduledExecutorService executor, StateManager stateManager, Clock clock, - Lifecycle lifecycle) { + Lifecycle lifecycle, + TaskEventBatchWorker batchWorker) { this.updateFactory = requireNonNull(updateFactory); this.lockManager = requireNonNull(lockManager); @@ -143,6 +147,7 @@ class JobUpdateControllerImpl implements JobUpdateController { this.stateManager = requireNonNull(stateManager); this.clock = requireNonNull(clock); this.lifecycle = requireNonNull(lifecycle); + this.batchWorker = requireNonNull(batchWorker); this.pulseHandler = new PulseHandler(clock); } @@ -346,7 +351,7 @@ class JobUpdateControllerImpl implements JobUpdateController { } private void instanceChanged(final IInstanceKey instance, final Optional<IScheduledTask> state) { - storage.write((NoResult.Quiet) storeProvider -> { + batchWorker.execute(storeProvider -> { IJobKey job = instance.getJobKey(); UpdateFactory.Update update = updates.get(job); if (update != null) { @@ -366,6 +371,7 @@ class JobUpdateControllerImpl implements JobUpdateController { + JobKeys.canonicalString(job)); } } + return BatchWorker.NO_RESULT; }); } http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java b/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java new file mode 100644 index 0000000..a86dc82 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java @@ -0,0 +1,96 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.common.util.BackoffStrategy; +import org.apache.aurora.scheduler.BatchWorker.Result; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; + +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertTrue; + +public class BatchWorkerTest extends EasyMockTest { + private static final String SERVICE_NAME = "TestWorker"; + private static final String BATCH_STAT = SERVICE_NAME + "_batches_processed"; + private FakeStatsProvider statsProvider; + private BatchWorker<Boolean> batchWorker; + + @Before + public void setUp() { + StorageTestUtil storageUtil = new StorageTestUtil(this); + storageUtil.expectOperations(); + statsProvider = new FakeStatsProvider(); + batchWorker = new BatchWorker<Boolean>(storageUtil.storage, statsProvider, 2) { + @Override + protected String serviceName() { + return SERVICE_NAME; + } + }; + } + + @Test + public void testExecute() throws Exception { + control.replay(); + + CompletableFuture<Boolean> result1 = batchWorker.execute(store -> true); + CompletableFuture<Boolean> result2 = batchWorker.execute(store -> true); + CompletableFuture<Boolean> result3 = batchWorker.execute(store -> true); + batchWorker.startAsync().awaitRunning(); + + assertTrue(result1.get()); + assertTrue(result2.get()); + assertTrue(result3.get()); + } + + @Test(expected = ExecutionException.class) + public void testExecuteThrows() throws Exception { + control.replay(); + + CompletableFuture<Boolean> result = + batchWorker.execute(store -> { throw new IllegalArgumentException(); }); + batchWorker.startAsync().awaitRunning(); + + result.get(); + } + + @Test + public void testExecuteWithReplay() throws Exception { + BackoffStrategy backoff = createMock(BackoffStrategy.class); + final CountDownLatch complete = new CountDownLatch(1); + + expect(backoff.calculateBackoffMs(EasyMock.anyLong())).andReturn(0L).anyTimes(); + + control.replay(); + + batchWorker.startAsync().awaitRunning(); + batchWorker.executeWithReplay( + backoff, + store -> statsProvider.getValue(BATCH_STAT).longValue() > 1L + ? new Result<>(true, true) + : new Result<>(false, false)) + .thenAccept(result -> complete.countDown()); + + assertTrue(complete.await(10L, TimeUnit.SECONDS)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java index 99c27e8..8469596 100644 --- a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java @@ -27,6 +27,7 @@ import org.apache.aurora.common.testing.easymock.EasyMockTest; import org.apache.aurora.common.util.testing.FakeClock; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker; import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.TaskTestUtil; @@ -48,6 +49,7 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLED; import static org.apache.aurora.gen.ScheduleStatus.LOST; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; import static org.apache.aurora.gen.ScheduleStatus.STARTING; +import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expectLastCall; @@ -68,20 +70,24 @@ public class TaskHistoryPrunerTest extends EasyMockTest { private Command shutdownCommand; @Before - public void setUp() { + public void setUp() throws Exception { executor = createMock(DelayExecutor.class); clock = new FakeClock(); stateManager = createMock(StateManager.class); storageUtil = new StorageTestUtil(this); storageUtil.expectOperations(); shutdownCommand = createMock(Command.class); + TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class); + expectBatchExecute(batchWorker, storageUtil.storage, control).anyTimes(); + pruner = new TaskHistoryPruner( executor, stateManager, clock, new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY), storageUtil.storage, - new Lifecycle(shutdownCommand)); + new Lifecycle(shutdownCommand), + batchWorker); closer = Closer.create(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java index 7d104aa..433f791 100644 --- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java @@ -24,6 +24,7 @@ import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskEvent; +import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker; import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; @@ -39,6 +40,7 @@ import static org.apache.aurora.gen.ScheduleStatus.INIT; import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; import static org.apache.aurora.gen.ScheduleStatus.THROTTLED; +import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute; import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; @@ -60,12 +62,15 @@ public class TaskThrottlerTest extends EasyMockTest { storageUtil = new StorageTestUtil(this); storageUtil.expectOperations(); stateManager = createMock(StateManager.class); + TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class); + expectBatchExecute(batchWorker, storageUtil.storage, control).anyTimes(); + throttler = new TaskThrottler( rescheduleCalculator, clock, executor, - storageUtil.storage, - stateManager); + stateManager, + batchWorker); } @Test http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java index 94f5ca5..ae83dea 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java @@ -30,6 +30,7 @@ import org.apache.aurora.gen.HostStatus; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskTestUtil; @@ -53,6 +54,7 @@ import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED; import static org.apache.aurora.gen.ScheduleStatus.KILLED; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; import static org.apache.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl; +import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; @@ -71,6 +73,8 @@ public class MaintenanceControllerImplTest extends EasyMockTest { storageUtil = new StorageTestUtil(this); storageUtil.expectOperations(); stateManager = createMock(StateManager.class); + TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class); + expectBatchExecute(batchWorker, storageUtil.storage, control).anyTimes(); Injector injector = Guice.createInjector( new PubsubEventModule(), @@ -83,6 +87,7 @@ public class MaintenanceControllerImplTest extends EasyMockTest { bind(StatsProvider.class).toInstance(new FakeStatsProvider()); bind(Executor.class).annotatedWith(AsyncExecutor.class) .toInstance(MoreExecutors.directExecutor()); + bind(TaskEventBatchWorker.class).toInstance(batchWorker); } }); maintenance = injector.getInstance(MaintenanceController.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/testing/BatchWorkerUtil.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/testing/BatchWorkerUtil.java b/src/test/java/org/apache/aurora/scheduler/testing/BatchWorkerUtil.java new file mode 100644 index 0000000..46b2e36 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/testing/BatchWorkerUtil.java @@ -0,0 +1,59 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.testing; + +import java.util.concurrent.CompletableFuture; + +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.scheduler.BatchWorker; +import org.apache.aurora.scheduler.BatchWorker.Work; +import org.apache.aurora.scheduler.storage.Storage; +import org.easymock.Capture; +import org.easymock.IExpectationSetters; +import org.easymock.IMocksControl; + +import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.expect; + +public final class BatchWorkerUtil { + private BatchWorkerUtil() { + // Utility class. + } + + public static <T> IExpectationSetters<CompletableFuture<T>> expectBatchExecute( + BatchWorker<T> batchWorker, + Storage storage, + IMocksControl control, + T resultValue) throws Exception { + + final CompletableFuture<T> result = new EasyMockTest.Clazz<CompletableFuture<T>>() { } + .createMock(control); + expect(result.get()).andReturn(resultValue).anyTimes(); + + final Capture<Work<T>> capture = createCapture(); + return expect(batchWorker.execute(capture(capture))).andAnswer(() -> { + storage.write((Storage.MutateWork.NoResult.Quiet) store -> capture.getValue().apply(store)); + return result; + }); + } + + public static <T> IExpectationSetters<CompletableFuture<T>> expectBatchExecute( + BatchWorker<T> batchWorker, + Storage storage, + IMocksControl control) throws Exception { + + return expectBatchExecute(batchWorker, storage, control, null); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java index f879827..ea0b89a 100644 --- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java +++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java @@ -60,6 +60,7 @@ import org.apache.aurora.gen.Range; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker; import org.apache.aurora.scheduler.TaskIdGenerator; import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl; import org.apache.aurora.scheduler.base.JobKeys; @@ -125,6 +126,7 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLED; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; import static org.apache.aurora.gen.ScheduleStatus.STARTING; import static org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; +import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute; import static org.apache.aurora.scheduler.updater.UpdateFactory.UpdateFactoryImpl.expandInstanceIds; import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; @@ -164,7 +166,7 @@ public class JobUpdaterIT extends EasyMockTest { } @Before - public void setUp() { + public void setUp() throws Exception { // Avoid console spam due to stats registered multiple times. Stats.flush(); ScheduledExecutorService executor = createMock(ScheduledExecutorService.class); @@ -172,6 +174,7 @@ public class JobUpdaterIT extends EasyMockTest { driver = createMock(Driver.class); shutdownCommand = createMock(Command.class); eventBus = new EventBus(); + TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class); Injector injector = Guice.createInjector( new UpdaterModule(executor), @@ -195,6 +198,7 @@ public class JobUpdaterIT extends EasyMockTest { bind(LockManager.class).to(LockManagerImpl.class); bind(UUIDGenerator.class).to(UUIDGeneratorImpl.class); bind(Lifecycle.class).toInstance(new Lifecycle(shutdownCommand)); + bind(TaskEventBatchWorker.class).toInstance(batchWorker); } }); updater = injector.getInstance(JobUpdateController.class); @@ -204,6 +208,7 @@ public class JobUpdaterIT extends EasyMockTest { stateManager = injector.getInstance(StateManager.class); eventBus.register(injector.getInstance(JobUpdateEventSubscriber.class)); subscriber = injector.getInstance(JobUpdateEventSubscriber.class); + expectBatchExecute(batchWorker, storage, control).anyTimes(); } @After