HDFS-13025. [SPS]: Implement a mechanism to scan the files for external SPS. 
Contributed by Uma Maheswara Rao G.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3159b39c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3159b39c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3159b39c

Branch: refs/heads/HDFS-10285
Commit: 3159b39cf8ef704835325263154fb1a1cecc109d
Parents: 8d4f74e
Author: Rakesh Radhakrishnan <rake...@apache.org>
Authored: Tue Jan 23 20:09:26 2018 +0530
Committer: Uma Maheswara Rao Gangumalla <umamah...@apache.org>
Committed: Sun Aug 12 03:06:03 2018 -0700

----------------------------------------------------------------------
 .../sps/BlockStorageMovementNeeded.java         |  70 +++-
 .../hdfs/server/namenode/sps/Context.java       |   8 +
 .../IntraSPSNameNodeBlockMoveTaskHandler.java   |   2 +
 .../namenode/sps/IntraSPSNameNodeContext.java   |   7 +
 .../sps/IntraSPSNameNodeFileIdCollector.java    |   6 +-
 .../hdfs/server/namenode/sps/SPSService.java    |  10 +-
 .../namenode/sps/StoragePolicySatisfier.java    |   8 +-
 .../server/sps/ExternalSPSFileIDCollector.java  | 156 +++++++++
 .../hadoop/hdfs/server/sps/package-info.java    |  28 ++
 .../sps/TestStoragePolicySatisfier.java         | 323 ++++++++++---------
 .../sps/TestExternalStoragePolicySatisfier.java | 108 +++++++
 11 files changed, 556 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159b39c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
index 39a0051..b141502 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
@@ -97,23 +97,53 @@ public class BlockStorageMovementNeeded {
   }
 
   /**
-   * Add the itemInfo to tracking list for which storage movement
-   * expected if necessary.
+   * Add the itemInfo list to tracking list for which storage movement expected
+   * if necessary.
+   *
    * @param startId
-   *            - start id
+   *          - start id
    * @param itemInfoList
-   *            - List of child in the directory
+   *          - List of child in the directory
+   * @param scanCompleted
+   *          -Indicates whether the start id directory has no more elements to
+   *          scan.
    */
   @VisibleForTesting
