HDFS-11164: Mover should avoid unnecessary retries if the block is pinned. Contributed by Rakesh R
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e24a923d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e24a923d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e24a923d Branch: refs/heads/YARN-5085 Commit: e24a923db50879f7dbe5d2afac0e6757089fb07d Parents: 9947aeb Author: Uma Maheswara Rao G <[email protected]> Authored: Tue Dec 13 17:09:58 2016 -0800 Committer: Uma Maheswara Rao G <[email protected]> Committed: Tue Dec 13 17:09:58 2016 -0800 ---------------------------------------------------------------------- .../datatransfer/BlockPinningException.java | 33 ++++ .../datatransfer/DataTransferProtoUtil.java | 17 +- .../src/main/proto/datatransfer.proto | 1 + .../hadoop/hdfs/server/balancer/Dispatcher.java | 62 ++++++- .../hdfs/server/datanode/DataXceiver.java | 8 +- .../apache/hadoop/hdfs/server/mover/Mover.java | 26 ++- .../hdfs/server/datanode/DataNodeTestUtils.java | 28 ++++ .../server/datanode/TestBlockReplacement.java | 70 +++++++- .../hadoop/hdfs/server/mover/TestMover.java | 163 ++++++++++++++++++- 9 files changed, 395 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockPinningException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockPinningException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockPinningException.java new file mode 100644 index 0000000..c2f12f9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockPinningException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import java.io.IOException; + +/** + * Indicates a failure due to block pinning. + */ +public class BlockPinningException extends IOException { + + // Required by {@link java.io.Serializable}. + private static final long serialVersionUID = 1L; + + public BlockPinningException(String errMsg) { + super(errMsg); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java index 6801149..287928c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java @@ -24,11 +24,11 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -107,6 +107,11 @@ public abstract class DataTransferProtoUtil { public static void checkBlockOpStatus( BlockOpResponseProto response, String logInfo) throws IOException { + checkBlockOpStatus(response, logInfo, false); + } + + public static void checkBlockOpStatus(BlockOpResponseProto response, + String logInfo, boolean checkBlockPinningErr) throws IOException { if (response.getStatus() != Status.SUCCESS) { if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException( @@ -114,6 +119,14 @@ public abstract class DataTransferProtoUtil { + ", status message " + response.getMessage() + ", " + logInfo ); + } else if (checkBlockPinningErr + && response.getStatus() == Status.ERROR_BLOCK_PINNED) { + throw new BlockPinningException( + "Got error" + + ", status=" + response.getStatus().name() + + ", status message " + response.getMessage() + + ", " + logInfo + ); } else { throw new IOException( "Got error" http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto index 290b158..889361a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto @@ -243,6 +243,7 @@ enum Status { OOB_RESERVED2 = 10; // Reserved OOB_RESERVED3 = 11; // Reserved IN_PROGRESS = 12; + ERROR_BLOCK_PINNED = 13; } enum ShortCircuitFdResponse { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index eb3ed87..0e62da2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -36,6 +36,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -55,6 +56,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; @@ -224,6 +226,10 @@ public class Dispatcher { this.target = target; } + public DatanodeInfo getSource() { + return source.getDatanodeInfo(); + } + @Override public String toString() { final Block b = reportedBlock != null ? reportedBlock.getBlock() : null; @@ -367,6 +373,15 @@ public class Dispatcher { } catch (IOException e) { LOG.warn("Failed to move " + this, e); target.getDDatanode().setHasFailure(); + // Check that the failure is due to block pinning errors. + if (e instanceof BlockPinningException) { + // Pinned block can't be moved. Add this block into failure list. + // Later in the next iteration mover will exclude these blocks from + // pending moves. + target.getDDatanode().addBlockPinningFailures(this); + return; + } + // Proxy or target may have some issues, delay before using these nodes // further in order to avoid a potential storm of "threads quota // exceeded" warnings when the dispatcher gets out of sync with work @@ -419,7 +434,7 @@ public class Dispatcher { } } String logInfo = "reportedBlock move is failed"; - DataTransferProtoUtil.checkBlockOpStatus(response, logInfo); + DataTransferProtoUtil.checkBlockOpStatus(response, logInfo, true); } /** reset the object */ @@ -600,6 +615,7 @@ public class Dispatcher { /** blocks being moved but not confirmed yet */ private final List<PendingMove> pendings; private volatile boolean hasFailure = false; + private Map<Long, Set<DatanodeInfo>> blockPinningFailures = new HashMap<>(); private volatile boolean hasSuccess = false; private ExecutorService moveExecutor; @@ -685,6 +701,22 @@ public class Dispatcher { this.hasFailure = true; } + void addBlockPinningFailures(PendingMove pendingBlock) { + synchronized (blockPinningFailures) { + long blockId = pendingBlock.reportedBlock.getBlock().getBlockId(); + Set<DatanodeInfo> pinnedLocations = blockPinningFailures.get(blockId); + if (pinnedLocations == null) { + pinnedLocations = new HashSet<>(); + blockPinningFailures.put(blockId, pinnedLocations); + } + pinnedLocations.add(pendingBlock.getSource()); + } + } + + Map<Long, Set<DatanodeInfo>> getBlockPinningFailureList() { + return blockPinningFailures; + } + void setHasSuccess() { this.hasSuccess = true; } @@ -1155,6 +1187,34 @@ public class Dispatcher { } /** + * Check any of the block movements are failed due to block pinning errors. If + * yes, add the failed blockId and its respective source node location to the + * excluded list. + */ + public static void checkForBlockPinningFailures( + Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks, + Iterable<? extends StorageGroup> targets) { + for (StorageGroup t : targets) { + Map<Long, Set<DatanodeInfo>> blockPinningFailureList = t.getDDatanode() + .getBlockPinningFailureList(); + Set<Entry<Long, Set<DatanodeInfo>>> entrySet = blockPinningFailureList + .entrySet(); + for (Entry<Long, Set<DatanodeInfo>> entry : entrySet) { + Long blockId = entry.getKey(); + Set<DatanodeInfo> locs = excludedPinnedBlocks.get(blockId); + if (locs == null) { + // blockId doesn't exists in the excluded list. + locs = entry.getValue(); + excludedPinnedBlocks.put(blockId, locs); + } else { + // blockId already exists in the excluded list, add the pinned node. + locs.addAll(entry.getValue()); + } + } + } + } + + /** * @return true if some moves are success. */ public static boolean checkForSuccess( http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index fee16b3..a35a5b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Op; @@ -1022,7 +1023,7 @@ class DataXceiver extends Receiver implements Runnable { String msg = "Not able to copy block " + block.getBlockId() + " " + "to " + peer.getRemoteAddressString() + " because it's pinned "; LOG.info(msg); - sendResponse(ERROR, msg); + sendResponse(Status.ERROR_BLOCK_PINNED, msg); return; } @@ -1156,7 +1157,7 @@ class DataXceiver extends Receiver implements Runnable { String logInfo = "copy block " + block + " from " + proxySock.getRemoteSocketAddress(); - DataTransferProtoUtil.checkBlockOpStatus(copyResponse, logInfo); + DataTransferProtoUtil.checkBlockOpStatus(copyResponse, logInfo, true); // get checksum info about the block we're copying ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo(); @@ -1183,6 +1184,9 @@ class DataXceiver extends Receiver implements Runnable { } } catch (IOException ioe) { opStatus = ERROR; + if (ioe instanceof BlockPinningException) { + opStatus = Status.ERROR_BLOCK_PINNED; + } errMsg = "opReplaceBlock " + block + " received exception " + ioe; LOG.info(errMsg); if (!IoeDuringCopyBlockOperation) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 4ab55d3..bc75f0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.SecurityUtil; @@ -117,10 +116,12 @@ public class Mover { private final List<Path> targetPaths; private final int retryMaxAttempts; private final AtomicInteger retryCount; + private final Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks; private final BlockStoragePolicy[] blockStoragePolicies; - Mover(NameNodeConnector nnc, Configuration conf, AtomicInteger retryCount) { + Mover(NameNodeConnector nnc, Configuration conf, AtomicInteger retryCount, + Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks) { final long movedWinWidth = conf.getLong( DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_DEFAULT); @@ -144,6 +145,7 @@ public class Mover { this.targetPaths = nnc.getTargetPaths(); this.blockStoragePolicies = new BlockStoragePolicy[1 << BlockStoragePolicySuite.ID_BIT_LENGTH]; + this.excludedPinnedBlocks = excludedPinnedBlocks; } void init() throws IOException { @@ -292,6 +294,8 @@ public class Mover { // wait for pending move to finish and retry the failed migration boolean hasFailed = Dispatcher.waitForMoveCompletion(storages.targets .values()); + Dispatcher.checkForBlockPinningFailures(excludedPinnedBlocks, + storages.targets.values()); boolean hasSuccess = Dispatcher.checkForSuccess(storages.targets .values()); if (hasFailed && !hasSuccess) { @@ -461,6 +465,19 @@ public class Mover { return true; } + // Check the given block is pinned in the source datanode. A pinned block + // can't be moved to a different datanode. So we can skip adding these + // blocks to different nodes. + long blockId = db.getBlock().getBlockId(); + if (excludedPinnedBlocks.containsKey(blockId)) { + Set<DatanodeInfo> locs = excludedPinnedBlocks.get(blockId); + for (DatanodeInfo dn : locs) { + if (source.getDatanodeInfo().equals(dn)) { + return false; + } + } + } + if (dispatcher.getCluster().isNodeGroupAware()) { if (chooseTarget(db, source, targetTypes, Matcher.SAME_NODE_GROUP)) { return true; @@ -614,6 +631,8 @@ public class Mover { DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT, TimeUnit.SECONDS) * 1000; AtomicInteger retryCount = new AtomicInteger(0); + // TODO: Need to limit the size of the pinned blocks to limit memory usage + Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks = new HashMap<>(); LOG.info("namenodes = " + namenodes); checkKeytabAndInit(conf); @@ -628,7 +647,8 @@ public class Mover { Iterator<NameNodeConnector> iter = connectors.iterator(); while (iter.hasNext()) { NameNodeConnector nnc = iter.next(); - final Mover m = new Mover(nnc, conf, retryCount); + final Mover m = new Mover(nnc, conf, retryCount, + excludedPinnedBlocks); final ExitStatus r = m.run(); if (r == ExitStatus.SUCCESS) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java index e2755f9..3501ed3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java @@ -25,10 +25,17 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; /** * Utility class for accessing package-private DataNode information during tests. @@ -175,4 +182,25 @@ public class DataNodeTestUtils { dn.getDirectoryScanner().reconcile(); } } + + /** + * This method is used to mock the data node block pinning API. + * + * @param dn datanode + * @param pinned true if the block is pinned, false otherwise + * @throws IOException + */ + public static void mockDatanodeBlkPinning(final DataNode dn, + final boolean pinned) throws IOException { + final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data; + dn.data = Mockito.spy(data); + + doAnswer(new Answer<Object>() { + public Object answer(InvocationOnMock invocation) throws IOException { + // Bypass the argument to FsDatasetImpl#getPinning to show that + // the block is pinned. + return pinned; + } + }).when(dn.data).getPinning(any(ExtendedBlock.class)); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java index 597dc46..f811bd8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java @@ -208,6 +208,67 @@ public class TestBlockReplacement { } } + /** + * Test to verify that the copying of pinned block to a different destination + * datanode will throw IOException with error code Status.ERROR_BLOCK_PINNED. + * + */ + @Test(timeout = 90000) + public void testBlockReplacementWithPinnedBlocks() throws Exception { + final Configuration conf = new HdfsConfiguration(); + + // create only one datanode in the cluster with DISK and ARCHIVE storage + // types. + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .storageTypes( + new StorageType[] {StorageType.DISK, StorageType.ARCHIVE}) + .build(); + + try { + cluster.waitActive(); + + final DistributedFileSystem dfs = cluster.getFileSystem(); + String fileName = "/testBlockReplacementWithPinnedBlocks/file"; + final Path file = new Path(fileName); + DFSTestUtil.createFile(dfs, file, 1024, (short) 1, 1024); + + LocatedBlock lb = dfs.getClient().getLocatedBlocks(fileName, 0).get(0); + DatanodeInfo[] oldNodes = lb.getLocations(); + assertEquals("Wrong block locations", oldNodes.length, 1); + DatanodeInfo source = oldNodes[0]; + ExtendedBlock b = lb.getBlock(); + + DatanodeInfo[] datanodes = dfs.getDataNodeStats(); + DatanodeInfo destin = null; + for (DatanodeInfo datanodeInfo : datanodes) { + // choose different destination node + if (!oldNodes[0].equals(datanodeInfo)) { + destin = datanodeInfo; + break; + } + } + + assertNotNull("Failed to choose destination datanode!", destin); + + assertFalse("Source and destin datanode should be different", + source.equals(destin)); + + // Mock FsDatasetSpi#getPinning to show that the block is pinned. + for (int i = 0; i < cluster.getDataNodes().size(); i++) { + DataNode dn = cluster.getDataNodes().get(i); + LOG.info("Simulate block pinning in datanode " + dn); + DataNodeTestUtils.mockDatanodeBlkPinning(dn, true); + } + + // Block movement to a different datanode should fail as the block is + // pinned. + assertTrue("Status code mismatches!", replaceBlock(b, source, source, + destin, StorageType.ARCHIVE, Status.ERROR_BLOCK_PINNED)); + } finally { + cluster.shutdown(); + } + } + @Test public void testBlockMoveAcrossStorageInSameNode() throws Exception { final Configuration conf = new HdfsConfiguration(); @@ -236,7 +297,7 @@ public class TestBlockReplacement { // move block to ARCHIVE by using same DataNodeInfo for source, proxy and // destination so that movement happens within datanode assertTrue(replaceBlock(block, source, source, source, - StorageType.ARCHIVE)); + StorageType.ARCHIVE, Status.SUCCESS)); // wait till namenode notified Thread.sleep(3000); @@ -311,7 +372,7 @@ public class TestBlockReplacement { private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { return replaceBlock(block, source, sourceProxy, destination, - StorageType.DEFAULT); + StorageType.DEFAULT, Status.SUCCESS); } /* @@ -322,7 +383,8 @@ public class TestBlockReplacement { DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination, - StorageType targetStorageType) throws IOException, SocketException { + StorageType targetStorageType, + Status opStatus) throws IOException, SocketException { Socket sock = new Socket(); try { sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()), @@ -342,7 +404,7 @@ public class TestBlockReplacement { while (proto.getStatus() == Status.IN_PROGRESS) { proto = BlockOpResponseProto.parseDelimitedFrom(reply); } - return proto.getStatus() == Status.SUCCESS; + return proto.getStatus() == opStatus; } finally { sock.close(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java index 20a6959..d565548 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java @@ -37,11 +37,13 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSF import java.io.File; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.URI; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -64,6 +66,7 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -72,6 +75,8 @@ import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock; import org.apache.hadoop.hdfs.server.balancer.ExitStatus; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.balancer.TestBalancer; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.mover.Mover.MLocation; import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; @@ -121,7 +126,7 @@ public class TestMover { final List<NameNodeConnector> nncs = NameNodeConnector.newNameNodeConnectors( nnMap, Mover.class.getSimpleName(), Mover.MOVER_ID_PATH, conf, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS); - return new Mover(nncs.get(0), conf, new AtomicInteger(0)); + return new Mover(nncs.get(0), conf, new AtomicInteger(0), new HashMap<>()); } @Test @@ -705,4 +710,160 @@ public class TestMover { UserGroupInformation.setConfiguration(new Configuration()); } } + + /** + * Test to verify that mover can't move pinned blocks. + */ + @Test(timeout = 90000) + public void testMoverWithPinnedBlocks() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + + // Sets bigger retry max attempts value so that test case will timed out if + // block pinning errors are not handled properly during block movement. + conf.setInt(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, 10000); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(3) + .build(); + try { + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final String file = "/testMoverWithPinnedBlocks/file"; + Path dir = new Path("/testMoverWithPinnedBlocks"); + dfs.mkdirs(dir); + + // write to DISK + dfs.setStoragePolicy(dir, "HOT"); + final FSDataOutputStream out = dfs.create(new Path(file)); + byte[] fileData = StripedFileTestUtil + .generateBytes(DEFAULT_BLOCK_SIZE * 3); + out.write(fileData); + out.close(); + + // verify before movement + LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); + StorageType[] storageTypes = lb.getStorageTypes(); + for (StorageType storageType : storageTypes) { + Assert.assertTrue(StorageType.DISK == storageType); + } + + // Adding one SSD based data node to the cluster. + StorageType[][] newtypes = new StorageType[][] {{StorageType.SSD}}; + startAdditionalDNs(conf, 1, newtypes, cluster); + + // Mock FsDatasetSpi#getPinning to show that the block is pinned. + for (int i = 0; i < cluster.getDataNodes().size(); i++) { + DataNode dn = cluster.getDataNodes().get(i); + LOG.info("Simulate block pinning in datanode {}", dn); + DataNodeTestUtils.mockDatanodeBlkPinning(dn, true); + } + + // move file blocks to ONE_SSD policy + dfs.setStoragePolicy(dir, "ONE_SSD"); + int rc = ToolRunner.run(conf, new Mover.Cli(), + new String[] {"-p", dir.toString()}); + + int exitcode = ExitStatus.NO_MOVE_BLOCK.getExitCode(); + Assert.assertEquals("Movement should fail", exitcode, rc); + + } finally { + cluster.shutdown(); + } + } + + /** + * Test to verify that mover should work well with pinned blocks as well as + * failed blocks. Mover should continue retrying the failed blocks only. + */ + @Test(timeout = 90000) + public void testMoverFailedRetryWithPinnedBlocks() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + conf.set(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, "2"); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(2) + .storageTypes( + new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}}).build(); + try { + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final String parenDir = "/parent"; + dfs.mkdirs(new Path(parenDir)); + final String file1 = "/parent/testMoverFailedRetryWithPinnedBlocks1"; + // write to DISK + final FSDataOutputStream out = dfs.create(new Path(file1), (short) 2); + byte[] fileData = StripedFileTestUtil + .generateBytes(DEFAULT_BLOCK_SIZE * 2); + out.write(fileData); + out.close(); + + // Adding pinned blocks. + createFileWithFavoredDatanodes(conf, cluster, dfs); + + // Delete block file so, block move will fail with FileNotFoundException + LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file1, 0); + Assert.assertEquals("Wrong block count", 2, + locatedBlocks.locatedBlockCount()); + LocatedBlock lb = locatedBlocks.get(0); + cluster.corruptBlockOnDataNodesByDeletingBlockFile(lb.getBlock()); + + // move to ARCHIVE + dfs.setStoragePolicy(new Path(parenDir), "COLD"); + int rc = ToolRunner.run(conf, new Mover.Cli(), + new String[] {"-p", parenDir.toString()}); + Assert.assertEquals("Movement should fail after some retry", + ExitStatus.NO_MOVE_PROGRESS.getExitCode(), rc); + } finally { + cluster.shutdown(); + } + } + + private void createFileWithFavoredDatanodes(final Configuration conf, + final MiniDFSCluster cluster, final DistributedFileSystem dfs) + throws IOException { + // Adding two DISK based data node to the cluster. + // Also, ensure that blocks are pinned in these new data nodes. + StorageType[][] newtypes = + new StorageType[][] {{StorageType.DISK}, {StorageType.DISK}}; + startAdditionalDNs(conf, 2, newtypes, cluster); + ArrayList<DataNode> dataNodes = cluster.getDataNodes(); + InetSocketAddress[] favoredNodes = new InetSocketAddress[2]; + int j = 0; + for (int i = dataNodes.size() - 1; i >= 2; i--) { + favoredNodes[j++] = dataNodes.get(i).getXferAddress(); + } + final String file = "/parent/testMoverFailedRetryWithPinnedBlocks2"; + final FSDataOutputStream out = dfs.create(new Path(file), + FsPermission.getDefault(), true, DEFAULT_BLOCK_SIZE, (short) 2, + DEFAULT_BLOCK_SIZE, null, favoredNodes); + byte[] fileData = StripedFileTestUtil.generateBytes(DEFAULT_BLOCK_SIZE * 2); + out.write(fileData); + out.close(); + + // Mock FsDatasetSpi#getPinning to show that the block is pinned. + LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file, 0); + Assert.assertEquals("Wrong block count", 2, + locatedBlocks.locatedBlockCount()); + LocatedBlock lb = locatedBlocks.get(0); + DatanodeInfo datanodeInfo = lb.getLocations()[0]; + for (DataNode dn : cluster.getDataNodes()) { + if (dn.getDatanodeId().getDatanodeUuid() + .equals(datanodeInfo.getDatanodeUuid())) { + LOG.info("Simulate block pinning in datanode {}", datanodeInfo); + DataNodeTestUtils.mockDatanodeBlkPinning(dn, true); + break; + } + } + } + + private void startAdditionalDNs(final Configuration conf, + int newNodesRequired, StorageType[][] newTypes, + final MiniDFSCluster cluster) throws IOException { + + cluster.startDataNodes(conf, newNodesRequired, newTypes, true, null, null, + null, null, null, false, false, false, null); + cluster.triggerHeartbeats(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
