HDFS-11119. Support for parallel checking of StorageLocations on DataNode startup.
This closes #155. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3d267177 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3d267177 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3d267177 Branch: refs/heads/YARN-3926 Commit: 3d267177776547ceb32c5b9ed04cd9ec05b3421a Parents: 4484b48 Author: Arpit Agarwal <a...@apache.org> Authored: Fri Nov 11 15:02:52 2016 -0800 Committer: Arpit Agarwal <a...@apache.org> Committed: Fri Nov 11 15:02:52 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 10 + .../hdfs/server/datanode/StorageLocation.java | 35 ++- .../checker/StorageLocationChecker.java | 207 ++++++++++++++++++ .../datanode/checker/VolumeCheckResult.java | 43 ++++ .../src/main/resources/hdfs-default.xml | 23 ++ .../checker/TestStorageLocationChecker.java | 217 +++++++++++++++++++ 6 files changed, 531 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d267177/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 951ad68..6377184 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -526,6 +526,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads"; public static final int DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1; + public static final String DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY = + "dfs.datanode.disk.check.min.gap"; + public static final String DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT = + "15m"; + + public static final String DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY = + "dfs.datanode.disk.check.timeout"; + public static final String DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT = + "10m"; + public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_KEY = "dfs.datanode.ec.reconstruction.stripedread.threads"; public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_DEFAULT = 20; public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.ec.reconstruction.stripedread.buffer.size"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d267177/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java index 1bd3782..b4d5794 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java @@ -31,12 +31,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.datanode.checker.Checkable; +import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.StringUtils; @@ -48,8 +50,10 @@ import org.apache.hadoop.util.StringUtils; * */ @InterfaceAudience.Private -public class StorageLocation implements Comparable<StorageLocation>{ - final StorageType storageType; +public class StorageLocation + implements Checkable<StorageLocation.CheckContext, VolumeCheckResult>, + Comparable<StorageLocation> { + private final StorageType storageType; private final URI baseURI; /** Regular expression that describes a storage uri with a storage type. * e.g. [Disk]/storages/storage1/ @@ -206,4 +210,27 @@ public class StorageLocation implements Comparable<StorageLocation>{ ": " + e.getMessage()); } } + + @Override // Checkable + public VolumeCheckResult check(CheckContext context) throws IOException { + DiskChecker.checkDir( + context.localFileSystem, + new Path(baseURI), + context.expectedPermission); + return VolumeCheckResult.HEALTHY; + } + + /** + * Class to hold the parameters for running a {@link #check}. + */ + public static final class CheckContext { + private final LocalFileSystem localFileSystem; + private final FsPermission expectedPermission; + + public CheckContext(LocalFileSystem localFileSystem, + FsPermission expectedPermission) { + this.localFileSystem = localFileSystem; + this.expectedPermission = expectedPermission; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d267177/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java new file mode 100644 index 0000000..4209737 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.checker; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; + +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation.CheckContext; +import org.apache.hadoop.util.Timer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A utility class that encapsulates checking storage locations during DataNode + * startup. + * + * Some of this code was extracted from the DataNode class. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class StorageLocationChecker { + public static final Logger LOG = LoggerFactory.getLogger( + StorageLocationChecker.class); + private final AsyncChecker<CheckContext, VolumeCheckResult> delegateChecker; + private final Timer timer; + + /** + * Max allowed time for a disk check in milliseconds. If the check + * doesn't complete within this time we declare the disk as dead. + */ + private final long maxAllowedTimeForCheckMs; + + + /** + * Expected filesystem permissions on the storage directory. + */ + private final FsPermission expectedPermission; + + /** + * Maximum number of volume failures that can be tolerated without + * declaring a fatal error. + */ + private final int maxVolumeFailuresTolerated; + + public StorageLocationChecker(Configuration conf, Timer timer) { + maxAllowedTimeForCheckMs = conf.getTimeDuration( + DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY, + DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + + expectedPermission = new FsPermission( + conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY, + DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT)); + + maxVolumeFailuresTolerated = conf.getInt( + DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, + DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT); + + this.timer = timer; + + delegateChecker = new ThrottledAsyncChecker<>( + timer, + conf.getTimeDuration( + DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, + DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT, + TimeUnit.MILLISECONDS), + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("StorageLocationChecker thread %d") + .setDaemon(true) + .build())); + } + + /** + * Initiate a check of the supplied storage volumes and return + * a list of failed volumes. + * + * @param conf HDFS configuration. + * @param dataDirs list of volumes to check. + * @return returns a list of failed volumes. Returns the empty list if + * there are no failed volumes. + * + * @throws InterruptedException if the check was interrupted. + * @throws IOException if the number of failed volumes exceeds the + * maximum allowed or if there are no good + * volumes. + */ + public List<StorageLocation> check( + final Configuration conf, + final Collection<StorageLocation> dataDirs) + throws InterruptedException, IOException { + + final ArrayList<StorageLocation> goodLocations = new ArrayList<>(); + final Set<StorageLocation> failedLocations = new HashSet<>(); + final Map<StorageLocation, ListenableFuture<VolumeCheckResult>> futures = + Maps.newHashMap(); + final LocalFileSystem localFS = FileSystem.getLocal(conf); + final CheckContext context = new CheckContext(localFS, expectedPermission); + + // Start parallel disk check operations on all StorageLocations. + for (StorageLocation location : dataDirs) { + futures.put(location, + delegateChecker.schedule(location, context)); + } + + final long checkStartTimeMs = timer.monotonicNow(); + + // Retrieve the results of the disk checks. + for (Map.Entry<StorageLocation, + ListenableFuture<VolumeCheckResult>> entry : futures.entrySet()) { + + // Determine how much time we can allow for this check to complete. + // The cumulative wait time cannot exceed maxAllowedTimeForCheck. + final long waitSoFarMs = (timer.monotonicNow() - checkStartTimeMs); + final long timeLeftMs = Math.max(0, + maxAllowedTimeForCheckMs - waitSoFarMs); + final StorageLocation location = entry.getKey(); + + try { + final VolumeCheckResult result = + entry.getValue().get(timeLeftMs, TimeUnit.MILLISECONDS); + switch (result) { + case HEALTHY: + goodLocations.add(entry.getKey()); + break; + case DEGRADED: + LOG.warn("StorageLocation {} appears to be degraded.", location); + break; + case FAILED: + LOG.warn("StorageLocation {} detected as failed.", location); + failedLocations.add(location); + break; + default: + LOG.error("Unexpected health check result {} for StorageLocation {}", + result, location); + goodLocations.add(entry.getKey()); + } + } catch (ExecutionException|TimeoutException e) { + LOG.warn("Exception checking StorageLocation " + location, + e.getCause()); + failedLocations.add(location); + } + } + + if (failedLocations.size() > maxVolumeFailuresTolerated) { + throw new IOException( + "Too many failed volumes: " + failedLocations.size() + + ". The configuration allows for a maximum of " + + maxVolumeFailuresTolerated + " failed volumes."); + } + + if (goodLocations.size() == 0) { + throw new IOException("All directories in " + + DFS_DATANODE_DATA_DIR_KEY + " are invalid: " + + failedLocations); + } + + return goodLocations; + } + + public void shutdownAndWait(int gracePeriod, TimeUnit timeUnit) { + try { + delegateChecker.shutdownAndWait(gracePeriod, timeUnit); + } catch (InterruptedException e) { + LOG.warn("StorageLocationChecker interrupted during shutdown."); + Thread.currentThread().interrupt(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d267177/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/VolumeCheckResult.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/VolumeCheckResult.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/VolumeCheckResult.java new file mode 100644 index 0000000..65b14da --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/VolumeCheckResult.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.checker; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Defines the outcomes of running a disk check operation against a + * volume. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public enum VolumeCheckResult { + HEALTHY(1), + DEGRADED(2), + FAILED(3); + + private final int value; + + VolumeCheckResult(int value) { + this.value = value; + } + + int getValue() { + return value; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d267177/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index b111193..be6d750 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4324,4 +4324,27 @@ call queue</description> </property> + <property> + <name>dfs.datanode.disk.check.min.gap</name> + <value>15m</value> + <description> + The minimum gap between two successive checks of the same DataNode + volume. This setting supports multiple time unit suffixes as described + in dfs.heartbeat.interval. If no suffix is specified then milliseconds + is assumed. + </description> + </property> + + <property> + <name>dfs.datanode.disk.check.timeout</name> + <value>10m</value> + <description> + Maximum allowed time for a disk check to complete. If the check does + not complete within this time interval then the disk is declared as + failed. This setting supports multiple time unit suffixes as described + in dfs.heartbeat.interval. If no suffix is specified then milliseconds + is assumed. + </description> + </property> + </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d267177/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestStorageLocationChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestStorageLocationChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestStorageLocationChecker.java new file mode 100644 index 0000000..bf885be --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestStorageLocationChecker.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.checker; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.util.FakeTimer; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY; +import static org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult.*; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.*; + +/** + * Unit tests for the {@link StorageLocationChecker} class. + */ +public class TestStorageLocationChecker { + public static final Logger LOG = LoggerFactory.getLogger( + TestStorageLocationChecker.class); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + /** + * Verify that all healthy locations are correctly handled and that the + * check routine is invoked as expected. + * @throws Exception + */ + @Test(timeout=30000) + public void testAllLocationsHealthy() throws Exception { + final List<StorageLocation> locations = + makeMockLocations(HEALTHY, HEALTHY, HEALTHY); + final Configuration conf = new HdfsConfiguration(); + conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 0); + StorageLocationChecker checker = + new StorageLocationChecker(conf, new FakeTimer()); + List<StorageLocation> filteredLocations = checker.check(conf, locations); + + // All locations should be healthy. + assertThat(filteredLocations.size(), is(3)); + + // Ensure that the check method was invoked for each location. + for (StorageLocation location : locations) { + verify(location).check(any(StorageLocation.CheckContext.class)); + } + } + + /** + * Test handling when the number of failed locations is below the + * max volume failure threshold. + * + * @throws Exception + */ + @Test(timeout=30000) + public void testFailedLocationsBelowThreshold() throws Exception { + final List<StorageLocation> locations = + makeMockLocations(HEALTHY, HEALTHY, FAILED); // 2 healthy, 1 failed. + final Configuration conf = new HdfsConfiguration(); + conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); + StorageLocationChecker checker = + new StorageLocationChecker(conf, new FakeTimer()); + List<StorageLocation> filteredLocations = checker.check(conf, locations); + assertThat(filteredLocations.size(), is(2)); + } + + /** + * Test handling when the number of failed locations is above the + * max volume failure threshold. + * + * @throws Exception + */ + @Test(timeout=30000) + public void testFailedLocationsAboveThreshold() throws Exception { + final List<StorageLocation> locations = + makeMockLocations(HEALTHY, FAILED, FAILED); // 1 healthy, 2 failed. + final Configuration conf = new HdfsConfiguration(); + conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); + + thrown.expect(IOException.class); + thrown.expectMessage("Too many failed volumes"); + StorageLocationChecker checker = + new StorageLocationChecker(conf, new FakeTimer()); + checker.check(conf, locations); + } + + /** + * Test handling all storage locations are failed. + * + * @throws Exception + */ + @Test(timeout=30000) + public void testAllFailedLocations() throws Exception { + final List<StorageLocation> locations = + makeMockLocations(FAILED, FAILED, FAILED); + final Configuration conf = new HdfsConfiguration(); + conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 3); + + thrown.expect(IOException.class); + thrown.expectMessage("All directories in " + DFS_DATANODE_DATA_DIR_KEY + + " are invalid"); + StorageLocationChecker checker = + new StorageLocationChecker(conf, new FakeTimer()); + checker.check(conf, locations); + } + + /** + * Verify that a {@link StorageLocation#check} timeout is correctly detected + * as a failure. + * + * This is hard to test without a {@link Thread#sleep} call. + * + * @throws Exception + */ + @Test (timeout=300000) + public void testTimeoutInCheck() throws Exception { + final Configuration conf = new HdfsConfiguration(); + conf.setTimeDuration(DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY, + 1, TimeUnit.SECONDS); + conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); + final FakeTimer timer = new FakeTimer(); + + // Generate a list of storage locations the first of which sleeps + // for 2 seconds in its check() routine. + final List<StorageLocation> locations = makeSlowLocations(2000, 1); + StorageLocationChecker checker = + new StorageLocationChecker(conf, timer); + + try { + // Check the two locations and ensure that only one of them + // was filtered out. + List<StorageLocation> filteredList = checker.check(conf, locations); + assertThat(filteredList.size(), is(1)); + } finally { + checker.shutdownAndWait(10, TimeUnit.SECONDS); + } + } + + /** + * Return a list of storage locations - one per argument - which return + * health check results corresponding to the supplied arguments. + */ + private List<StorageLocation> makeMockLocations(VolumeCheckResult... args) + throws IOException { + final List<StorageLocation> locations = new ArrayList<>(args.length); + final AtomicInteger index = new AtomicInteger(0); + + for (VolumeCheckResult result : args) { + final StorageLocation location = mock(StorageLocation.class); + when(location.toString()).thenReturn("/" + index.incrementAndGet()); + when(location.check(any(StorageLocation.CheckContext.class))) + .thenReturn(result); + locations.add(location); + } + + return locations; + } + + /** + * Return a list of storage locations - one per argument - whose check() + * method takes at least the specified number of milliseconds to complete. + */ + private List<StorageLocation> makeSlowLocations(long... args) + throws IOException { + final List<StorageLocation> locations = new ArrayList<>(args.length); + final AtomicInteger index = new AtomicInteger(0); + + for (final long checkDelayMs: args) { + final StorageLocation location = mock(StorageLocation.class); + when(location.toString()).thenReturn("/" + index.incrementAndGet()); + when(location.check(any(StorageLocation.CheckContext.class))) + .thenAnswer(new Answer<VolumeCheckResult>() { + @Override + public VolumeCheckResult answer(InvocationOnMock invocation) + throws Throwable { + Thread.sleep(checkDelayMs); + return VolumeCheckResult.HEALTHY; + } + }); + + locations.add(location); + } + return locations; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org