http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java deleted file mode 100644 index ff277ba..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java +++ /dev/null @@ -1,174 +0,0 @@ -package org.apache.hadoop.hdfs.server.sps; -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.IOException; -import java.util.ArrayList; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.protocol.DirectoryListing; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.hdfs.server.namenode.sps.Context; -import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector; -import org.apache.hadoop.hdfs.server.namenode.sps.ItemInfo; -import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class is to scan the paths recursively. If file is directory, then it - * will scan for files recursively. If the file is non directory, then it will - * just submit the same file to process. - */ -@InterfaceAudience.Private -public class ExternalSPSFileIDCollector implements FileIdCollector { - public static final Logger LOG = - LoggerFactory.getLogger(ExternalSPSFileIDCollector.class); - private Context cxt; - private DistributedFileSystem dfs; - private SPSService service; - private int maxQueueLimitToScan; - - public ExternalSPSFileIDCollector(Context cxt, SPSService service) { - this.cxt = cxt; - this.service = service; - this.maxQueueLimitToScan = service.getConf().getInt( - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT); - try { - // TODO: probably we could get this dfs from external context? but this is - // too specific to external. - dfs = getFS(service.getConf()); - } catch (IOException e) { - LOG.error("Unable to get the filesystem. Make sure Namenode running and " - + "configured namenode address is correct.", e); - } - } - - private DistributedFileSystem getFS(Configuration conf) throws IOException { - return (DistributedFileSystem) FileSystem - .get(FileSystem.getDefaultUri(conf), conf); - } - - /** - * Recursively scan the given path and add the file info to SPS service for - * processing. - */ - private long processPath(long startID, String fullPath) { - long pendingWorkCount = 0; // to be satisfied file counter - for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) { - final DirectoryListing children; - try { - children = dfs.getClient().listPaths(fullPath, lastReturnedName, false); - } catch (IOException e) { - LOG.warn("Failed to list directory " + fullPath - + ". Ignore the directory and continue.", e); - return pendingWorkCount; - } - if (children == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("The scanning start dir/sub dir " + fullPath - + " does not have childrens."); - } - return pendingWorkCount; - } - - for (HdfsFileStatus child : children.getPartialListing()) { - if (child.isFile()) { - service.addFileIdToProcess(new ItemInfo(startID, child.getFileId()), - false); - checkProcessingQueuesFree(); - pendingWorkCount++; // increment to be satisfied file count - } else { - String fullPathStr = child.getFullName(fullPath); - if (child.isDirectory()) { - if (!fullPathStr.endsWith(Path.SEPARATOR)) { - fullPathStr = fullPathStr + Path.SEPARATOR; - } - pendingWorkCount += processPath(startID, fullPathStr); - } - } - } - - if (children.hasMore()) { - lastReturnedName = children.getLastName(); - } else { - return pendingWorkCount; - } - } - } - - private void checkProcessingQueuesFree() { - int remainingCapacity = remainingCapacity(); - // wait for queue to be free - while (remainingCapacity <= 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Waiting for storageMovementNeeded queue to be free!"); - } - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - remainingCapacity = remainingCapacity(); - } - } - - /** - * Returns queue remaining capacity. - */ - public int remainingCapacity() { - int size = service.processingQueueSize(); - int remainingSize = 0; - if (size < maxQueueLimitToScan) { - remainingSize = maxQueueLimitToScan - size; - } - if (LOG.isDebugEnabled()) { - LOG.debug("SPS processing Q -> maximum capacity:{}, current size:{}," - + " remaining size:{}", maxQueueLimitToScan, size, remainingSize); - } - return remainingSize; - } - - @Override - public void scanAndCollectFileIds(Long inodeId) throws IOException { - if (dfs == null) { - dfs = getFS(service.getConf()); - } - long pendingSatisfyItemsCount = processPath(inodeId, - cxt.getFilePath(inodeId)); - // Check whether the given path contains any item to be tracked - // or the no to be satisfied paths. In case of empty list, add the given - // inodeId to the 'pendingWorkForDirectory' with empty list so that later - // SPSPathIdProcessor#run function will remove the SPS hint considering that - // this path is already satisfied the storage policy. - if (pendingSatisfyItemsCount <= 0) { - LOG.debug("There is no pending items to satisfy the given path " - + "inodeId:{}", inodeId); - service.addAllFileIdsToProcess(inodeId, new ArrayList<>(), true); - } else { - service.markScanCompletedForPath(inodeId); - } - } - -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java new file mode 100644 index 0000000..9435475 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java @@ -0,0 +1,172 @@ +package org.apache.hadoop.hdfs.server.sps; +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.namenode.sps.FileCollector; +import org.apache.hadoop.hdfs.server.namenode.sps.ItemInfo; +import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is to scan the paths recursively. If file is directory, then it + * will scan for files recursively. If the file is non directory, then it will + * just submit the same file to process. This will use file string path + * representation. + */ +@InterfaceAudience.Private +public class ExternalSPSFilePathCollector implements FileCollector <String>{ + public static final Logger LOG = + LoggerFactory.getLogger(ExternalSPSFilePathCollector.class); + private DistributedFileSystem dfs; + private SPSService<String> service; + private int maxQueueLimitToScan; + + public ExternalSPSFilePathCollector(SPSService<String> service) { + this.service = service; + this.maxQueueLimitToScan = service.getConf().getInt( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT); + try { + // TODO: probably we could get this dfs from external context? but this is + // too specific to external. + dfs = getFS(service.getConf()); + } catch (IOException e) { + LOG.error("Unable to get the filesystem. Make sure Namenode running and " + + "configured namenode address is correct.", e); + } + } + + private DistributedFileSystem getFS(Configuration conf) throws IOException { + return (DistributedFileSystem) FileSystem + .get(FileSystem.getDefaultUri(conf), conf); + } + + /** + * Recursively scan the given path and add the file info to SPS service for + * processing. + */ + private long processPath(String startID, String childPath) { + long pendingWorkCount = 0; // to be satisfied file counter + for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) { + final DirectoryListing children; + try { + children = dfs.getClient().listPaths(childPath, lastReturnedName, + false); + } catch (IOException e) { + LOG.warn("Failed to list directory " + childPath + + ". Ignore the directory and continue.", e); + return pendingWorkCount; + } + if (children == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("The scanning start dir/sub dir " + childPath + + " does not have childrens."); + } + return pendingWorkCount; + } + + for (HdfsFileStatus child : children.getPartialListing()) { + String childFullPath = child.getFullName(childPath); + if (child.isFile()) { + service.addFileToProcess( + new ItemInfo<String>(startID, childFullPath), false); + checkProcessingQueuesFree(); + pendingWorkCount++; // increment to be satisfied file count + } else { + if (child.isDirectory()) { + if (!childFullPath.endsWith(Path.SEPARATOR)) { + childFullPath = childFullPath + Path.SEPARATOR; + } + pendingWorkCount += processPath(startID, childFullPath); + } + } + } + + if (children.hasMore()) { + lastReturnedName = children.getLastName(); + } else { + return pendingWorkCount; + } + } + } + + private void checkProcessingQueuesFree() { + int remainingCapacity = remainingCapacity(); + // wait for queue to be free + while (remainingCapacity <= 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting for storageMovementNeeded queue to be free!"); + } + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + remainingCapacity = remainingCapacity(); + } + } + + /** + * Returns queue remaining capacity. + */ + public int remainingCapacity() { + int size = service.processingQueueSize(); + int remainingSize = 0; + if (size < maxQueueLimitToScan) { + remainingSize = maxQueueLimitToScan - size; + } + if (LOG.isDebugEnabled()) { + LOG.debug("SPS processing Q -> maximum capacity:{}, current size:{}," + + " remaining size:{}", maxQueueLimitToScan, size, remainingSize); + } + return remainingSize; + } + + @Override + public void scanAndCollectFiles(String path) throws IOException { + if (dfs == null) { + dfs = getFS(service.getConf()); + } + long pendingSatisfyItemsCount = processPath(path, path); + // Check whether the given path contains any item to be tracked + // or the no to be satisfied paths. In case of empty list, add the given + // inodeId to the 'pendingWorkForDirectory' with empty list so that later + // SPSPathIdProcessor#run function will remove the SPS hint considering that + // this path is already satisfied the storage policy. + if (pendingSatisfyItemsCount <= 0) { + LOG.debug("There is no pending items to satisfy the given path " + + "inodeId:{}", path); + service.addAllFilesToProcess(path, new ArrayList<>(), true); + } else { + service.markScanCompletedForPath(path); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java index 33448db..6fc35ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java @@ -68,7 +68,8 @@ public final class ExternalStoragePolicySatisfier { HdfsConfiguration spsConf = new HdfsConfiguration(); // login with SPS keytab secureLogin(spsConf); - StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf); + StoragePolicySatisfier<String> sps = new StoragePolicySatisfier<String>( + spsConf); nnc = getNameNodeConnector(spsConf); boolean spsRunning; @@ -86,8 +87,8 @@ public final class ExternalStoragePolicySatisfier { ExternalSPSBlockMoveTaskHandler externalHandler = new ExternalSPSBlockMoveTaskHandler(spsConf, nnc, sps); externalHandler.init(); - sps.init(context, new ExternalSPSFileIDCollector(context, sps), - externalHandler, blkMoveListener); + sps.init(context, new ExternalSPSFilePathCollector(sps), externalHandler, + blkMoveListener); sps.start(true, StoragePolicySatisfierMode.EXTERNAL); if (sps != null) { sps.join(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto index b0e900d..b137f2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto @@ -214,11 +214,11 @@ message GetFilePathResponseProto { required string srcPath = 1; } -message GetNextSPSPathIdRequestProto { +message GetNextSPSPathRequestProto { } -message GetNextSPSPathIdResponseProto { - optional uint64 fileId = 1; +message GetNextSPSPathResponseProto { + optional string spsPath = 1; } message CheckDNSpaceRequestProto { @@ -322,26 +322,15 @@ service NamenodeProtocolService { returns (IsRollingUpgradeResponseProto); /** - * Return the corresponding file path for give file id + * Return the sps path from namenode */ - rpc getFilePath(GetFilePathRequestProto) - returns (GetFilePathResponseProto); + rpc getNextSPSPath(GetNextSPSPathRequestProto) + returns (GetNextSPSPathResponseProto); /** - * Return the sps path id from namenode - */ - rpc getNextSPSPathId(GetNextSPSPathIdRequestProto) - returns (GetNextSPSPathIdResponseProto); - - /** - * Return the sps path id from namenode + * Verifies whether the given Datanode has the enough estimated size with + * given storage type for scheduling the block movement. */ rpc checkDNSpaceForScheduling(CheckDNSpaceRequestProto) returns (CheckDNSpaceResponseProto); - - /** - * check whether given file id has low redundancy blocks. - */ - rpc hasLowRedundancyBlocks(HasLowRedundancyBlocksRequestProto) - returns (HasLowRedundancyBlocksResponseProto); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java index 4097339..29af885 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java @@ -40,22 +40,21 @@ import org.mockito.Mockito; */ public class TestBlockStorageMovementAttemptedItems { - private BlockStorageMovementAttemptedItems bsmAttemptedItems = null; - private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles = null; + private BlockStorageMovementAttemptedItems<Long> bsmAttemptedItems; + private BlockStorageMovementNeeded<Long> unsatisfiedStorageMovementFiles; private final int selfRetryTimeout = 500; @Before public void setup() throws Exception { Configuration config = new HdfsConfiguration(); - Context ctxt = Mockito.mock(Context.class); - SPSService sps = Mockito.mock(StoragePolicySatisfier.class); - Mockito.when(sps.getConf()).thenReturn(config); + Context<Long> ctxt = Mockito.mock(IntraSPSNameNodeContext.class); + SPSService<Long> sps = new StoragePolicySatisfier<Long>(config); Mockito.when(ctxt.isRunning()).thenReturn(true); Mockito.when(ctxt.isInSafeMode()).thenReturn(false); Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true); unsatisfiedStorageMovementFiles = - new BlockStorageMovementNeeded(ctxt, null); - bsmAttemptedItems = new BlockStorageMovementAttemptedItems(sps, + new BlockStorageMovementNeeded<Long>(ctxt, null); + bsmAttemptedItems = new BlockStorageMovementAttemptedItems<Long>(sps, unsatisfiedStorageMovementFiles, null); } @@ -72,9 +71,9 @@ public class TestBlockStorageMovementAttemptedItems { long stopTime = monotonicNow() + (retryTimeout * 2); boolean isItemFound = false; while (monotonicNow() < (stopTime)) { - ItemInfo ele = null; + ItemInfo<Long> ele = null; while ((ele = unsatisfiedStorageMovementFiles.get()) != null) { - if (item == ele.getFileId()) { + if (item == ele.getFile()) { isItemFound = true; break; } @@ -97,7 +96,7 @@ public class TestBlockStorageMovementAttemptedItems { Long item = new Long(1234); List<Block> blocks = new ArrayList<Block>(); blocks.add(new Block(item)); - bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0)); + bsmAttemptedItems.add(new AttemptedItemInfo<Long>(0L, 0L, 0L, blocks, 0)); Block[] blockArray = new Block[blocks.size()]; blocks.toArray(blockArray); bsmAttemptedItems.notifyMovementTriedBlocks(blockArray); @@ -114,7 +113,7 @@ public class TestBlockStorageMovementAttemptedItems { Long item = new Long(1234); List<Block> blocks = new ArrayList<>(); blocks.add(new Block(item)); - bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0)); + bsmAttemptedItems.add(new AttemptedItemInfo<Long>(0L, 0L, 0L, blocks, 0)); assertEquals("Shouldn't receive result", 0, bsmAttemptedItems.getMovementFinishedBlocksCount()); assertEquals("Item doesn't exist in the attempted list", 1, @@ -135,7 +134,7 @@ public class TestBlockStorageMovementAttemptedItems { blocks.add(new Block(5678L)); Long trackID = 0L; bsmAttemptedItems - .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); + .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0)); Block[] blksMovementReport = new Block[1]; blksMovementReport[0] = new Block(item); bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport); @@ -160,7 +159,7 @@ public class TestBlockStorageMovementAttemptedItems { List<Block> blocks = new ArrayList<>(); blocks.add(new Block(item)); bsmAttemptedItems - .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); + .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0)); Block[] blksMovementReport = new Block[1]; blksMovementReport[0] = new Block(item); bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport); @@ -188,7 +187,7 @@ public class TestBlockStorageMovementAttemptedItems { List<Block> blocks = new ArrayList<>(); blocks.add(new Block(item)); bsmAttemptedItems - .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); + .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0)); Block[] blksMovementReport = new Block[1]; blksMovementReport[0] = new Block(item); bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java index 6f7fe89..2a3d0c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java @@ -105,7 +105,7 @@ public class TestStoragePolicySatisfier { public static final int NUM_OF_DATANODES = 3; public static final int STORAGES_PER_DATANODE = 2; public static final long CAPACITY = 2 * 256 * 1024 * 1024; - public static final String FILE = "/testMoveWhenStoragePolicyNotSatisfying"; + public static final String FILE = "/testMoveToSatisfyStoragePolicy"; public static final int DEFAULT_BLOCK_SIZE = 1024; /** @@ -1269,8 +1269,9 @@ public class TestStoragePolicySatisfier { //Queue limit can control the traverse logic to wait for some free //entry in queue. After 10 files, traverse control will be on U. - StoragePolicySatisfier sps = new StoragePolicySatisfier(config); - Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(), + StoragePolicySatisfier<Long> sps = new StoragePolicySatisfier<Long>(config); + Context<Long> ctxt = new IntraSPSNameNodeContext( + hdfsCluster.getNamesystem(), hdfsCluster.getNamesystem().getBlockManager(), sps) { @Override public boolean isInSafeMode() { @@ -1283,7 +1284,7 @@ public class TestStoragePolicySatisfier { } }; - FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt); + FileCollector<Long> fileIDCollector = createFileIdCollector(sps, ctxt); sps.init(ctxt, fileIDCollector, null, null); sps.getStorageMovementQueue().activate(); @@ -1300,9 +1301,9 @@ public class TestStoragePolicySatisfier { dfs.delete(new Path("/root"), true); } - public FileIdCollector createFileIdCollector(StoragePolicySatisfier sps, - Context ctxt) { - FileIdCollector fileIDCollector = new IntraSPSNameNodeFileIdCollector( + public FileCollector<Long> createFileIdCollector( + StoragePolicySatisfier<Long> sps, Context<Long> ctxt) { + FileCollector<Long> fileIDCollector = new IntraSPSNameNodeFileIdCollector( hdfsCluster.getNamesystem().getFSDirectory(), sps); return fileIDCollector; } @@ -1337,8 +1338,9 @@ public class TestStoragePolicySatisfier { // Queue limit can control the traverse logic to wait for some free // entry in queue. After 10 files, traverse control will be on U. - StoragePolicySatisfier sps = new StoragePolicySatisfier(config); - Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(), + StoragePolicySatisfier<Long> sps = new StoragePolicySatisfier<Long>(config); + Context<Long> ctxt = new IntraSPSNameNodeContext( + hdfsCluster.getNamesystem(), hdfsCluster.getNamesystem().getBlockManager(), sps) { @Override public boolean isInSafeMode() { @@ -1350,7 +1352,7 @@ public class TestStoragePolicySatisfier { return true; } }; - FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt); + FileCollector<Long> fileIDCollector = createFileIdCollector(sps, ctxt); sps.init(ctxt, fileIDCollector, null, null); sps.getStorageMovementQueue().activate(); @@ -1368,16 +1370,16 @@ public class TestStoragePolicySatisfier { } private void assertTraversal(List<String> expectedTraverseOrder, - FSDirectory fsDir, StoragePolicySatisfier sps) + FSDirectory fsDir, StoragePolicySatisfier<Long> sps) throws InterruptedException { // Remove 10 element and make queue free, So other traversing will start. for (int i = 0; i < 10; i++) { String path = expectedTraverseOrder.remove(0); - ItemInfo itemInfo = sps.getStorageMovementQueue().get(); + ItemInfo<Long> itemInfo = sps.getStorageMovementQueue().get(); if (itemInfo == null) { continue; } - long trackId = itemInfo.getFileId(); + Long trackId = itemInfo.getFile(); INode inode = fsDir.getInode(trackId); assertTrue("Failed to traverse tree, expected " + path + " but got " + inode.getFullPathName(), path.equals(inode.getFullPathName())); @@ -1388,11 +1390,11 @@ public class TestStoragePolicySatisfier { // Check other element traversed in order and E, M, U, R, S should not be // added in queue which we already removed from expected list for (String path : expectedTraverseOrder) { - ItemInfo itemInfo = sps.getStorageMovementQueue().get(); + ItemInfo<Long> itemInfo = sps.getStorageMovementQueue().get(); if (itemInfo == null) { continue; } - long trackId = itemInfo.getFileId(); + Long trackId = itemInfo.getFile(); INode inode = fsDir.getInode(trackId); assertTrue("Failed to traverse tree, expected " + path + " but got " + inode.getFullPathName(), path.equals(inode.getFullPathName())); @@ -1696,39 +1698,41 @@ public class TestStoragePolicySatisfier { return file1; } - private void waitForAttemptedItems(long expectedBlkMovAttemptedCount, + public void waitForAttemptedItems(long expectedBlkMovAttemptedCount, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager - .getSPSManager().getInternalSPSService(); + final StoragePolicySatisfier<Long> sps = + (StoragePolicySatisfier<Long>) blockManager.getSPSManager() + .getInternalSPSService(); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", expectedBlkMovAttemptedCount, - ((BlockStorageMovementAttemptedItems) (sps + ((BlockStorageMovementAttemptedItems<Long>) (sps .getAttemptedItemsMonitor())).getAttemptedItemsCount()); - return ((BlockStorageMovementAttemptedItems) (sps + return ((BlockStorageMovementAttemptedItems<Long>) (sps .getAttemptedItemsMonitor())) .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; } }, 100, timeout); } - private void waitForBlocksMovementAttemptReport( + public void waitForBlocksMovementAttemptReport( long expectedMovementFinishedBlocksCount, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager + final StoragePolicySatisfier<Long> sps = + (StoragePolicySatisfier<Long>) blockManager .getSPSManager().getInternalSPSService(); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", expectedMovementFinishedBlocksCount, - ((BlockStorageMovementAttemptedItems) (sps + ((BlockStorageMovementAttemptedItems<Long>) (sps .getAttemptedItemsMonitor())).getMovementFinishedBlocksCount()); - return ((BlockStorageMovementAttemptedItems) (sps + return ((BlockStorageMovementAttemptedItems<Long>) (sps .getAttemptedItemsMonitor())) .getMovementFinishedBlocksCount() >= expectedMovementFinishedBlocksCount; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java index ef12300..a39fb92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java @@ -495,16 +495,17 @@ public class TestStoragePolicySatisfierWithStripedFile { long expectedBlkMovAttemptedCount, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = cluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager + final StoragePolicySatisfier<Long> sps = + (StoragePolicySatisfier<Long>) blockManager .getSPSManager().getInternalSPSService(); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", expectedBlkMovAttemptedCount, - ((BlockStorageMovementAttemptedItems) sps + ((BlockStorageMovementAttemptedItems<Long>) sps .getAttemptedItemsMonitor()).getAttemptedItemsCount()); - return ((BlockStorageMovementAttemptedItems) sps + return ((BlockStorageMovementAttemptedItems<Long>) sps .getAttemptedItemsMonitor()) .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; } @@ -567,7 +568,8 @@ public class TestStoragePolicySatisfierWithStripedFile { long expectedMoveFinishedBlks, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = cluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager + final StoragePolicySatisfier<Long> sps = + (StoragePolicySatisfier<Long>) blockManager .getSPSManager().getInternalSPSService(); Assert.assertNotNull("Failed to get SPS object reference!", sps); @@ -575,9 +577,10 @@ public class TestStoragePolicySatisfierWithStripedFile { @Override public Boolean get() { LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", - expectedMoveFinishedBlks, ((BlockStorageMovementAttemptedItems) sps + expectedMoveFinishedBlks, + ((BlockStorageMovementAttemptedItems<Long>) sps .getAttemptedItemsMonitor()).getMovementFinishedBlocksCount()); - return ((BlockStorageMovementAttemptedItems) sps + return ((BlockStorageMovementAttemptedItems<Long>) sps .getAttemptedItemsMonitor()) .getMovementFinishedBlocksCount() >= expectedMoveFinishedBlks; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java index 0546f39..28e172a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java @@ -43,23 +43,23 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Properties; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener; -import org.apache.hadoop.hdfs.server.namenode.sps.Context; -import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector; -import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; +import org.apache.hadoop.hdfs.server.namenode.sps.BlockStorageMovementAttemptedItems; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier; import org.apache.hadoop.http.HttpConfig; @@ -74,6 +74,8 @@ import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import com.google.common.base.Supplier; + /** * Tests the external sps service plugins. */ @@ -88,6 +90,8 @@ public class TestExternalStoragePolicySatisfier private String principal; private MiniKdc kdc; private File baseDir; + private StoragePolicySatisfier<String> externalSps; + private ExternalSPSContext externalCtxt; @After public void destroy() throws Exception { @@ -98,6 +102,14 @@ public class TestExternalStoragePolicySatisfier } @Override + public void shutdownCluster() { + if (externalSps != null) { + externalSps.stopGracefully(); + } + super.shutdownCluster(); + } + + @Override public void setUp() { super.setUp(); @@ -131,60 +143,44 @@ public class TestExternalStoragePolicySatisfier nnc = getNameNodeConnector(getConf()); - BlockManager blkMgr = cluster.getNameNode().getNamesystem() - .getBlockManager(); - SPSService spsService = blkMgr.getSPSManager().getInternalSPSService(); - spsService.stopGracefully(); - - ExternalSPSContext context = new ExternalSPSContext(spsService, + externalSps = new StoragePolicySatisfier<String>(getConf()); + externalCtxt = new ExternalSPSContext(externalSps, getNameNodeConnector(conf)); ExternalBlockMovementListener blkMoveListener = new ExternalBlockMovementListener(); ExternalSPSBlockMoveTaskHandler externalHandler = new ExternalSPSBlockMoveTaskHandler(conf, nnc, - blkMgr.getSPSManager().getInternalSPSService()); + externalSps); externalHandler.init(); - spsService.init(context, - new ExternalSPSFileIDCollector(context, - blkMgr.getSPSManager().getInternalSPSService()), - externalHandler, blkMoveListener); - spsService.start(true, StoragePolicySatisfierMode.EXTERNAL); + externalSps.init(externalCtxt, + new ExternalSPSFilePathCollector(externalSps), externalHandler, + blkMoveListener); + externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL); return cluster; } public void restartNamenode() throws IOException{ - BlockManager blkMgr = getCluster().getNameNode().getNamesystem() - .getBlockManager(); - SPSService spsService = blkMgr.getSPSManager().getInternalSPSService(); - spsService.stopGracefully(); + if (externalSps != null) { + externalSps.stopGracefully(); + } getCluster().restartNameNodes(); getCluster().waitActive(); - blkMgr = getCluster().getNameNode().getNamesystem() - .getBlockManager(); - spsService = blkMgr.getSPSManager().getInternalSPSService(); - spsService.stopGracefully(); + externalSps = new StoragePolicySatisfier<>(getConf()); - ExternalSPSContext context = new ExternalSPSContext(spsService, + externalCtxt = new ExternalSPSContext(externalSps, getNameNodeConnector(getConf())); ExternalBlockMovementListener blkMoveListener = new ExternalBlockMovementListener(); ExternalSPSBlockMoveTaskHandler externalHandler = new ExternalSPSBlockMoveTaskHandler(getConf(), nnc, - blkMgr.getSPSManager().getInternalSPSService()); + externalSps); externalHandler.init(); - spsService.init(context, - new ExternalSPSFileIDCollector(context, - blkMgr.getSPSManager().getInternalSPSService()), - externalHandler, blkMoveListener); - spsService.start(true, StoragePolicySatisfierMode.EXTERNAL); - } - - @Override - public FileIdCollector createFileIdCollector(StoragePolicySatisfier sps, - Context ctxt) { - return new ExternalSPSFileIDCollector(ctxt, sps); + externalSps.init(externalCtxt, + new ExternalSPSFilePathCollector(externalSps), externalHandler, + blkMoveListener); + externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL); } private class ExternalBlockMovementListener implements BlockMovementListener { @@ -204,7 +200,7 @@ public class TestExternalStoragePolicySatisfier throws IOException { final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf); Assert.assertEquals(1, namenodes.size()); - final Path externalSPSPathId = new Path("/system/tmp.id"); + final Path externalSPSPathId = HdfsServerConstants.MOVER_ID_PATH; NameNodeConnector.checkOtherInstanceRunning(false); while (true) { try { @@ -222,6 +218,40 @@ public class TestExternalStoragePolicySatisfier } } + public void waitForAttemptedItems(long expectedBlkMovAttemptedCount, + int timeout) throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", + expectedBlkMovAttemptedCount, + ((BlockStorageMovementAttemptedItems<String>) (externalSps + .getAttemptedItemsMonitor())).getAttemptedItemsCount()); + return ((BlockStorageMovementAttemptedItems<String>) (externalSps + .getAttemptedItemsMonitor())) + .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; + } + }, 100, timeout); + } + + public void waitForBlocksMovementAttemptReport( + long expectedMovementFinishedBlocksCount, int timeout) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", + expectedMovementFinishedBlocksCount, + ((BlockStorageMovementAttemptedItems<String>) (externalSps + .getAttemptedItemsMonitor())).getMovementFinishedBlocksCount()); + return ((BlockStorageMovementAttemptedItems<String>) (externalSps + .getAttemptedItemsMonitor())) + .getMovementFinishedBlocksCount() + >= expectedMovementFinishedBlocksCount; + } + }, 100, timeout); + } + private void initSecureConf(Configuration conf) throws Exception { String username = "externalSPS"; baseDir = GenericTestUtils @@ -321,10 +351,6 @@ public class TestExternalStoragePolicySatisfier List<String> files = new ArrayList<>(); files.add(FILE); DistributedFileSystem fs = getFS(); - BlockManager blkMgr = getCluster().getNameNode().getNamesystem() - .getBlockManager(); - SPSService spsService = blkMgr.getSPSManager().getInternalSPSService(); - spsService.stopGracefully(); // stops SPS // Creates 4 more files. Send all of them for satisfying the storage // policy together. @@ -367,6 +393,28 @@ public class TestExternalStoragePolicySatisfier } /** + * Tests to verify that SPS should be able to start when the Mover ID file + * is not being hold by a Mover. This can be the case when Mover exits + * ungracefully without deleting the ID file from HDFS. + */ + @Test(timeout = 300000) + public void testWhenMoverExitsWithoutDeleteMoverIDFile() + throws IOException { + try { + createCluster(); + // Simulate the case by creating MOVER_ID file + DFSTestUtil.createFile(getCluster().getFileSystem(), + HdfsServerConstants.MOVER_ID_PATH, 0, (short) 1, 0); + restartNamenode(); + boolean running = externalCtxt.isRunning(); + Assert.assertTrue("SPS should be running as " + + "no Mover really running", running); + } finally { + shutdownCluster(); + } + } + + /** * This test need not run as external scan is not a batch based scanning right * now. */ @@ -389,4 +437,20 @@ public class TestExternalStoragePolicySatisfier @Ignore("Status is not supported for external SPS. So, ignoring it.") public void testMaxRetryForFailedBlock() throws Exception { } + + /** + * This test is specific to internal SPS. So, ignoring it. + */ + @Ignore("This test is specific to internal SPS. So, ignoring it.") + @Override + public void testTraverseWhenParentDeleted() throws Exception { + } + + /** + * This test is specific to internal SPS. So, ignoring it. + */ + @Ignore("This test is specific to internal SPS. So, ignoring it.") + @Override + public void testTraverseWhenRootParentDeleted() throws Exception { + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org