Repository: hadoop Updated Branches: refs/heads/HDFS-12996 8d4aa014d -> e37cde0a6
HDFS-13163. Move invalidated blocks to replica-trash with disk layout based on timestamp. Contributed by Bharat Viswanadham. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e37cde0a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e37cde0a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e37cde0a Branch: refs/heads/HDFS-12996 Commit: e37cde0a6a755b428dbc30eae07f53914fbd9b45 Parents: 8d4aa01 Author: Arpit Agarwal <[email protected]> Authored: Thu Mar 15 15:02:03 2018 -0700 Committer: Arpit Agarwal <[email protected]> Committed: Thu Mar 15 15:02:03 2018 -0700 ---------------------------------------------------------------------- .../impl/FsDatasetAsyncDiskService.java | 86 +++++++++- .../datanode/fsdataset/impl/FsDatasetImpl.java | 3 +- .../datanode/TestDatanodeReplicaTrash.java | 166 +++++++++++++++++++ 3 files changed, 250 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e37cde0a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index 9174cb0..52f8265 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -20,6 +20,10 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.File; import java.io.IOException; +import java.net.URI; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -29,11 +33,16 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; @@ -42,6 +51,8 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIOException; +import static org.apache.hadoop.hdfs.server.common.Storage.STORAGE_DIR_CURRENT; + /** * This class is a container of multiple thread pools, each for a volume, * so that we can schedule async disk operations easily. @@ -65,7 +76,17 @@ class FsDatasetAsyncDiskService { // ThreadPool maximum pool size private static final int MAXIMUM_THREADS_PER_VOLUME = 4; // ThreadPool keep-alive time for threads over core pool size - private static final long THREADS_KEEP_ALIVE_SECONDS = 60; + private static final long THREADS_KEEP_ALIVE_SECONDS = 60; + + private static final String BLOCK_POOL_ID_PATTERN_BASE = + Pattern.quote(File.separator) + + "BP-\\d+-\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}-\\d+" + + Pattern.quote(File.separator); + + private static final Pattern BLOCK_POOL_CURRENT_PATH_PATTERN = + Pattern.compile("^(.*)(" + BLOCK_POOL_ID_PATTERN_BASE + ")(" + + STORAGE_DIR_CURRENT + ")(.*)$"); + private final DataNode datanode; private final FsDatasetImpl fsdatasetImpl; @@ -76,6 +97,8 @@ class FsDatasetAsyncDiskService { = new HashMap<String, Set<Long>>(); private static final int MAX_DELETED_BLOCKS = 64; private int numDeletedBlocks = 0; + private final Configuration conf; + private final boolean replicaTrashEnabled; /** * Create a AsyncDiskServices with a set of volumes (specified by their @@ -84,10 +107,15 @@ class FsDatasetAsyncDiskService { * The AsyncDiskServices uses one ThreadPool per volume to do the async * disk operations. */ - FsDatasetAsyncDiskService(DataNode datanode, FsDatasetImpl fsdatasetImpl) { + FsDatasetAsyncDiskService(DataNode datanode, FsDatasetImpl fsdatasetImpl, + Configuration configuration) { this.datanode = datanode; this.fsdatasetImpl = fsdatasetImpl; this.threadGroup = new ThreadGroup(getClass().getSimpleName()); + this.conf = configuration; + this.replicaTrashEnabled = conf.getBoolean(DFSConfigKeys + .DFS_DATANODE_ENABLE_REPLICA_TRASH_KEY, DFSConfigKeys + .DFS_DATANODE_ENABLE_REPLICA_TRASH_DEFAULT); } private void addExecutorForVolume(final FsVolumeImpl volume) { @@ -277,6 +305,49 @@ class FsDatasetAsyncDiskService { (replicaToDelete.deleteMetadata() || !replicaToDelete.metadataExists()); } + private boolean moveFilesToReplicaTrash() { + + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-HH"); + Date date = new Date(); + + URI blockURI = replicaToDelete.getBlockURI(); + + String replicaTrashBaseDir; + File blockFile = new File(blockURI); + Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile + .getParent()); + replicaTrashBaseDir = matcher.replaceFirst("$1$2" + DataStorage + .STORAGE_DIR_REPLICA_TRASH); + + File replicaTrashDir = new File(replicaTrashBaseDir + File + .separator + dateFormat.format(date)); + + try { + volume.getFileIoProvider().mkdirsWithExistsCheck( + volume, replicaTrashDir); + } catch (IOException e) { + return false; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Moving files " + replicaToDelete.getBlockURI() + " and " + + replicaToDelete.getMetadataURI() + " to replica-trash."); + } + + final String blockName = replicaToDelete.getBlockName(); + final long genstamp = replicaToDelete.getGenerationStamp(); + File newBlockFile = new File(replicaTrashDir, blockName); + File newMetaFile = new File(replicaTrashDir, + DatanodeUtil.getMetaName(blockName, genstamp)); + try { + return (replicaToDelete.renameData(newBlockFile.toURI()) && + replicaToDelete.renameMeta(newMetaFile.toURI())); + } catch (IOException e) { + LOG.error("Error moving files to trash: " + replicaToDelete, e); + } + return false; + } + private boolean moveFiles() { if (trashDirectory == null) { LOG.error("Trash dir for replica " + replicaToDelete + " is null"); @@ -316,11 +387,18 @@ class FsDatasetAsyncDiskService { final long metaLength = replicaToDelete.getMetadataLength(); boolean result; - result = (trashDirectory == null) ? deleteFiles() : moveFiles(); + if (trashDirectory != null) { + result = moveFiles(); + } else if (replicaTrashEnabled) { + result = moveFilesToReplicaTrash(); + } else { + result = deleteFiles(); + } if (!result) { LOG.warn("Unexpected error trying to " - + (trashDirectory == null ? "delete" : "move") + + ((trashDirectory == null || !replicaTrashEnabled) ? + "delete" : "move") + " block " + block.getBlockPoolId() + " " + block.getLocalBlock() + " at file " + replicaToDelete.getBlockURI() + ". Ignored."); } else { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e37cde0a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index c141293..850a04f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -308,7 +308,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { VolumeChoosingPolicy.class), conf); volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(), blockChooserImpl); - asyncDiskService = new FsDatasetAsyncDiskService(datanode, this); + asyncDiskService = new FsDatasetAsyncDiskService(datanode, this, + new Configuration(conf)); asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode, conf); deletingBlock = new HashMap<String, Set<Long>>(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e37cde0a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeReplicaTrash.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeReplicaTrash.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeReplicaTrash.java new file mode 100644 index 0000000..74b091c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeReplicaTrash.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collection; +import java.util.List; +import java.util.Random; + +/** + * Test Datanode Replica Trash with enable and disable. + */ +public class TestDatanodeReplicaTrash { + private final static Logger LOG = LoggerFactory.getLogger( + TestDatanodeReplicaTrash.class); + private final Configuration conf = new Configuration(); + private static final Random RANDOM = new Random(); + private static final String FILE_NAME = "/tmp.txt"; + private static final int DEFAULT_BLOCK_SIZE = 512; + + @Test + public void testDeleteWithReplicaTrashEnable() throws Exception { + + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); + conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ENABLE_REPLICA_TRASH_KEY, + true); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 2); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).storagesPerDatanode(1).build(); + try { + cluster.waitActive(); + DistributedFileSystem dfs = cluster.getFileSystem(); + final ClientProtocol client = cluster.getNameNode().getRpcServer(); + final Path f = new Path(FILE_NAME); + int len = 1024; + DFSTestUtil.createFile(dfs, f, len, (short) 1, RANDOM.nextLong()); + + LocatedBlocks blockLocations = client.getBlockLocations(f.toString(), + 0, 1024); + String bpId = blockLocations.getLocatedBlocks().get(0).getBlock() + .getBlockPoolId(); + + Collection<String> locations = conf.getTrimmedStringCollection( + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); + + String loc; + File replicaTrashDir = null; + + for (String location : locations) { + loc = location.replace("[DISK]file:", ""); + replicaTrashDir = new File(loc + File.separator + Storage + .STORAGE_DIR_CURRENT + File.separator + bpId + File + .separator + DataStorage.STORAGE_DIR_REPLICA_TRASH); + } + + //Before Delete replica-trash dir should be empty + Assert.assertTrue(replicaTrashDir.list().length == 0); + + dfs.delete(f, true); + LOG.info("File is being deleted"); + + + List<DataNode> datanodes = cluster.getDataNodes(); + for (DataNode datanode : datanodes) { + DataNodeTestUtils.triggerHeartbeat(datanode); + } + + final File replicaTrash = replicaTrashDir; + //After delete, replica-trash dir should not be empty + LambdaTestUtils.await(30000, 1000, + () -> { + return replicaTrash.list().length > 0; + }); + } finally { + cluster.shutdown(); + } + + } + + @Test + public void testDeleteWithReplicaTrashDisable() throws Exception { + + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); + conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ENABLE_REPLICA_TRASH_KEY, + false); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 2); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).storagesPerDatanode(1).build(); + try { + cluster.waitActive(); + DistributedFileSystem dfs = cluster.getFileSystem(); + ClientProtocol client = cluster.getNameNode().getRpcServer(); + DataNode dn = cluster.getDataNodes().get(0); + Path f = new Path(FILE_NAME); + int len = 100; + DFSTestUtil.createFile(dfs, f, len, (short) 1, RANDOM.nextLong()); + + LocatedBlocks blockLocations = client.getBlockLocations(f.toString(), + 0, 100); + String bpId = blockLocations.getLocatedBlocks().get(0).getBlock() + .getBlockPoolId(); + + Collection<String> locations = conf.getTrimmedStringCollection( + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); + + String loc; + File replicaTrashDir = null; + + + for (String location : locations) { + loc = location.replace("[DISK]file:", ""); + replicaTrashDir = new File(loc + File.separator + Storage + .STORAGE_DIR_CURRENT + File.separator + bpId + File + .separator + DataStorage.STORAGE_DIR_REPLICA_TRASH); + } + + dfs.delete(f, true); + LOG.info("File is being deleted"); + + List<DataNode> datanodes = cluster.getDataNodes(); + for (DataNode datanode : datanodes) { + DataNodeTestUtils.triggerHeartbeat(datanode); + } + + //replica-trash folder should not be created, as replica trash is not + // enabled + Assert.assertTrue(!replicaTrashDir.exists()); + } finally { + cluster.shutdown(); + } + + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
