HDFS-11114. Support for running async disk checks in DataNode.

This closes #153.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3fff1585
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3fff1585
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3fff1585

Branch: refs/heads/YARN-4752
Commit: 3fff1585875ad322ce6e8acb485275e6a4360823
Parents: 3dbad5d
Author: Arpit Agarwal <a...@apache.org>
Authored: Mon Nov 7 18:45:53 2016 -0800
Committer: Arpit Agarwal <a...@apache.org>
Committed: Mon Nov 7 18:45:53 2016 -0800

----------------------------------------------------------------------
 .../server/datanode/checker/AsyncChecker.java   |  63 +++++
 .../hdfs/server/datanode/checker/Checkable.java |  49 ++++
 .../datanode/checker/ThrottledAsyncChecker.java | 224 +++++++++++++++
 .../server/datanode/checker/package-info.java   |  26 ++
 .../checker/TestThrottledAsyncChecker.java      | 276 +++++++++++++++++++
 5 files changed, 638 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fff1585/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
new file mode 100644
index 0000000..1d534a3
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.checker;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A class that can be used to schedule an asynchronous check on a given
+ * {@link Checkable}. If the check is successfully scheduled then a
+ * {@link ListenableFuture} is returned.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface AsyncChecker<K, V> {
+
+  /**
+   * Schedule an asynchronous check for the given object.
+   *
+   * @param target object to be checked.
+   *
+   * @param context the interpretation of the context depends on the
+   *                target.
+   *
+   * @return returns a {@link ListenableFuture} that can be used to
+   *         retrieve the result of the asynchronous check.
+   */
+  ListenableFuture<V> schedule(Checkable<K, V> target, K context);
+
+  /**
+   * Cancel all executing checks and wait for them to complete.
+   * First attempts a graceful cancellation, then cancels forcefully.
+   * Waits for the supplied timeout after both attempts.
+   *
+   * See {@link ExecutorService#awaitTermination} for a description of
+   * the parameters.
+   *
+   * @throws InterruptedException
+   */
+  void shutdownAndWait(long timeout, TimeUnit timeUnit)
+      throws InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fff1585/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java
new file mode 100644
index 0000000..833ebda
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.checker;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+
+/**
+ * A Checkable is an object whose health can be probed by invoking its
+ * {@link #check} method.
+ *
+ * e.g. a {@link Checkable} instance may represent a single hardware
+ * resource.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface Checkable<K, V> {
+
+  /**
+   * Query the health of this object. This method may hang
+   * indefinitely depending on the status of the target resource.
+   *
+   * @param context for the probe operation. May be null depending
+   *                on the implementation.
+   *
+   * @return result of the check operation.
+   *
+   * @throws Exception encountered during the check operation. An
+   *                   exception indicates that the check failed.
+   */
+  V check(K context) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fff1585/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
new file mode 100644
index 0000000..d0ee3d2
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.checker;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.Timer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An implementation of {@link AsyncChecker} that skips checking recently
+ * checked objects. It will enforce at least {@link minMsBetweenChecks}
+ * milliseconds between two successive checks of any one object.
+ *
+ * It is assumed that the total number of Checkable objects in the system
+ * is small, (not more than a few dozen) since the checker uses O(Checkables)
+ * storage and also potentially O(Checkables) threads.
+ *
+ * {@link minMsBetweenChecks} should be configured reasonably
+ * by the caller to avoid spinning up too many threads frequently.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ThrottledAsyncChecker.class);
+
+  private final Timer timer;
+
+  /**
+   * The ExecutorService used to schedule asynchronous checks.
+   */
+  private final ListeningExecutorService executorService;
+
+  /**
+   * The minimum gap in milliseconds between two successive checks
+   * of the same object. This is the throttle.
+   */
+  private final long minMsBetweenChecks;
+
+  /**
+   * Map of checks that are currently in progress. Protected by the object
+   * lock.
+   */
+  private final Map<Checkable, ListenableFuture<V>> checksInProgress;
+
+  /**
+   * Maps Checkable objects to a future that can be used to retrieve
+   * the results of the operation.
+   * Protected by the object lock.
+   */
+  private final Map<Checkable, LastCheckResult<V>> completedChecks;
+
+  ThrottledAsyncChecker(final Timer timer,
+                        final long minMsBetweenChecks,
+                        final ExecutorService executorService) {
+    this.timer = timer;
+    this.minMsBetweenChecks = minMsBetweenChecks;
+    this.executorService = MoreExecutors.listeningDecorator(executorService);
+    this.checksInProgress = new HashMap<>();
+    this.completedChecks = new WeakHashMap<>();
+  }
+
+  /**
+   * See {@link AsyncChecker#schedule}
+   *
+   * If the object has been checked recently then the check will
+   * be skipped. Multiple concurrent checks for the same object
+   * will receive the same Future.
+   */
+  @Override
+  public synchronized ListenableFuture<V> schedule(
+      final Checkable<K, V> target,
+      final K context) {
+    LOG.debug("Scheduling a check of {}", target);
+
+    if (checksInProgress.containsKey(target)) {
+      return checksInProgress.get(target);
+    }
+
+    if (completedChecks.containsKey(target)) {
+      final LastCheckResult<V> result = completedChecks.get(target);
+      final long msSinceLastCheck = timer.monotonicNow() - result.completedAt;
+      if (msSinceLastCheck < minMsBetweenChecks) {
+        LOG.debug("Skipped checking {}. Time since last check {}ms " +
+            "is less than the min gap {}ms.",
+            target, msSinceLastCheck, minMsBetweenChecks);
+        return result.result != null ?
+            Futures.immediateFuture(result.result) :
+            Futures.immediateFailedFuture(result.exception);
+      }
+    }
+
+    final ListenableFuture<V> lf = executorService.submit(
+        new Callable<V>() {
+          @Override
+          public V call() throws Exception {
+            return target.check(context);
+          }
+        });
+    checksInProgress.put(target, lf);
+    addResultCachingCallback(target, lf);
+    return lf;
+  }
+
+  /**
+   * Register a callback to cache the result of a check.
+   * @param target
+   * @param lf
+   */
+  private void addResultCachingCallback(
+      Checkable<K, V> target, ListenableFuture<V> lf) {
+    Futures.addCallback(lf, new FutureCallback<V>() {
+      @Override
+      public void onSuccess(@Nullable V result) {
+        synchronized (ThrottledAsyncChecker.this) {
+          checksInProgress.remove(target);
+          completedChecks.put(target, new LastCheckResult<>(
+              result, timer.monotonicNow()));
+        }
+      }
+
+      @Override
+      public void onFailure(@Nonnull Throwable t) {
+        synchronized (ThrottledAsyncChecker.this) {
+          checksInProgress.remove(target);
+          completedChecks.put(target, new LastCheckResult<>(
+              t, timer.monotonicNow()));
+        }
+      }
+    });
+  }
+
+  /**
+   * {@inheritDoc}.
+   */
+  @Override
+  public void shutdownAndWait(long timeout, TimeUnit timeUnit)
+      throws InterruptedException {
+    // Try orderly shutdown.
+    executorService.shutdown();
+
+    if (!executorService.awaitTermination(timeout, timeUnit)) {
+      // Interrupt executing tasks and wait again.
+      executorService.shutdownNow();
+      executorService.awaitTermination(timeout, timeUnit);
+    }
+  }
+
+  /**
+   * Status of running a check. It can either be a result or an
+   * exception, depending on whether the check completed or threw.
+   */
+  private static final class LastCheckResult<V> {
+    /**
+     * Timestamp at which the check completed.
+     */
+    private final long completedAt;
+
+    /**
+     * Result of running the check if it completed. null if it threw.
+     */
+    @Nullable
+    private final V result;
+
+    /**
+     * Exception thrown by the check. null if it returned a result.
+     */
+    private final Throwable exception; // null on success.
+
+    /**
+     * Initialize with a result.
+     * @param result
+     */
+    private LastCheckResult(V result, long completedAt) {
+      this.result = result;
+      this.exception = null;
+      this.completedAt = completedAt;
+    }
+
+    /**
+     * Initialize with an exception.
+     * @param completedAt
+     * @param t
+     */
+    private LastCheckResult(Throwable t, long completedAt) {
+      this.result = null;
+      this.exception = t;
+      this.completedAt = completedAt;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fff1585/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java
new file mode 100644
index 0000000..52822e9
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Datanode support for running disk checks.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+@InterfaceStability.Evolving
+package org.apache.hadoop.hdfs.server.datanode.checker;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fff1585/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
new file mode 100644
index 0000000..70795ca
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.checker;
+
+import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.FakeTimer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.core.Is.isA;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Verify functionality of {@link ThrottledAsyncChecker}.
+ */
+public class TestThrottledAsyncChecker {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestThrottledAsyncChecker.class);
+  private static final long MIN_ERROR_CHECK_GAP = 1000;
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  /**
+   * Test various scheduling combinations to ensure scheduling and
+   * throttling behave as expected.
+   */
+  @Test(timeout=60000)
+  public void testScheduler() throws Exception {
+    final NoOpCheckable target1 = new NoOpCheckable();
+    final NoOpCheckable target2 = new NoOpCheckable();
+    final FakeTimer timer = new FakeTimer();
+    ThrottledAsyncChecker<Boolean, Boolean> checker =
+        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
+                                    getExecutorService());
+
+    // check target1 and ensure we get back the expected result.
+    assertTrue(checker.schedule(target1, true).get());
+    assertThat(target1.numChecks.get(), is(1L));
+
+    // Check target1 again without advancing the timer. target1 should not
+    // be checked again and the cached result should be returned.
+    assertTrue(checker.schedule(target1, true).get());
+    assertThat(target1.numChecks.get(), is(1L));
+
+    // Schedule target2 scheduled without advancing the timer.
+    // target2 should be checked as it has never been checked before.
+    assertTrue(checker.schedule(target2, true).get());
+    assertThat(target2.numChecks.get(), is(1L));
+
+    // Advance the timer but just short of the min gap.
+    // Neither target1 nor target2 should be checked again.
+    timer.advance(MIN_ERROR_CHECK_GAP - 1);
+    assertTrue(checker.schedule(target1, true).get());
+    assertThat(target1.numChecks.get(), is(1L));
+    assertTrue(checker.schedule(target2, true).get());
+    assertThat(target2.numChecks.get(), is(1L));
+
+    // Advance the timer again.
+    // Both targets should be checked now.
+    timer.advance(MIN_ERROR_CHECK_GAP);
+    assertTrue(checker.schedule(target1, true).get());
+    assertThat(target1.numChecks.get(), is(2L));
+    assertTrue(checker.schedule(target2, true).get());
+    assertThat(target1.numChecks.get(), is(2L));
+  }
+
+  @Test (timeout=60000)
+  public void testCancellation() throws Exception {
+    LatchedCheckable target = new LatchedCheckable();
+    final FakeTimer timer = new FakeTimer();
+    final LatchedCallback callback = new LatchedCallback(target);
+    ThrottledAsyncChecker<Boolean, Boolean> checker =
+        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
+                                    getExecutorService());
+
+    ListenableFuture<Boolean> lf = checker.schedule(target, true);
+    Futures.addCallback(lf, callback);
+
+    // Request immediate cancellation.
+    checker.shutdownAndWait(0, TimeUnit.MILLISECONDS);
+    try {
+      assertFalse(lf.get());
+      fail("Failed to get expected InterruptedException");
+    } catch (ExecutionException ee) {
+      assertTrue(ee.getCause() instanceof InterruptedException);
+    }
+    callback.failureLatch.await();
+  }
+
+  @Test (timeout=60000)
+  public void testConcurrentChecks() throws Exception {
+    LatchedCheckable target = new LatchedCheckable();
+    final FakeTimer timer = new FakeTimer();
+    ThrottledAsyncChecker<Boolean, Boolean> checker =
+        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
+                                    getExecutorService());
+    final ListenableFuture<Boolean> lf1 = checker.schedule(target, true);
+    final ListenableFuture<Boolean> lf2 = checker.schedule(target, true);
+
+    // Ensure that concurrent requests return the same future object.
+    assertTrue(lf1 == lf2);
+
+    // Unblock the latch and wait for it to finish execution.
+    target.latch.countDown();
+    lf1.get();
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        // We should not get back the same future as before.
+        // This can take a short while until the internal callback in
+        // ThrottledAsyncChecker is scheduled for execution.
+        // Also this should not trigger a new check operation as the timer
+        // was not advanced. If it does trigger a new check then the test
+        // will fail with a timeout.
+        final ListenableFuture<Boolean> lf3 = checker.schedule(target, true);
+        return lf3 != lf2;
+      }
+    }, 100, 10000);
+  }
+
+  /**
+   * Ensure that the context is passed through to the Checkable#check
+   * method.
+   * @throws Exception
+   */
+  @Test(timeout=60000)
+  public void testContextIsPassed() throws Exception {
+    final NoOpCheckable target1 = new NoOpCheckable();
+    final FakeTimer timer = new FakeTimer();
+    ThrottledAsyncChecker<Boolean, Boolean> checker =
+        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
+            getExecutorService());
+
+    assertTrue(checker.schedule(target1, true).get());
+    assertThat(target1.numChecks.get(), is(1L));
+    timer.advance(MIN_ERROR_CHECK_GAP + 1);
+    assertFalse(checker.schedule(target1, false).get());
+    assertThat(target1.numChecks.get(), is(2L));
+  }
+
+  /**
+   * Ensure that the exeption from a failed check is cached
+   * and returned without re-running the check when the minimum
+   * gap has not elapsed.
+   *
+   * @throws Exception
+   */
+  @Test(timeout=60000)
+  public void testExceptionCaching() throws Exception {
+    final ThrowingCheckable target1 = new ThrowingCheckable();
+    final FakeTimer timer = new FakeTimer();
+    ThrottledAsyncChecker<Boolean, Boolean> checker =
+        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
+            getExecutorService());
+
+    thrown.expectCause(isA(DummyException.class));
+    checker.schedule(target1, true).get();
+    assertThat(target1.numChecks.get(), is(1L));
+
+    thrown.expectCause(isA(DummyException.class));
+    checker.schedule(target1, true).get();
+    assertThat(target1.numChecks.get(), is(2L));
+  }
+
+  /**
+   * A simple ExecutorService for testing.
+   */
+  private ExecutorService getExecutorService() {
+    return new ScheduledThreadPoolExecutor(1);
+  }
+
+  /**
+   * A Checkable that just returns its input.
+   */
+  private static class NoOpCheckable
+      implements Checkable<Boolean, Boolean> {
+    private final AtomicLong numChecks = new AtomicLong(0);
+    @Override
+    public Boolean check(Boolean context) {
+      numChecks.incrementAndGet();
+      return context;
+    }
+  }
+
+  private static class ThrowingCheckable
+      implements Checkable<Boolean, Boolean> {
+    private final AtomicLong numChecks = new AtomicLong(0);
+    @Override
+    public Boolean check(Boolean context) throws DummyException {
+      numChecks.incrementAndGet();
+      throw new DummyException();
+    }
+
+  }
+
+  private static class DummyException extends Exception {
+  }
+
+  /**
+   * A checkable that hangs until signaled.
+   */
+  private static class LatchedCheckable
+      implements Checkable<Boolean, Boolean> {
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+    @Override
+    public Boolean check(Boolean ignored) throws InterruptedException {
+      LOG.info("LatchedCheckable {} waiting.", this);
+      latch.await();
+      return true;  // Unreachable.
+    }
+  }
+
+  /**
+   * A {@link FutureCallback} that counts its invocations.
+   */
+  private static final class LatchedCallback
+      implements FutureCallback<Boolean> {
+    private final CountDownLatch successLatch = new CountDownLatch(1);
+    private final CountDownLatch failureLatch = new CountDownLatch(1);
+    private final Checkable target;
+
+    private LatchedCallback(Checkable target) {
+      this.target = target;
+    }
+
+    @Override
+    public void onSuccess(@Nonnull Boolean result) {
+      LOG.info("onSuccess callback invoked for {}", target);
+      successLatch.countDown();
+    }
+
+    @Override
+    public void onFailure(@Nonnull Throwable t) {
+      LOG.info("onFailure callback invoked for {} with exception", target, t);
+      failureLatch.countDown();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to