-  public synchronized void addAll(long startId,
-      List<ItemInfo> itemInfoList, boolean scanCompleted) {
+  public synchronized void addAll(long startId, List<ItemInfo> itemInfoList,
+      boolean scanCompleted) {
     storageMovementNeeded.addAll(itemInfoList);
+    updatePendingDirScanStats(startId, itemInfoList.size(), scanCompleted);
+  }
+
+  /**
+   * Add the itemInfo to tracking list for which storage movement expected if
+   * necessary.
+   *
+   * @param itemInfoList
+   *          - List of child in the directory
+   * @param scanCompleted
+   *          -Indicates whether the ItemInfo start id directory has no more
+   *          elements to scan.
+   */
+  @VisibleForTesting
+  public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) {
+    storageMovementNeeded.add(itemInfo);
+    // This represents sps start id is file, so no need to update pending dir
+    // stats.
+    if (itemInfo.getStartId() == itemInfo.getFileId()) {
+      return;
+    }
+    updatePendingDirScanStats(itemInfo.getStartId(), 1, scanCompleted);
+  }
+
+  private void updatePendingDirScanStats(long startId, int numScannedFiles,
+      boolean scanCompleted) {
     DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
     if (pendingWork == null) {
       pendingWork = new DirPendingWorkInfo();
       pendingWorkForDirectory.put(startId, pendingWork);
     }
-    pendingWork.addPendingWorkCount(itemInfoList.size());
+    pendingWork.addPendingWorkCount(numScannedFiles);
     if (scanCompleted) {
       pendingWork.markScanCompleted();
     }
@@ -250,13 +280,15 @@ public class BlockStorageMovementNeeded {
 
     @Override
     public void run() {
-      LOG.info("Starting FileInodeIdCollector!.");
+      LOG.info("Starting SPSPathIdProcessor!.");
       long lastStatusCleanTime = 0;
+      Long startINodeId = null;
       while (ctxt.isRunning()) {
-        LOG.info("Running FileInodeIdCollector!.");
         try {
           if (!ctxt.isInSafeMode()) {
-            Long startINodeId = ctxt.getNextSPSPathId();
+            if (startINodeId == null) {
+              startINodeId = ctxt.getNextSPSPathId();
+            } // else same id will be retried
             if (startINodeId == null) {
               // Waiting for SPS path
               Thread.sleep(3000);
@@ -281,9 +313,18 @@ public class BlockStorageMovementNeeded {
               lastStatusCleanTime = Time.monotonicNow();
               cleanSpsStatus();
             }
+            startINodeId = null; // Current inode id successfully scanned.
           }
         } catch (Throwable t) {
-          LOG.warn("Exception while loading inodes to satisfy the policy", t);
+          String reClass = t.getClass().getName();
+          if (InterruptedException.class.getName().equals(reClass)) {
+            LOG.info("SPSPathIdProcessor thread is interrupted. Stopping..");
+            Thread.currentThread().interrupt();
+            break;
+          }
+          LOG.warn("Exception while scanning file inodes to satisfy the 
policy",
+              t);
+          // TODO: may be we should retry the current inode id?
         }
       }
     }
@@ -426,4 +467,11 @@ public class BlockStorageMovementNeeded {
   public static long getStatusClearanceElapsedTimeMs() {
     return statusClearanceElapsedTimeMs;
   }
+
+  public void markScanCompletedForDir(Long inodeId) {
+    DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inodeId);
+    if (pendingWork != null) {
+      pendingWork.markScanCompleted();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159b39c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
index b7053b9..f103dfe 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
@@ -167,4 +167,12 @@ public interface Context {
    */
   void removeAllSPSPathIds();
 
+  /**
+   * Gets the file path for a given inode id.
+   *
+   * @param inodeId
+   *          - path inode id.
+   */
+  String getFilePath(Long inodeId);
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159b39c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
index 1da4af9..b27e8c9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.sps;
 
 import java.io.IOException;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -29,6 +30,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockM
  * This class handles the internal SPS block movements. This will assign block
  * movement tasks to target datanode descriptors.
  */
+@InterfaceAudience.Private
 public class IntraSPSNameNodeBlockMoveTaskHandler
     implements BlockMoveTaskHandler {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159b39c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
index cef26ed..aed684a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
@@ -22,6 +22,7 @@ import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SAT
 import java.io.IOException;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.UnresolvedLinkException;
@@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory;
  * are expecting to change its storages and assigning the block storage
  * movements to satisfy the storage policy.
  */
+@InterfaceAudience.Private
 public class IntraSPSNameNodeContext implements Context {
   private static final Logger LOG = LoggerFactory
       .getLogger(IntraSPSNameNodeContext.class);
@@ -195,4 +197,9 @@ public class IntraSPSNameNodeContext implements Context {
   public void removeAllSPSPathIds() {
     blockManager.removeAllSPSPathIds();
   }
+
+  @Override
+  public String getFilePath(Long inodeId) {
+    return namesystem.getFilePath(inodeId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159b39c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
index c6834c1..f7cd754 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
  * A specific implementation for scanning the directory with Namenode internal
  * Inode structure and collects the file ids under the given directory ID.
  */
+@InterfaceAudience.Private
 public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
     implements FileIdCollector {
   private int maxQueueLimitToScan;
@@ -131,12 +133,12 @@ public class IntraSPSNameNodeFileIdCollector extends 
FSTreeTraverser
       } else {
 
         readLock();
-        // NOTE: this lock will not be held until full directory scanning. It 
is
+        // NOTE: this lock will not be held for full directory scanning. It is
         // basically a sliced locking. Once it collects a batch size( at max 
the
         // size of maxQueueLimitToScan (default 1000)) file ids, then it will
         // unlock and submits the current batch to SPSService. Once
         // service.processingQueueSize() shows empty slots, then lock will be
-        // resumed and scan also will be resumed. This logic was re-used from
+        // re-acquired and scan will be resumed. This logic was re-used from
         // EDEK feature.
         try {
           traverseDir(startInode.asDirectory(), startINodeId,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159b39c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
index 6d85ea6..d74e391 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
@@ -80,7 +80,7 @@ public interface SPSService {
    *
    * @param itemInfo
    */
-  void addFileIdToProcess(ItemInfo itemInfo);
+  void addFileIdToProcess(ItemInfo itemInfo, boolean scanCompleted);
 
   /**
    * Adds all the Item information(file id etc) to processing queue.
@@ -104,4 +104,12 @@ public interface SPSService {
    * @return the configuration.
    */
   Configuration getConf();
+
+  /**
+   * Marks the scanning of directory if finished.
+   *
+   * @param inodeId
+   *          - directory inode id.
+   */
+  void markScanCompletedForPath(Long inodeId);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159b39c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index 28c1372..aafdc65 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -563,7 +563,6 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
                 chosenTarget.storageType, blockMovingInfos);
           }
           expected.remove(chosenTarget.storageType);
-          // TODO: We can increment scheduled block count for this node?
         }
       }
       // To avoid choosing this excludeNodes as targets later
@@ -924,7 +923,7 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
   }
 
   @Override
-  public void addFileIdToProcess(ItemInfo trackInfo) {
+  public void addFileIdToProcess(ItemInfo trackInfo, boolean scanCompleted) {
     storageMovementNeeded.add(trackInfo);
   }
 
@@ -948,4 +947,9 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
   public BlockStorageMovementNeeded getStorageMovementQueue() {
     return storageMovementNeeded;
   }
+
+  @Override
+  public void markScanCompletedForPath(Long inodeId) {
+    getStorageMovementQueue().markScanCompletedForDir(inodeId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159b39c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
new file mode 100644
index 0000000..597a7d3
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
@@ -0,0 +1,156 @@
+package org.apache.hadoop.hdfs.server.sps;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.sps.Context;
+import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector;
+import org.apache.hadoop.hdfs.server.namenode.sps.ItemInfo;
+import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is to scan the paths recursively. If file is directory, then it
+ * will scan for files recursively. If the file is non directory, then it will
+ * just submit the same file to process.
+ */
+@InterfaceAudience.Private
+public class ExternalSPSFileIDCollector implements FileIdCollector {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ExternalSPSFileIDCollector.class);
+  private Context cxt;
+  private DistributedFileSystem dfs;
+  private SPSService service;
+  private int maxQueueLimitToScan;
+
+  public ExternalSPSFileIDCollector(Context cxt, SPSService service,
+      int batchSize) {
+    this.cxt = cxt;
+    this.service = service;
+    this.maxQueueLimitToScan = service.getConf().getInt(
+        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
+        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT);
+    try {
+      // TODO: probably we could get this dfs from external context? but this 
is
+      // too specific to external.
+      dfs = getFS(service.getConf());
+    } catch (IOException e) {
+      LOG.error("Unable to get the filesystem. Make sure Namenode running and "
+          + "configured namenode address is correct.", e);
+    }
+  }
+
+  private DistributedFileSystem getFS(Configuration conf) throws IOException {
+    return (DistributedFileSystem) FileSystem
+        .get(FileSystem.getDefaultUri(conf), conf);
+  }
+
+  /**
+   * Recursively scan the given path and add the file info to SPS service for
+   * processing.
+   */
+  private void processPath(long startID, String fullPath) {
+    for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
+      final DirectoryListing children;
+      try {
+        children = dfs.getClient().listPaths(fullPath, lastReturnedName, 
false);
+      } catch (IOException e) {
+        LOG.warn("Failed to list directory " + fullPath
+            + ". Ignore the directory and continue.", e);
+        return;
+      }
+      if (children == null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("The scanning start dir/sub dir " + fullPath
+              + " does not have childrens.");
+        }
+        return;
+      }
+
+      for (HdfsFileStatus child : children.getPartialListing()) {
+        if (child.isFile()) {
+          service.addFileIdToProcess(new ItemInfo(startID, child.getFileId()),
+              false);
+          checkProcessingQueuesFree();
+        } else {
+          String fullPathStr = child.getFullName(fullPath);
+          if (child.isDirectory()) {
+            if (!fullPathStr.endsWith(Path.SEPARATOR)) {
+              fullPathStr = fullPathStr + Path.SEPARATOR;
+            }
+            processPath(startID, fullPathStr);
+          }
+        }
+      }
+
+      if (children.hasMore()) {
+        lastReturnedName = children.getLastName();
+      } else {
+        return;
+      }
+    }
+  }
+
+  private void checkProcessingQueuesFree() {
+    int remainingCapacity = remainingCapacity();
+    // wait for queue to be free
+    while (remainingCapacity <= 0) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Waiting for storageMovementNeeded queue to be free!");
+      }
+      try {
+        Thread.sleep(5000);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      remainingCapacity = remainingCapacity();
+    }
+  }
+
+  /**
+   * Returns queue remaining capacity.
+   */
+  public int remainingCapacity() {
+    int size = service.processingQueueSize();
+    if (size >= maxQueueLimitToScan) {
+      return 0;
+    } else {
+      return (maxQueueLimitToScan - size);
+    }
+  }
+
+  @Override
+  public void scanAndCollectFileIds(Long inodeId) throws IOException {
+    if (dfs == null) {
+      dfs = getFS(service.getConf());
+    }
+    processPath(inodeId, cxt.getFilePath(inodeId));
+    service.markScanCompletedForPath(inodeId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159b39c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/package-info.java
new file mode 100644
index 0000000..f705df2
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/package-info.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package provides a mechanism for satisfying the storage policy of a
+ * path.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.hdfs.server.sps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159b39c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
index 9354044..e0bf410 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -93,18 +94,41 @@ public class TestStoragePolicySatisfier {
   private static final String COLD = "COLD";
   private static final Logger LOG =
       LoggerFactory.getLogger(TestStoragePolicySatisfier.class);
-  private final Configuration config = new HdfsConfiguration();
+  private Configuration config = null;
   private StorageType[][] allDiskTypes =
       new StorageType[][]{{StorageType.DISK, StorageType.DISK},
           {StorageType.DISK, StorageType.DISK},
           {StorageType.DISK, StorageType.DISK}};
   private MiniDFSCluster hdfsCluster = null;
-  final private int numOfDatanodes = 3;
-  final private int storagesPerDatanode = 2;
-  final private long capacity = 2 * 256 * 1024 * 1024;
-  final private String file = "/testMoveWhenStoragePolicyNotSatisfying";
   private DistributedFileSystem dfs = null;
-  private static final int DEFAULT_BLOCK_SIZE = 1024;
+  public static final int NUM_OF_DATANODES = 3;
+  public static final int STORAGES_PER_DATANODE = 2;
+  public static final long CAPACITY = 2 * 256 * 1024 * 1024;
+  public static final String FILE = "/testMoveWhenStoragePolicyNotSatisfying";
+  public static final int DEFAULT_BLOCK_SIZE = 1024;
+
+  /**
+   * Sets hdfs cluster.
+   */
+  public void setCluster(MiniDFSCluster cluster) {
+    this.hdfsCluster = cluster;
+  }
+
+  /**
+   * @return conf.
+   */
+  public Configuration getConf() {
+    return this.config;
+  }
+
+  /**
+   * Gets distributed file system.
+   *
+   * @throws IOException
+   */
+  public void getFS() throws IOException {
+    this.dfs = hdfsCluster.getFileSystem();
+  }
 
   @After
   public void shutdownCluster() {
@@ -113,14 +137,19 @@ public class TestStoragePolicySatisfier {
     }
   }
 
-  private void createCluster() throws IOException {
+  public void createCluster() throws IOException {
     config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
     config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
         true);
-    hdfsCluster = startCluster(config, allDiskTypes, numOfDatanodes,
-        storagesPerDatanode, capacity);
-    dfs = hdfsCluster.getFileSystem();
-    writeContent(file);
+    hdfsCluster = startCluster(config, allDiskTypes, NUM_OF_DATANODES,
+        STORAGES_PER_DATANODE, CAPACITY);
+    getFS();
+    writeContent(FILE);
+  }
+
+  @Before
+  public void setUp() {
+    config = new HdfsConfiguration();
   }
 
   @Test(timeout = 300000)
@@ -137,19 +166,19 @@ public class TestStoragePolicySatisfier {
 
   private void doTestWhenStoragePolicySetToCOLD() throws Exception {
     // Change policy to COLD
-    dfs.setStoragePolicy(new Path(file), COLD);
+    dfs.setStoragePolicy(new Path(FILE), COLD);
 
     StorageType[][] newtypes =
         new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
             {StorageType.ARCHIVE, StorageType.ARCHIVE},
             {StorageType.ARCHIVE, StorageType.ARCHIVE}};
-    startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
-        storagesPerDatanode, capacity, hdfsCluster);
+    startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
+        STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
 
     hdfsCluster.triggerHeartbeats();
-    dfs.satisfyStoragePolicy(new Path(file));
+    dfs.satisfyStoragePolicy(new Path(FILE));
     // Wait till namenode notified about the block location details
-    DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 35000,
+    DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 3, 35000,
         dfs);
   }
 
@@ -159,7 +188,7 @@ public class TestStoragePolicySatisfier {
     try {
       createCluster();
       // Change policy to ALL_SSD
-      dfs.setStoragePolicy(new Path(file), "ALL_SSD");
+      dfs.setStoragePolicy(new Path(FILE), "ALL_SSD");
 
       StorageType[][] newtypes =
           new StorageType[][]{{StorageType.SSD, StorageType.DISK},
@@ -168,14 +197,13 @@ public class TestStoragePolicySatisfier {
 
       // Making sure SDD based nodes added to cluster. Adding SSD based
       // datanodes.
-      startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
-          storagesPerDatanode, capacity, hdfsCluster);
-      dfs.satisfyStoragePolicy(new Path(file));
+      startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
+      dfs.satisfyStoragePolicy(new Path(FILE));
       hdfsCluster.triggerHeartbeats();
       // Wait till StorgePolicySatisfier Identified that block to move to SSD
       // areas
-      DFSTestUtil.waitExpectedStorageType(
-          file, StorageType.SSD, 3, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 3, 30000, 
dfs);
     } finally {
       shutdownCluster();
     }
@@ -187,23 +215,22 @@ public class TestStoragePolicySatisfier {
     try {
       createCluster();
       // Change policy to ONE_SSD
-      dfs.setStoragePolicy(new Path(file), ONE_SSD);
+      dfs.setStoragePolicy(new Path(FILE), ONE_SSD);
 
       StorageType[][] newtypes =
           new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
 
       // Making sure SDD based nodes added to cluster. Adding SSD based
       // datanodes.
-      startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
-          storagesPerDatanode, capacity, hdfsCluster);
-      dfs.satisfyStoragePolicy(new Path(file));
+      startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
+      dfs.satisfyStoragePolicy(new Path(FILE));
       hdfsCluster.triggerHeartbeats();
       // Wait till StorgePolicySatisfier Identified that block to move to SSD
       // areas
-      DFSTestUtil.waitExpectedStorageType(
-          file, StorageType.SSD, 1, 30000, dfs);
-      DFSTestUtil.waitExpectedStorageType(
-          file, StorageType.DISK, 2, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, 
dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
+          dfs);
     } finally {
       shutdownCluster();
     }
@@ -218,23 +245,22 @@ public class TestStoragePolicySatisfier {
     try {
       createCluster();
       // Change policy to ONE_SSD
-      dfs.setStoragePolicy(new Path(file), ONE_SSD);
+      dfs.setStoragePolicy(new Path(FILE), ONE_SSD);
 
       StorageType[][] newtypes =
           new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
 
       // Making sure SDD based nodes added to cluster. Adding SSD based
       // datanodes.
-      startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
-          storagesPerDatanode, capacity, hdfsCluster);
-      dfs.satisfyStoragePolicy(new Path(file));
+      startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
+      dfs.satisfyStoragePolicy(new Path(FILE));
       hdfsCluster.triggerHeartbeats();
 
       // Wait till the block is moved to SSD areas
-      DFSTestUtil.waitExpectedStorageType(
-          file, StorageType.SSD, 1, 30000, dfs);
-      DFSTestUtil.waitExpectedStorageType(
-          file, StorageType.DISK, 2, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, 
dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
+          dfs);
 
       waitForBlocksMovementAttemptReport(1, 30000);
     } finally {
@@ -251,7 +277,7 @@ public class TestStoragePolicySatisfier {
     try {
       createCluster();
       List<String> files = new ArrayList<>();
-      files.add(file);
+      files.add(FILE);
 
       // Creates 4 more files. Send all of them for satisfying the storage
       // policy together.
@@ -271,8 +297,8 @@ public class TestStoragePolicySatisfier {
 
       // Making sure SDD based nodes added to cluster. Adding SSD based
       // datanodes.
-      startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
-          storagesPerDatanode, capacity, hdfsCluster);
+      startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
       hdfsCluster.triggerHeartbeats();
 
       for (String fileName : files) {
@@ -300,21 +326,21 @@ public class TestStoragePolicySatisfier {
       HdfsAdmin hdfsAdmin =
           new HdfsAdmin(FileSystem.getDefaultUri(config), config);
       // Change policy to COLD
-      dfs.setStoragePolicy(new Path(file), COLD);
+      dfs.setStoragePolicy(new Path(FILE), COLD);
 
       StorageType[][] newtypes =
           new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
               {StorageType.DISK, StorageType.ARCHIVE},
               {StorageType.DISK, StorageType.ARCHIVE}};
-      startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
-          storagesPerDatanode, capacity, hdfsCluster);
+      startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
 
-      hdfsAdmin.satisfyStoragePolicy(new Path(file));
+      hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
 
       hdfsCluster.triggerHeartbeats();
       // Wait till namenode notified about the block location details
-      DFSTestUtil.waitExpectedStorageType(
-          file, StorageType.ARCHIVE, 3, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 3, 30000,
+          dfs);
     } finally {
       shutdownCluster();
     }
@@ -344,8 +370,8 @@ public class TestStoragePolicySatisfier {
 
       StorageType[][] newtypes =
           new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
-      startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
-          storagesPerDatanode, capacity, hdfsCluster);
+      startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
 
       hdfsAdmin.satisfyStoragePolicy(new Path(subDir));
 
@@ -384,11 +410,11 @@ public class TestStoragePolicySatisfier {
           new HdfsAdmin(FileSystem.getDefaultUri(config), config);
 
       try {
-        hdfsAdmin.satisfyStoragePolicy(new Path(file));
+        hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
         Assert.fail(String.format(
             "Should failed to satisfy storage policy "
                 + "for %s since %s is set to false.",
-            file, DFS_STORAGE_POLICY_ENABLED_KEY));
+            FILE, DFS_STORAGE_POLICY_ENABLED_KEY));
       } catch (IOException e) {
         Assert.assertTrue(e.getMessage().contains(String.format(
             "Failed to satisfy storage policy since %s is set to false.",
@@ -409,17 +435,17 @@ public class TestStoragePolicySatisfier {
       }
 
       try {
-        hdfsAdmin.satisfyStoragePolicy(new Path(file));
-        hdfsAdmin.satisfyStoragePolicy(new Path(file));
-        Assert.fail(String.format(
-            "Should failed to satisfy storage policy "
-            + "for %s ,since it has been "
-            + "added to satisfy movement queue.", file));
+        hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
+        hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
+        Assert.fail(String.format("Should failed to satisfy storage policy "
+            + "for %s ,since it has been " + "added to satisfy movement 
queue.",
+            FILE));
       } catch (IOException e) {
         GenericTestUtils.assertExceptionContains(
             String.format("Cannot request to call satisfy storage policy "
                 + "on path %s, as this file/dir was already called for "
-                + "satisfying storage policy.", file), e);
+                + "satisfying storage policy.", FILE),
+            e);
       }
     } finally {
       shutdownCluster();
@@ -446,23 +472,23 @@ public class TestStoragePolicySatisfier {
     try {
       createCluster();
       // Change policy to COLD
-      dfs.setStoragePolicy(new Path(file), COLD);
+      dfs.setStoragePolicy(new Path(FILE), COLD);
 
       StorageType[][] newtypes =
           new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}};
 
       // Adding ARCHIVE based datanodes.
-      startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
-          storagesPerDatanode, capacity, hdfsCluster);
+      startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
 
-      dfs.satisfyStoragePolicy(new Path(file));
+      dfs.satisfyStoragePolicy(new Path(FILE));
       hdfsCluster.triggerHeartbeats();
       // Wait till StorgePolicySatisfier identified that block to move to
       // ARCHIVE area.
-      DFSTestUtil.waitExpectedStorageType(
-          file, StorageType.ARCHIVE, 1, 30000, dfs);
-      DFSTestUtil.waitExpectedStorageType(
-          file, StorageType.DISK, 2, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 1, 30000,
+          dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
+          dfs);
 
       waitForBlocksMovementAttemptReport(1, 30000);
     } finally {
@@ -489,22 +515,22 @@ public class TestStoragePolicySatisfier {
     try {
       createCluster();
       // Change policy to COLD
-      dfs.setStoragePolicy(new Path(file), COLD);
+      dfs.setStoragePolicy(new Path(FILE), COLD);
 
       StorageType[][] newtypes =
           new StorageType[][]{{StorageType.DISK, StorageType.DISK}};
       // Adding DISK based datanodes
-      startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
-          storagesPerDatanode, capacity, hdfsCluster);
+      startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
 
-      dfs.satisfyStoragePolicy(new Path(file));
+      dfs.satisfyStoragePolicy(new Path(FILE));
       hdfsCluster.triggerHeartbeats();
 
       // No block movement will be scheduled as there is no target node
       // available with the required storage type.
       waitForAttemptedItems(1, 30000);
-      DFSTestUtil.waitExpectedStorageType(
-          file, StorageType.DISK, 3, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
+          dfs);
       // Since there is no target node the item will get timed out and then
       // re-attempted.
       waitForAttemptedItems(1, 30000);
@@ -628,8 +654,8 @@ public class TestStoragePolicySatisfier {
               {StorageType.ARCHIVE, StorageType.ARCHIVE},
               {StorageType.ARCHIVE, StorageType.ARCHIVE}};
       // Adding DISK based datanodes
-      startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
-          storagesPerDatanode, capacity, hdfsCluster);
+      startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
 
       dfs.satisfyStoragePolicy(new Path(file1));
       hdfsCluster.triggerHeartbeats();
@@ -682,21 +708,21 @@ public class TestStoragePolicySatisfier {
               {StorageType.DISK, StorageType.DISK},
               {StorageType.DISK, StorageType.ARCHIVE}};
       hdfsCluster = startCluster(config, allDiskTypes, numOfDns,
-          storagesPerDatanode, capacity);
+          STORAGES_PER_DATANODE, CAPACITY);
       dfs = hdfsCluster.getFileSystem();
-      writeContent(file, (short) 5);
+      writeContent(FILE, (short) 5);
 
       // Change policy to COLD
-      dfs.setStoragePolicy(new Path(file), COLD);
+      dfs.setStoragePolicy(new Path(FILE), COLD);
 
-      dfs.satisfyStoragePolicy(new Path(file));
+      dfs.satisfyStoragePolicy(new Path(FILE));
       hdfsCluster.triggerHeartbeats();
       // Wait till StorgePolicySatisfier identified that block to move to
       // ARCHIVE area.
-      DFSTestUtil.waitExpectedStorageType(
-          file, StorageType.ARCHIVE, 2, 30000, dfs);
-      DFSTestUtil.waitExpectedStorageType(
-          file, StorageType.DISK, 3, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 2, 30000,
+          dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
+          dfs);
 
       waitForBlocksMovementAttemptReport(1, 30000);
     } finally {
@@ -720,20 +746,19 @@ public class TestStoragePolicySatisfier {
     config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
         true);
     try {
-      hdfsCluster = startCluster(config, diskTypes, numOfDatanodes,
-          storagesPerDatanode, capacity);
+      hdfsCluster = startCluster(config, diskTypes, NUM_OF_DATANODES,
+          STORAGES_PER_DATANODE, CAPACITY);
       dfs = hdfsCluster.getFileSystem();
-      writeContent(file);
+      writeContent(FILE);
 
       // Change policy to ONE_SSD
-      dfs.setStoragePolicy(new Path(file), ONE_SSD);
+      dfs.setStoragePolicy(new Path(FILE), ONE_SSD);
 
-      dfs.satisfyStoragePolicy(new Path(file));
+      dfs.satisfyStoragePolicy(new Path(FILE));
       hdfsCluster.triggerHeartbeats();
-      DFSTestUtil.waitExpectedStorageType(
-          file, StorageType.SSD, 1, 30000, dfs);
-      DFSTestUtil.waitExpectedStorageType(
-          file, StorageType.DISK, 2, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, 
dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
+          dfs);
 
     } finally {
       shutdownCluster();
@@ -760,19 +785,19 @@ public class TestStoragePolicySatisfier {
         true);
     try {
       hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
-          storagesPerDatanode, capacity);
+          STORAGES_PER_DATANODE, CAPACITY);
       dfs = hdfsCluster.getFileSystem();
-      writeContent(file);
+      writeContent(FILE);
 
       // Change policy to WARM
-      dfs.setStoragePolicy(new Path(file), "WARM");
-      dfs.satisfyStoragePolicy(new Path(file));
+      dfs.setStoragePolicy(new Path(FILE), "WARM");
+      dfs.satisfyStoragePolicy(new Path(FILE));
       hdfsCluster.triggerHeartbeats();
 
-      DFSTestUtil.waitExpectedStorageType(
-          file, StorageType.DISK, 1, 30000, dfs);
-      DFSTestUtil.waitExpectedStorageType(
-          file, StorageType.ARCHIVE, 2, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 1, 30000,
+          dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 2, 30000,
+          dfs);
     } finally {
       shutdownCluster();
     }
@@ -794,31 +819,31 @@ public class TestStoragePolicySatisfier {
       config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
           true);
       hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
-          storagesPerDatanode, capacity);
+          STORAGES_PER_DATANODE, CAPACITY);
       dfs = hdfsCluster.getFileSystem();
       // 1. Write two replica on disk
-      DFSTestUtil.createFile(dfs, new Path(file), DEFAULT_BLOCK_SIZE,
+      DFSTestUtil.createFile(dfs, new Path(FILE), DEFAULT_BLOCK_SIZE,
           (short) 2, 0);
       // 2. Change policy to COLD, so third replica will be written to ARCHIVE.
-      dfs.setStoragePolicy(new Path(file), "COLD");
+      dfs.setStoragePolicy(new Path(FILE), "COLD");
 
       // 3.Change replication factor to 3.
-      dfs.setReplication(new Path(file), (short) 3);
+      dfs.setReplication(new Path(FILE), (short) 3);
 
-      DFSTestUtil
-          .waitExpectedStorageType(file, StorageType.DISK, 2, 30000, dfs);
-      DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000,
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
+          dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 1, 30000,
           dfs);
 
       // 4. Change policy to HOT, so we can move the all block to DISK.
-      dfs.setStoragePolicy(new Path(file), "HOT");
+      dfs.setStoragePolicy(new Path(FILE), "HOT");
 
       // 4. Satisfy the policy.
-      dfs.satisfyStoragePolicy(new Path(file));
+      dfs.satisfyStoragePolicy(new Path(FILE));
 
       // 5. Block should move successfully .
-      DFSTestUtil
-          .waitExpectedStorageType(file, StorageType.DISK, 3, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
+          dfs);
     } finally {
       shutdownCluster();
     }
@@ -840,13 +865,13 @@ public class TestStoragePolicySatisfier {
         true);
     long dnCapacity = 1024 * DEFAULT_BLOCK_SIZE + (2 * DEFAULT_BLOCK_SIZE - 1);
     try {
-      hdfsCluster = startCluster(config, diskTypes, numOfDatanodes,
-          storagesPerDatanode, dnCapacity);
+      hdfsCluster = startCluster(config, diskTypes, NUM_OF_DATANODES,
+          STORAGES_PER_DATANODE, dnCapacity);
       dfs = hdfsCluster.getFileSystem();
-      writeContent(file);
+      writeContent(FILE);
 
       // Change policy to ONE_SSD
-      dfs.setStoragePolicy(new Path(file), ONE_SSD);
+      dfs.setStoragePolicy(new Path(FILE), ONE_SSD);
       Path filePath = new Path("/testChooseInSameDatanode");
       final FSDataOutputStream out =
           dfs.create(filePath, false, 100, (short) 1, 2 * DEFAULT_BLOCK_SIZE);
@@ -869,7 +894,7 @@ public class TestStoragePolicySatisfier {
       for (DataNode dataNode : dataNodes) {
         DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true);
       }
-      dfs.satisfyStoragePolicy(new Path(file));
+      dfs.satisfyStoragePolicy(new Path(FILE));
 
       // Wait for items to be processed
       waitForAttemptedItems(1, 30000);
@@ -887,9 +912,9 @@ public class TestStoragePolicySatisfier {
       }
       hdfsCluster.triggerHeartbeats();
 
-      DFSTestUtil.waitExpectedStorageType(file, StorageType.DISK, 3, 30000,
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
           dfs);
-      DFSTestUtil.waitExpectedStorageType(file, StorageType.SSD, 0, 30000, 
dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 0, 30000, 
dfs);
     } finally {
       shutdownCluster();
     }
@@ -928,7 +953,7 @@ public class TestStoragePolicySatisfier {
         true);
     try {
       hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
-          storagesPerDatanode, capacity);
+          STORAGES_PER_DATANODE, CAPACITY);
       dfs = hdfsCluster.getFileSystem();
       dfs.enableErasureCodingPolicy(
           StripedFileTestUtil.getDefaultECPolicy().getName());
@@ -1029,8 +1054,7 @@ public class TestStoragePolicySatisfier {
           {StorageType.ARCHIVE, StorageType.DISK},
           {StorageType.ARCHIVE, StorageType.DISK},
           {StorageType.ARCHIVE, StorageType.DISK}};
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
-          .storageTypes(newtypes).build();
+      cluster = startCluster(conf, newtypes, 3, 2, CAPACITY);
       cluster.waitActive();
       DistributedFileSystem fs = cluster.getFileSystem();
       Path filePath = new Path("/zeroSizeFile");
@@ -1211,7 +1235,7 @@ public class TestStoragePolicySatisfier {
       config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
           true);
       hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
-          storagesPerDatanode, capacity);
+          STORAGES_PER_DATANODE, CAPACITY);
       dfs = hdfsCluster.getFileSystem();
       createDirectoryTree(dfs);
 
@@ -1245,7 +1269,7 @@ public class TestStoragePolicySatisfier {
       config.setInt(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
           5);
       hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
-          storagesPerDatanode, capacity);
+          STORAGES_PER_DATANODE, CAPACITY);
       dfs = hdfsCluster.getFileSystem();
       createDirectoryTree(dfs);
       List<String> files = getDFSListOfTree();
@@ -1284,7 +1308,7 @@ public class TestStoragePolicySatisfier {
     config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
     config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
     hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
-        storagesPerDatanode, capacity);
+        STORAGES_PER_DATANODE, CAPACITY);
     dfs = hdfsCluster.getFileSystem();
     createDirectoryTree(dfs);
 
@@ -1312,8 +1336,7 @@ public class TestStoragePolicySatisfier {
       }
     };
 
-    FileIdCollector fileIDCollector =
-        new IntraSPSNameNodeFileIdCollector(fsDir, sps);
+    FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt);
     sps.init(ctxt, fileIDCollector, null);
     sps.getStorageMovementQueue().activate();
 
@@ -1323,31 +1346,20 @@ public class TestStoragePolicySatisfier {
 
     //Wait for thread to reach U.
     Thread.sleep(1000);
-
     dfs.delete(new Path("/root/D/L"), true);
 
-    // Remove 10 element and make queue free, So other traversing will start.
-    for (int i = 0; i < 10; i++) {
-      String path = expectedTraverseOrder.remove(0);
-      long trackId = sps.getStorageMovementQueue().get().getFileId();
-      INode inode = fsDir.getInode(trackId);
-      assertTrue("Failed to traverse tree, expected " + path + " but got "
-          + inode.getFullPathName(), path.equals(inode.getFullPathName()));
-    }
-    //Wait to finish tree traverse
-    Thread.sleep(5000);
 
-    // Check other element traversed in order and R,S should not be added in
-    // queue which we already removed from expected list
-    for (String path : expectedTraverseOrder) {
-      long trackId = sps.getStorageMovementQueue().get().getFileId();
-      INode inode = fsDir.getInode(trackId);
-      assertTrue("Failed to traverse tree, expected " + path + " but got "
-          + inode.getFullPathName(), path.equals(inode.getFullPathName()));
-    }
+    assertTraversal(expectedTraverseOrder, fsDir, sps);
     dfs.delete(new Path("/root"), true);
   }
 
+  public FileIdCollector createFileIdCollector(StoragePolicySatisfier sps,
+      Context ctxt) {
+    FileIdCollector fileIDCollector = new IntraSPSNameNodeFileIdCollector(
+        hdfsCluster.getNamesystem().getFSDirectory(), sps);
+    return fileIDCollector;
+  }
+
   /**
    *  Test traverse when root parent got deleted.
    *  1. Delete L when traversing Q
@@ -1362,7 +1374,7 @@ public class TestStoragePolicySatisfier {
     config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
     config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
     hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
-        storagesPerDatanode, capacity);
+        STORAGES_PER_DATANODE, CAPACITY);
     dfs = hdfsCluster.getFileSystem();
     createDirectoryTree(dfs);
 
@@ -1378,7 +1390,6 @@ public class TestStoragePolicySatisfier {
 
     // Queue limit can control the traverse logic to wait for some free
     // entry in queue. After 10 files, traverse control will be on U.
-    // StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
     StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
     Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(),
         hdfsCluster.getNamesystem().getBlockManager(), sps) {
@@ -1392,9 +1403,7 @@ public class TestStoragePolicySatisfier {
         return true;
       }
     };
-
-    FileIdCollector fileIDCollector =
-        new IntraSPSNameNodeFileIdCollector(fsDir, sps);
+    FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt);
     sps.init(ctxt, fileIDCollector, null);
     sps.getStorageMovementQueue().activate();
 
@@ -1407,6 +1416,13 @@ public class TestStoragePolicySatisfier {
 
     dfs.delete(new Path("/root/D/L"), true);
 
+    assertTraversal(expectedTraverseOrder, fsDir, sps);
+    dfs.delete(new Path("/root"), true);
+  }
+
+  private void assertTraversal(List<String> expectedTraverseOrder,
+      FSDirectory fsDir, StoragePolicySatisfier sps)
+          throws InterruptedException {
     // Remove 10 element and make queue free, So other traversing will start.
     for (int i = 0; i < 10; i++) {
       String path = expectedTraverseOrder.remove(0);
@@ -1426,7 +1442,6 @@ public class TestStoragePolicySatisfier {
       assertTrue("Failed to traverse tree, expected " + path + " but got "
           + inode.getFullPathName(), path.equals(inode.getFullPathName()));
     }
-    dfs.delete(new Path("/root"), true);
   }
 
   /**
@@ -1473,8 +1488,8 @@ public class TestStoragePolicySatisfier {
       StorageType[][] newtypes =
           new StorageType[][]{{StorageType.DISK, StorageType.SSD},
               {StorageType.DISK, StorageType.SSD}};
-      startAdditionalDNs(config, 2, numOfDatanodes, newtypes,
-          storagesPerDatanode, capacity, hdfsCluster);
+      startAdditionalDNs(config, 2, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
 
       // increase replication factor to 4 for the first 10 files and thus
       // initiate replica tasks
@@ -1772,7 +1787,7 @@ public class TestStoragePolicySatisfier {
     }, 100, timeout);
   }
 
-  private void writeContent(final String fileName) throws IOException {
+  public void writeContent(final String fileName) throws IOException {
     writeContent(fileName, (short) 3);
   }
 
@@ -1805,7 +1820,7 @@ public class TestStoragePolicySatisfier {
     cluster.triggerHeartbeats();
   }
 
-  private MiniDFSCluster startCluster(final Configuration conf,
+  public MiniDFSCluster startCluster(final Configuration conf,
       StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
       long nodeCapacity) throws IOException {
     long[][] capacities = new long[numberOfDatanodes][storagesPerDn];

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159b39c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
new file mode 100644
index 0000000..3ced34e
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.sps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.sps.Context;
+import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector;
+import 
org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler;
+import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
+import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
+import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
+import org.junit.Ignore;
+
+/**
+ * Tests the external sps service plugins.
+ */
+public class TestExternalStoragePolicySatisfier
+    extends TestStoragePolicySatisfier {
+  private StorageType[][] allDiskTypes =
+      new StorageType[][]{{StorageType.DISK, StorageType.DISK},
+          {StorageType.DISK, StorageType.DISK},
+          {StorageType.DISK, StorageType.DISK}};
+
+  @Override
+  public void createCluster() throws IOException {
+    getConf().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    
getConf().setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+        true);
+    setCluster(startCluster(getConf(), allDiskTypes, NUM_OF_DATANODES,
+        STORAGES_PER_DATANODE, CAPACITY));
+    getFS();
+    writeContent(FILE);
+  }
+
+  @Override
+  public MiniDFSCluster startCluster(final Configuration conf,
+      StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
+      long nodeCapacity) throws IOException {
+    long[][] capacities = new long[numberOfDatanodes][storagesPerDn];
+    for (int i = 0; i < numberOfDatanodes; i++) {
+      for (int j = 0; j < storagesPerDn; j++) {
+        capacities[i][j] = nodeCapacity;
+      }
+    }
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn)
+        .storageTypes(storageTypes).storageCapacities(capacities).build();
+    cluster.waitActive();
+    if (conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+        false)) {
+      SPSService spsService = cluster.getNameNode().getNamesystem()
+          .getBlockManager().getSPSService();
+      spsService.stopGracefully();
+
+      IntraSPSNameNodeContext context = new IntraSPSNameNodeContext(
+          cluster.getNameNode().getNamesystem(),
+          cluster.getNameNode().getNamesystem().getBlockManager(), cluster
+              
.getNameNode().getNamesystem().getBlockManager().getSPSService());
+
+      spsService.init(context,
+          new ExternalSPSFileIDCollector(context,
+              cluster.getNameNode().getNamesystem().getBlockManager()
+                  .getSPSService(),
+              5),
+          new IntraSPSNameNodeBlockMoveTaskHandler(
+              cluster.getNameNode().getNamesystem().getBlockManager(),
+              cluster.getNameNode().getNamesystem()));
+      spsService.start(true);
+    }
+    return cluster;
+  }
+
+  @Override
+  public FileIdCollector createFileIdCollector(StoragePolicySatisfier sps,
+      Context ctxt) {
+    return new ExternalSPSFileIDCollector(ctxt, sps, 5);
+  }
+
+  /**
+   * This test need not run as external scan is not a batch based scanning 
right
+   * now.
+   */
+  @Ignore("ExternalFileIdCollector is not batch based right now."
+      + " So, ignoring it.")
+  public void testBatchProcessingForSPSDirectory() throws Exception {
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to