Repository: hadoop Updated Branches: refs/heads/HDFS-12996 6c9af6c88 -> ffee1a026
HDFS-13277. Improve move to Replica trash to limit trash sub-dir size. Contributed by Bharat Viswanadham. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ffee1a02 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ffee1a02 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ffee1a02 Branch: refs/heads/HDFS-12996 Commit: ffee1a026caf1423926014a2c5b6cbf86abdedcd Parents: 6c9af6c Author: Hanisha Koneru <[email protected]> Authored: Mon Mar 26 11:08:11 2018 -0700 Committer: Hanisha Koneru <[email protected]> Committed: Mon Mar 26 11:08:11 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 + .../impl/FsDatasetAsyncDiskService.java | 99 +++++++++++++++++++- .../src/main/resources/hdfs-default.xml | 10 ++ .../datanode/TestDatanodeReplicaTrash.java | 72 ++++++++++++++ 4 files changed, 182 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffee1a02/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index a7ee2bf..254fcb6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1082,6 +1082,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.datanode.enable.replica.trash"; public static final boolean DFS_DATANODE_ENABLE_REPLICA_TRASH_DEFAULT = false; + public static final String DFS_DATANODE_REPLICA_TRASH_SUBDIR_MAX_BLOCKS = + "dfs.datanode.replica.trash.subdir.max.blocks"; + // Slow io warning log threshold settings for dfsclient and datanode. public static final String DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY = "dfs.datanode.slow.io.warning.threshold.ms"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffee1a02/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index 52f8265..68bd283 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -99,6 +99,12 @@ class FsDatasetAsyncDiskService { private int numDeletedBlocks = 0; private final Configuration conf; private final boolean replicaTrashEnabled; + private int replicaTrashSubDirMaxBlocks; + + //Holds information about current replica trash directory + private Map<String, ReplicaTrashCurDirInfo> replicaTrashCurDirInfoMap = new + HashMap<>(); + /** * Create a AsyncDiskServices with a set of volumes (specified by their @@ -116,6 +122,12 @@ class FsDatasetAsyncDiskService { this.replicaTrashEnabled = conf.getBoolean(DFSConfigKeys .DFS_DATANODE_ENABLE_REPLICA_TRASH_KEY, DFSConfigKeys .DFS_DATANODE_ENABLE_REPLICA_TRASH_DEFAULT); + int blockInvalidate = conf.getInt(DFSConfigKeys + .DFS_BLOCK_INVALIDATE_LIMIT_KEY, DFSConfigKeys + .DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT); + this.replicaTrashSubDirMaxBlocks = conf.getInt(DFSConfigKeys + .DFS_DATANODE_REPLICA_TRASH_SUBDIR_MAX_BLOCKS, + blockInvalidate); } private void addExecutorForVolume(final FsVolumeImpl volume) { @@ -319,8 +331,37 @@ class FsDatasetAsyncDiskService { replicaTrashBaseDir = matcher.replaceFirst("$1$2" + DataStorage .STORAGE_DIR_REPLICA_TRASH); - File replicaTrashDir = new File(replicaTrashBaseDir + File - .separator + dateFormat.format(date)); + String key = volume.getStorageID() + "-" + block.getBlockPoolId(); + String subDir = dateFormat.format(date); + ReplicaTrashCurDirInfo current = replicaTrashCurDirInfoMap.get(key); + File replicaTrashDir; + synchronized (replicaTrashCurDirInfoMap) { + if (current == null) { + // first time moving blocks in this volume + ReplicaTrashCurDirInfo replicaTrashInfo = new + ReplicaTrashCurDirInfo(); + replicaTrashInfo.setCurReplicaTrashSubDir(subDir); + replicaTrashInfo.setIteration(0); + replicaTrashInfo.setBlocks(0); + replicaTrashCurDirInfoMap.put(key, replicaTrashInfo); + current = replicaTrashInfo; + } else if (current.getCurReplicaTrashSubDir().equals(subDir)) { + if (current.getBlocks() == replicaTrashSubDirMaxBlocks) { + //reached max entries in a sub directory + current.incrementIteration(); + current.setBlocks(0); + } + } else { + //date change, reset to current date + current.setCurReplicaTrashSubDir(subDir); + current.setIteration(0); + current.setBlocks(0); + } + current.incrementBlocks(); + } + replicaTrashDir = new File(replicaTrashBaseDir + File + .separator + current.getCurReplicaTrashSubDir() + "_" + current + .getIteration()); try { volume.getFileIoProvider().mkdirsWithExistsCheck( @@ -414,6 +455,60 @@ class FsDatasetAsyncDiskService { IOUtils.cleanup(null, volumeRef); } } + + /** + * Information about current replica trash directory details. + */ + static class ReplicaTrashCurDirInfo { + private String curReplicaTrashSubDir; + private int blocks; + private int iteration; + + /** + * @return current replica trash sub directory + */ + public String getCurReplicaTrashSubDir() { + return curReplicaTrashSubDir; + } + + /** Sets current replica trash sub directory. */ + public void setCurReplicaTrashSubDir(String currentDir) { + this.curReplicaTrashSubDir = currentDir; + } + + /** + * @return Num of Blocks + */ + public int getBlocks() { + return blocks; + } + + /** Sets number of blocks in the current replica trash sub directory. */ + public void setBlocks(int val) { + this.blocks = val; + } + + /** + * @return current iteration + */ + public int getIteration() { + return iteration; + } + + /** Sets current iteration. */ + public void setIteration(int val) { + this.iteration = val; + } + + /** Increment number of blocks. */ + public void incrementBlocks() { + ++blocks; + } + /** Increnent current iteration. */ + public void incrementIteration() { + ++iteration; + } + } private synchronized void updateDeletedBlockId(ExtendedBlock block) { Set<Long> blockIds = deletedBlockIds.get(block.getBlockPoolId()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffee1a02/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 541d24d..b2b81fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3810,6 +3810,16 @@ </property> <property> + <name>dfs.datanode.replica.trash.subdir.max.blocks</name> + <value>${dfs.block.invalidate.limit}</value> + <description> + Maximum number of blocks per sub directory in replica-trash. This limits + number of blocks under each subdirectory in replica-trash. If this + property is not set, the default value will be dfs.block.invalidate.limit. + </description> +</property> + +<property> <name>dfs.datanode.balance.max.concurrent.moves</name> <value>50</value> <description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffee1a02/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeReplicaTrash.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeReplicaTrash.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeReplicaTrash.java index 74b091c..6d34539 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeReplicaTrash.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeReplicaTrash.java @@ -109,6 +109,78 @@ public class TestDatanodeReplicaTrash { } @Test + public void testDeleteWithTrashSubDirLimit() throws Exception { + + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); + conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ENABLE_REPLICA_TRASH_KEY, + true); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 2); + conf.setInt(DFSConfigKeys + .DFS_DATANODE_REPLICA_TRASH_SUBDIR_MAX_BLOCKS, 2); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).storagesPerDatanode(1).build(); + try { + cluster.waitActive(); + DistributedFileSystem dfs = cluster.getFileSystem(); + final ClientProtocol client = cluster.getNameNode().getRpcServer(); + final Path f = new Path(FILE_NAME); + int len = 10240; + DFSTestUtil.createFile(dfs, f, len, (short) 1, RANDOM.nextLong()); + + LocatedBlocks blockLocations = client.getBlockLocations(f.toString(), + 0, 1024); + String bpId = blockLocations.getLocatedBlocks().get(0).getBlock() + .getBlockPoolId(); + + Collection<String> locations = conf.getTrimmedStringCollection( + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); + + String loc; + File replicaTrashDir = null; + + // Here no of storages per datanode is set to 1 + Assert.assertEquals(locations.size(), 1); + for (String location : locations) { + loc = location.replace("[DISK]file:", ""); + replicaTrashDir = new File(loc + File.separator + Storage + .STORAGE_DIR_CURRENT + File.separator + bpId + File + .separator + DataStorage.STORAGE_DIR_REPLICA_TRASH); + } + + //Before Delete replica-trash dir should be empty + Assert.assertTrue(replicaTrashDir.list().length == 0); + + dfs.delete(f, true); + LOG.info("File is being deleted"); + + + List<DataNode> datanodes = cluster.getDataNodes(); + for (DataNode datanode : datanodes) { + DataNodeTestUtils.triggerHeartbeat(datanode); + } + + final File replicaTrash = replicaTrashDir; + //After delete, replica-trash dir should not be empty + //No of sub directories in replica-trash should be 10, as no of blocks + // for 10KB file is 20(10240/512) + LambdaTestUtils.await(30000, 1000, + () -> { + File[] subDirs = replicaTrash.listFiles( + file -> file.isDirectory()); + if (subDirs != null) { + return subDirs.length == 10; + } else { + return false; + } + }); + } finally { + cluster.shutdown(); + } + } + + + @Test public void testDeleteWithReplicaTrashDisable() throws Exception { conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
