Repository: aurora Updated Branches: refs/heads/master 5dccf92f4 -> e658a8a0b
Make async work queue gating thread-local. Bugs closed: AURORA-1459 Reviewed at https://reviews.apache.org/r/38336/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/e658a8a0 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/e658a8a0 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/e658a8a0 Branch: refs/heads/master Commit: e658a8a0b71e894a03ffc81dcae981db9d68bcb4 Parents: 5dccf92 Author: Bill Farner <[email protected]> Authored: Mon Sep 14 18:29:22 2015 -0700 Committer: Bill Farner <[email protected]> Committed: Mon Sep 14 18:29:22 2015 -0700 ---------------------------------------------------------------------- config/findbugs/excludeFilter.xml | 2 +- .../aurora/scheduler/async/AsyncModule.java | 14 +- .../scheduler/async/FlushableWorkQueue.java | 25 --- .../scheduler/async/GatedDelayExecutor.java | 71 -------- .../aurora/scheduler/async/GatedWorkQueue.java | 41 +++++ .../scheduler/async/GatingDelayExecutor.java | 98 +++++++++++ .../aurora/scheduler/storage/db/DbModule.java | 12 +- .../aurora/scheduler/storage/db/DbStorage.java | 82 ++++----- .../aurora/scheduler/app/SchedulerIT.java | 4 - .../scheduler/async/GatedDelayExecutorTest.java | 91 ---------- .../async/GatingDelayExecutorTest.java | 170 +++++++++++++++++++ .../pruning/TaskHistoryPrunerTest.java | 33 ++-- .../scheduler/storage/db/DbStorageTest.java | 54 +++--- 13 files changed, 411 insertions(+), 286 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/config/findbugs/excludeFilter.xml ---------------------------------------------------------------------- diff --git a/config/findbugs/excludeFilter.xml b/config/findbugs/excludeFilter.xml index 7c65302..fe3f4ca 100644 --- a/config/findbugs/excludeFilter.xml +++ b/config/findbugs/excludeFilter.xml @@ -114,7 +114,7 @@ limitations under the License. <Match> <!-- False positives on a check introduced in findbugs 3.0.1 --> <Or> - <Class name="org.apache.aurora.scheduler.storage.db.DbStorage" /> + <Class name="org.apache.aurora.scheduler.storage.db.DbStorage$3" /> <Class name="org.apache.aurora.scheduler.http.api.security.AuthorizeHeaderTokenTest" /> </Or> <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT" /> http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java index 217b9c0..eccb864 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java +++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java @@ -75,8 +75,8 @@ public class AsyncModule extends AbstractModule { bind(ScheduledThreadPoolExecutor.class).toInstance(afterTransaction); bind(ScheduledExecutorService.class).toInstance(afterTransaction); - bind(GatedDelayExecutor.class).in(Singleton.class); - expose(GatedDelayExecutor.class); + bind(GatingDelayExecutor.class).in(Singleton.class); + expose(GatingDelayExecutor.class); bind(RegisterGauges.class).in(Singleton.class); expose(RegisterGauges.class); @@ -84,9 +84,9 @@ public class AsyncModule extends AbstractModule { }); SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterGauges.class); - bind(Executor.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class); - bind(DelayExecutor.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class); - bind(FlushableWorkQueue.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class); + bind(Executor.class).annotatedWith(AsyncExecutor.class).to(GatingDelayExecutor.class); + bind(DelayExecutor.class).annotatedWith(AsyncExecutor.class).to(GatingDelayExecutor.class); + bind(GatedWorkQueue.class).annotatedWith(AsyncExecutor.class).to(GatingDelayExecutor.class); } static class RegisterGauges extends AbstractIdleService { @@ -101,13 +101,13 @@ public class AsyncModule extends AbstractModule { private final StatsProvider statsProvider; private final ScheduledThreadPoolExecutor executor; - private final GatedDelayExecutor delayExecutor; + private final GatingDelayExecutor delayExecutor; @Inject RegisterGauges( StatsProvider statsProvider, ScheduledThreadPoolExecutor executor, - GatedDelayExecutor delayExecutor) { + GatingDelayExecutor delayExecutor) { this.statsProvider = requireNonNull(statsProvider); this.executor = requireNonNull(executor); http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/async/FlushableWorkQueue.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/FlushableWorkQueue.java b/src/main/java/org/apache/aurora/scheduler/async/FlushableWorkQueue.java deleted file mode 100644 index 11a1c2a..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/FlushableWorkQueue.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -/** - * A work queue that only executes pending work when flushed. - */ -public interface FlushableWorkQueue { - - /** - * Makes pending work available for execution. - */ - void flush(); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java b/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java deleted file mode 100644 index 9d4cfcf..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.Queue; -import java.util.concurrent.ScheduledExecutorService; - -import javax.inject.Inject; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; - -import static java.util.Objects.requireNonNull; - -/** - * An executor that queues work until flushed. - */ -class GatedDelayExecutor implements DelayExecutor, FlushableWorkQueue { - - private final ScheduledExecutorService executor; - private final Queue<Runnable> queue = Lists.newLinkedList(); - - /** - * Creates a gated delay executor that will flush work to the provided {@code delegate}. - * - * @param delegate Delegate to execute work with when flushed. - */ - @Inject - GatedDelayExecutor(ScheduledExecutorService delegate) { - this.executor = requireNonNull(delegate); - } - - synchronized int getQueueSize() { - return queue.size(); - } - - private synchronized void enqueue(Runnable work) { - queue.add(work); - } - - @Override - public synchronized void flush() { - for (Runnable work : Iterables.consumingIterable(queue)) { - work.run(); - } - } - - @Override - public synchronized void execute(Runnable command) { - enqueue(() -> executor.execute(command)); - } - - @Override - public synchronized void execute(Runnable work, Amount<Long, Time> minDelay) { - enqueue(() -> executor.schedule(work, minDelay.getValue(), minDelay.getUnit().getTimeUnit())); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java b/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java new file mode 100644 index 0000000..7032271 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java @@ -0,0 +1,41 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.async; + +/** + * A work queue that only executes pending work when flushed. + */ +public interface GatedWorkQueue { + + /** + * Closes the gate on the work queue for the duration of an operation. + * + * @param operation Operation to execute while keeping the gate closed. + * @param <T> Operation return type. + * @param <E> Operation exception type. + * @return The value returned by the {@code operation}. + * @throws E Exception thrown by the {@code operation}. + */ + <T, E extends Exception> T closeDuring(GatedOperation<T, E> operation) throws E; + + /** + * Operation prevents new items from being executed on the work queue. + * + * @param <T> Operation return type. + * @param <E> Operation exception type. + */ + interface GatedOperation<T, E extends Exception> { + T doWithGateClosed() throws E; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java b/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java new file mode 100644 index 0000000..a7240ae --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java @@ -0,0 +1,98 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.async; + +import java.util.Queue; +import java.util.concurrent.ScheduledExecutorService; + +import javax.inject.Inject; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; + +import static java.util.Objects.requireNonNull; + +/** + * An executor that may be temporarily gated with {@link #closeDuring(GatedOperation)}. When the + * executor is gated, newly-submitted work will be enqueued and executed once the gate is opened as + * a result of {@link #closeDuring(GatedOperation)} returning. + */ +class GatingDelayExecutor implements DelayExecutor, GatedWorkQueue { + + private final ScheduledExecutorService gated; + private final Queue<Runnable> queue = Lists.newLinkedList(); + + /** + * Creates a gating delay executor that will gate work from the provided executor. + * + * @param gated Delegate to execute work with when ungated. + */ + @Inject + GatingDelayExecutor(ScheduledExecutorService gated) { + this.gated = requireNonNull(gated); + } + + private final ThreadLocal<Boolean> isOpen = new ThreadLocal<Boolean>() { + @Override + protected Boolean initialValue() { + return true; + } + }; + + @Override + public <T, E extends Exception> T closeDuring(GatedOperation<T, E> operation) throws E { + boolean startedOpen = isOpen.get(); + isOpen.set(false); + + try { + return operation.doWithGateClosed(); + } finally { + if (startedOpen) { + isOpen.set(true); + flush(); + } + } + } + + synchronized int getQueueSize() { + return queue.size(); + } + + private synchronized void enqueue(Runnable work) { + if (isOpen.get()) { + work.run(); + } else { + queue.add(work); + } + } + + private synchronized void flush() { + for (Runnable work : Iterables.consumingIterable(queue)) { + work.run(); + } + } + + @Override + public synchronized void execute(Runnable command) { + enqueue(() -> gated.execute(command)); + } + + @Override + public synchronized void execute(Runnable work, Amount<Long, Time> minDelay) { + enqueue(() -> gated.schedule(work, minDelay.getValue(), minDelay.getUnit().getTimeUnit())); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java index 6da6193..e3efbdb 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java @@ -39,7 +39,7 @@ import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.scheduler.SchedulerServicesModule; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; -import org.apache.aurora.scheduler.async.FlushableWorkQueue; +import org.apache.aurora.scheduler.async.GatedWorkQueue; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.CronJobStore; import org.apache.aurora.scheduler.storage.JobUpdateStore; @@ -156,7 +156,15 @@ public final class DbModule extends PrivateModule { new AbstractModule() { @Override protected void configure() { - bind(FlushableWorkQueue.class).annotatedWith(AsyncExecutor.class).toInstance(() -> { }); + bind(GatedWorkQueue.class).annotatedWith(AsyncExecutor.class).toInstance( + new GatedWorkQueue() { + @Override + public <T, E extends Exception> T closeDuring( + GatedOperation<T, E> operation) throws E { + + return operation.doWithGateClosed(); + } + }); } }, new DbModule( http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java index 6036570..dd7e1d3 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java @@ -29,7 +29,8 @@ import org.apache.aurora.gen.JobUpdateStatus; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; -import org.apache.aurora.scheduler.async.FlushableWorkQueue; +import org.apache.aurora.scheduler.async.GatedWorkQueue; +import org.apache.aurora.scheduler.async.GatedWorkQueue.GatedOperation; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.CronJobStore; import org.apache.aurora.scheduler.storage.JobUpdateStore; @@ -61,13 +62,13 @@ class DbStorage extends AbstractIdleService implements Storage { private final SqlSessionFactory sessionFactory; private final MutableStoreProvider storeProvider; private final EnumValueMapper enumValueMapper; - private final FlushableWorkQueue postTransactionWork; + private final GatedWorkQueue gatedWorkQueue; @Inject DbStorage( SqlSessionFactory sessionFactory, EnumValueMapper enumValueMapper, - @AsyncExecutor FlushableWorkQueue postTransactionWork, + @AsyncExecutor GatedWorkQueue gatedWorkQueue, final CronJobStore.Mutable cronJobStore, final TaskStore.Mutable taskStore, final SchedulerStore.Mutable schedulerStore, @@ -78,7 +79,7 @@ class DbStorage extends AbstractIdleService implements Storage { this.sessionFactory = requireNonNull(sessionFactory); this.enumValueMapper = requireNonNull(enumValueMapper); - this.postTransactionWork = requireNonNull(postTransactionWork); + this.gatedWorkQueue = requireNonNull(gatedWorkQueue); requireNonNull(cronJobStore); requireNonNull(taskStore); requireNonNull(schedulerStore); @@ -140,13 +141,6 @@ class DbStorage extends AbstractIdleService implements Storage { } } - private final ThreadLocal<Boolean> inTransaction = new ThreadLocal<Boolean>() { - @Override - protected Boolean initialValue() { - return false; - } - }; - @Transactional <T, E extends Exception> T transactionedWrite(MutateWork<T, E> work) throws E { return work.apply(storeProvider); @@ -155,28 +149,22 @@ class DbStorage extends AbstractIdleService implements Storage { @Timed("db_storage_write_operation") @Override public <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E { - // Only flush for the top-level write() call when calls are reentrant. - boolean shouldFlush = !inTransaction.get(); - if (shouldFlush) { - inTransaction.set(true); - } - - try { - return transactionedWrite(work); - } catch (PersistenceException e) { - throw new StorageException(e.getMessage(), e); - } finally { - // NOTE: Async work is intentionally executed regardless of whether the transaction succeeded. - // Doing otherwise runs the risk of cross-talk between transactions and losing async tasks - // due to failure of an unrelated transaction. This matches behavior prior to the - // introduction of DbStorage, but should be revisited. - // TODO(wfarner): Consider revisiting to execute async work only when the transaction is - // successful. - if (shouldFlush) { - postTransactionWork.flush(); - inTransaction.set(false); + // NOTE: Async work is intentionally executed regardless of whether the transaction succeeded. + // Doing otherwise runs the risk of cross-talk between transactions and losing async tasks + // due to failure of an unrelated transaction. This matches behavior prior to the + // introduction of DbStorage, but should be revisited. + // TODO(wfarner): Consider revisiting to execute async work only when the transaction is + // successful. + return gatedWorkQueue.closeDuring(new GatedOperation<T, E>() { + @Override + public T doWithGateClosed() throws E { + try { + return transactionedWrite(work); + } catch (PersistenceException e) { + throw new StorageException(e.getMessage(), e); + } } - } + }); } @VisibleForTesting @@ -191,20 +179,24 @@ class DbStorage extends AbstractIdleService implements Storage { public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work) throws StorageException, E { - // Disabling the undo log disables transaction rollback, but dramatically speeds up a bulk - // insert. - try (SqlSession session = sessionFactory.openSession(false)) { - try { - session.update(DISABLE_UNDO_LOG); - work.apply(storeProvider); - } catch (PersistenceException e) { - throw new StorageException(e.getMessage(), e); - } finally { - session.update(ENABLE_UNDO_LOG); + gatedWorkQueue.closeDuring(new GatedOperation<Void, E>() { + @Override + public Void doWithGateClosed() throws E { + // Disabling the undo log disables transaction rollback, but dramatically speeds up a bulk + // insert. + try (SqlSession session = sessionFactory.openSession(false)) { + try { + session.update(DISABLE_UNDO_LOG); + work.apply(storeProvider); + } catch (PersistenceException e) { + throw new StorageException(e.getMessage(), e); + } finally { + session.update(ENABLE_UNDO_LOG); + } + } + return null; } - } finally { - postTransactionWork.flush(); - } + }); } @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java index 4941128..a44b9da 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java +++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java @@ -72,8 +72,6 @@ import org.apache.aurora.gen.storage.Transaction; import org.apache.aurora.gen.storage.storageConstants; import org.apache.aurora.scheduler.AppStartup; import org.apache.aurora.scheduler.ResourceSlot; -import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; -import org.apache.aurora.scheduler.async.FlushableWorkQueue; import org.apache.aurora.scheduler.log.Log; import org.apache.aurora.scheduler.log.Log.Entry; import org.apache.aurora.scheduler.log.Log.Position; @@ -378,8 +376,6 @@ public class SchedulerIT extends BaseZooKeeperTest { scheduler.getValue().registered(driver, FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(), MasterInfo.getDefaultInstance()); - // Registration is published on the event bus, which will be gated until a flush. - injector.getInstance(Key.get(FlushableWorkQueue.class, AsyncExecutor.class)).flush(); awaitSchedulerReady(); http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/test/java/org/apache/aurora/scheduler/async/GatedDelayExecutorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/GatedDelayExecutorTest.java b/src/test/java/org/apache/aurora/scheduler/async/GatedDelayExecutorTest.java deleted file mode 100644 index 2867633..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/GatedDelayExecutorTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.concurrent.ScheduledExecutorService; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.testing.easymock.EasyMockTest; - -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.easymock.IExpectationSetters; -import org.junit.Before; -import org.junit.Test; - -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expectLastCall; - -public class GatedDelayExecutorTest extends EasyMockTest { - - private static final Amount<Long, Time> ONE_SECOND = Amount.of(1L, Time.SECONDS); - - private ScheduledExecutorService mockExecutor; - private Runnable runnable; - private GatedDelayExecutor gatedExecutor; - - @Before - public void setUp() { - mockExecutor = createMock(ScheduledExecutorService.class); - runnable = createMock(Runnable.class); - gatedExecutor = new GatedDelayExecutor(mockExecutor); - } - - @Test - public void testNoFlush() { - control.replay(); - - gatedExecutor.execute(runnable); - // flush() is not called, so no work is performed. - } - - private IExpectationSetters<?> invokeWorkWhenSubmitted() { - return expectLastCall().andAnswer(new IAnswer<Object>() { - @Override - public Object answer() { - ((Runnable) EasyMock.getCurrentArguments()[0]).run(); - return null; - } - }); - } - - @Test - public void testExecute() { - mockExecutor.execute(EasyMock.<Runnable>anyObject()); - invokeWorkWhenSubmitted(); - runnable.run(); - expectLastCall(); - - control.replay(); - - gatedExecutor.execute(runnable); - gatedExecutor.flush(); - } - - @Test - public void testExecuteAfterDelay() { - mockExecutor.schedule( - EasyMock.<Runnable>anyObject(), - eq(ONE_SECOND.getValue().longValue()), - eq(ONE_SECOND.getUnit().getTimeUnit())); - invokeWorkWhenSubmitted(); - runnable.run(); - - control.replay(); - - gatedExecutor.execute(runnable, ONE_SECOND); - gatedExecutor.flush(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/test/java/org/apache/aurora/scheduler/async/GatingDelayExecutorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/GatingDelayExecutorTest.java b/src/test/java/org/apache/aurora/scheduler/async/GatingDelayExecutorTest.java new file mode 100644 index 0000000..c62a1d5 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/async/GatingDelayExecutorTest.java @@ -0,0 +1,170 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.async; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; + +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.scheduler.async.GatedWorkQueue.GatedOperation; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.easymock.IExpectationSetters; +import org.junit.Before; +import org.junit.Test; + +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; + +public class GatingDelayExecutorTest extends EasyMockTest { + + private static final Amount<Long, Time> ONE_SECOND = Amount.of(1L, Time.SECONDS); + + private ScheduledExecutorService gatedExecutor; + private Runnable runnable; + private GatingDelayExecutor gatingExecutor; + + @Before + public void setUp() { + gatedExecutor = createMock(ScheduledExecutorService.class); + runnable = createMock(Runnable.class); + gatingExecutor = new GatingDelayExecutor(gatedExecutor); + } + + @Test + public void testGateOpen() { + gatedExecutor.execute(runnable); + + control.replay(); + + // The gate was not closed, so the work is executed immediately. + gatingExecutor.execute(runnable); + } + + private IExpectationSetters<?> invokeWorkWhenSubmitted() { + return expectLastCall().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() { + ((Runnable) EasyMock.getCurrentArguments()[0]).run(); + return null; + } + }); + } + + @Test + public void testGateIsThreadSpecific() throws InterruptedException { + gatedExecutor.execute(runnable); + + control.replay(); + + CountDownLatch gateClosed = new CountDownLatch(1); + CountDownLatch unblock = new CountDownLatch(1); + Runnable closer = new Runnable() { + @Override + public void run() { + gatingExecutor.closeDuring(new GatedOperation<String, RuntimeException>() { + @Override + public String doWithGateClosed() { + gateClosed.countDown(); + try { + unblock.await(); + } catch (InterruptedException e) { + throw Throwables.propagate(e); + } + return "hi"; + } + }); + } + }; + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("GateTest") + .build() + .newThread(closer) + .start(); + + gateClosed.await(); + gatingExecutor.execute(runnable); + assertQueueSize(0); + unblock.countDown(); + } + + private void assertQueueSize(int size) { + assertEquals(size, gatingExecutor.getQueueSize()); + } + + @Test + public void testReentrantClose() { + gatedExecutor.execute(runnable); + expectLastCall().times(3); + + control.replay(); + + gatingExecutor.execute(runnable); + assertQueueSize(0); + + String result = gatingExecutor.closeDuring(new GatedOperation<String, RuntimeException>() { + @Override + public String doWithGateClosed() { + gatingExecutor.execute(runnable); + assertQueueSize(1); + + String result = gatingExecutor.closeDuring(new GatedOperation<String, RuntimeException>() { + @Override + public String doWithGateClosed() { + gatingExecutor.execute(runnable); + assertQueueSize(2); + return "hello"; + } + }); + assertEquals("hello", result); + + return "hi"; + } + }); + assertEquals("hi", result); + assertQueueSize(0); + } + + @Test + public void testExecute() { + gatedExecutor.execute(runnable); + invokeWorkWhenSubmitted(); + runnable.run(); + expectLastCall(); + + control.replay(); + + gatingExecutor.execute(runnable); + } + + @Test + public void testExecuteAfterDelay() { + gatedExecutor.schedule( + runnable, + ONE_SECOND.getValue().longValue(), + ONE_SECOND.getUnit().getTimeUnit()); + invokeWorkWhenSubmitted(); + runnable.run(); + + control.replay(); + + gatingExecutor.execute(runnable, ONE_SECOND); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java index 0c7da07..acd2cd1 100644 --- a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java @@ -13,14 +13,19 @@ */ package org.apache.aurora.scheduler.pruning; +import java.io.Closeable; +import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.io.Closer; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.AbstractModule; import com.google.inject.Guice; @@ -44,7 +49,6 @@ import org.apache.aurora.gen.TaskEvent; import org.apache.aurora.scheduler.async.AsyncModule; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; import org.apache.aurora.scheduler.async.DelayExecutor; -import org.apache.aurora.scheduler.async.FlushableWorkQueue; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunnerSettings; @@ -56,6 +60,7 @@ import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.IAnswer; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -84,6 +89,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest { private StateManager stateManager; private StorageTestUtil storageUtil; private TaskHistoryPruner pruner; + private Closer closer; @Before public void setUp() { @@ -98,6 +104,12 @@ public class TaskHistoryPrunerTest extends EasyMockTest { clock, new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY), storageUtil.storage); + closer = Closer.create(); + } + + @After + public void tearDownCloser() throws Exception { + closer.close(); } @Test @@ -242,6 +254,13 @@ public class TaskHistoryPrunerTest extends EasyMockTest { .setDaemon(true) .setNameFormat("testThreadSafeEvents-executor") .build()); + closer.register(new Closeable() { + @Override + public void close() throws IOException { + MoreExecutors.shutdownAndAwaitTermination(realExecutor, 1L, TimeUnit.SECONDS); + } + }); + Injector injector = Guice.createInjector( new AsyncModule(realExecutor), new AbstractModule() { @@ -251,24 +270,16 @@ public class TaskHistoryPrunerTest extends EasyMockTest { } }); executor = injector.getInstance(Key.get(DelayExecutor.class, AsyncExecutor.class)); - FlushableWorkQueue flusher = - injector.getInstance(Key.get(FlushableWorkQueue.class, AsyncExecutor.class)); pruner = buildPruner(executor); - Command onDeleted = new Command() { - @Override - public void execute() { - // The goal is to verify that the call does not deadlock. We do not care about the outcome. - changeState(makeTask("b", ASSIGNED), STARTING); - } - }; + // The goal is to verify that the call does not deadlock. We do not care about the outcome. + Command onDeleted = () -> changeState(makeTask("b", ASSIGNED), STARTING); CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID); control.replay(); // Change the task to a terminal state and wait for it to be pruned. changeState(makeTask(TASK_ID, RUNNING), KILLED); - flusher.flush(); taskDeleted.await(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java index 6dd5026..a0bd34b 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java @@ -13,11 +13,9 @@ */ package org.apache.aurora.scheduler.storage.db; -import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.aurora.common.testing.easymock.EasyMockTest; - -import org.apache.aurora.scheduler.async.FlushableWorkQueue; +import org.apache.aurora.scheduler.async.GatedWorkQueue; +import org.apache.aurora.scheduler.async.GatedWorkQueue.GatedOperation; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.CronJobStore; import org.apache.aurora.scheduler.storage.JobUpdateStore; @@ -34,20 +32,19 @@ import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.easymock.EasyMock; import org.easymock.IAnswer; +import org.easymock.IExpectationSetters; import org.junit.Before; import org.junit.Test; import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; public class DbStorageTest extends EasyMockTest { private SqlSessionFactory sessionFactory; private SqlSession session; private EnumValueMapper enumMapper; - private FlushableWorkQueue flusher; + private GatedWorkQueue gatedWorkQueue; private Work.Quiet<String> readWork; private MutateWork.NoResult.Quiet writeWork; @@ -58,14 +55,14 @@ public class DbStorageTest extends EasyMockTest { sessionFactory = createMock(SqlSessionFactory.class); session = createMock(SqlSession.class); enumMapper = createMock(EnumValueMapper.class); - flusher = createMock(FlushableWorkQueue.class); + gatedWorkQueue = createMock(GatedWorkQueue.class); readWork = createMock(new Clazz<Work.Quiet<String>>() { }); writeWork = createMock(new Clazz<MutateWork.NoResult.Quiet>() { }); storage = new DbStorage( sessionFactory, enumMapper, - flusher, + gatedWorkQueue, createMock(CronJobStore.Mutable.class), createMock(TaskStore.Mutable.class), createMock(SchedulerStore.Mutable.class), @@ -94,12 +91,23 @@ public class DbStorageTest extends EasyMockTest { assertEquals("hi", storage.read(readWork)); } + private IExpectationSetters<?> expectGateClosed() throws Exception { + return expect(gatedWorkQueue.closeDuring(EasyMock.anyObject())) + .andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + GatedOperation<?, ?> op = (GatedOperation<?, ?>) EasyMock.getCurrentArguments()[0]; + return op.doWithGateClosed(); + } + }); + } + @Test(expected = StorageException.class) - public void testBulkLoadFails() { + public void testBulkLoadFails() throws Exception { expect(sessionFactory.openSession(false)).andReturn(session); expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andThrow(new PersistenceException()); expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0); - flusher.flush(); + expectGateClosed(); control.replay(); @@ -107,13 +115,13 @@ public class DbStorageTest extends EasyMockTest { } @Test - public void testBulkLoad() { + public void testBulkLoad() throws Exception { expect(sessionFactory.openSession(false)).andReturn(session); expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andReturn(0); expect(writeWork.apply(EasyMock.anyObject())).andReturn(null); session.close(); expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0); - flusher.flush(); + expectGateClosed(); control.replay(); @@ -121,16 +129,8 @@ public class DbStorageTest extends EasyMockTest { } @Test - public void testFlushWithReentrantWrites() { - final AtomicBoolean flushed = new AtomicBoolean(false); - flusher.flush(); - expectLastCall().andAnswer(new IAnswer<Void>() { - @Override - public Void answer() { - flushed.set(true); - return null; - } - }); + public void testGateWithReentrantWrites() throws Exception { + expectGateClosed().times(2); control.replay(); @@ -138,9 +138,6 @@ public class DbStorageTest extends EasyMockTest { @Override public void execute(MutableStoreProvider storeProvider) { noopWrite(); - - // Should not have flushed yet. - assertFalse("flush() should not be called until outer write() completes.", flushed.get()); } }); } @@ -155,9 +152,8 @@ public class DbStorageTest extends EasyMockTest { } @Test - public void testFlushWithSeqentialWrites() { - flusher.flush(); - expectLastCall().times(2); + public void testFlushWithSeqentialWrites() throws Exception { + expectGateClosed().times(2); control.replay();
