This is an automated email from the ASF dual-hosted git repository.
jing9 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7743d40 HDFS-15549. Use Hardlink to move replica between DISK and
ARCHIVE storage if on same filesystem mount (#2583). Contributed by Leon Gao.
7743d40 is described below
commit 7743d40ac5b6fba73204feba22d2256d4e9d70f0
Author: LeonGao <[email protected]>
AuthorDate: Fri Jan 15 16:28:11 2021 -0800
HDFS-15549. Use Hardlink to move replica between DISK and ARCHIVE storage
if on same filesystem mount (#2583). Contributed by Leon Gao.
---
.../main/java/org/apache/hadoop/fs/HardLink.java | 4 +-
.../java/org/apache/hadoop/fs/StorageType.java | 5 +
.../hdfs/server/datanode/DirectoryScanner.java | 3 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 136 ++++++++-
.../datanode/fsdataset/impl/FsVolumeImpl.java | 23 +-
.../datanode/fsdataset/impl/FsVolumeList.java | 29 +-
.../datanode/fsdataset/impl/TestFsDatasetImpl.java | 317 ++++++++++++++++++++-
.../apache/hadoop/hdfs/server/mover/TestMover.java | 6 +
8 files changed, 502 insertions(+), 21 deletions(-)
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java
index 30f793d..887ae0c 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java
@@ -153,11 +153,11 @@ public class HardLink {
*/
/**
- * Creates a hardlink
+ * Creates a hardlink.
* @param file - existing source file
* @param linkName - desired target link file
*/
- public static void createHardLink(File file, File linkName)
+ public static void createHardLink(File file, File linkName)
throws IOException {
if (file == null) {
throw new IOException(
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java
index e11c129..b17864a 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java
@@ -92,6 +92,11 @@ public enum StorageType {
return StorageType.valueOf(StringUtils.toUpperCase(s));
}
+ public static boolean allowSameDiskTiering(StorageType storageType) {
+ return storageType == StorageType.DISK
+ || storageType == StorageType.ARCHIVE;
+ }
+
private static List<StorageType> getNonTransientTypes() {
List<StorageType> nonTransientTypes = new ArrayList<>();
for (StorageType t : VALUES) {
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index d835108..66cfa01 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -322,7 +322,8 @@ public class DirectoryScanner implements Runnable {
* Start the scanner. The scanner will run every
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds.
*/
- void start() {
+ @VisibleForTesting
+ public void start() {
shouldRun.set(true);
long firstScanTime = ThreadLocalRandom.current().nextLong(scanPeriodMsecs);
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index f5bfd92..c3dbf48 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -48,6 +48,7 @@ import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
+import org.apache.hadoop.fs.HardLink;
import
org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -994,6 +995,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
smallBufferSize, conf);
}
+ /**
+ * Link the block and meta files for the given block to the given
destination.
+ * @return the new meta and block files.
+ * @throws IOException
+ */
+ static File[] hardLinkBlockFiles(long blockId, long genStamp,
+ ReplicaInfo srcReplica, File destRoot) throws IOException {
+ final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
+ // blockName is same as the filename for the block
+ final File dstFile = new File(destDir, srcReplica.getBlockName());
+ final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
+ return hardLinkBlockFiles(srcReplica, dstMeta, dstFile);
+ }
+
static File[] copyBlockFiles(ReplicaInfo srcReplica, File dstMeta,
File dstFile, boolean calculateChecksum,
int smallBufferSize, final Configuration conf)
@@ -1026,6 +1041,34 @@ class FsDatasetImpl implements
FsDatasetSpi<FsVolumeImpl> {
return new File[] {dstMeta, dstFile};
}
+ static File[] hardLinkBlockFiles(ReplicaInfo srcReplica, File dstMeta,
+ File dstFile)
+ throws IOException {
+ // Create parent folder if not exists.
+ srcReplica.getFileIoProvider()
+ .mkdirs(srcReplica.getVolume(), dstFile.getParentFile());
+ try {
+ HardLink.createHardLink(
+ new File(srcReplica.getBlockURI()), dstFile);
+ } catch (IOException e) {
+ throw new IOException("Failed to hardLink "
+ + srcReplica + " block file to "
+ + dstFile, e);
+ }
+ try {
+ HardLink.createHardLink(
+ new File(srcReplica.getMetadataURI()), dstMeta);
+ } catch (IOException e) {
+ throw new IOException("Failed to hardLink "
+ + srcReplica + " metadata to "
+ + dstMeta, e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Linked " + srcReplica.getBlockURI() + " to " + dstFile);
+ }
+ return new File[]{dstMeta, dstFile};
+ }
+
/**
* Move block files from one storage to another storage.
* @return Returns the Old replicaInfo
@@ -1058,12 +1101,30 @@ class FsDatasetImpl implements
FsDatasetSpi<FsVolumeImpl> {
}
FsVolumeReference volumeRef = null;
+ boolean shouldConsiderSameMountVolume =
+ shouldConsiderSameMountVolume(replicaInfo.getVolume(),
+ targetStorageType, targetStorageId);
+ boolean useVolumeOnSameMount = false;
+
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
- volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
- block.getNumBytes());
+ if (shouldConsiderSameMountVolume) {
+ volumeRef = volumes.getVolumeByMount(targetStorageType,
+ ((FsVolumeImpl) replicaInfo.getVolume()).getMount(),
+ block.getNumBytes());
+ if (volumeRef != null) {
+ useVolumeOnSameMount = true;
+ }
+ }
+ if (!useVolumeOnSameMount) {
+ volumeRef = volumes.getNextVolume(
+ targetStorageType,
+ targetStorageId,
+ block.getNumBytes()
+ );
+ }
}
try {
- moveBlock(block, replicaInfo, volumeRef);
+ moveBlock(block, replicaInfo, volumeRef, useVolumeOnSameMount);
} finally {
if (volumeRef != null) {
volumeRef.close();
@@ -1075,19 +1136,53 @@ class FsDatasetImpl implements
FsDatasetSpi<FsVolumeImpl> {
}
/**
+ * When configuring DISK/ARCHIVE on same volume,
+ * check if we should find the counterpart on the same disk mount.
+ */
+ @VisibleForTesting
+ boolean shouldConsiderSameMountVolume(FsVolumeSpi fsVolume,
+ StorageType targetStorageType, String targetStorageID) {
+ if (targetStorageID != null && !targetStorageID.isEmpty()) {
+ return false;
+ }
+ if (!(fsVolume instanceof FsVolumeImpl)
+ || ((FsVolumeImpl) fsVolume).getMount().isEmpty()) {
+ return false;
+ }
+ StorageType sourceStorageType = fsVolume.getStorageType();
+ // Source/dest storage types are different
+ if (sourceStorageType == targetStorageType) {
+ return false;
+ }
+ // Source/dest storage types are either DISK or ARCHIVE.
+ return StorageType.allowSameDiskTiering(sourceStorageType)
+ && StorageType.allowSameDiskTiering(targetStorageType);
+ }
+
+ /**
* Moves a block from a given volume to another.
*
* @param block - Extended Block
* @param replicaInfo - ReplicaInfo
* @param volumeRef - Volume Ref - Closed by caller.
+ * @param moveBlockToLocalMount - Whether we use shortcut
+ * to move block on same mount.
* @return newReplicaInfo
* @throws IOException
*/
@VisibleForTesting
ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo,
- FsVolumeReference volumeRef) throws IOException {
- ReplicaInfo newReplicaInfo = copyReplicaToVolume(block, replicaInfo,
- volumeRef);
+ FsVolumeReference volumeRef, boolean moveBlockToLocalMount)
+ throws IOException {
+ ReplicaInfo newReplicaInfo;
+ if (moveBlockToLocalMount) {
+ newReplicaInfo = moveReplicaToVolumeOnSameMount(block, replicaInfo,
+ volumeRef);
+ } else {
+ newReplicaInfo = copyReplicaToVolume(block, replicaInfo,
+ volumeRef);
+ }
+
finalizeNewReplica(newReplicaInfo, block);
removeOldReplica(replicaInfo, newReplicaInfo, block.getBlockPoolId());
return newReplicaInfo;
@@ -1129,6 +1224,33 @@ class FsDatasetImpl implements
FsDatasetSpi<FsVolumeImpl> {
}
/**
+ * Shortcut to use hardlink to move blocks on same mount.
+ * This is useful when moving blocks between storage types on same disk
mount.
+ * Two cases need to be considered carefully:
+ * 1) Datanode restart in the middle should not cause data loss.
+ * We use hardlink to avoid this.
+ * 2) Finalized blocks can be reopened to append.
+ * This is already handled by dataset lock and gen stamp.
+ * See HDFS-12942
+ *
+ * @param block - Extended Block
+ * @param replicaInfo - ReplicaInfo
+ * @param volumeRef - Volume Ref - Closed by caller.
+ * @return newReplicaInfo new replica object created in specified volume.
+ * @throws IOException
+ */
+ @VisibleForTesting
+ ReplicaInfo moveReplicaToVolumeOnSameMount(ExtendedBlock block,
+ ReplicaInfo replicaInfo,
+ FsVolumeReference volumeRef) throws IOException {
+ FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
+ // Move files to temp dir first
+ ReplicaInfo newReplicaInfo = targetVolume.hardLinkBlockToTmpLocation(block,
+ replicaInfo);
+ return newReplicaInfo;
+ }
+
+ /**
* Finalizes newReplica by calling finalizeReplica internally.
*
* @param newReplicaInfo - ReplicaInfo
@@ -1177,7 +1299,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
}
try {
- moveBlock(block, replicaInfo, volumeRef);
+ moveBlock(block, replicaInfo, volumeRef, false);
} finally {
if (volumeRef != null) {
volumeRef.close();
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index ccb76b1..07e14fb 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -484,9 +484,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
// should share the same amount of reserved capacity.
// When calculating actual non dfs used,
// exclude DFS used capacity by another volume.
- if (enableSameDiskTiering &&
- (storageType == StorageType.DISK
- || storageType == StorageType.ARCHIVE)) {
+ if (enableSameDiskTiering
+ && StorageType.allowSameDiskTiering(storageType)) {
StorageType counterpartStorageType = storageType == StorageType.DISK
? StorageType.ARCHIVE : StorageType.DISK;
FsVolumeReference counterpartRef = dataset
@@ -1529,6 +1528,24 @@ public class FsVolumeImpl implements FsVolumeSpi {
return newReplicaInfo;
}
+ public ReplicaInfo hardLinkBlockToTmpLocation(ExtendedBlock block,
+ ReplicaInfo replicaInfo) throws IOException {
+
+ File[] blockFiles = FsDatasetImpl.hardLinkBlockFiles(block.getBlockId(),
+ block.getGenerationStamp(), replicaInfo,
+ getTmpDir(block.getBlockPoolId()));
+
+ ReplicaInfo newReplicaInfo = new ReplicaBuilder(ReplicaState.TEMPORARY)
+ .setBlockId(replicaInfo.getBlockId())
+ .setGenerationStamp(replicaInfo.getGenerationStamp())
+ .setFsVolume(this)
+ .setDirectoryToUse(blockFiles[0].getParentFile())
+ .setBytesToReserve(0)
+ .build();
+ newReplicaInfo.setNumBytes(blockFiles[1].length());
+ return newReplicaInfo;
+ }
+
public File[] copyBlockToLazyPersistLocation(String bpId, long blockId,
long genStamp,
ReplicaInfo replicaInfo,
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index 2d6593d..38cf399 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -111,6 +111,30 @@ class FsVolumeList {
}
}
+ /**
+ * Get volume by disk mount to place a block.
+ * This is useful for same disk tiering.
+ *
+ * @param storageType The desired {@link StorageType}
+ * @param mount Disk mount of the volume
+ * @param blockSize Free space needed on the volume
+ * @return
+ * @throws IOException
+ */
+ FsVolumeReference getVolumeByMount(StorageType storageType,
+ String mount, long blockSize) throws IOException {
+ if (!enableSameDiskTiering) {
+ return null;
+ }
+ FsVolumeReference volume = mountVolumeMap
+ .getVolumeRefByMountAndStorageType(mount, storageType);
+ // Check if volume has enough capacity
+ if (volume != null && volume.getVolume().getAvailable() > blockSize) {
+ return volume;
+ }
+ return null;
+ }
+
/**
* Get next volume.
*
@@ -354,9 +378,8 @@ class FsVolumeList {
* Check if same disk tiering is applied to the volume.
*/
private boolean isSameDiskTieringApplied(FsVolumeImpl target) {
- return enableSameDiskTiering &&
- (target.getStorageType() == StorageType.DISK
- || target.getStorageType() == StorageType.ARCHIVE);
+ return enableSameDiskTiering
+ && StorageType.allowSameDiskTiering(target.getStorageType());
}
/**
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 33a6c4f..80437ee 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -17,9 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.function.Supplier;
+
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import java.io.OutputStream;
@@ -68,6 +71,7 @@ import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.FakeTimer;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
@@ -1070,24 +1074,43 @@ public class TestFsDatasetImpl {
}
}
+ /**
+ * When moving blocks using hardLink or copy
+ * and append happened in the middle,
+ * block movement should fail and hardlink is removed.
+ */
@Test(timeout = 30000)
public void testMoveBlockFailure() {
+ // Test copy
+ testMoveBlockFailure(conf);
+ // Test hardlink
+ conf.setBoolean(DFSConfigKeys
+ .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
+ conf.setDouble(DFSConfigKeys
+ .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5);
+ testMoveBlockFailure(conf);
+ }
+
+ private void testMoveBlockFailure(Configuration config) {
MiniDFSCluster cluster = null;
try {
+
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
- .storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
+ .storageTypes(
+ new StorageType[]{StorageType.DISK, StorageType.ARCHIVE})
.storagesPerDatanode(2)
.build();
FileSystem fs = cluster.getFileSystem();
DataNode dataNode = cluster.getDataNodes().get(0);
Path filePath = new Path("testData");
- DFSTestUtil.createFile(fs, filePath, 100, (short) 1, 0);
- ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
+ long fileLen = 100;
+ ExtendedBlock block = createTestFile(fs, fileLen, filePath);
FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
- ReplicaInfo newReplicaInfo = createNewReplicaObj(block, fsDataSetImpl);
+ ReplicaInfo newReplicaInfo =
+ createNewReplicaObjWithLink(block, fsDataSetImpl);
// Append to file to update its GS
FSDataOutputStream out = fs.append(filePath, (short) 1);
@@ -1095,6 +1118,7 @@ public class TestFsDatasetImpl {
out.hflush();
// Call finalizeNewReplica
+ assertTrue(newReplicaInfo.blockDataExists());
LOG.info("GenerationStamp of old replica: {}",
block.getGenerationStamp());
LOG.info("GenerationStamp of new replica: {}", fsDataSetImpl
@@ -1103,6 +1127,9 @@ public class TestFsDatasetImpl {
LambdaTestUtils.intercept(IOException.class, "Generation Stamp "
+ "should be monotonically increased.",
() -> fsDataSetImpl.finalizeNewReplica(newReplicaInfo, block));
+ assertFalse(newReplicaInfo.blockDataExists());
+
+ validateFileLen(fs, fileLen, filePath);
} catch (Exception ex) {
LOG.info("Exception in testMoveBlockFailure ", ex);
fail("Exception while testing testMoveBlockFailure ");
@@ -1144,6 +1171,253 @@ public class TestFsDatasetImpl {
}
/**
+ * Make sure datanode restart can clean up un-finalized links,
+ * if the block is not finalized yet.
+ */
+ @Test(timeout = 30000)
+ public void testDnRestartWithHardLinkInTmp() {
+ MiniDFSCluster cluster = null;
+ try {
+ conf.setBoolean(DFSConfigKeys
+ .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
+ conf.setDouble(DFSConfigKeys
+ .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5);
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1)
+ .storageTypes(
+ new StorageType[]{StorageType.DISK, StorageType.ARCHIVE})
+ .storagesPerDatanode(2)
+ .build();
+ FileSystem fs = cluster.getFileSystem();
+ DataNode dataNode = cluster.getDataNodes().get(0);
+
+ Path filePath = new Path("testData");
+ long fileLen = 100;
+
+ ExtendedBlock block = createTestFile(fs, fileLen, filePath);
+
+ FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
+
+ ReplicaInfo oldReplicaInfo = fsDataSetImpl.getReplicaInfo(block);
+ ReplicaInfo newReplicaInfo =
+ createNewReplicaObjWithLink(block, fsDataSetImpl);
+
+ // Link exists
+ assertTrue(Files.exists(Paths.get(newReplicaInfo.getBlockURI())));
+
+ cluster.restartDataNode(0);
+ cluster.waitDatanodeFullyStarted(cluster.getDataNodes().get(0), 60000);
+ cluster.triggerBlockReports();
+
+ // Un-finalized replica data (hard link) is deleted as they were in /tmp
+ assertFalse(Files.exists(Paths.get(newReplicaInfo.getBlockURI())));
+
+ // Old block is there.
+ assertTrue(Files.exists(Paths.get(oldReplicaInfo.getBlockURI())));
+
+ validateFileLen(fs, fileLen, filePath);
+
+ } catch (Exception ex) {
+ LOG.info("Exception in testDnRestartWithHardLinkInTmp ", ex);
+ fail("Exception while testing testDnRestartWithHardLinkInTmp ");
+ } finally {
+ if (cluster.isClusterUp()) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * If new block is finalized and DN restarted,
+ * DiskScanner should clean up the hardlink correctly.
+ */
+ @Test(timeout = 30000)
+ public void testDnRestartWithHardLink() {
+ MiniDFSCluster cluster = null;
+ try {
+ conf.setBoolean(DFSConfigKeys
+ .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
+ conf.setDouble(DFSConfigKeys
+ .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5);
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1)
+ .storageTypes(
+ new StorageType[]{StorageType.DISK, StorageType.ARCHIVE})
+ .storagesPerDatanode(2)
+ .build();
+ FileSystem fs = cluster.getFileSystem();
+ DataNode dataNode = cluster.getDataNodes().get(0);
+
+ Path filePath = new Path("testData");
+ long fileLen = 100;
+
+ ExtendedBlock block = createTestFile(fs, fileLen, filePath);
+
+ FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
+
+ final ReplicaInfo oldReplicaInfo = fsDataSetImpl.getReplicaInfo(block);
+
+ fsDataSetImpl.finalizeNewReplica(
+ createNewReplicaObjWithLink(block, fsDataSetImpl), block);
+
+ ReplicaInfo newReplicaInfo = fsDataSetImpl.getReplicaInfo(block);
+
+ cluster.restartDataNode(0);
+ cluster.waitDatanodeFullyStarted(cluster.getDataNodes().get(0), 60000);
+ cluster.triggerBlockReports();
+
+ assertTrue(Files.exists(Paths.get(newReplicaInfo.getBlockURI())));
+ assertTrue(Files.exists(Paths.get(oldReplicaInfo.getBlockURI())));
+
+ DirectoryScanner scanner = new DirectoryScanner(
+ cluster.getDataNodes().get(0).getFSDataset(), conf);
+ scanner.start();
+ scanner.run();
+
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override public Boolean get() {
+ return !Files.exists(Paths.get(oldReplicaInfo.getBlockURI()));
+ }
+ }, 100, 10000);
+ assertTrue(Files.exists(Paths.get(newReplicaInfo.getBlockURI())));
+
+ validateFileLen(fs, fileLen, filePath);
+
+ } catch (Exception ex) {
+ LOG.info("Exception in testDnRestartWithHardLink ", ex);
+ fail("Exception while testing testDnRestartWithHardLink ");
+ } finally {
+ if (cluster.isClusterUp()) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ @Test(timeout = 30000)
+ public void testMoveBlockSuccessWithSameMountMove() {
+ MiniDFSCluster cluster = null;
+ try {
+ conf.setBoolean(DFSConfigKeys
+ .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
+ conf.setDouble(DFSConfigKeys
+ .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5);
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1)
+ .storageTypes(
+ new StorageType[]{StorageType.DISK, StorageType.ARCHIVE})
+ .storagesPerDatanode(2)
+ .build();
+ FileSystem fs = cluster.getFileSystem();
+ DataNode dataNode = cluster.getDataNodes().get(0);
+ Path filePath = new Path("testData");
+ long fileLen = 100;
+
+ ExtendedBlock block = createTestFile(fs, fileLen, filePath);
+
+ FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
+ assertEquals(StorageType.DISK,
+ fsDataSetImpl.getReplicaInfo(block).getVolume().getStorageType());
+
+ FsDatasetImpl fsDataSetImplSpy =
+ spy((FsDatasetImpl) dataNode.getFSDataset());
+ fsDataSetImplSpy.moveBlockAcrossStorage(
+ block, StorageType.ARCHIVE, null);
+
+ // Make sure it is done thru hardlink
+ verify(fsDataSetImplSpy).moveBlock(any(), any(), any(), eq(true));
+
+ assertEquals(StorageType.ARCHIVE,
+ fsDataSetImpl.getReplicaInfo(block).getVolume().getStorageType());
+ validateFileLen(fs, fileLen, filePath);
+
+ } catch (Exception ex) {
+ LOG.info("Exception in testMoveBlockSuccessWithSameMountMove ", ex);
+ fail("testMoveBlockSuccessWithSameMountMove operation should succeed");
+ } finally {
+ if (cluster.isClusterUp()) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ // Move should fail if the volume on same mount has no space.
+ @Test(timeout = 30000)
+ public void testMoveBlockWithSameMountMoveWithoutSpace() {
+ MiniDFSCluster cluster = null;
+ try {
+ conf.setBoolean(DFSConfigKeys
+ .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
+ conf.setDouble(DFSConfigKeys
+ .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.0);
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1)
+ .storageTypes(
+ new StorageType[]{StorageType.DISK, StorageType.ARCHIVE})
+ .storagesPerDatanode(2)
+ .build();
+ FileSystem fs = cluster.getFileSystem();
+ DataNode dataNode = cluster.getDataNodes().get(0);
+ Path filePath = new Path("testData");
+ long fileLen = 100;
+
+ ExtendedBlock block = createTestFile(fs, fileLen, filePath);
+
+ FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
+ assertEquals(StorageType.DISK,
+ fsDataSetImpl.getReplicaInfo(block).getVolume().getStorageType());
+
+ FsDatasetImpl fsDataSetImplSpy =
+ spy((FsDatasetImpl) dataNode.getFSDataset());
+ fsDataSetImplSpy.moveBlockAcrossStorage(
+ block, StorageType.ARCHIVE, null);
+
+ fail("testMoveBlockWithSameMountMoveWithoutSpace operation" +
+ " should failed");
+ } catch (Exception ex) {
+ assertTrue(ex instanceof DiskChecker.DiskOutOfSpaceException);
+ } finally {
+ if (cluster.isClusterUp()) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ // More tests on shouldConsiderSameMountVolume.
+ @Test(timeout = 10000)
+ public void testShouldConsiderSameMountVolume() throws IOException {
+ FsVolumeImpl volume = new FsVolumeImplBuilder()
+ .setConf(conf)
+ .setDataset(dataset)
+ .setStorageID("storage-id")
+ .setStorageDirectory(
+ new StorageDirectory(StorageLocation.parse(BASE_DIR)))
+ .build();
+ assertFalse(dataset.shouldConsiderSameMountVolume(volume,
+ StorageType.ARCHIVE, null));
+
+ conf.setBoolean(DFSConfigKeys
+ .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
+ conf.setDouble(DFSConfigKeys
+ .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE,
+ 0.5);
+ volume = new FsVolumeImplBuilder()
+ .setConf(conf)
+ .setDataset(dataset)
+ .setStorageID("storage-id")
+ .setStorageDirectory(
+ new StorageDirectory(StorageLocation.parse(BASE_DIR)))
+ .build();
+ assertTrue(dataset.shouldConsiderSameMountVolume(volume,
+ StorageType.ARCHIVE, null));
+ assertTrue(dataset.shouldConsiderSameMountVolume(volume,
+ StorageType.ARCHIVE, ""));
+ assertFalse(dataset.shouldConsiderSameMountVolume(volume,
+ StorageType.DISK, null));
+ assertFalse(dataset.shouldConsiderSameMountVolume(volume,
+ StorageType.ARCHIVE, "target"));
+ }
+
+ /**
* Create a new temporary replica of replicaInfo object in another volume.
*
* @param block - Extended Block
@@ -1159,6 +1433,38 @@ public class TestFsDatasetImpl {
}
/**
+ * Create a new temporary replica of replicaInfo object in another volume.
+ *
+ * @param block - Extended Block
+ * @param fsDataSetImpl - FsDatasetImpl reference
+ * @throws IOException
+ */
+ private ReplicaInfo createNewReplicaObjWithLink(ExtendedBlock block,
+ FsDatasetImpl fsDataSetImpl) throws IOException {
+ ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block);
+ FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl);
+ return fsDataSetImpl.moveReplicaToVolumeOnSameMount(block, replicaInfo,
+ destVolume.obtainReference());
+ }
+
+ private ExtendedBlock createTestFile(FileSystem fs,
+ long fileLen, Path filePath) throws IOException {
+ DFSTestUtil.createFile(fs, filePath, fileLen, (short) 1, 0);
+ return DFSTestUtil.getFirstBlock(fs, filePath);
+ }
+
+ private void validateFileLen(FileSystem fs,
+ long fileLen, Path filePath) throws IOException {
+ // Read data file to make sure it is good.
+ InputStream in = fs.open(filePath);
+ int bytesCount = 0;
+ while (in.read() != -1) {
+ bytesCount++;
+ }
+ assertTrue(fileLen <= bytesCount);
+ }
+
+ /**
* Finds a new destination volume for block.
*
* @param block - Extended Block
@@ -1225,7 +1531,8 @@ public class TestFsDatasetImpl {
ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block);
FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl);
assertNotNull("Destination volume should not be null.", destVolume);
- fsDataSetImpl.moveBlock(block, replicaInfo,
destVolume.obtainReference());
+ fsDataSetImpl.moveBlock(block, replicaInfo,
+ destVolume.obtainReference(), false);
// Trigger block report to update block info in NN
cluster.triggerBlockReports();
blkReader.read(buf, 512, 512);
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index 5393b90..481c7cf 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -446,6 +446,12 @@ public class TestMover {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
testWithinSameNode(conf);
+ // Test movement with hardlink, when same disk tiering is enabled.
+ conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
+ conf.setDouble(DFSConfigKeys
+ .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5);
+ testWithinSameNode(conf);
+ conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, false);
}
private void checkMovePaths(List<Path> actual, Path... expected) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]