HDFS-11131. TestThrottledAsyncChecker#testCancellation is flaky.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8c57aeb5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8c57aeb5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8c57aeb5 Branch: refs/heads/HDFS-7240 Commit: 8c57aeb5b4fcf9f688c0f00df684b9125f683250 Parents: 32bb36b Author: Arpit Agarwal <[email protected]> Authored: Wed Apr 5 16:01:54 2017 -0700 Committer: Arpit Agarwal <[email protected]> Committed: Wed Apr 5 17:54:30 2017 -0700 ---------------------------------------------------------------------- .../datanode/checker/ThrottledAsyncChecker.java | 25 ++-- .../checker/TestThrottledAsyncChecker.java | 118 ++++++------------- 2 files changed, 43 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c57aeb5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java index 7584d97..b71c015 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java @@ -187,28 +187,21 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> { /** * {@inheritDoc}. + * + * The results of in-progress checks are not useful during shutdown, + * so we optimize for faster shutdown by interrupt all actively + * executing checks. */ @Override public void shutdownAndWait(long timeout, TimeUnit timeUnit) throws InterruptedException { - // Try orderly shutdown. - executorService.shutdown(); - - if (!executorService.awaitTermination(timeout, timeUnit)) { - // Interrupt executing tasks and wait again. - executorService.shutdownNow(); - executorService.awaitTermination(timeout, timeUnit); - } if (scheduledExecutorService != null) { - // Try orderly shutdown - scheduledExecutorService.shutdown(); - - if (!scheduledExecutorService.awaitTermination(timeout, timeUnit)) { - // Interrupt executing tasks and wait again. - scheduledExecutorService.shutdownNow(); - scheduledExecutorService.awaitTermination(timeout, timeUnit); - } + scheduledExecutorService.shutdownNow(); + scheduledExecutorService.awaitTermination(timeout, timeUnit); } + + executorService.shutdownNow(); + executorService.awaitTermination(timeout, timeUnit); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/8c57aeb5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java index 00b1af2..4ed6371 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode.checker; import com.google.common.base.Optional; import com.google.common.base.Supplier; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.FakeTimer; @@ -29,12 +27,9 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.assertFalse; @@ -94,34 +89,8 @@ public class TestThrottledAsyncChecker { } @Test (timeout=60000) - public void testCancellation() throws Exception { - LatchedCheckable target = new LatchedCheckable(); - final FakeTimer timer = new FakeTimer(); - final LatchedCallback callback = new LatchedCallback(target); - ThrottledAsyncChecker<Boolean, Boolean> checker = - new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0, - getExecutorService()); - - Optional<ListenableFuture<Boolean>> olf = - checker.schedule(target, true); - if (olf.isPresent()) { - Futures.addCallback(olf.get(), callback); - } - - // Request immediate cancellation. - checker.shutdownAndWait(0, TimeUnit.MILLISECONDS); - try { - assertFalse(olf.get().get()); - fail("Failed to get expected InterruptedException"); - } catch (ExecutionException ee) { - assertTrue(ee.getCause() instanceof InterruptedException); - } - callback.failureLatch.await(); - } - - @Test (timeout=60000) public void testConcurrentChecks() throws Exception { - LatchedCheckable target = new LatchedCheckable(); + final StalledCheckable target = new StalledCheckable(); final FakeTimer timer = new FakeTimer(); ThrottledAsyncChecker<Boolean, Boolean> checker = new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0, @@ -136,25 +105,6 @@ public class TestThrottledAsyncChecker { // for the first caller. assertTrue(olf1.isPresent()); assertFalse(olf2.isPresent()); - - // Unblock the latch and wait for it to finish execution. - target.latch.countDown(); - olf1.get().get(); - - GenericTestUtils.waitFor(new Supplier<Boolean>() { - @Override - public Boolean get() { - // We should get an absent Optional. - // This can take a short while until the internal callback in - // ThrottledAsyncChecker is scheduled for execution. - // Also this should not trigger a new check operation as the timer - // was not advanced. If it does trigger a new check then the test - // will fail with a timeout. - final Optional<ListenableFuture<Boolean>> olf3 = - checker.schedule(target, true); - return !olf3.isPresent(); - } - }, 100, 10000); } /** @@ -191,6 +141,32 @@ public class TestThrottledAsyncChecker { } }, 100, 10000); } + + /** + * Ensure that an exception thrown by the check routine is + * propagated. + * + * @throws Exception + */ + @Test(timeout=60000) + public void testExceptionIsPropagated() throws Exception { + final ThrowingCheckable target = new ThrowingCheckable(); + final FakeTimer timer = new FakeTimer(); + ThrottledAsyncChecker<Boolean, Boolean> checker = + new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0, + getExecutorService()); + + final Optional<ListenableFuture<Boolean>> olf = + checker.schedule(target, true); + assertTrue(olf.isPresent()); + try { + olf.get().get(); + fail("Failed to get expected ExecutionException"); + } catch(ExecutionException ee) { + assertTrue(ee.getCause() instanceof DummyException); + } + } + /** * Ensure that the exception from a failed check is cached * and returned without re-running the check when the minimum @@ -245,6 +221,9 @@ public class TestThrottledAsyncChecker { } } + /** + * A Checkable that throws an exception when checked. + */ private static class ThrowingCheckable extends TestCheckableBase { @Override @@ -258,43 +237,14 @@ public class TestThrottledAsyncChecker { } /** - * A checkable that hangs until signaled. + * A checkable that hangs forever when checked. */ - private static class LatchedCheckable + private static class StalledCheckable implements Checkable<Boolean, Boolean> { - private final CountDownLatch latch = new CountDownLatch(1); - @Override public Boolean check(Boolean ignored) throws InterruptedException { - LOG.info("LatchedCheckable {} waiting.", this); - latch.await(); - return true; // Unreachable. - } - } - - /** - * A {@link FutureCallback} that counts its invocations. - */ - private static final class LatchedCallback - implements FutureCallback<Boolean> { - private final CountDownLatch successLatch = new CountDownLatch(1); - private final CountDownLatch failureLatch = new CountDownLatch(1); - private final Checkable target; - - private LatchedCallback(Checkable target) { - this.target = target; - } - - @Override - public void onSuccess(@Nonnull Boolean result) { - LOG.info("onSuccess callback invoked for {}", target); - successLatch.countDown(); - } - - @Override - public void onFailure(@Nonnull Throwable t) { - LOG.info("onFailure callback invoked for {} with exception", target, t); - failureLatch.countDown(); + Thread.sleep(Long.MAX_VALUE); + return false; // Unreachable. } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
