This is an automated email from the ASF dual-hosted git repository.
arp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3b49d7a HDDS-989. Check Hdds Volumes for errors. Contributed by Arpit
Agarwal.
3b49d7a is described below
commit 3b49d7aeae8819ce7c2c4f4fec057dd9e75dedf1
Author: Arpit Agarwal <[email protected]>
AuthorDate: Sun Jan 27 11:18:30 2019 -0800
HDDS-989. Check Hdds Volumes for errors. Contributed by Arpit Agarwal.
---
.../container/common/volume/AbstractFuture.java | 1291 ++++++++++++++++++++
.../ozone/container/common/volume/HddsVolume.java | 24 +-
.../container/common/volume/HddsVolumeChecker.java | 418 +++++++
.../common/volume/ThrottledAsyncChecker.java | 245 ++++
.../container/common/volume/TimeoutFuture.java | 161 +++
.../ozone/container/common/volume/VolumeSet.java | 116 +-
.../ozone/container/ozoneimpl/OzoneContainer.java | 1 +
.../common/volume/TestHddsVolumeChecker.java | 212 ++++
.../common/volume/TestVolumeSetDiskChecks.java | 185 +++
9 files changed, 2643 insertions(+), 10 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AbstractFuture.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AbstractFuture.java
new file mode 100644
index 0000000..438692c
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AbstractFuture.java
@@ -0,0 +1,1291 @@
+/*
+ * Copyright (C) 2007 The Guava Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+/**
+ * Some portions of this class have been modified to make it functional in this
+ * package.
+ */
+package org.apache.hadoop.ozone.container.common.volume;
+
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.GwtCompatible;
+import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+ .newUpdater;
+
+import javax.annotation.Nullable;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+import java.util.concurrent.locks.LockSupport;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An abstract implementation of {@link ListenableFuture}, intended for
+ * advanced users only. More common ways to create a {@code ListenableFuture}
+ * include instantiating a {@link SettableFuture}, submitting a task to a
+ * {@link ListeningExecutorService}, and deriving a {@code Future} from an
+ * existing one, typically using methods like {@link Futures#transform
+ * (ListenableFuture, com.google.common.base.Function) Futures.transform}
+ * and its overloaded versions.
+ * <p>
+ * <p>This class implements all methods in {@code ListenableFuture}.
+ * Subclasses should provide a way to set the result of the computation
+ * through the protected methods {@link #set(Object)},
+ * {@link #setFuture(ListenableFuture)} and {@link #setException(Throwable)}.
+ * Subclasses may also override {@link #interruptTask()}, which will be
+ * invoked automatically if a call to {@link #cancel(boolean) cancel(true)}
+ * succeeds in canceling the future. Subclasses should rarely override other
+ * methods.
+ */
+
+@GwtCompatible(emulated = true)
+public abstract class AbstractFuture<V> implements ListenableFuture<V> {
+ // NOTE: Whenever both tests are cheap and functional, it's faster to use &,
+ // | instead of &&, ||
+
+ private static final boolean GENERATE_CANCELLATION_CAUSES =
+ Boolean.parseBoolean(
+ System.getProperty("guava.concurrent.generate_cancellation_cause",
+ "false"));
+
+ /**
+ * A less abstract subclass of AbstractFuture. This can be used to optimize
+ * setFuture by ensuring that {@link #get} calls exactly the implementation
+ * of {@link AbstractFuture#get}.
+ */
+ abstract static class TrustedFuture<V> extends AbstractFuture<V> {
+ @Override
+ public final V get() throws InterruptedException, ExecutionException {
+ return super.get();
+ }
+
+ @Override
+ public final V get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return super.get(timeout, unit);
+ }
+
+ @Override
+ public final boolean isDone() {
+ return super.isDone();
+ }
+
+ @Override
+ public final boolean isCancelled() {
+ return super.isCancelled();
+ }
+
+ @Override
+ public final void addListener(Runnable listener, Executor executor) {
+ super.addListener(listener, executor);
+ }
+
+ @Override
+ public final boolean cancel(boolean mayInterruptIfRunning) {
+ return super.cancel(mayInterruptIfRunning);
+ }
+ }
+
+ // Logger to log exceptions caught when running listeners.
+ private static final Logger log = Logger
+ .getLogger(AbstractFuture.class.getName());
+
+ // A heuristic for timed gets. If the remaining timeout is less than this,
+ // spin instead of
+ // blocking. This value is what AbstractQueuedSynchronizer uses.
+ private static final long SPIN_THRESHOLD_NANOS = 1000L;
+
+ private static final AtomicHelper ATOMIC_HELPER;
+
+ static {
+ AtomicHelper helper;
+
+ try {
+ helper = new UnsafeAtomicHelper();
+ } catch (Throwable unsafeFailure) {
+ // catch absolutely everything and fall through to our 'SafeAtomicHelper'
+ // The access control checks that ARFU does means the caller class has
+ // to be AbstractFuture
+ // instead of SafeAtomicHelper, so we annoyingly define these here
+ try {
+ helper =
+ new SafeAtomicHelper(
+ newUpdater(Waiter.class, Thread.class, "thread"),
+ newUpdater(Waiter.class, Waiter.class, "next"),
+ newUpdater(AbstractFuture.class, Waiter.class, "waiters"),
+ newUpdater(AbstractFuture.class, Listener.class, "listeners"),
+ newUpdater(AbstractFuture.class, Object.class, "value"));
+ } catch (Throwable atomicReferenceFieldUpdaterFailure) {
+ // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs
+ // that cause getDeclaredField to throw a NoSuchFieldException when
+ // the field is definitely there.
+ // For these users fallback to a suboptimal implementation, based on
+ // synchronized. This will be a definite performance hit to those
users.
+ log.log(Level.SEVERE, "UnsafeAtomicHelper is broken!", unsafeFailure);
+ log.log(
+ Level.SEVERE, "SafeAtomicHelper is broken!",
+ atomicReferenceFieldUpdaterFailure);
+ helper = new SynchronizedHelper();
+ }
+ }
+ ATOMIC_HELPER = helper;
+
+ // Prevent rare disastrous classloading in first call to LockSupport.park.
+ // See: https://bugs.openjdk.java.net/browse/JDK-8074773
+ @SuppressWarnings("unused")
+ Class<?> ensureLoaded = LockSupport.class;
+ }
+
+ /**
+ * Waiter links form a Treiber stack, in the {@link #waiters} field.
+ */
+ private static final class Waiter {
+ static final Waiter TOMBSTONE = new Waiter(false /* ignored param */);
+
+ @Nullable volatile Thread thread;
+ @Nullable volatile Waiter next;
+
+ /**
+ * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this
+ * class is loaded before the ATOMIC_HELPER. Apparently this is possible
+ * on some android platforms.
+ */
+ Waiter(boolean unused) {
+ }
+
+ Waiter() {
+ // avoid volatile write, write is made visible by subsequent CAS on
+ // waiters field
+ ATOMIC_HELPER.putThread(this, Thread.currentThread());
+ }
+
+ // non-volatile write to the next field. Should be made visible by
+ // subsequent CAS on waiters field.
+ void setNext(Waiter next) {
+ ATOMIC_HELPER.putNext(this, next);
+ }
+
+ void unpark() {
+ // This is racy with removeWaiter. The consequence of the race is that
+ // we may spuriously call unpark even though the thread has already
+ // removed itself from the list. But even if we did use a CAS, that
+ // race would still exist (it would just be ever so slightly smaller).
+ Thread w = thread;
+ if (w != null) {
+ thread = null;
+ LockSupport.unpark(w);
+ }
+ }
+ }
+
+ /**
+ * Marks the given node as 'deleted' (null waiter) and then scans the list
+ * to unlink all deleted nodes. This is an O(n) operation in the common
+ * case (and O(n^2) in the worst), but we are saved by two things.
+ * <ul>
+ * <li>This is only called when a waiting thread times out or is
+ * interrupted. Both of which should be rare.
+ * <li>The waiters list should be very short.
+ * </ul>
+ */
+ private void removeWaiter(Waiter node) {
+ node.thread = null; // mark as 'deleted'
+ restart:
+ while (true) {
+ Waiter pred = null;
+ Waiter curr = waiters;
+ if (curr == Waiter.TOMBSTONE) {
+ return; // give up if someone is calling complete
+ }
+ Waiter succ;
+ while (curr != null) {
+ succ = curr.next;
+ if (curr.thread != null) { // we aren't unlinking this node, update
+ // pred.
+ pred = curr;
+ } else if (pred != null) { // We are unlinking this node and it has a
+ // predecessor.
+ pred.next = succ;
+ if (pred.thread == null) { // We raced with another node that
+ // unlinked pred. Restart.
+ continue restart;
+ }
+ } else if (!ATOMIC_HELPER
+ .casWaiters(this, curr, succ)) { // We are unlinking head
+ continue restart; // We raced with an add or complete
+ }
+ curr = succ;
+ }
+ break;
+ }
+ }
+
+ /**
+ * Listeners also form a stack through the {@link #listeners} field.
+ */
+ private static final class Listener {
+ static final Listener TOMBSTONE = new Listener(null, null);
+ final Runnable task;
+ final Executor executor;
+
+ // writes to next are made visible by subsequent CAS's on the listeners
+ // field
+ @Nullable Listener next;
+
+ Listener(Runnable task, Executor executor) {
+ this.task = task;
+ this.executor = executor;
+ }
+ }
+
+ /**
+ * A special value to represent {@code null}.
+ */
+ private static final Object NULL = new Object();
+
+ /**
+ * A special value to represent failure, when {@link #setException} is
+ * called successfully.
+ */
+ private static final class Failure {
+ static final Failure FALLBACK_INSTANCE =
+ new Failure(
+ new Throwable("Failure occurred while trying to finish a future" +
+ ".") {
+ @Override
+ public synchronized Throwable fillInStackTrace() {
+ return this; // no stack trace
+ }
+ });
+ final Throwable exception;
+
+ Failure(Throwable exception) {
+ this.exception = checkNotNull(exception);
+ }
+ }
+
+ /**
+ * A special value to represent cancellation and the 'wasInterrupted' bit.
+ */
+ private static final class Cancellation {
+ final boolean wasInterrupted;
+ @Nullable final Throwable cause;
+
+ Cancellation(boolean wasInterrupted, @Nullable Throwable cause) {
+ this.wasInterrupted = wasInterrupted;
+ this.cause = cause;
+ }
+ }
+
+ /**
+ * A special value that encodes the 'setFuture' state.
+ */
+ private static final class SetFuture<V> implements Runnable {
+ final AbstractFuture<V> owner;
+ final ListenableFuture<? extends V> future;
+
+ SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future) {
+ this.owner = owner;
+ this.future = future;
+ }
+
+ @Override
+ public void run() {
+ if (owner.value != this) {
+ // nothing to do, we must have been cancelled, don't bother inspecting
+ // the future.
+ return;
+ }
+ Object valueToSet = getFutureValue(future);
+ if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) {
+ complete(owner);
+ }
+ }
+ }
+
+ /**
+ * This field encodes the current state of the future.
+ * <p>
+ * <p>The valid values are:
+ * <ul>
+ * <li>{@code null} initial state, nothing has happened.
+ * <li>{@link Cancellation} terminal state, {@code cancel} was called.
+ * <li>{@link Failure} terminal state, {@code setException} was called.
+ * <li>{@link SetFuture} intermediate state, {@code setFuture} was called.
+ * <li>{@link #NULL} terminal state, {@code set(null)} was called.
+ * <li>Any other non-null value, terminal state, {@code set} was called with
+ * a non-null argument.
+ * </ul>
+ */
+ private volatile Object value;
+
+ /**
+ * All listeners.
+ */
+ private volatile Listener listeners;
+
+ /**
+ * All waiting threads.
+ */
+ private volatile Waiter waiters;
+
+ /**
+ * Constructor for use by subclasses.
+ */
+ protected AbstractFuture() {
+ }
+
+ // Gets and Timed Gets
+ //
+ // * Be responsive to interruption
+ // * Don't create Waiter nodes if you aren't going to park, this helps
+ // reduce contention on the waiters field.
+ // * Future completion is defined by when #value becomes non-null/non
+ // SetFuture
+ // * Future completion can be observed if the waiters field contains a
+ // TOMBSTONE
+
+ // Timed Get
+ // There are a few design constraints to consider
+ // * We want to be responsive to small timeouts, unpark() has non trivial
+ // latency overheads (I have observed 12 micros on 64 bit linux systems to
+ // wake up a parked thread). So if the timeout is small we shouldn't park().
+ // This needs to be traded off with the cpu overhead of spinning, so we use
+ // SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for
+ // similar purposes.
+ // * We want to behave reasonably for timeouts of 0
+ // * We are more responsive to completion than timeouts. This is because
+ // parkNanos depends on system scheduling and as such we could either miss
+ // our deadline, or unpark() could be delayed so that it looks like we
+ // timed out even though we didn't. For comparison FutureTask respects
+ // completion preferably and AQS is non-deterministic (depends on where in
+ // the queue the waiter is). If we wanted to be strict about it, we could
+ // store the unpark() time in the Waiter node and we could use that to make
+ // a decision about whether or not we timed out prior to being unparked.
+
+ /*
+ * Improve the documentation of when InterruptedException is thrown. Our
+ * behavior matches the JDK's, but the JDK's documentation is misleading.
+ */
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * <p>The default {@link AbstractFuture} implementation throws {@code
+ * InterruptedException} if the current thread is interrupted before or
+ * during the call, even if the value is already available.
+ *
+ * @throws InterruptedException if the current thread was interrupted
+ * before or during the call
+ * (optional but recommended).
+ * @throws CancellationException {@inheritDoc}
+ */
+ @Override
+ public V get(long timeout, TimeUnit unit)
+ throws InterruptedException, TimeoutException, ExecutionException {
+ // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into
+ // the while(true) loop at the bottom and throw a timeoutexception.
+ long remainingNanos = unit
+ .toNanos(timeout); // we rely on the implicit null check on unit.
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ Object localValue = value;
+ if (localValue != null & !(localValue instanceof SetFuture)) {
+ return getDoneValue(localValue);
+ }
+ // we delay calling nanoTime until we know we will need to either park or
+ // spin
+ final long endNanos = remainingNanos > 0 ? System
+ .nanoTime() + remainingNanos : 0;
+ long_wait_loop:
+ if (remainingNanos >= SPIN_THRESHOLD_NANOS) {
+ Waiter oldHead = waiters;
+ if (oldHead != Waiter.TOMBSTONE) {
+ Waiter node = new Waiter();
+ do {
+ node.setNext(oldHead);
+ if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
+ while (true) {
+ LockSupport.parkNanos(this, remainingNanos);
+ // Check interruption first, if we woke up due to interruption
+ // we need to honor that.
+ if (Thread.interrupted()) {
+ removeWaiter(node);
+ throw new InterruptedException();
+ }
+
+ // Otherwise re-read and check doneness. If we loop then it must
+ // have been a spurious wakeup
+ localValue = value;
+ if (localValue != null & !(localValue instanceof SetFuture)) {
+ return getDoneValue(localValue);
+ }
+
+ // timed out?
+ remainingNanos = endNanos - System.nanoTime();
+ if (remainingNanos < SPIN_THRESHOLD_NANOS) {
+ // Remove the waiter, one way or another we are done parking
+ // this thread.
+ removeWaiter(node);
+ break long_wait_loop; // jump down to the busy wait loop
+ }
+ }
+ }
+ oldHead = waiters; // re-read and loop.
+ } while (oldHead != Waiter.TOMBSTONE);
+ }
+ // re-read value, if we get here then we must have observed a TOMBSTONE
+ // while trying to add a waiter.
+ return getDoneValue(value);
+ }
+ // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and
+ // there is no node on the waiters list
+ while (remainingNanos > 0) {
+ localValue = value;
+ if (localValue != null & !(localValue instanceof SetFuture)) {
+ return getDoneValue(localValue);
+ }
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ remainingNanos = endNanos - System.nanoTime();
+ }
+ throw new TimeoutException();
+ }
+
+ /*
+ * Improve the documentation of when InterruptedException is thrown. Our
+ * behavior matches the JDK's, but the JDK's documentation is misleading.
+ */
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * <p>The default {@link AbstractFuture} implementation throws {@code
+ * InterruptedException} if the current thread is interrupted before or
+ * during the call, even if the value is already available.
+ *
+ * @throws InterruptedException if the current thread was interrupted
+ * before or during the call
+ * (optional but recommended).
+ * @throws CancellationException {@inheritDoc}
+ */
+ @Override
+ public V get() throws InterruptedException, ExecutionException {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ Object localValue = value;
+ if (localValue != null & !(localValue instanceof SetFuture)) {
+ return getDoneValue(localValue);
+ }
+ Waiter oldHead = waiters;
+ if (oldHead != Waiter.TOMBSTONE) {
+ Waiter node = new Waiter();
+ do {
+ node.setNext(oldHead);
+ if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
+ // we are on the stack, now wait for completion.
+ while (true) {
+ LockSupport.park(this);
+ // Check interruption first, if we woke up due to interruption we
+ // need to honor that.
+ if (Thread.interrupted()) {
+ removeWaiter(node);
+ throw new InterruptedException();
+ }
+ // Otherwise re-read and check doneness. If we loop then it must
+ // have been a spurious wakeup
+ localValue = value;
+ if (localValue != null & !(localValue instanceof SetFuture)) {
+ return getDoneValue(localValue);
+ }
+ }
+ }
+ oldHead = waiters; // re-read and loop.
+ } while (oldHead != Waiter.TOMBSTONE);
+ }
+ // re-read value, if we get here then we must have observed a TOMBSTONE
+ // while trying to add a waiter.
+ return getDoneValue(value);
+ }
+
+ /**
+ * Unboxes {@code obj}. Assumes that obj is not {@code null} or a
+ * {@link SetFuture}.
+ */
+ private V getDoneValue(Object obj) throws ExecutionException {
+ // While this seems like it might be too branch-y, simple benchmarking
+ // proves it to be unmeasurable (comparing done AbstractFutures with
+ // immediateFuture)
+ if (obj instanceof Cancellation) {
+ throw cancellationExceptionWithCause(
+ "Task was cancelled.", ((Cancellation) obj).cause);
+ } else if (obj instanceof Failure) {
+ throw new ExecutionException(((Failure) obj).exception);
+ } else if (obj == NULL) {
+ return null;
+ } else {
+ @SuppressWarnings("unchecked") // this is the only other option
+ V asV = (V) obj;
+ return asV;
+ }
+ }
+
+ @Override
+ public boolean isDone() {
+ final Object localValue = value;
+ return localValue != null & !(localValue instanceof SetFuture);
+ }
+
+ @Override
+ public boolean isCancelled() {
+ final Object localValue = value;
+ return localValue instanceof Cancellation;
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * <p>If a cancellation attempt succeeds on a {@code Future} that had
+ * previously been {@linkplain#setFuture set asynchronously}, then the
+ * cancellation will also be propagated to the delegate {@code Future} that
+ * was supplied in the {@code setFuture} call.
+ */
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ Object localValue = value;
+ boolean rValue = false;
+ if (localValue == null | localValue instanceof SetFuture) {
+ // Try to delay allocating the exception. At this point we may still
+ // lose the CAS, but it is certainly less likely.
+ Throwable cause =
+ GENERATE_CANCELLATION_CAUSES
+ ? new CancellationException("Future.cancel() was called.")
+ : null;
+ Object valueToSet = new Cancellation(mayInterruptIfRunning, cause);
+ AbstractFuture<?> abstractFuture = this;
+ while (true) {
+ if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) {
+ rValue = true;
+ // We call interuptTask before calling complete(), which is
+ // consistent with FutureTask
+ if (mayInterruptIfRunning) {
+ abstractFuture.interruptTask();
+ }
+ complete(abstractFuture);
+ if (localValue instanceof SetFuture) {
+ // propagate cancellation to the future set in setfuture, this is
+ // racy, and we don't care if we are successful or not.
+ ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue)
+ .future;
+ if (futureToPropagateTo instanceof TrustedFuture) {
+ // If the future is a TrustedFuture then we specifically avoid
+ // calling cancel() this has 2 benefits
+ // 1. for long chains of futures strung together with setFuture
+ // we consume less stack
+ // 2. we avoid allocating Cancellation objects at every level of
+ // the cancellation chain
+ // We can only do this for TrustedFuture, because
+ // TrustedFuture.cancel is final and does nothing but delegate
+ // to this method.
+ AbstractFuture<?> trusted = (AbstractFuture<?>)
+ futureToPropagateTo;
+ localValue = trusted.value;
+ if (localValue == null | localValue instanceof SetFuture) {
+ abstractFuture = trusted;
+ continue; // loop back up and try to complete the new future
+ }
+ } else {
+ // not a TrustedFuture, call cancel directly.
+ futureToPropagateTo.cancel(mayInterruptIfRunning);
+ }
+ }
+ break;
+ }
+ // obj changed, reread
+ localValue = abstractFuture.value;
+ if (!(localValue instanceof SetFuture)) {
+ // obj cannot be null at this point, because value can only change
+ // from null to non-null. So if value changed (and it did since we
+ // lost the CAS), then it cannot be null and since it isn't a
+ // SetFuture, then the future must be done and we should exit the
loop
+ break;
+ }
+ }
+ }
+ return rValue;
+ }
+
+ /**
+ * Subclasses can override this method to implement interruption of the
+ * future's computation. The method is invoked automatically by a
+ * successful call to {@link #cancel(boolean) cancel(true)}.
+ * <p>
+ * <p>The default implementation does nothing.
+ *
+ * @since 10.0
+ */
+ protected void interruptTask() {
+ }
+
+ /**
+ * Returns true if this future was cancelled with {@code
+ * mayInterruptIfRunning} set to {@code true}.
+ *
+ * @since 14.0
+ */
+ protected final boolean wasInterrupted() {
+ final Object localValue = value;
+ return (localValue instanceof Cancellation) && ((Cancellation) localValue)
+ .wasInterrupted;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @since 10.0
+ */
+ @Override
+ public void addListener(Runnable listener, Executor executor) {
+ checkNotNull(listener, "Runnable was null.");
+ checkNotNull(executor, "Executor was null.");
+ Listener oldHead = listeners;
+ if (oldHead != Listener.TOMBSTONE) {
+ Listener newNode = new Listener(listener, executor);
+ do {
+ newNode.next = oldHead;
+ if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {
+ return;
+ }
+ oldHead = listeners; // re-read
+ } while (oldHead != Listener.TOMBSTONE);
+ }
+ // If we get here then the Listener TOMBSTONE was set, which means the
+ // future is done, call the listener.
+ executeListener(listener, executor);
+ }
+
+ /**
+ * Sets the result of this {@code Future} unless this {@code Future} has
+ * already been cancelled or set (including
+ * {@linkplain #setFuture set asynchronously}). When a call to this method
+ * returns, the {@code Future} is guaranteed to be
+ * {@linkplain #isDone done} <b>only if</b> the call was accepted (in which
+ * case it returns {@code true}). If it returns {@code false}, the {@code
+ * Future} may have previously been set asynchronously, in which case its
+ * result may not be known yet. That result, though not yet known, cannot
+ * be overridden by a call to a {@code set*} method, only by a call to
+ * {@link #cancel}.
+ *
+ * @param value the value to be used as the result
+ * @return true if the attempt was accepted, completing the {@code Future}
+ */
+ protected boolean set(@Nullable V value) {
+ Object valueToSet = value == null ? NULL : value;
+ if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
+ complete(this);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Sets the failed result of this {@code Future} unless this {@code Future}
+ * has already been cancelled or set (including
+ * {@linkplain #setFuture set asynchronously}). When a call to this method
+ * returns, the {@code Future} is guaranteed to be
+ * {@linkplain #isDone done} <b>only if</b> the call was accepted (in which
+ * case it returns {@code true}). If it returns {@code false}, the
+ * {@code Future} may have previously been set asynchronously, in which case
+ * its result may not be known yet. That result, though not yet known,
+ * cannot be overridden by a call to a {@code set*} method, only by a call
+ * to {@link #cancel}.
+ *
+ * @param throwable the exception to be used as the failed result
+ * @return true if the attempt was accepted, completing the {@code Future}
+ */
+ protected boolean setException(Throwable throwable) {
+ Object valueToSet = new Failure(checkNotNull(throwable));
+ if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
+ complete(this);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Sets the result of this {@code Future} to match the supplied input
+ * {@code Future} once the supplied {@code Future} is done, unless this
+ * {@code Future} has already been cancelled or set (including "set
+ * asynchronously," defined below).
+ * <p>
+ * <p>If the supplied future is {@linkplain #isDone done} when this method
+ * is called and the call is accepted, then this future is guaranteed to
+ * have been completed with the supplied future by the time this method
+ * returns. If the supplied future is not done and the call is accepted, then
+ * the future will be <i>set asynchronously</i>. Note that such a result,
+ * though not yet known, cannot be overridden by a call to a {@code set*}
+ * method, only by a call to {@link #cancel}.
+ * <p>
+ * <p>If the call {@code setFuture(delegate)} is accepted and this {@code
+ * Future} is later cancelled, cancellation will be propagated to {@code
+ * delegate}. Additionally, any call to {@code setFuture} after any
+ * cancellation will propagate cancellation to the supplied {@code Future}.
+ *
+ * @param future the future to delegate to
+ * @return true if the attempt was accepted, indicating that the {@code
+ * Future} was not previously cancelled or set.
+ * @since 19.0
+ */
+ @Beta
+ protected boolean setFuture(ListenableFuture<? extends V> future) {
+ checkNotNull(future);
+ Object localValue = value;
+ if (localValue == null) {
+ if (future.isDone()) {
+ Object value = getFutureValue(future);
+ if (ATOMIC_HELPER.casValue(this, null, value)) {
+ complete(this);
+ return true;
+ }
+ return false;
+ }
+ SetFuture valueToSet = new SetFuture<V>(this, future);
+ if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
+ // the listener is responsible for calling completeWithFuture,
+ // directExecutor is appropriate since all we are doing is unpacking
+ // a completed future which should be fast.
+ try {
+ future.addListener(valueToSet, directExecutor());
+ } catch (Throwable t) {
+ // addListener has thrown an exception! SetFuture.run can't throw
+ // any exceptions so this must have been caused by addListener
+ // itself. The most likely explanation is a misconfigured mock. Try
+ // to switch to Failure.
+ Failure failure;
+ try {
+ failure = new Failure(t);
+ } catch (Throwable oomMostLikely) {
+ failure = Failure.FALLBACK_INSTANCE;
+ }
+ // Note: The only way this CAS could fail is if cancel() has raced
+ // with us. That is ok.
+ boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure);
+ }
+ return true;
+ }
+ localValue = value; // we lost the cas, fall through and maybe cancel
+ }
+ // The future has already been set to something. If it is cancellation we
+ // should cancel the incoming future.
+ if (localValue instanceof Cancellation) {
+ // we don't care if it fails, this is best-effort.
+ future.cancel(((Cancellation) localValue).wasInterrupted);
+ }
+ return false;
+ }
+
+ /**
+ * Returns a value, suitable for storing in the {@link #value} field. From
+ * the given future, which is assumed to be done.
+ * <p>
+ * <p>This is approximately the inverse of {@link #getDoneValue(Object)}
+ */
+ private static Object getFutureValue(ListenableFuture<?> future) {
+ Object valueToSet;
+ if (future instanceof TrustedFuture) {
+ // Break encapsulation for TrustedFuture instances since we know that
+ // subclasses cannot override .get() (since it is final) and therefore
+ // this is equivalent to calling .get() and unpacking the exceptions
+ // like we do below (just much faster because it is a single field read
+ // instead of a read, several branches and possibly creating exceptions).
+ return ((AbstractFuture<?>) future).value;
+ } else {
+ // Otherwise calculate valueToSet by calling .get()
+ try {
+ Object v = getDone(future);
+ valueToSet = v == null ? NULL : v;
+ } catch (ExecutionException exception) {
+ valueToSet = new Failure(exception.getCause());
+ } catch (CancellationException cancellation) {
+ valueToSet = new Cancellation(false, cancellation);
+ } catch (Throwable t) {
+ valueToSet = new Failure(t);
+ }
+ }
+ return valueToSet;
+ }
+
+ /**
+ * Unblocks all threads and runs all listeners.
+ */
+ private static void complete(AbstractFuture<?> future) {
+ Listener next = null;
+ outer:
+ while (true) {
+ future.releaseWaiters();
+ // We call this before the listeners in order to avoid needing to manage
+ // a separate stack data structure for them. afterDone() should be
+ // generally fast and only used for cleanup work... but in theory can
+ // also be recursive and create StackOverflowErrors
+ future.afterDone();
+ // push the current set of listeners onto next
+ next = future.clearListeners(next);
+ future = null;
+ while (next != null) {
+ Listener curr = next;
+ next = next.next;
+ Runnable task = curr.task;
+ if (task instanceof SetFuture) {
+ SetFuture<?> setFuture = (SetFuture<?>) task;
+ // We unwind setFuture specifically to avoid StackOverflowErrors in
+ // the case of long chains of SetFutures
+ // Handling this special case is important because there is no way
+ // to pass an executor to setFuture, so a user couldn't break the
+ // chain by doing this themselves. It is also potentially common
+ // if someone writes a recursive Futures.transformAsync transformer.
+ future = setFuture.owner;
+ if (future.value == setFuture) {
+ Object valueToSet = getFutureValue(setFuture.future);
+ if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) {
+ continue outer;
+ }
+ }
+ // other wise the future we were trying to set is already done.
+ } else {
+ executeListener(task, curr.executor);
+ }
+ }
+ break;
+ }
+ }
+
+ public static <V> V getDone(Future<V> future) throws ExecutionException {
+ /*
+ * We throw IllegalStateException, since the call could succeed later.
+ * Perhaps we "should" throw IllegalArgumentException, since the call
+ * could succeed with a different argument. Those exceptions' docs
+ * suggest that either is acceptable. Google's Java Practices page
+ * recommends IllegalArgumentException here, in part to keep its
+ * recommendation simple: Static methods should throw
+ * IllegalStateException only when they use static state.
+ *
+ *
+ * Why do we deviate here? The answer: We want for fluentFuture.getDone()
+ * to throw the same exception as Futures.getDone(fluentFuture).
+ */
+ Preconditions.checkState(future.isDone(), "Future was expected to be " +
+ "done:" +
+ " %s", future);
+ return Uninterruptibles.getUninterruptibly(future);
+ }
+
+ /**
+ * Callback method that is called exactly once after the future is completed.
+ * <p>
+ * <p>If {@link #interruptTask} is also run during completion,
+ * {@link #afterDone} runs after it.
+ * <p>
+ * <p>The default implementation of this method in {@code AbstractFuture}
+ * does nothing. This is intended for very lightweight cleanup work, for
+ * example, timing statistics or clearing fields.
+ * If your task does anything heavier consider, just using a listener with
+ * an executor.
+ *
+ * @since 20.0
+ */
+ @Beta
+ protected void afterDone() {
+ }
+
+ /**
+ * If this future has been cancelled (and possibly interrupted), cancels
+ * (and possibly interrupts) the given future (if available).
+ * <p>
+ * <p>This method should be used only when this future is completed. It is
+ * designed to be called from {@code done}.
+ */
+ final void maybePropagateCancellation(@Nullable Future<?> related) {
+ if (related != null & isCancelled()) {
+ related.cancel(wasInterrupted());
+ }
+ }
+
+ /**
+ * Releases all threads in the {@link #waiters} list, and clears the list.
+ */
+ private void releaseWaiters() {
+ Waiter head;
+ do {
+ head = waiters;
+ } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE));
+ for (
+ Waiter currentWaiter = head;
+ currentWaiter != null;
+ currentWaiter = currentWaiter.next) {
+ currentWaiter.unpark();
+ }
+ }
+
+ /**
+ * Clears the {@link #listeners} list and prepends its contents to {@code
+ * onto}, least recently added first.
+ */
+ private Listener clearListeners(Listener onto) {
+ // We need to
+ // 1. atomically swap the listeners with TOMBSTONE, this is because
+ // addListener uses that to to synchronize with us
+ // 2. reverse the linked list, because despite our rather clear contract,
+ // people depend on us executing listeners in the order they were added
+ // 3. push all the items onto 'onto' and return the new head of the stack
+ Listener head;
+ do {
+ head = listeners;
+ } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE));
+ Listener reversedList = onto;
+ while (head != null) {
+ Listener tmp = head;
+ head = head.next;
+ tmp.next = reversedList;
+ reversedList = tmp;
+ }
+ return reversedList;
+ }
+
+ /**
+ * Submits the given runnable to the given {@link Executor} catching and
+ * logging all {@linkplain RuntimeException runtime exceptions} thrown by
+ * the executor.
+ */
+ private static void executeListener(Runnable runnable, Executor executor) {
+ try {
+ executor.execute(runnable);
+ } catch (RuntimeException e) {
+ // Log it and keep going -- bad runnable and/or executor. Don't punish
+ // the other runnables if we're given a bad one. We only catch
+ // RuntimeException because we want Errors to propagate up.
+ log.log(
+ Level.SEVERE,
+ "RuntimeException while executing runnable " + runnable + " with " +
+ "executor " + executor,
+ e);
+ }
+ }
+
+ private abstract static class AtomicHelper {
+ /**
+ * Non volatile write of the thread to the {@link Waiter#thread} field.
+ */
+ abstract void putThread(Waiter waiter, Thread newValue);
+
+ /**
+ * Non volatile write of the waiter to the {@link Waiter#next} field.
+ */
+ abstract void putNext(Waiter waiter, Waiter newValue);
+
+ /**
+ * Performs a CAS operation on the {@link #waiters} field.
+ */
+ abstract boolean casWaiters(
+ AbstractFuture<?> future, Waiter expect,
+ Waiter update);
+
+ /**
+ * Performs a CAS operation on the {@link #listeners} field.
+ */
+ abstract boolean casListeners(
+ AbstractFuture<?> future, Listener expect,
+ Listener update);
+
+ /**
+ * Performs a CAS operation on the {@link #value} field.
+ */
+ abstract boolean casValue(
+ AbstractFuture<?> future, Object expect, Object update);
+ }
+
+ /**
+ * {@link AtomicHelper} based on {@link sun.misc.Unsafe}.
+ * <p>
+ * <p>Static initialization of this class will fail if the
+ * {@link sun.misc.Unsafe} object cannot be accessed.
+ */
+ private static final class UnsafeAtomicHelper extends AtomicHelper {
+ static final sun.misc.Unsafe UNSAFE;
+ static final long LISTENERS_OFFSET;
+ static final long WAITERS_OFFSET;
+ static final long VALUE_OFFSET;
+ static final long WAITER_THREAD_OFFSET;
+ static final long WAITER_NEXT_OFFSET;
+
+ static {
+ sun.misc.Unsafe unsafe = null;
+ try {
+ unsafe = sun.misc.Unsafe.getUnsafe();
+ } catch (SecurityException tryReflectionInstead) {
+ try {
+ unsafe =
+ AccessController.doPrivileged(
+ new PrivilegedExceptionAction<sun.misc.Unsafe>() {
+ @Override
+ public sun.misc.Unsafe run() throws Exception {
+ Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
+ for (java.lang.reflect.Field f : k.getDeclaredFields()) {
+ f.setAccessible(true);
+ Object x = f.get(null);
+ if (k.isInstance(x)) {
+ return k.cast(x);
+ }
+ }
+ throw new NoSuchFieldError("the Unsafe");
+ }
+ });
+ } catch (PrivilegedActionException e) {
+ throw new RuntimeException(
+ "Could not initialize intrinsics", e.getCause());
+ }
+ }
+ try {
+ Class<?> abstractFuture = AbstractFuture.class;
+ WAITERS_OFFSET = unsafe
+ .objectFieldOffset(abstractFuture.getDeclaredField("waiters"));
+ LISTENERS_OFFSET = unsafe
+ .objectFieldOffset(abstractFuture.getDeclaredField("listeners"));
+ VALUE_OFFSET = unsafe
+ .objectFieldOffset(abstractFuture.getDeclaredField("value"));
+ WAITER_THREAD_OFFSET = unsafe
+ .objectFieldOffset(Waiter.class.getDeclaredField("thread"));
+ WAITER_NEXT_OFFSET = unsafe
+ .objectFieldOffset(Waiter.class.getDeclaredField("next"));
+ UNSAFE = unsafe;
+ } catch (Exception e) {
+ throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void throwIfUnchecked(Throwable throwable) {
+ checkNotNull(throwable);
+ if (throwable instanceof RuntimeException) {
+ throw (RuntimeException) throwable;
+ }
+ if (throwable instanceof Error) {
+ throw (Error) throwable;
+ }
+ }
+
+ @Override
+ void putThread(Waiter waiter, Thread newValue) {
+ UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue);
+ }
+
+ @Override
+ void putNext(Waiter waiter, Waiter newValue) {
+ UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue);
+ }
+
+ /**
+ * Performs a CAS operation on the {@link #waiters} field.
+ */
+ @Override
+ boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter
+ update) {
+ return UNSAFE
+ .compareAndSwapObject(future, WAITERS_OFFSET, expect, update);
+ }
+
+ /**
+ * Performs a CAS operation on the {@link #listeners} field.
+ */
+ @Override
+ boolean casListeners(
+ AbstractFuture<?> future, Listener expect, Listener update) {
+ return UNSAFE
+ .compareAndSwapObject(future, LISTENERS_OFFSET, expect, update);
+ }
+
+ /**
+ * Performs a CAS operation on the {@link #value} field.
+ */
+ @Override
+ boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
+ return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update);
+ }
+ }
+
+ /**
+ * {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}.
+ */
+ private static final class SafeAtomicHelper extends AtomicHelper {
+ final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater;
+ final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater;
+ final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater;
+ final AtomicReferenceFieldUpdater<AbstractFuture, Listener>
+ listenersUpdater;
+ final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater;
+
+ SafeAtomicHelper(
+ AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater,
+ AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater,
+ AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater,
+ AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater,
+ AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) {
+ this.waiterThreadUpdater = waiterThreadUpdater;
+ this.waiterNextUpdater = waiterNextUpdater;
+ this.waitersUpdater = waitersUpdater;
+ this.listenersUpdater = listenersUpdater;
+ this.valueUpdater = valueUpdater;
+ }
+
+ @Override
+ void putThread(Waiter waiter, Thread newValue) {
+ waiterThreadUpdater.lazySet(waiter, newValue);
+ }
+
+ @Override
+ void putNext(Waiter waiter, Waiter newValue) {
+ waiterNextUpdater.lazySet(waiter, newValue);
+ }
+
+ @Override
+ boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter
+ update) {
+ return waitersUpdater.compareAndSet(future, expect, update);
+ }
+
+ @Override
+ boolean casListeners(
+ AbstractFuture<?> future, Listener expect, Listener update) {
+ return listenersUpdater.compareAndSet(future, expect, update);
+ }
+
+ @Override
+ boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
+ return valueUpdater.compareAndSet(future, expect, update);
+ }
+ }
+
+ /**
+ * {@link AtomicHelper} based on {@code synchronized} and volatile writes.
+ * <p>
+ * <p>This is an implementation of last resort for when certain basic VM
+ * features are broken (like AtomicReferenceFieldUpdater).
+ */
+ private static final class SynchronizedHelper extends AtomicHelper {
+ @Override
+ void putThread(Waiter waiter, Thread newValue) {
+ waiter.thread = newValue;
+ }
+
+ @Override
+ void putNext(Waiter waiter, Waiter newValue) {
+ waiter.next = newValue;
+ }
+
+ @Override
+ boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter
+ update) {
+ synchronized (future) {
+ if (future.waiters == expect) {
+ future.waiters = update;
+ return true;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ boolean casListeners(
+ AbstractFuture<?> future, Listener expect, Listener update) {
+ synchronized (future) {
+ if (future.listeners == expect) {
+ future.listeners = update;
+ return true;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
+ synchronized (future) {
+ if (future.value == expect) {
+ future.value = update;
+ return true;
+ }
+ return false;
+ }
+ }
+ }
+
+ private static CancellationException cancellationExceptionWithCause(
+ @Nullable String message, @Nullable Throwable cause) {
+ CancellationException exception = new CancellationException(message);
+ exception.initCause(cause);
+ return exception;
+ }
+
+ /**
+ * Returns an {@link Executor} that runs each task in the thread that invokes
+ * {@link Executor#execute execute}, as in {@link CallerRunsPolicy}.
+ * <p>
+ * <p>This instance is equivalent to: <pre> {@code
+ * final class DirectExecutor implements Executor {
+ * public void execute(Runnable r) {
+ * r.run();
+ * }
+ * }}</pre>
+ */
+ public static Executor directExecutor() {
+ return DirectExecutor.INSTANCE;
+ }
+
+ /**
+ * See {@link #directExecutor} for behavioral notes.
+ */
+ private enum DirectExecutor implements Executor {
+ INSTANCE;
+
+ @Override
+ public void execute(Runnable command) {
+ command.run();
+ }
+
+ @Override
+ public String toString() {
+ return "MoreExecutors.directExecutor()";
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
index d088826..4cf6c3d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
@@ -20,17 +20,23 @@ package org.apache.hadoop.ozone.container.common.volume;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.sun.istack.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.GetSpaceUsed;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
import org.apache.hadoop.ozone.container.common.DataNodeLayoutVersion;
import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.Time;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +64,10 @@ import java.util.UUID;
* During DN startup, if the VERSION file exists, we verify that the
* clusterID in the version file matches the clusterID from SCM.
*/
-public final class HddsVolume {
[email protected]
[email protected]
+public class HddsVolume
+ implements Checkable<Boolean, VolumeCheckResult> {
private static final Logger LOG = LoggerFactory.getLogger(HddsVolume.class);
@@ -77,6 +86,19 @@ public final class HddsVolume {
private int layoutVersion; // layout version of the storage data
/**
+ * Run a check on the current volume to determine if it is healthy.
+ * @param unused context for the check, ignored.
+ * @return result of checking the volume.
+ * @throws Exception if an exception was encountered while running
+ * the volume check.
+ */
+ @Override
+ public VolumeCheckResult check(@Nullable Boolean unused) throws Exception {
+ DiskChecker.checkDir(hddsRootDir);
+ return VolumeCheckResult.HEALTHY;
+ }
+
+ /**
* Builder for HddsVolume.
*/
public static class Builder {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeChecker.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeChecker.java
new file mode 100644
index 0000000..6df81df
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeChecker.java
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.checker.AsyncChecker;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY;
+
+
+/**
+ * A class that encapsulates running disk checks against each HDDS volume and
+ * allows retrieving a list of failed volumes.
+ */
+public class HddsVolumeChecker {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(HddsVolumeChecker.class);
+
+ private AsyncChecker<Boolean, VolumeCheckResult> delegateChecker;
+
+ private final AtomicLong numVolumeChecks = new AtomicLong(0);
+ private final AtomicLong numAllVolumeChecks = new AtomicLong(0);
+ private final AtomicLong numSkippedChecks = new AtomicLong(0);
+
+ /**
+ * Max allowed time for a disk check in milliseconds. If the check
+ * doesn't complete within this time we declare the disk as dead.
+ */
+ private final long maxAllowedTimeForCheckMs;
+
+ /**
+ * Minimum time between two successive disk checks of a volume.
+ */
+ private final long minDiskCheckGapMs;
+
+ /**
+ * Timestamp of the last check of all volumes.
+ */
+ private long lastAllVolumesCheck;
+
+ private final Timer timer;
+
+ private final ExecutorService checkVolumeResultHandlerExecutorService;
+
+ /**
+ * @param conf Configuration object.
+ * @param timer {@link Timer} object used for throttling checks.
+ */
+ public HddsVolumeChecker(Configuration conf, Timer timer)
+ throws DiskErrorException {
+ maxAllowedTimeForCheckMs = conf.getTimeDuration(
+ DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
+ DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ if (maxAllowedTimeForCheckMs <= 0) {
+ throw new DiskErrorException("Invalid value configured for "
+ + DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - "
+ + maxAllowedTimeForCheckMs + " (should be > 0)");
+ }
+
+ this.timer = timer;
+
+ /**
+ * Maximum number of volume failures that can be tolerated without
+ * declaring a fatal error.
+ */
+ int maxVolumeFailuresTolerated = conf.getInt(
+ DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
+ DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
+
+ minDiskCheckGapMs = conf.getTimeDuration(
+ DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+ DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ if (minDiskCheckGapMs < 0) {
+ throw new DiskErrorException("Invalid value configured for "
+ + DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY + " - "
+ + minDiskCheckGapMs + " (should be >= 0)");
+ }
+
+ long diskCheckTimeout = conf.getTimeDuration(
+ DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
+ DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ if (diskCheckTimeout < 0) {
+ throw new DiskErrorException("Invalid value configured for "
+ + DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - "
+ + diskCheckTimeout + " (should be >= 0)");
+ }
+
+ lastAllVolumesCheck = timer.monotonicNow() - minDiskCheckGapMs;
+
+ if (maxVolumeFailuresTolerated <
DataNode.MAX_VOLUME_FAILURE_TOLERATED_LIMIT) {
+ throw new DiskErrorException("Invalid value configured for "
+ + DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY + " - "
+ + maxVolumeFailuresTolerated + " "
+ + DataNode.MAX_VOLUME_FAILURES_TOLERATED_MSG);
+ }
+
+ delegateChecker = new ThrottledAsyncChecker<>(
+ timer, minDiskCheckGapMs, diskCheckTimeout,
+ Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder()
+ .setNameFormat("DataNode DiskChecker thread %d")
+ .setDaemon(true)
+ .build()));
+
+ checkVolumeResultHandlerExecutorService = Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder()
+ .setNameFormat("VolumeCheck ResultHandler thread %d")
+ .setDaemon(true)
+ .build());
+ }
+
+ /**
+ * Run checks against all HDDS volumes.
+ *
+ * This check may be performed at service startup and subsequently at
+ * regular intervals to detect and handle failed volumes.
+ *
+ * @param volumes - Set of volumes to be checked. This set must be immutable
+ * for the duration of the check else the results will be
+ * unexpected.
+ *
+ * @return set of failed volumes.
+ */
+ public Set<HddsVolume> checkAllVolumes(Collection<HddsVolume> volumes)
+ throws InterruptedException {
+ final long gap = timer.monotonicNow() - lastAllVolumesCheck;
+ if (gap < minDiskCheckGapMs) {
+ numSkippedChecks.incrementAndGet();
+ LOG.trace(
+ "Skipped checking all volumes, time since last check {} is less " +
+ "than the minimum gap between checks ({} ms).",
+ gap, minDiskCheckGapMs);
+ return Collections.emptySet();
+ }
+
+ lastAllVolumesCheck = timer.monotonicNow();
+ final Set<HddsVolume> healthyVolumes = new HashSet<>();
+ final Set<HddsVolume> failedVolumes = new HashSet<>();
+ final Set<HddsVolume> allVolumes = new HashSet<>();
+
+ final AtomicLong numVolumes = new AtomicLong(volumes.size());
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ for (HddsVolume v : volumes) {
+ Optional<ListenableFuture<VolumeCheckResult>> olf =
+ delegateChecker.schedule(v, null);
+ LOG.info("Scheduled health check for volume {}", v);
+ if (olf.isPresent()) {
+ allVolumes.add(v);
+ Futures.addCallback(olf.get(),
+ new ResultHandler(v, healthyVolumes, failedVolumes,
+ numVolumes, (ignored1, ignored2) -> latch.countDown()));
+ } else {
+ if (numVolumes.decrementAndGet() == 0) {
+ latch.countDown();
+ }
+ }
+ }
+
+ // Wait until our timeout elapses, after which we give up on
+ // the remaining volumes.
+ if (!latch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) {
+ LOG.warn("checkAllVolumes timed out after {} ms" +
+ maxAllowedTimeForCheckMs);
+ }
+
+ numAllVolumeChecks.incrementAndGet();
+ synchronized (this) {
+ // All volumes that have not been detected as healthy should be
+ // considered failed. This is a superset of 'failedVolumes'.
+ //
+ // Make a copy under the mutex as Sets.difference() returns a view
+ // of a potentially changing set.
+ return new HashSet<>(Sets.difference(allVolumes, healthyVolumes));
+ }
+ }
+
+ /**
+ * A callback interface that is supplied the result of running an
+ * async disk check on multiple volumes.
+ */
+ public interface Callback {
+ /**
+ * @param healthyVolumes set of volumes that passed disk checks.
+ * @param failedVolumes set of volumes that failed disk checks.
+ */
+ void call(Set<HddsVolume> healthyVolumes,
+ Set<HddsVolume> failedVolumes);
+ }
+
+ /**
+ * Check a single volume asynchronously, returning a {@link ListenableFuture}
+ * that can be used to retrieve the final result.
+ *
+ * If the volume cannot be referenced then it is already closed and
+ * cannot be checked. No error is propagated to the callback.
+ *
+ * @param volume the volume that is to be checked.
+ * @param callback callback to be invoked when the volume check completes.
+ * @return true if the check was scheduled and the callback will be invoked.
+ * false otherwise.
+ */
+ public boolean checkVolume(final HddsVolume volume, Callback callback) {
+ if (volume == null) {
+ LOG.debug("Cannot schedule check on null volume");
+ return false;
+ }
+
+ Optional<ListenableFuture<VolumeCheckResult>> olf =
+ delegateChecker.schedule(volume, null);
+ if (olf.isPresent()) {
+ numVolumeChecks.incrementAndGet();
+ Futures.addCallback(olf.get(),
+ new ResultHandler(volume, new HashSet<>(), new HashSet<>(),
+ new AtomicLong(1), callback),
+ checkVolumeResultHandlerExecutorService
+ );
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * A callback to process the results of checking a volume.
+ */
+ private class ResultHandler
+ implements FutureCallback<VolumeCheckResult> {
+ private final HddsVolume volume;
+ private final Set<HddsVolume> failedVolumes;
+ private final Set<HddsVolume> healthyVolumes;
+ private final AtomicLong volumeCounter;
+
+ @Nullable
+ private final Callback callback;
+
+ /**
+ *
+ * @param healthyVolumes set of healthy volumes. If the disk check is
+ * successful, add the volume here.
+ * @param failedVolumes set of failed volumes. If the disk check fails,
+ * add the volume here.
+ * @param volumeCounter volumeCounter used to trigger callback invocation.
+ * @param callback invoked when the volumeCounter reaches 0.
+ */
+ ResultHandler(HddsVolume volume,
+ Set<HddsVolume> healthyVolumes,
+ Set<HddsVolume> failedVolumes,
+ AtomicLong volumeCounter,
+ @Nullable Callback callback) {
+ this.volume = volume;
+ this.healthyVolumes = healthyVolumes;
+ this.failedVolumes = failedVolumes;
+ this.volumeCounter = volumeCounter;
+ this.callback = callback;
+ }
+
+ @Override
+ public void onSuccess(@Nonnull VolumeCheckResult result) {
+ switch(result) {
+ case HEALTHY:
+ case DEGRADED:
+ LOG.debug("Volume {} is {}.", volume, result);
+ markHealthy();
+ break;
+ case FAILED:
+ LOG.warn("Volume {} detected as being unhealthy", volume);
+ markFailed();
+ break;
+ default:
+ LOG.error("Unexpected health check result {} for volume {}",
+ result, volume);
+ markHealthy();
+ break;
+ }
+ cleanup();
+ }
+
+ @Override
+ public void onFailure(@Nonnull Throwable t) {
+ Throwable exception = (t instanceof ExecutionException) ?
+ t.getCause() : t;
+ LOG.warn("Exception running disk checks against volume " +
+ volume, exception);
+ markFailed();
+ cleanup();
+ }
+
+ private void markHealthy() {
+ synchronized (HddsVolumeChecker.this) {
+ healthyVolumes.add(volume);
+ }
+ }
+
+ private void markFailed() {
+ synchronized (HddsVolumeChecker.this) {
+ failedVolumes.add(volume);
+ }
+ }
+
+ private void cleanup() {
+ invokeCallback();
+ }
+
+ private void invokeCallback() {
+ try {
+ final long remaining = volumeCounter.decrementAndGet();
+ if (callback != null && remaining == 0) {
+ callback.call(healthyVolumes, failedVolumes);
+ }
+ } catch(Exception e) {
+ // Propagating this exception is unlikely to be helpful.
+ LOG.warn("Unexpected exception", e);
+ }
+ }
+ }
+
+ /**
+ * Shutdown the checker and its associated ExecutorService.
+ *
+ * See {@link ExecutorService#awaitTermination} for the interpretation
+ * of the parameters.
+ */
+ void shutdownAndWait(int gracePeriod, TimeUnit timeUnit) {
+ try {
+ delegateChecker.shutdownAndWait(gracePeriod, timeUnit);
+ } catch (InterruptedException e) {
+ LOG.warn("{} interrupted during shutdown.",
this.getClass().getSimpleName());
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * This method is for testing only.
+ *
+ * @param testDelegate
+ */
+ @VisibleForTesting
+ void setDelegateChecker(
+ AsyncChecker<Boolean, VolumeCheckResult> testDelegate) {
+ delegateChecker = testDelegate;
+ }
+
+ /**
+ * Return the number of {@link #checkVolume} invocations.
+ */
+ public long getNumVolumeChecks() {
+ return numVolumeChecks.get();
+ }
+
+ /**
+ * Return the number of {@link #checkAllVolumes} invocations.
+ */
+ public long getNumAllVolumeChecks() {
+ return numAllVolumeChecks.get();
+ }
+
+ /**
+ * Return the number of checks skipped because the minimum gap since the
+ * last check had not elapsed.
+ */
+ public long getNumSkippedChecks() {
+ return numSkippedChecks.get();
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/ThrottledAsyncChecker.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/ThrottledAsyncChecker.java
new file mode 100644
index 0000000..3be24e4
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/ThrottledAsyncChecker.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.checker.AsyncChecker;
+import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
+import org.apache.hadoop.util.Timer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An implementation of {@link AsyncChecker} that skips checking recently
+ * checked objects. It will enforce at least minMsBetweenChecks
+ * milliseconds between two successive checks of any one object.
+ *
+ * It is assumed that the total number of Checkable objects in the system
+ * is small, (not more than a few dozen) since the checker uses O(Checkables)
+ * storage and also potentially O(Checkables) threads.
+ *
+ * minMsBetweenChecks should be configured reasonably
+ * by the caller to avoid spinning up too many threads frequently.
+ */
[email protected]
[email protected]
+public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(ThrottledAsyncChecker.class);
+
+ private final Timer timer;
+
+ /**
+ * The ExecutorService used to schedule asynchronous checks.
+ */
+ private final ListeningExecutorService executorService;
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ /**
+ * The minimum gap in milliseconds between two successive checks
+ * of the same object. This is the throttle.
+ */
+ private final long minMsBetweenChecks;
+ private final long diskCheckTimeout;
+
+ /**
+ * Map of checks that are currently in progress. Protected by the object
+ * lock.
+ */
+ private final Map<Checkable, ListenableFuture<V>> checksInProgress;
+
+ /**
+ * Maps Checkable objects to a future that can be used to retrieve
+ * the results of the operation.
+ * Protected by the object lock.
+ */
+ private final Map<Checkable, ThrottledAsyncChecker.LastCheckResult<V>>
completedChecks;
+
+ public ThrottledAsyncChecker(final Timer timer,
+ final long minMsBetweenChecks,
+ final long diskCheckTimeout,
+ final ExecutorService executorService) {
+ this.timer = timer;
+ this.minMsBetweenChecks = minMsBetweenChecks;
+ this.diskCheckTimeout = diskCheckTimeout;
+ this.executorService = MoreExecutors.listeningDecorator(executorService);
+ this.checksInProgress = new HashMap<>();
+ this.completedChecks = new WeakHashMap<>();
+
+ if (this.diskCheckTimeout > 0) {
+ ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new
+ ScheduledThreadPoolExecutor(1);
+ this.scheduledExecutorService = MoreExecutors
+ .getExitingScheduledExecutorService(scheduledThreadPoolExecutor);
+ } else {
+ this.scheduledExecutorService = null;
+ }
+ }
+
+ /**
+ * See {@link AsyncChecker#schedule}
+ *
+ * If the object has been checked recently then the check will
+ * be skipped. Multiple concurrent checks for the same object
+ * will receive the same Future.
+ */
+ @Override
+ public Optional<ListenableFuture<V>> schedule(
+ Checkable<K, V> target, K context) {
+ if (checksInProgress.containsKey(target)) {
+ return Optional.absent();
+ }
+
+ if (completedChecks.containsKey(target)) {
+ final ThrottledAsyncChecker.LastCheckResult<V> result =
completedChecks.get(target);
+ final long msSinceLastCheck = timer.monotonicNow() - result.completedAt;
+ if (msSinceLastCheck < minMsBetweenChecks) {
+ LOG.debug("Skipped checking {}. Time since last check {}ms " +
+ "is less than the min gap {}ms.",
+ target, msSinceLastCheck, minMsBetweenChecks);
+ return Optional.absent();
+ }
+ }
+
+ LOG.info("Scheduling a check for {}", target);
+ final ListenableFuture<V> lfWithoutTimeout = executorService.submit(
+ () -> target.check(context));
+ final ListenableFuture<V> lf;
+
+ if (diskCheckTimeout > 0) {
+ lf = TimeoutFuture
+ .create(lfWithoutTimeout, diskCheckTimeout, TimeUnit.MILLISECONDS,
+ scheduledExecutorService);
+ } else {
+ lf = lfWithoutTimeout;
+ }
+
+ checksInProgress.put(target, lf);
+ addResultCachingCallback(target, lf);
+ return Optional.of(lf);
+ }
+
+ /**
+ * Register a callback to cache the result of a check.
+ * @param target
+ * @param lf
+ */
+ private void addResultCachingCallback(
+ Checkable<K, V> target, ListenableFuture<V> lf) {
+ Futures.addCallback(lf, new FutureCallback<V>() {
+ @Override
+ public void onSuccess(@Nullable V result) {
+ synchronized (ThrottledAsyncChecker.this) {
+ checksInProgress.remove(target);
+ completedChecks.put(target, new LastCheckResult<>(
+ result, timer.monotonicNow()));
+ }
+ }
+
+ @Override
+ public void onFailure(@Nonnull Throwable t) {
+ synchronized (ThrottledAsyncChecker.this) {
+ checksInProgress.remove(target);
+ completedChecks.put(target, new LastCheckResult<>(
+ t, timer.monotonicNow()));
+ }
+ }
+ });
+ }
+
+ /**
+ * {@inheritDoc}.
+ *
+ * The results of in-progress checks are not useful during shutdown,
+ * so we optimize for faster shutdown by interrupt all actively
+ * executing checks.
+ */
+ @Override
+ public void shutdownAndWait(long timeout, TimeUnit timeUnit)
+ throws InterruptedException {
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdownNow();
+ scheduledExecutorService.awaitTermination(timeout, timeUnit);
+ }
+
+ executorService.shutdownNow();
+ executorService.awaitTermination(timeout, timeUnit);
+ }
+
+ /**
+ * Status of running a check. It can either be a result or an
+ * exception, depending on whether the check completed or threw.
+ */
+ private static final class LastCheckResult<V> {
+ /**
+ * Timestamp at which the check completed.
+ */
+ private final long completedAt;
+
+ /**
+ * Result of running the check if it completed. null if it threw.
+ */
+ @Nullable
+ private final V result;
+
+ /**
+ * Exception thrown by the check. null if it returned a result.
+ */
+ private final Throwable exception; // null on success.
+
+ /**
+ * Initialize with a result.
+ * @param result
+ */
+ private LastCheckResult(V result, long completedAt) {
+ this.result = result;
+ this.exception = null;
+ this.completedAt = completedAt;
+ }
+
+ /**
+ * Initialize with an exception.
+ * @param completedAt
+ * @param t
+ */
+ private LastCheckResult(Throwable t, long completedAt) {
+ this.result = null;
+ this.exception = t;
+ this.completedAt = completedAt;
+ }
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/TimeoutFuture.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/TimeoutFuture.java
new file mode 100644
index 0000000..a7a492a
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/TimeoutFuture.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright (C) 2007 The Guava Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+/**
+ * Some portions of this class have been modified to make it functional in this
+ * package.
+ */
+package org.apache.hadoop.ozone.container.common.volume;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Implementation of {@code Futures#withTimeout}.
+ * <p>
+ * <p>Future that delegates to another but will finish early (via a
+ * {@link TimeoutException} wrapped in an {@link ExecutionException}) if the
+ * specified duration expires. The delegate future is interrupted and
+ * cancelled if it times out.
+ */
+final class TimeoutFuture<V> extends AbstractFuture.TrustedFuture<V> {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ TimeoutFuture.class);
+
+ static <V> ListenableFuture<V> create(
+ ListenableFuture<V> delegate,
+ long time,
+ TimeUnit unit,
+ ScheduledExecutorService scheduledExecutor) {
+ TimeoutFuture<V> result = new TimeoutFuture<V>(delegate);
+ TimeoutFuture.Fire<V> fire = new TimeoutFuture.Fire<V>(result);
+ result.timer = scheduledExecutor.schedule(fire, time, unit);
+ delegate.addListener(fire, directExecutor());
+ return result;
+ }
+
+ /*
+ * Memory visibility of these fields. There are two cases to consider.
+ *
+ * 1. visibility of the writes to these fields to Fire.run:
+ *
+ * The initial write to delegateRef is made definitely visible via the
+ * semantics of addListener/SES.schedule. The later racy write in cancel()
+ * is not guaranteed to be observed, however that is fine since the
+ * correctness is based on the atomic state in our base class. The initial
+ * write to timer is never definitely visible to Fire.run since it is
+ * assigned after SES.schedule is called. Therefore Fire.run has to check
+ * for null. However, it should be visible if Fire.run is called by
+ * delegate.addListener since addListener is called after the assignment
+ * to timer, and importantly this is the main situation in which we need to
+ * be able to see the write.
+ *
+ * 2. visibility of the writes to an afterDone() call triggered by cancel():
+ *
+ * Since these fields are non-final that means that TimeoutFuture is not
+ * being 'safely published', thus a motivated caller may be able to expose
+ * the reference to another thread that would then call cancel() and be
+ * unable to cancel the delegate. There are a number of ways to solve this,
+ * none of which are very pretty, and it is currently believed to be a
+ * purely theoretical problem (since the other actions should supply
+ * sufficient write-barriers).
+ */
+
+ @Nullable private ListenableFuture<V> delegateRef;
+ @Nullable private Future<?> timer;
+
+ private TimeoutFuture(ListenableFuture<V> delegate) {
+ this.delegateRef = Preconditions.checkNotNull(delegate);
+ }
+
+ /**
+ * A runnable that is called when the delegate or the timer completes.
+ */
+ private static final class Fire<V> implements Runnable {
+ @Nullable
+ TimeoutFuture<V> timeoutFutureRef;
+
+ Fire(
+ TimeoutFuture<V> timeoutFuture) {
+ this.timeoutFutureRef = timeoutFuture;
+ }
+
+ @Override
+ public void run() {
+ // If either of these reads return null then we must be after a
+ // successful cancel or another call to this method.
+ TimeoutFuture<V> timeoutFuture = timeoutFutureRef;
+ if (timeoutFuture == null) {
+ return;
+ }
+ ListenableFuture<V> delegate = timeoutFuture.delegateRef;
+ if (delegate == null) {
+ return;
+ }
+
+ /*
+ * If we're about to complete the TimeoutFuture, we want to release our
+ * reference to it. Otherwise, we'll pin it (and its result) in memory
+ * until the timeout task is GCed. (The need to clear our reference to
+ * the TimeoutFuture is the reason we use a *static* nested class with
+ * a manual reference back to the "containing" class.)
+ *
+ * This has the nice-ish side effect of limiting reentrancy: run() calls
+ * timeoutFuture.setException() calls run(). That reentrancy would
+ * already be harmless, since timeoutFuture can be set (and delegate
+ * cancelled) only once. (And "set only once" is important for other
+ * reasons: run() can still be invoked concurrently in different threads,
+ * even with the above null checks.)
+ */
+ timeoutFutureRef = null;
+ if (delegate.isDone()) {
+ timeoutFuture.setFuture(delegate);
+ } else {
+ try {
+ timeoutFuture.setException(
+ new TimeoutException("Future timed out: " + delegate));
+ } finally {
+ delegate.cancel(true);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void afterDone() {
+ maybePropagateCancellation(delegateRef);
+
+ Future<?> localTimer = timer;
+ // Try to cancel the timer as an optimization.
+ // timer may be null if this call to run was by the timer task since there
+ // is no happens-before edge between the assignment to timer and an
+ // execution of the timer task.
+ if (localTimer != null) {
+ localTimer.cancel(false);
+ }
+
+ delegateRef = null;
+ timer = null;
+ }
+}
\ No newline at end of file
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
index d30dd89..7addd63 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.volume;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import org.apache.curator.shaded.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
@@ -37,8 +38,10 @@ import
org.apache.hadoop.ozone.common.InconsistentStorageStateException;
import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume.VolumeState;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,11 +51,18 @@ import java.util.Collection;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
- * VolumeSet to manage volumes in a DataNode.
+ * VolumeSet to manage HDDS volumes in a DataNode.
*/
public class VolumeSet {
@@ -79,6 +89,14 @@ public class VolumeSet {
private EnumMap<StorageType, List<HddsVolume>> volumeStateMap;
/**
+ * An executor for periodic disk checks.
+ */
+ final ScheduledExecutorService diskCheckerservice;
+ final ScheduledFuture<?> periodicDiskChecker;
+
+ private static final long DISK_CHECK_INTERVAL_MINUTES = 15;
+
+ /**
* A Reentrant Read Write Lock to synchronize volume operations in VolumeSet.
* Any update to {@link VolumeSet#volumeMap},
* {@link VolumeSet#failedVolumeMap}, or {@link VolumeSet#volumeStateMap}
@@ -90,6 +108,7 @@ public class VolumeSet {
private String clusterID;
private Runnable shutdownHook;
+ private final HddsVolumeChecker volumeChecker;
public VolumeSet(String dnUuid, Configuration conf)
throws IOException {
@@ -102,11 +121,30 @@ public class VolumeSet {
this.clusterID = clusterID;
this.conf = conf;
this.volumeSetRWLock = new ReentrantReadWriteLock();
-
+ this.volumeChecker = getVolumeChecker(conf);
+ this.diskCheckerservice = Executors.newScheduledThreadPool(
+ 1, r -> new Thread(r, "Periodic HDDS volume checker"));
+ this.periodicDiskChecker =
+ diskCheckerservice.scheduleWithFixedDelay(() -> {
+ try {
+ checkAllVolumes();
+ } catch (IOException e) {
+ LOG.warn("Exception while checking disks", e);
+ }
+ }, DISK_CHECK_INTERVAL_MINUTES, DISK_CHECK_INTERVAL_MINUTES,
+ TimeUnit.MINUTES);
initializeVolumeSet();
}
- // Add DN volumes configured through ConfigKeys to volumeMap.
+ @VisibleForTesting
+ HddsVolumeChecker getVolumeChecker(Configuration conf)
+ throws DiskChecker.DiskErrorException {
+ return new HddsVolumeChecker(conf, new Timer());
+ }
+
+ /**
+ * Add DN volumes configured through ConfigKeys to volumeMap.
+ */
private void initializeVolumeSet() throws IOException {
volumeMap = new ConcurrentHashMap<>();
failedVolumeMap = new ConcurrentHashMap<>();
@@ -123,7 +161,7 @@ public class VolumeSet {
}
for (StorageType storageType : StorageType.values()) {
- volumeStateMap.put(storageType, new ArrayList<HddsVolume>());
+ volumeStateMap.put(storageType, new ArrayList<>());
}
for (String locationString : rawLocations) {
@@ -139,6 +177,12 @@ public class VolumeSet {
volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume);
LOG.info("Added Volume : {} to VolumeSet",
hddsVolume.getHddsRootDir().getPath());
+
+ if (!hddsVolume.getHddsRootDir().mkdirs() &&
+ !hddsVolume.getHddsRootDir().exists()) {
+ throw new IOException("Failed to create HDDS storage dir " +
+ hddsVolume.getHddsRootDir());
+ }
} catch (IOException e) {
HddsVolume volume = new HddsVolume.Builder(locationString)
.failedVolume(true).build();
@@ -147,8 +191,10 @@ public class VolumeSet {
}
}
+ checkAllVolumes();
+
if (volumeMap.size() == 0) {
- throw new DiskOutOfSpaceException("No storage location configured");
+ throw new DiskOutOfSpaceException("No storage locations configured");
}
// Ensure volume threads are stopped and scm df is saved during shutdown.
@@ -160,6 +206,52 @@ public class VolumeSet {
}
/**
+ * Run a synchronous parallel check of all HDDS volumes, removing
+ * failed volumes.
+ */
+ private void checkAllVolumes() throws IOException {
+ List<HddsVolume> allVolumes = getVolumesList();
+ Set<HddsVolume> failedVolumes;
+ try {
+ failedVolumes = volumeChecker.checkAllVolumes(allVolumes);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while running disk check", e);
+ }
+
+ if (failedVolumes.size() > 0) {
+ LOG.warn("checkAllVolumes got {} failed volumes - {}",
+ failedVolumes.size(), failedVolumes);
+ handleVolumeFailures(failedVolumes);
+ } else {
+ LOG.debug("checkAllVolumes encountered no failures");
+ }
+ }
+
+ /**
+ * Handle one or more failed volumes.
+ * @param failedVolumes
+ */
+ private void handleVolumeFailures(Set<HddsVolume> failedVolumes) {
+ for (HddsVolume v: failedVolumes) {
+ this.writeLock();
+ try {
+ // Immediately mark the volume as failed so it is unavailable
+ // for new containers.
+ volumeMap.remove(v.getHddsRootDir().getPath());
+ failedVolumeMap.putIfAbsent(v.getHddsRootDir().getPath(), v);
+ } finally {
+ this.writeUnlock();
+ }
+
+ // TODO:
+ // 1. Mark all closed containers on the volume as unhealthy.
+ // 2. Consider stopping IO on open containers and tearing down
+ // active pipelines.
+ // 3. Handle Ratis log disk failure.
+ }
+ }
+
+ /**
* If Version file exists and the {@link VolumeSet#clusterID} is not set yet,
* assign it the value from Version file. Otherwise, check that the given
* id matches with the id from version file.
@@ -225,12 +317,12 @@ public class VolumeSet {
// Add a volume to VolumeSet
- public boolean addVolume(String dataDir) {
+ boolean addVolume(String dataDir) {
return addVolume(dataDir, StorageType.DEFAULT);
}
// Add a volume to VolumeSet
- public boolean addVolume(String volumeRoot, StorageType storageType) {
+ private boolean addVolume(String volumeRoot, StorageType storageType) {
String hddsRoot = HddsVolumeUtil.getHddsRoot(volumeRoot);
boolean success;
@@ -330,16 +422,22 @@ public class VolumeSet {
}
/**
- * Shutdown's the volumeset, if saveVolumeSetUsed is false, call's
- * {@link VolumeSet#saveVolumeSetUsed}.
+ * Shutdown the volumeset.
*/
public void shutdown() {
saveVolumeSetUsed();
+ stopDiskChecker();
if (shutdownHook != null) {
ShutdownHookManager.get().removeShutdownHook(shutdownHook);
}
}
+ private void stopDiskChecker() {
+ periodicDiskChecker.cancel(true);
+ volumeChecker.shutdownAndWait(0, TimeUnit.SECONDS);
+ diskCheckerservice.shutdownNow();
+ }
+
@VisibleForTesting
public List<HddsVolume> getVolumesList() {
return ImmutableList.copyOf(volumeMap.values());
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 80ce13d..92d76ef 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -160,6 +160,7 @@ public class OzoneContainer {
writeChannel.stop();
readChannel.stop();
hddsDispatcher.shutdown();
+ volumeSet.shutdown();
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolumeChecker.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolumeChecker.java
new file mode 100644
index 0000000..f2a0c25
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolumeChecker.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.datanode.checker.AsyncChecker;
+import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.FakeTimer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static
org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult.*;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Tests for {@link HddsVolumeChecker}.
+ */
+@RunWith(Parameterized.class)
+public class TestHddsVolumeChecker {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ TestHddsVolumeChecker.class);
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Rule
+ public Timeout globalTimeout = new Timeout(30_000);
+
+ /**
+ * Run each test case for each possible value of {@link VolumeCheckResult}.
+ * Including "null" for 'throw exception'.
+ * @return
+ */
+ @Parameters(name="{0}")
+ public static Collection<Object[]> data() {
+ List<Object[]> values = new ArrayList<>();
+ for (VolumeCheckResult result : VolumeCheckResult.values()) {
+ values.add(new Object[] {result});
+ }
+ values.add(new Object[] {null});
+ return values;
+ }
+
+ /**
+ * When null, the check call should throw an exception.
+ */
+ private final VolumeCheckResult expectedVolumeHealth;
+ private static final int NUM_VOLUMES = 2;
+
+
+ public TestHddsVolumeChecker(VolumeCheckResult expectedVolumeHealth) {
+ this.expectedVolumeHealth = expectedVolumeHealth;
+ }
+
+ /**
+ * Test {@link HddsVolumeChecker#checkVolume} propagates the
+ * check to the delegate checker.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCheckOneVolume() throws Exception {
+ LOG.info("Executing {}", testName.getMethodName());
+ final HddsVolume volume = makeVolumes(1, expectedVolumeHealth).get(0);
+ final HddsVolumeChecker checker =
+ new HddsVolumeChecker(new HdfsConfiguration(), new FakeTimer());
+ checker.setDelegateChecker(new DummyChecker());
+ final AtomicLong numCallbackInvocations = new AtomicLong(0);
+
+ /**
+ * Request a check and ensure it triggered {@link HddsVolume#check}.
+ */
+ boolean result =
+ checker.checkVolume(volume, (healthyVolumes, failedVolumes) -> {
+ numCallbackInvocations.incrementAndGet();
+ if (expectedVolumeHealth != null &&
+ expectedVolumeHealth != FAILED) {
+ assertThat(healthyVolumes.size(), is(1));
+ assertThat(failedVolumes.size(), is(0));
+ } else {
+ assertThat(healthyVolumes.size(), is(0));
+ assertThat(failedVolumes.size(), is(1));
+ }
+ });
+
+ GenericTestUtils.waitFor(() -> numCallbackInvocations.get() > 0, 5, 10000);
+
+ // Ensure that the check was invoked at least once.
+ verify(volume, times(1)).check(anyObject());
+ if (result) {
+ assertThat(numCallbackInvocations.get(), is(1L));
+ }
+ }
+
+ /**
+ * Test {@link HddsVolumeChecker#checkAllVolumes} propagates
+ * checks for all volumes to the delegate checker.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCheckAllVolumes() throws Exception {
+ LOG.info("Executing {}", testName.getMethodName());
+
+ final List<HddsVolume> volumes = makeVolumes(
+ NUM_VOLUMES, expectedVolumeHealth);
+ final HddsVolumeChecker checker =
+ new HddsVolumeChecker(new HdfsConfiguration(), new FakeTimer());
+ checker.setDelegateChecker(new DummyChecker());
+
+ Set<HddsVolume> failedVolumes = checker.checkAllVolumes(volumes);
+ LOG.info("Got back {} failed volumes", failedVolumes.size());
+
+ if (expectedVolumeHealth == null || expectedVolumeHealth == FAILED) {
+ assertThat(failedVolumes.size(), is(NUM_VOLUMES));
+ } else {
+ assertTrue(failedVolumes.isEmpty());
+ }
+
+ // Ensure each volume's check() method was called exactly once.
+ for (HddsVolume volume : volumes) {
+ verify(volume, times(1)).check(anyObject());
+ }
+ }
+
+ /**
+ * A checker to wraps the result of {@link HddsVolume#check} in
+ * an ImmediateFuture.
+ */
+ static class DummyChecker
+ implements AsyncChecker<Boolean, VolumeCheckResult> {
+
+ @Override
+ public Optional<ListenableFuture<VolumeCheckResult>> schedule(
+ Checkable<Boolean, VolumeCheckResult> target,
+ Boolean context) {
+ try {
+ LOG.info("Returning success for volume check");
+ return Optional.of(
+ Futures.immediateFuture(target.check(context)));
+ } catch (Exception e) {
+ LOG.info("check routine threw exception " + e);
+ return Optional.of(Futures.immediateFailedFuture(e));
+ }
+ }
+
+ @Override
+ public void shutdownAndWait(long timeout, TimeUnit timeUnit)
+ throws InterruptedException {
+ // Nothing to cancel.
+ }
+ }
+
+ static List<HddsVolume> makeVolumes(
+ int numVolumes, VolumeCheckResult health) throws Exception {
+ final List<HddsVolume> volumes = new ArrayList<>(numVolumes);
+ for (int i = 0; i < numVolumes; ++i) {
+ final HddsVolume volume = mock(HddsVolume.class);
+
+ if (health != null) {
+ when(volume.check(any(Boolean.class))).thenReturn(health);
+ when(volume.check(isNull())).thenReturn(health);
+ } else {
+ final DiskErrorException de = new DiskErrorException("Fake Exception");
+ when(volume.check(any(Boolean.class))).thenThrow(de);
+ when(volume.check(isNull())).thenThrow(de);
+ }
+ volumes.add(volume);
+ }
+ return volumes;
+ }
+}
\ No newline at end of file
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
new file mode 100644
index 0000000..687a12d
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import com.google.common.collect.Iterables;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.shaded.com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdfs.server.datanode.checker.AsyncChecker;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.Timer;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Verify that {@link VolumeSet} correctly checks for failed disks
+ * during initialization.
+ */
+public class TestVolumeSetDiskChecks {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ TestVolumeSetDiskChecks.class);
+
+ @Rule
+ public Timeout globalTimeout = new Timeout(30_000);
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ Configuration conf = null;
+
+ /**
+ * Cleanup volume directories.
+ */
+ @After
+ public void cleanup() {
+ final Collection<String> dirs = conf.getTrimmedStringCollection(
+ DFS_DATANODE_DATA_DIR_KEY);
+
+ for (String d: dirs) {
+ FileUtils.deleteQuietly(new File(d));
+ }
+ }
+
+ /**
+ * Verify that VolumeSet creates volume root directories at startup.
+ * @throws IOException
+ */
+ @Test
+ public void testOzoneDirsAreCreated() throws IOException {
+ final int numVolumes = 2;
+
+ conf = getConfWithDataNodeDirs(numVolumes);
+ final VolumeSet volumeSet =
+ new VolumeSet(UUID.randomUUID().toString(), conf);
+
+ assertThat(volumeSet.getVolumesList().size(), is(numVolumes));
+ assertThat(volumeSet.getFailedVolumesList().size(), is(0));
+
+ // Verify that the Ozone dirs were created during initialization.
+ Collection<String> dirs = conf.getTrimmedStringCollection(
+ DFS_DATANODE_DATA_DIR_KEY);
+ for (String d : dirs) {
+ assertTrue(new File(d).isDirectory());
+ }
+ }
+
+ /**
+ * Verify that bad volumes are filtered at startup.
+ * @throws IOException
+ */
+ @Test
+ public void testBadDirectoryDetection() throws IOException {
+ final int numVolumes = 5;
+ final int numBadVolumes = 2;
+
+ conf = getConfWithDataNodeDirs(numVolumes);
+ final VolumeSet volumeSet = new VolumeSet(
+ UUID.randomUUID().toString(), conf) {
+ @Override
+ HddsVolumeChecker getVolumeChecker(Configuration conf)
+ throws DiskErrorException {
+ return new DummyChecker(conf, new Timer(), numBadVolumes);
+ }
+ };
+
+ assertThat(volumeSet.getFailedVolumesList().size(), is(numBadVolumes));
+ assertThat(volumeSet.getVolumesList().size(), is(numVolumes -
numBadVolumes));
+ }
+
+ /**
+ * Verify that initialization fails if all volumes are bad.
+ */
+ @Test
+ public void testAllVolumesAreBad() throws IOException {
+ final int numVolumes = 5;
+
+ conf = getConfWithDataNodeDirs(numVolumes);
+ thrown.expect(IOException.class);
+ final VolumeSet volumeSet = new VolumeSet(
+ UUID.randomUUID().toString(), conf) {
+ @Override
+ HddsVolumeChecker getVolumeChecker(Configuration conf)
+ throws DiskErrorException {
+ return new DummyChecker(conf, new Timer(), numVolumes);
+ }
+ };
+ }
+
+ /**
+ * Update configuration with the specified number of Datanode
+ * storage directories.
+ * @param conf
+ * @param numDirs
+ */
+ private Configuration getConfWithDataNodeDirs(int numDirs) {
+ final Configuration conf = new OzoneConfiguration();
+ final List<String> dirs = new ArrayList<>();
+ for (int i = 0; i < numDirs; ++i) {
+ dirs.add(GenericTestUtils.getRandomizedTestDir().getPath());
+ }
+ conf.set(DFS_DATANODE_DATA_DIR_KEY, String.join(",", dirs));
+ return conf;
+ }
+
+ /**
+ * A no-op checker that fails the given number of volumes and succeeds
+ * the rest.
+ */
+ static class DummyChecker extends HddsVolumeChecker {
+ private final int numBadVolumes;
+
+ public DummyChecker(Configuration conf, Timer timer, int numBadVolumes)
+ throws DiskErrorException {
+ super(conf, timer);
+ this.numBadVolumes = numBadVolumes;
+ }
+
+ @Override
+ public Set<HddsVolume> checkAllVolumes(Collection<HddsVolume> volumes)
+ throws InterruptedException {
+ // Return the first 'numBadVolumes' as failed.
+ return ImmutableSet.copyOf(Iterables.limit(volumes, numBadVolumes));
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]