Repository: aurora Updated Branches: refs/heads/master d3c5ca7cc -> 02ba97fbb
Shutting down scheduler on unhandled BatchWorker error. Bugs closed: AURORA-1779 Reviewed at https://reviews.apache.org/r/52141/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/02ba97fb Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/02ba97fb Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/02ba97fb Branch: refs/heads/master Commit: 02ba97fbb2ead51c9f788ca58ac878b3fd2cfd8e Parents: d3c5ca7 Author: Maxim Khutornenko <ma...@apache.org> Authored: Thu Sep 22 10:54:14 2016 -0700 Committer: Maxim Khutornenko <ma...@apache.org> Committed: Thu Sep 22 10:54:14 2016 -0700 ---------------------------------------------------------------------- .../apache/aurora/scheduler/BatchWorker.java | 51 +++++++++++--------- .../aurora/scheduler/BatchWorkerTest.java | 24 +++++++-- 2 files changed, 47 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/02ba97fb/src/main/java/org/apache/aurora/scheduler/BatchWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/BatchWorker.java b/src/main/java/org/apache/aurora/scheduler/BatchWorker.java index e05d4b4..c15a04c 100644 --- a/src/main/java/org/apache/aurora/scheduler/BatchWorker.java +++ b/src/main/java/org/apache/aurora/scheduler/BatchWorker.java @@ -128,7 +128,11 @@ public class BatchWorker<T> extends AbstractExecutionThreadService { } @Inject - protected BatchWorker(Storage storage, StatsProvider statsProvider, int maxBatchSize) { + protected BatchWorker( + Storage storage, + StatsProvider statsProvider, + int maxBatchSize) { + this.storage = requireNonNull(storage); this.maxBatchSize = maxBatchSize; @@ -185,9 +189,15 @@ public class BatchWorker<T> extends AbstractExecutionThreadService { protected void run() throws Exception { while (isRunning()) { List<WorkItem<T>> batch = new LinkedList<>(); - batch.add(workQueue.take()); - workQueue.drainTo(batch, maxBatchSize - batch.size()); - processBatch(batch); + + // Make the loop responsive to shutdown under light load by using + // a short non-configurable timeout in poll(). + Optional<WorkItem<T>> head = Optional.ofNullable(workQueue.poll(3, TimeUnit.SECONDS)); + if (head.isPresent()) { + workQueue.add(head.get()); + workQueue.drainTo(batch, maxBatchSize - batch.size()); + processBatch(batch); + } } } @@ -197,25 +207,20 @@ public class BatchWorker<T> extends AbstractExecutionThreadService { storage.write((Storage.MutateWork.NoResult.Quiet) storeProvider -> { long lockedStart = System.nanoTime(); for (WorkItem<T> item : batch) { - try { - Result<T> itemResult = item.work.apply(storeProvider); - if (itemResult.isCompleted) { - item.result.complete(itemResult.value); - } else { - // Work not finished yet - re-queue for a followup later. - long backoffMsec = backoffFor(item); - scheduledExecutor.schedule( - () -> workQueue.add(new WorkItem<>( - item.work, - item.result, - item.backoffStrategy, - Optional.of(backoffMsec))), - backoffMsec, - TimeUnit.MILLISECONDS); - } - } catch (RuntimeException e) { - LOG.error("{}: Failed to process batch item. Error: {}", serviceName(), e); - item.result.completeExceptionally(e); + Result<T> itemResult = item.work.apply(storeProvider); + if (itemResult.isCompleted) { + item.result.complete(itemResult.value); + } else { + // Work not finished yet - re-queue for a followup later. + long backoffMsec = backoffFor(item); + scheduledExecutor.schedule( + () -> workQueue.add(new WorkItem<>( + item.work, + item.result, + item.backoffStrategy, + Optional.of(backoffMsec))), + backoffMsec, + TimeUnit.MILLISECONDS); } } batchLocked.accumulate(System.nanoTime() - lockedStart); http://git-wip-us.apache.org/repos/asf/aurora/blob/02ba97fb/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java b/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java index a86dc82..67b6642 100644 --- a/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java @@ -15,9 +15,11 @@ package org.apache.aurora.scheduler; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.Service; + import org.apache.aurora.common.testing.easymock.EasyMockTest; import org.apache.aurora.common.util.BackoffStrategy; import org.apache.aurora.scheduler.BatchWorker.Result; @@ -63,15 +65,27 @@ public class BatchWorkerTest extends EasyMockTest { assertTrue(result3.get()); } - @Test(expected = ExecutionException.class) + @Test public void testExecuteThrows() throws Exception { control.replay(); - CompletableFuture<Boolean> result = - batchWorker.execute(store -> { throw new IllegalArgumentException(); }); + // Make sure BatchWorker service fails on unhandled error during batch processing. + CountDownLatch shutdownLatch = new CountDownLatch(1); + batchWorker.addListener( + new Service.Listener() { + @Override + public void failed(Service.State from, Throwable failure) { + shutdownLatch.countDown(); + } + }, + MoreExecutors.newDirectExecutorService()); + batchWorker.startAsync().awaitRunning(); + batchWorker.execute(store -> { + throw new IllegalArgumentException(); + }); - result.get(); + assertTrue(shutdownLatch.await(10L, TimeUnit.SECONDS)); } @Test