HDFS-13110: [SPS]: Reduce the number of APIs in NamenodeProtocol used by 
external satisfier. Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8467ec24
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8467ec24
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8467ec24

Branch: refs/heads/HDFS-10285
Commit: 8467ec24fb74f30371d5a13e893fc56309ee9372
Parents: 4402f3f
Author: Rakesh Radhakrishnan <rake...@apache.org>
Authored: Fri Feb 16 17:01:38 2018 +0530
Committer: Uma Maheswara Rao Gangumalla <umamah...@apache.org>
Committed: Sun Aug 12 03:06:05 2018 -0700

----------------------------------------------------------------------
 .../NamenodeProtocolServerSideTranslatorPB.java |  46 +----
 .../NamenodeProtocolTranslatorPB.java           |  42 +----
 .../hdfs/server/namenode/FSTreeTraverser.java   |   2 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java |  32 +---
 .../server/namenode/ReencryptionHandler.java    |   2 +-
 .../sps/BlockStorageMovementAttemptedItems.java |  42 +++--
 .../sps/BlockStorageMovementNeeded.java         | 119 +++++++------
 .../hdfs/server/namenode/sps/Context.java       |  55 +++---
 .../hdfs/server/namenode/sps/FileCollector.java |  48 +++++
 .../server/namenode/sps/FileIdCollector.java    |  43 -----
 .../namenode/sps/IntraSPSNameNodeContext.java   |  39 ++---
 .../sps/IntraSPSNameNodeFileIdCollector.java    |  23 +--
 .../hdfs/server/namenode/sps/ItemInfo.java      |  39 +++--
 .../hdfs/server/namenode/sps/SPSService.java    |  32 ++--
 .../namenode/sps/StoragePolicySatisfier.java    | 129 +++++++++-----
 .../sps/StoragePolicySatisfyManager.java        |   6 +-
 .../hdfs/server/protocol/NamenodeProtocol.java  |  24 +--
 .../sps/ExternalSPSBlockMoveTaskHandler.java    |   4 +-
 .../hdfs/server/sps/ExternalSPSContext.java     |  60 +++----
 .../server/sps/ExternalSPSFileIDCollector.java  | 174 -------------------
 .../sps/ExternalSPSFilePathCollector.java       | 172 ++++++++++++++++++
 .../sps/ExternalStoragePolicySatisfier.java     |   7 +-
 .../src/main/proto/NamenodeProtocol.proto       |  27 +--
 .../TestBlockStorageMovementAttemptedItems.java |  27 ++-
 .../sps/TestStoragePolicySatisfier.java         |  52 +++---
 ...stStoragePolicySatisfierWithStripedFile.java |  15 +-
 .../sps/TestExternalStoragePolicySatisfier.java | 148 +++++++++++-----
 27 files changed, 701 insertions(+), 708 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
index 25eafdf..ed176cc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
@@ -35,16 +35,12 @@ import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksReq
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathRequestProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdRequestProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdResponseProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksRequestProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto;
@@ -267,15 +263,15 @@ public class NamenodeProtocolServerSideTranslatorPB 
implements
   }
 
   @Override
-  public GetNextSPSPathIdResponseProto getNextSPSPathId(
-      RpcController controller, GetNextSPSPathIdRequestProto request)
+  public GetNextSPSPathResponseProto getNextSPSPath(
+      RpcController controller, GetNextSPSPathRequestProto request)
           throws ServiceException {
     try {
-      Long nextSPSPathId = impl.getNextSPSPathId();
-      if (nextSPSPathId == null) {
-        return GetNextSPSPathIdResponseProto.newBuilder().build();
+      String nextSPSPath = impl.getNextSPSPath();
+      if (nextSPSPath == null) {
+        return GetNextSPSPathResponseProto.newBuilder().build();
       }
-      return 
GetNextSPSPathIdResponseProto.newBuilder().setFileId(nextSPSPathId)
+      return GetNextSPSPathResponseProto.newBuilder().setSpsPath(nextSPSPath)
           .build();
     } catch (IOException e) {
       throw new ServiceException(e);
@@ -283,17 +279,6 @@ public class NamenodeProtocolServerSideTranslatorPB 
implements
   }
 
   @Override
-  public GetFilePathResponseProto getFilePath(RpcController controller,
-      GetFilePathRequestProto request) throws ServiceException {
-    try {
-      return GetFilePathResponseProto.newBuilder()
-          .setSrcPath(impl.getFilePath(request.getFileId())).build();
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
-  }
-
-  @Override
   public CheckDNSpaceResponseProto checkDNSpaceForScheduling(
       RpcController controller, CheckDNSpaceRequestProto request)
           throws ServiceException {
@@ -309,19 +294,4 @@ public class NamenodeProtocolServerSideTranslatorPB 
implements
       throw new ServiceException(e);
     }
   }
-
-  @Override
-  public HasLowRedundancyBlocksResponseProto hasLowRedundancyBlocks(
-      RpcController controller, HasLowRedundancyBlocksRequestProto request)
-          throws ServiceException {
-    try {
-      return HasLowRedundancyBlocksResponseProto.newBuilder()
-          .setHasLowRedundancyBlocks(
-              impl.hasLowRedundancyBlocks(request.getInodeId()))
-          .build();
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
index 8bff499..d2e97a2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
@@ -34,12 +34,10 @@ import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeys
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdRequestProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdResponseProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto;
@@ -271,24 +269,13 @@ public class NamenodeProtocolTranslatorPB implements 
NamenodeProtocol,
   }
 
   @Override
-  public Long getNextSPSPathId() throws IOException {
-    GetNextSPSPathIdRequestProto req =
-        GetNextSPSPathIdRequestProto.newBuilder().build();
+  public String getNextSPSPath() throws IOException {
+    GetNextSPSPathRequestProto req =
+        GetNextSPSPathRequestProto.newBuilder().build();
     try {
-      GetNextSPSPathIdResponseProto nextSPSPathId =
-          rpcProxy.getNextSPSPathId(NULL_CONTROLLER, req);
-      return nextSPSPathId.hasFileId() ? nextSPSPathId.getFileId() : null;
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-  }
-
-  @Override
-  public String getFilePath(Long inodeId) throws IOException {
-    GetFilePathRequestProto req =
-        GetFilePathRequestProto.newBuilder().setFileId(inodeId).build();
-    try {
-      return rpcProxy.getFilePath(NULL_CONTROLLER, req).getSrcPath();
+      GetNextSPSPathResponseProto nextSPSPath =
+          rpcProxy.getNextSPSPath(NULL_CONTROLLER, req);
+      return nextSPSPath.hasSpsPath() ? nextSPSPath.getSpsPath() : null;
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
@@ -308,17 +295,4 @@ public class NamenodeProtocolTranslatorPB implements 
NamenodeProtocol,
       throw ProtobufHelper.getRemoteException(e);
     }
   }
-
-  @Override
-  public boolean hasLowRedundancyBlocks(long inodeId) throws IOException {
-    HasLowRedundancyBlocksRequestProto req = HasLowRedundancyBlocksRequestProto
-        .newBuilder().setInodeId(inodeId).build();
-    try {
-      return rpcProxy.hasLowRedundancyBlocks(NULL_CONTROLLER, req)
-          .getHasLowRedundancyBlocks();
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
index a7d633f..2acbda4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
@@ -310,7 +310,7 @@ public abstract class FSTreeTraverser {
    * @throws IOException
    * @throws InterruptedException
    */
-  protected abstract void submitCurrentBatch(long startId)
+  protected abstract void submitCurrentBatch(Long startId)
       throws IOException, InterruptedException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 97f38c7..6fe38d6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -2561,20 +2561,9 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
   }
 
   @Override
-  public String getFilePath(Long inodeId) throws IOException {
+  public String getNextSPSPath() throws IOException {
     checkNNStartup();
-    String operationName = "getFilePath";
-    namesystem.checkSuperuserPrivilege(operationName);
-    if (nn.isStandbyState()) {
-      throw new StandbyException("Not supported by Standby Namenode.");
-    }
-    return namesystem.getFilePath(inodeId);
-  }
-
-  @Override
-  public Long getNextSPSPathId() throws IOException {
-    checkNNStartup();
-    String operationName = "getNextSPSPathId";
+    String operationName = "getNextSPSPath";
     namesystem.checkSuperuserPrivilege(operationName);
     if (nn.isStandbyState()) {
       throw new StandbyException("Not supported by Standby Namenode.");
@@ -2588,7 +2577,11 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
           + " inside namenode, so external SPS is not allowed to fetch"
           + " the path Ids");
     }
-    return namesystem.getBlockManager().getSPSManager().getNextPathId();
+    Long pathId = namesystem.getBlockManager().getSPSManager().getNextPathId();
+    if (pathId == null) {
+      return null;
+    }
+    return namesystem.getFilePath(pathId);
   }
 
   @Override
@@ -2603,15 +2596,4 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
     return namesystem.getBlockManager().getDatanodeManager()
         .verifyTargetDatanodeHasSpaceForScheduling(dn, type, estimatedSize);
   }
-
-  @Override
-  public boolean hasLowRedundancyBlocks(long inodeId) throws IOException {
-    checkNNStartup();
-    String operationName = "hasLowRedundancyBlocks";
-    namesystem.checkSuperuserPrivilege(operationName);
-    if (nn.isStandbyState()) {
-      throw new StandbyException("Not supported by Standby Namenode.");
-    }
-    return namesystem.getBlockManager().hasLowRedundancyBlocks(inodeId);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
index feacd74..c8c8d68 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
@@ -702,7 +702,7 @@ public class ReencryptionHandler implements Runnable {
      * @throws InterruptedException
      */
     @Override
-    protected void submitCurrentBatch(final long zoneId) throws IOException,
+    protected void submitCurrentBatch(final Long zoneId) throws IOException,
         InterruptedException {
       if (currentBatch.isEmpty()) {
         return;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
index ea7a093..d2f0bb2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
@@ -45,8 +45,13 @@ import com.google.common.annotations.VisibleForTesting;
  * entries from tracking. If there is no DN reports about movement attempt
  * finished for a longer time period, then such items will retries 
automatically
  * after timeout. The default timeout would be 5 minutes.
+ *
+ * @param <T>
+ *          is identifier of inode or full path name of inode. Internal sps 
will
+ *          use the file inodeId for the block movement. External sps will use
+ *          file string path representation for the block movement.
  */
-public class BlockStorageMovementAttemptedItems{
+public class BlockStorageMovementAttemptedItems<T> {
   private static final Logger LOG =
       LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
 
@@ -54,7 +59,7 @@ public class BlockStorageMovementAttemptedItems{
    * A map holds the items which are already taken for blocks movements
    * processing and sent to DNs.
    */
-  private final List<AttemptedItemInfo> storageMovementAttemptedItems;
+  private final List<AttemptedItemInfo<T>> storageMovementAttemptedItems;
   private final List<Block> movementFinishedBlocks;
   private volatile boolean monitorRunning = true;
   private Daemon timerThread = null;
@@ -70,11 +75,11 @@ public class BlockStorageMovementAttemptedItems{
   // a request is timed out.
   //
   private long minCheckTimeout = 1 * 60 * 1000; // minimum value
-  private BlockStorageMovementNeeded blockStorageMovementNeeded;
-  private final SPSService service;
+  private BlockStorageMovementNeeded<T> blockStorageMovementNeeded;
+  private final SPSService<T> service;
 
-  public BlockStorageMovementAttemptedItems(SPSService service,
-      BlockStorageMovementNeeded unsatisfiedStorageMovementFiles,
+  public BlockStorageMovementAttemptedItems(SPSService<T> service,
+      BlockStorageMovementNeeded<T> unsatisfiedStorageMovementFiles,
       BlockMovementListener blockMovementListener) {
     this.service = service;
     long recheckTimeout = this.service.getConf().getLong(
@@ -100,7 +105,7 @@ public class BlockStorageMovementAttemptedItems{
    * @param itemInfo
    *          - tracking info
    */
-  public void add(AttemptedItemInfo itemInfo) {
+  public void add(AttemptedItemInfo<T> itemInfo) {
     synchronized (storageMovementAttemptedItems) {
       storageMovementAttemptedItems.add(itemInfo);
     }
@@ -190,25 +195,24 @@ public class BlockStorageMovementAttemptedItems{
   @VisibleForTesting
   void blocksStorageMovementUnReportedItemsCheck() {
     synchronized (storageMovementAttemptedItems) {
-      Iterator<AttemptedItemInfo> iter = storageMovementAttemptedItems
+      Iterator<AttemptedItemInfo<T>> iter = storageMovementAttemptedItems
           .iterator();
       long now = monotonicNow();
       while (iter.hasNext()) {
-        AttemptedItemInfo itemInfo = iter.next();
+        AttemptedItemInfo<T> itemInfo = iter.next();
         if (now > itemInfo.getLastAttemptedOrReportedTime()
             + selfRetryTimeout) {
-          Long blockCollectionID = itemInfo.getFileId();
+          T file = itemInfo.getFile();
           synchronized (movementFinishedBlocks) {
-            ItemInfo candidate = new ItemInfo(itemInfo.getStartId(),
-                blockCollectionID, itemInfo.getRetryCount() + 1);
+            ItemInfo<T> candidate = new ItemInfo<T>(itemInfo.getStartPath(),
+                file, itemInfo.getRetryCount() + 1);
             blockStorageMovementNeeded.add(candidate);
             iter.remove();
             LOG.info("TrackID: {} becomes timed out and moved to needed "
-                + "retries queue for next iteration.", blockCollectionID);
+                + "retries queue for next iteration.", file);
           }
         }
       }
-
     }
   }
 
@@ -219,17 +223,17 @@ public class BlockStorageMovementAttemptedItems{
       while (finishedBlksIter.hasNext()) {
         Block blk = finishedBlksIter.next();
         synchronized (storageMovementAttemptedItems) {
-          Iterator<AttemptedItemInfo> iterator = storageMovementAttemptedItems
-              .iterator();
+          Iterator<AttemptedItemInfo<T>> iterator =
+              storageMovementAttemptedItems.iterator();
           while (iterator.hasNext()) {
-            AttemptedItemInfo attemptedItemInfo = iterator.next();
+            AttemptedItemInfo<T> attemptedItemInfo = iterator.next();
             attemptedItemInfo.getBlocks().remove(blk);
             if (attemptedItemInfo.getBlocks().isEmpty()) {
               // TODO: try add this at front of the Queue, so that this element
               // gets the chance first and can be cleaned from queue quickly as
               // all movements already done.
-              blockStorageMovementNeeded.add(new ItemInfo(attemptedItemInfo
-                  .getStartId(), attemptedItemInfo.getFileId(),
+              blockStorageMovementNeeded.add(new ItemInfo<T>(attemptedItemInfo
+                  .getStartPath(), attemptedItemInfo.getFile(),
                   attemptedItemInfo.getRetryCount() + 1));
               iterator.remove();
             }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
index c683a63..a194876 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
@@ -43,31 +43,36 @@ import com.google.common.annotations.VisibleForTesting;
  * schedule the block collection IDs for movement. It track the info of
  * scheduled items and remove the SPS xAttr from the file/Directory once
  * movement is success.
+ *
+ * @param <T>
+ *          is identifier of inode or full path name of inode. Internal sps 
will
+ *          use the file inodeId for the block movement. External sps will use
+ *          file string path representation for the block movement.
  */
 @InterfaceAudience.Private
-public class BlockStorageMovementNeeded {
+public class BlockStorageMovementNeeded<T> {
 
   public static final Logger LOG =
       LoggerFactory.getLogger(BlockStorageMovementNeeded.class);
 
-  private final Queue<ItemInfo> storageMovementNeeded =
-      new LinkedList<ItemInfo>();
+  private final Queue<ItemInfo<T>> storageMovementNeeded =
+      new LinkedList<ItemInfo<T>>();
 
   /**
-   * Map of startId and number of child's. Number of child's indicate the
+   * Map of startPath and number of child's. Number of child's indicate the
    * number of files pending to satisfy the policy.
    */
-  private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory =
-      new HashMap<Long, DirPendingWorkInfo>();
+  private final Map<T, DirPendingWorkInfo> pendingWorkForDirectory =
+      new HashMap<>();
 
-  private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus =
+  private final Map<T, StoragePolicySatisfyPathStatusInfo> spsStatus =
       new ConcurrentHashMap<>();
 
-  private final Context ctxt;
+  private final Context<T> ctxt;
 
   private Daemon pathIdCollector;
 
-  private FileIdCollector fileIDCollector;
+  private FileCollector<T> fileCollector;
 
   private SPSPathIdProcessor pathIDProcessor;
 
@@ -75,10 +80,10 @@ public class BlockStorageMovementNeeded {
   // NOT_AVAILABLE.
   private static long statusClearanceElapsedTimeMs = 300000;
 
-  public BlockStorageMovementNeeded(Context context,
-      FileIdCollector fileIDCollector) {
+  public BlockStorageMovementNeeded(Context<T> context,
+      FileCollector<T> fileCollector) {
     this.ctxt = context;
-    this.fileIDCollector = fileIDCollector;
+    this.fileCollector = fileCollector;
     pathIDProcessor = new SPSPathIdProcessor();
   }
 
@@ -89,8 +94,8 @@ public class BlockStorageMovementNeeded {
    * @param trackInfo
    *          - track info for satisfy the policy
    */
-  public synchronized void add(ItemInfo trackInfo) {
-    spsStatus.put(trackInfo.getStartId(),
+  public synchronized void add(ItemInfo<T> trackInfo) {
+    spsStatus.put(trackInfo.getFile(),
         new StoragePolicySatisfyPathStatusInfo(
             StoragePolicySatisfyPathStatus.IN_PROGRESS));
     storageMovementNeeded.add(trackInfo);
@@ -100,8 +105,8 @@ public class BlockStorageMovementNeeded {
    * Add the itemInfo list to tracking list for which storage movement expected
    * if necessary.
    *
-   * @param startId
-   *          - start id
+   * @param startPath
+   *          - start path
    * @param itemInfoList
    *          - List of child in the directory
    * @param scanCompleted
@@ -109,10 +114,10 @@ public class BlockStorageMovementNeeded {
    *          scan.
    */
   @VisibleForTesting
-  public synchronized void addAll(long startId, List<ItemInfo> itemInfoList,
+  public synchronized void addAll(T startPath, List<ItemInfo<T>> itemInfoList,
       boolean scanCompleted) {
     storageMovementNeeded.addAll(itemInfoList);
-    updatePendingDirScanStats(startId, itemInfoList.size(), scanCompleted);
+    updatePendingDirScanStats(startPath, itemInfoList.size(), scanCompleted);
   }
 
   /**
@@ -126,22 +131,22 @@ public class BlockStorageMovementNeeded {
    *          elements to scan.
    */
   @VisibleForTesting
-  public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) {
+  public synchronized void add(ItemInfo<T> itemInfo, boolean scanCompleted) {
     storageMovementNeeded.add(itemInfo);
     // This represents sps start id is file, so no need to update pending dir
     // stats.
-    if (itemInfo.getStartId() == itemInfo.getFileId()) {
+    if (itemInfo.getStartPath() == itemInfo.getFile()) {
       return;
     }
-    updatePendingDirScanStats(itemInfo.getStartId(), 1, scanCompleted);
+    updatePendingDirScanStats(itemInfo.getFile(), 1, scanCompleted);
   }
 
-  private void updatePendingDirScanStats(long startId, int numScannedFiles,
+  private void updatePendingDirScanStats(T startPath, int numScannedFiles,
       boolean scanCompleted) {
-    DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
+    DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startPath);
     if (pendingWork == null) {
       pendingWork = new DirPendingWorkInfo();
-      pendingWorkForDirectory.put(startId, pendingWork);
+      pendingWorkForDirectory.put(startPath, pendingWork);
     }
     pendingWork.addPendingWorkCount(numScannedFiles);
     if (scanCompleted) {
@@ -150,12 +155,12 @@ public class BlockStorageMovementNeeded {
   }
 
   /**
-   * Gets the block collection id for which storage movements check necessary
+   * Gets the satisfier files for which block storage movements check necessary
    * and make the movement if required.
    *
-   * @return block collection ID
+   * @return satisfier files
    */
-  public synchronized ItemInfo get() {
+  public synchronized ItemInfo<T> get() {
     return storageMovementNeeded.poll();
   }
 
@@ -176,12 +181,12 @@ public class BlockStorageMovementNeeded {
    * Decrease the pending child count for directory once one file blocks moved
    * successfully. Remove the SPS xAttr if pending child count is zero.
    */
-  public synchronized void removeItemTrackInfo(ItemInfo trackInfo,
+  public synchronized void removeItemTrackInfo(ItemInfo<T> trackInfo,
       boolean isSuccess) throws IOException {
     if (trackInfo.isDir()) {
       // If track is part of some start inode then reduce the pending
       // directory work count.
-      long startId = trackInfo.getStartId();
+      T startId = trackInfo.getStartPath();
       if (!ctxt.isFileExist(startId)) {
         // directory deleted just remove it.
         this.pendingWorkForDirectory.remove(startId);
@@ -202,17 +207,17 @@ public class BlockStorageMovementNeeded {
     } else {
       // Remove xAttr if trackID doesn't exist in
       // storageMovementAttemptedItems or file policy satisfied.
-      ctxt.removeSPSHint(trackInfo.getFileId());
-      updateStatus(trackInfo.getStartId(), isSuccess);
+      ctxt.removeSPSHint(trackInfo.getFile());
+      updateStatus(trackInfo.getFile(), isSuccess);
     }
   }
 
-  public synchronized void clearQueue(long trackId) {
+  public synchronized void clearQueue(T trackId) {
     ctxt.removeSPSPathId(trackId);
-    Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
+    Iterator<ItemInfo<T>> iterator = storageMovementNeeded.iterator();
     while (iterator.hasNext()) {
-      ItemInfo next = iterator.next();
-      if (next.getStartId() == trackId) {
+      ItemInfo<T> next = iterator.next();
+      if (next.getFile() == trackId) {
         iterator.remove();
       }
     }
@@ -222,7 +227,7 @@ public class BlockStorageMovementNeeded {
   /**
    * Mark inode status as SUCCESS in map.
    */
-  private void updateStatus(long startId, boolean isSuccess){
+  private void updateStatus(T startId, boolean isSuccess){
     StoragePolicySatisfyPathStatusInfo spsStatusInfo =
         spsStatus.get(startId);
     if (spsStatusInfo == null) {
@@ -244,8 +249,8 @@ public class BlockStorageMovementNeeded {
    */
   public synchronized void clearQueuesWithNotification() {
     // Remove xAttr from directories
-    Long trackId;
-    while ((trackId = ctxt.getNextSPSPathId()) != null) {
+    T trackId;
+    while ((trackId = ctxt.getNextSPSPath()) != null) {
       try {
         // Remove xAttr for file
         ctxt.removeSPSHint(trackId);
@@ -256,17 +261,17 @@ public class BlockStorageMovementNeeded {
 
     // File's directly added to storageMovementNeeded, So try to remove
     // xAttr for file
-    ItemInfo itemInfo;
+    ItemInfo<T> itemInfo;
     while ((itemInfo = get()) != null) {
       try {
         // Remove xAttr for file
         if (!itemInfo.isDir()) {
-          ctxt.removeSPSHint(itemInfo.getFileId());
+          ctxt.removeSPSHint(itemInfo.getFile());
         }
       } catch (IOException ie) {
         LOG.warn(
             "Failed to remove SPS xattr for track id "
-                + itemInfo.getFileId(), ie);
+                + itemInfo.getFile(), ie);
       }
     }
     this.clearAll();
@@ -282,29 +287,29 @@ public class BlockStorageMovementNeeded {
     public void run() {
       LOG.info("Starting SPSPathIdProcessor!.");
       long lastStatusCleanTime = 0;
-      Long startINodeId = null;
+      T startINode = null;
       while (ctxt.isRunning()) {
         try {
           if (!ctxt.isInSafeMode()) {
-            if (startINodeId == null) {
-              startINodeId = ctxt.getNextSPSPathId();
+            if (startINode == null) {
+              startINode = ctxt.getNextSPSPath();
             } // else same id will be retried
-            if (startINodeId == null) {
+            if (startINode == null) {
               // Waiting for SPS path
               Thread.sleep(3000);
             } else {
-              spsStatus.put(startINodeId,
+              spsStatus.put(startINode,
                   new StoragePolicySatisfyPathStatusInfo(
                       StoragePolicySatisfyPathStatus.IN_PROGRESS));
-              fileIDCollector.scanAndCollectFileIds(startINodeId);
+              fileCollector.scanAndCollectFiles(startINode);
               // check if directory was empty and no child added to queue
               DirPendingWorkInfo dirPendingWorkInfo =
-                  pendingWorkForDirectory.get(startINodeId);
+                  pendingWorkForDirectory.get(startINode);
               if (dirPendingWorkInfo != null
                   && dirPendingWorkInfo.isDirWorkDone()) {
-                ctxt.removeSPSHint(startINodeId);
-                pendingWorkForDirectory.remove(startINodeId);
-                updateStatus(startINodeId, true);
+                ctxt.removeSPSHint(startINode);
+                pendingWorkForDirectory.remove(startINode);
+                updateStatus(startINode, true);
               }
             }
             //Clear the SPS status if status is in SUCCESS more than 5 min.
@@ -313,7 +318,7 @@ public class BlockStorageMovementNeeded {
               lastStatusCleanTime = Time.monotonicNow();
               cleanSPSStatus();
             }
-            startINodeId = null; // Current inode id successfully scanned.
+            startINode = null; // Current inode successfully scanned.
           }
         } catch (Throwable t) {
           String reClass = t.getClass().getName();
@@ -334,9 +339,9 @@ public class BlockStorageMovementNeeded {
     }
 
     private synchronized void cleanSPSStatus() {
-      for (Iterator<Entry<Long, StoragePolicySatisfyPathStatusInfo>> it =
-          spsStatus.entrySet().iterator(); it.hasNext();) {
-        Entry<Long, StoragePolicySatisfyPathStatusInfo> entry = it.next();
+      for (Iterator<Entry<T, StoragePolicySatisfyPathStatusInfo>> it = 
spsStatus
+          .entrySet().iterator(); it.hasNext();) {
+        Entry<T, StoragePolicySatisfyPathStatusInfo> entry = it.next();
         if (entry.getValue().canRemove()) {
           it.remove();
         }
@@ -472,8 +477,8 @@ public class BlockStorageMovementNeeded {
     return statusClearanceElapsedTimeMs;
   }
 
-  public void markScanCompletedForDir(Long inodeId) {
-    DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inodeId);
+  public void markScanCompletedForDir(T inode) {
+    DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inode);
     if (pendingWork != null) {
       pendingWork.markScanCompleted();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
index ff4ad6b..84a969d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
@@ -33,11 +33,16 @@ import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.AccessControlException;
 
 /**
- * An interface for the communication between NameNode and SPS module.
+ * An interface for the communication between SPS and Namenode module.
+ *
+ * @param <T>
+ *          is identifier of inode or full path name of inode. Internal sps 
will
+ *          use the file inodeId for the block movement. External sps will use
+ *          file string path representation for the block movement.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public interface Context {
+public interface Context<T> {
 
   /**
    * Returns true if the SPS is running, false otherwise.
@@ -72,13 +77,13 @@ public interface Context {
   NetworkTopology getNetworkTopology();
 
   /**
-   * Returns true if the give Inode exists in the Namespace.
+   * Returns true if the give file exists in the Namespace.
    *
-   * @param inodeId
-   *          - Inode ID
-   * @return true if Inode exists, false otherwise.
+   * @param filePath
+   *          - file info
+   * @return true if the given file exists, false otherwise.
    */
-  boolean isFileExist(long inodeId);
+  boolean isFileExist(T filePath);
 
   /**
    * Gets the storage policy details for the given policy ID.
@@ -97,11 +102,11 @@ public interface Context {
   /**
    * Remove the hint which was added to track SPS call.
    *
-   * @param inodeId
-   *          - Inode ID
+   * @param spsPath
+   *          - user invoked satisfier path
    * @throws IOException
    */
-  void removeSPSHint(long inodeId) throws IOException;
+  void removeSPSHint(T spsPath) throws IOException;
 
   /**
    * Gets the number of live datanodes in the cluster.
@@ -113,11 +118,11 @@ public interface Context {
   /**
    * Get the file info for a specific file.
    *
-   * @param inodeID
-   *          inode identifier
+   * @param file
+   *          file path
    * @return file status metadata information
    */
-  HdfsFileStatus getFileInfo(long inodeID) throws IOException;
+  HdfsFileStatus getFileInfo(T file) throws IOException;
 
   /**
    * Returns all the live datanodes and its storage details.
@@ -128,15 +133,6 @@ public interface Context {
       throws IOException;
 
   /**
-   * Returns true if the given inode file has low redundancy blocks.
-   *
-   * @param inodeID
-   *          inode identifier
-   * @return true if block collection has low redundancy blocks
-   */
-  boolean hasLowRedundancyBlocks(long inodeID);
-
-  /**
    * Checks whether the given datanode has sufficient space to occupy the given
    * blockSize data.
    *
@@ -153,26 +149,17 @@ public interface Context {
       long blockSize);
 
   /**
-   * @return next SPS path id to process.
+   * @return next SPS path info to process.
    */
-  Long getNextSPSPathId();
+  T getNextSPSPath();
 
   /**
    * Removes the SPS path id.
    */
-  void removeSPSPathId(long pathId);
+  void removeSPSPathId(T pathId);
 
   /**
    * Removes all SPS path ids.
    */
   void removeAllSPSPathIds();
-
-  /**
-   * Gets the file path for a given inode id.
-   *
-   * @param inodeId
-   *          - path inode id.
-   */
-  String getFilePath(Long inodeId);
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java
new file mode 100644
index 0000000..dceb5fa
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * An interface for scanning the directory recursively and collect files
+ * under the given directory.
+ *
+ * @param <T>
+ *          is identifier of inode or full path name of inode. Internal sps 
will
+ *          use the file inodeId for the block movement. External sps will use
+ *          file string path representation for the block movement.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface FileCollector<T> {
+
+  /**
+   * This method can be used to scan and collects the files under that
+   * directory and adds to the given BlockStorageMovementNeeded.
+   *
+   * @param filePath
+   *          - file path
+   */
+  void scanAndCollectFiles(T filePath)
+      throws IOException, InterruptedException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java
deleted file mode 100644
index 7cf77f0..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.namenode.sps;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * An interface for scanning the directory recursively and collect file ids
- * under the given directory.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public interface FileIdCollector {
-
-  /**
-   * Scans the given inode directory and collects the file ids under that
-   * directory and adds to the given BlockStorageMovementNeeded.
-   *
-   * @param inodeID
-   *          - The directory ID
-   */
-  void scanAndCollectFileIds(Long inodeId)
-      throws IOException, InterruptedException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
index 495d1c4..f6b6d95 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
@@ -47,17 +47,17 @@ import org.slf4j.LoggerFactory;
  * movements to satisfy the storage policy.
  */
 @InterfaceAudience.Private
-public class IntraSPSNameNodeContext implements Context {
+public class IntraSPSNameNodeContext implements Context<Long> {
   private static final Logger LOG = LoggerFactory
       .getLogger(IntraSPSNameNodeContext.class);
 
   private final Namesystem namesystem;
   private final BlockManager blockManager;
 
-  private SPSService service;
+  private SPSService<Long> service;
 
   public IntraSPSNameNodeContext(Namesystem namesystem,
-      BlockManager blockManager, SPSService service) {
+      BlockManager blockManager, SPSService<Long> service) {
     this.namesystem = namesystem;
     this.blockManager = blockManager;
     this.service = service;
@@ -68,20 +68,18 @@ public class IntraSPSNameNodeContext implements Context {
     return blockManager.getDatanodeManager().getNumLiveDataNodes();
   }
 
+  /**
+   * @return object containing information regarding the file or null if file
+   *         not found.
+   */
   @Override
-  public HdfsFileStatus getFileInfo(long inodeID) throws IOException {
+  public HdfsFileStatus getFileInfo(Long inodeID) throws IOException {
     String filePath = namesystem.getFilePath(inodeID);
     if (StringUtils.isBlank(filePath)) {
       LOG.debug("File with inodeID:{} doesn't exists!", inodeID);
       return null;
     }
-    HdfsFileStatus fileInfo = null;
-    try {
-      fileInfo = namesystem.getFileInfo(filePath, true, true);
-    } catch (IOException e) {
-      LOG.debug("File path:{} doesn't exists!", filePath);
-    }
-    return fileInfo;
+    return namesystem.getFileInfo(filePath, true, true);
   }
 
   @Override
@@ -97,17 +95,12 @@ public class IntraSPSNameNodeContext implements Context {
   }
 
   @Override
-  public boolean hasLowRedundancyBlocks(long inodeId) {
-    return blockManager.hasLowRedundancyBlocks(inodeId);
-  }
-
-  @Override
-  public boolean isFileExist(long inodeId) {
+  public boolean isFileExist(Long inodeId) {
     return namesystem.getFSDirectory().getInode(inodeId) != null;
   }
 
   @Override
-  public void removeSPSHint(long inodeId) throws IOException {
+  public void removeSPSHint(Long inodeId) throws IOException {
     this.namesystem.removeXattr(inodeId, XATTR_SATISFY_STORAGE_POLICY);
   }
 
@@ -177,12 +170,12 @@ public class IntraSPSNameNodeContext implements Context {
   }
 
   @Override
-  public Long getNextSPSPathId() {
+  public Long getNextSPSPath() {
     return blockManager.getSPSManager().getNextPathId();
   }
 
   @Override
-  public void removeSPSPathId(long trackId) {
+  public void removeSPSPathId(Long trackId) {
     blockManager.getSPSManager().removePathId(trackId);
   }
 
@@ -190,10 +183,4 @@ public class IntraSPSNameNodeContext implements Context {
   public void removeAllSPSPathIds() {
     blockManager.getSPSManager().removeAllPathIds();
   }
-
-  @Override
-  public String getFilePath(Long inodeId) {
-    return namesystem.getFilePath(inodeId);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
index 7a44dd9..27d9e7d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
@@ -35,15 +35,16 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
  */
 @InterfaceAudience.Private
 public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
-    implements FileIdCollector {
+    implements FileCollector<Long> {
   private int maxQueueLimitToScan;
-  private final SPSService service;
+  private final SPSService <Long> service;
 
   private int remainingCapacity = 0;
 
-  private List<ItemInfo> currentBatch;
+  private List<ItemInfo<Long>> currentBatch;
 
-  public IntraSPSNameNodeFileIdCollector(FSDirectory dir, SPSService service) {
+  public IntraSPSNameNodeFileIdCollector(FSDirectory dir,
+      SPSService<Long> service) {
     super(dir);
     this.service = service;
     this.maxQueueLimitToScan = service.getConf().getInt(
@@ -63,7 +64,7 @@ public class IntraSPSNameNodeFileIdCollector extends 
FSTreeTraverser
       return false;
     }
     if (inode.isFile() && inode.asFile().numBlocks() != 0) {
-      currentBatch.add(new ItemInfo(
+      currentBatch.add(new ItemInfo<Long>(
           ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
       remainingCapacity--;
     }
@@ -83,10 +84,10 @@ public class IntraSPSNameNodeFileIdCollector extends 
FSTreeTraverser
   }
 
   @Override
-  protected void submitCurrentBatch(long startId)
+  protected void submitCurrentBatch(Long startId)
       throws IOException, InterruptedException {
     // Add current child's to queue
-    service.addAllFileIdsToProcess(startId,
+    service.addAllFilesToProcess(startId,
         currentBatch, false);
     currentBatch.clear();
   }
@@ -119,7 +120,7 @@ public class IntraSPSNameNodeFileIdCollector extends 
FSTreeTraverser
   }
 
   @Override
-  public void scanAndCollectFileIds(final Long startINodeId)
+  public void scanAndCollectFiles(final Long startINodeId)
       throws IOException, InterruptedException {
     FSDirectory fsd = getFSDirectory();
     INode startInode = fsd.getInode(startINodeId);
@@ -129,9 +130,9 @@ public class IntraSPSNameNodeFileIdCollector extends 
FSTreeTraverser
         throttle();
       }
       if (startInode.isFile()) {
-        currentBatch.add(new ItemInfo(startInode.getId(), startInode.getId()));
+        currentBatch
+            .add(new ItemInfo<Long>(startInode.getId(), startInode.getId()));
       } else {
-
         readLock();
         // NOTE: this lock will not be held for full directory scanning. It is
         // basically a sliced locking. Once it collects a batch size( at max 
the
@@ -148,7 +149,7 @@ public class IntraSPSNameNodeFileIdCollector extends 
FSTreeTraverser
         }
       }
       // Mark startInode traverse is done, this is last-batch
-      service.addAllFileIdsToProcess(startInode.getId(), currentBatch, true);
+      service.addAllFilesToProcess(startInode.getId(), currentBatch, true);
       currentBatch.clear();
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
index 47c64cc..bd8ab92 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
@@ -21,48 +21,51 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
- * ItemInfo is a file info object for which need to satisfy the policy.
+ * ItemInfo is a file info object for which need to satisfy the policy. For
+ * internal satisfier service, it uses inode id which is Long datatype. For the
+ * external satisfier service, it uses the full string representation of the
+ * path.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class ItemInfo {
-  private long startId;
-  private long fileId;
+public class ItemInfo<T> {
+  private T startPath;
+  private T file;
   private int retryCount;
 
-  public ItemInfo(long startId, long fileId) {
-    this.startId = startId;
-    this.fileId = fileId;
+  public ItemInfo(T startPath, T file) {
+    this.startPath = startPath;
+    this.file = file;
     // set 0 when item is getting added first time in queue.
     this.retryCount = 0;
   }
 
-  public ItemInfo(final long startId, final long fileId, final int retryCount) 
{
-    this.startId = startId;
-    this.fileId = fileId;
+  public ItemInfo(final T startPath, final T file, final int retryCount) {
+    this.startPath = startPath;
+    this.file = file;
     this.retryCount = retryCount;
   }
 
   /**
-   * Return the start inode id of the current track Id. This indicates that SPS
-   * was invoked on this inode id.
+   * Returns the start path of the current file. This indicates that SPS
+   * was invoked on this path.
    */
-  public long getStartId() {
-    return startId;
+  public T getStartPath() {
+    return startPath;
   }
 
   /**
-   * Return the File inode Id for which needs to satisfy the policy.
+   * Returns the file for which needs to satisfy the policy.
    */
-  public long getFileId() {
-    return fileId;
+  public T getFile() {
+    return file;
   }
 
   /**
    * Returns true if the tracking path is a directory, false otherwise.
    */
   public boolean isDir() {
-    return (startId != fileId);
+    return !startPath.equals(file);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
index da6e365..71d8fd1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
@@ -27,10 +27,15 @@ import 
org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 
 /**
  * An interface for SPSService, which exposes life cycle and processing APIs.
+ *
+ * @param <T>
+ *          is identifier of inode or full path name of inode. Internal sps 
will
+ *          use the file inodeId for the block movement. External sps will use
+ *          file string path representation for the block movement.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public interface SPSService {
+public interface SPSService<T> {
 
   /**
    * Initializes the helper services.
@@ -38,7 +43,7 @@ public interface SPSService {
    * @param ctxt
    *          - context is an helper service to provide communication channel
    *          between NN and SPS
-   * @param fileIDCollector
+   * @param fileCollector
    *          - a helper service for scanning the files under a given directory
    *          id
    * @param handler
@@ -46,7 +51,7 @@ public interface SPSService {
    * @param blkMovementListener
    *          - listener to know about block movement attempt completion
    */
-  void init(Context ctxt, FileIdCollector fileIDCollector,
+  void init(Context<T> ctxt, FileCollector<T> fileCollector,
       BlockMoveTaskHandler handler, BlockMovementListener blkMovementListener);
 
   /**
@@ -82,23 +87,24 @@ public interface SPSService {
   boolean isRunning();
 
   /**
-   * Adds the Item information(file id etc) to processing queue.
+   * Adds the Item information(file etc) to processing queue.
    *
    * @param itemInfo
+   *          file info object for which need to satisfy the policy
    */
-  void addFileIdToProcess(ItemInfo itemInfo, boolean scanCompleted);
+  void addFileToProcess(ItemInfo<T> itemInfo, boolean scanCompleted);
 
   /**
-   * Adds all the Item information(file id etc) to processing queue.
+   * Adds all the Item information(file etc) to processing queue.
    *
-   * @param startId
-   *          - directory/file id, on which SPS was called.
+   * @param startPath
+   *          - directory/file, on which SPS was called.
    * @param itemInfoList
    *          - list of item infos
    * @param scanCompleted
    *          - whether the scanning of directory fully done with itemInfoList
    */
-  void addAllFileIdsToProcess(long startId, List<ItemInfo> itemInfoList,
+  void addAllFilesToProcess(T startPath, List<ItemInfo<T>> itemInfoList,
       boolean scanCompleted);
 
   /**
@@ -109,7 +115,7 @@ public interface SPSService {
   /**
    * Clear inodeId present in the processing queue.
    */
-  void clearQueue(long inodeId);
+  void clearQueue(T spsPath);
 
   /**
    * @return the configuration.
@@ -119,10 +125,10 @@ public interface SPSService {
   /**
    * Marks the scanning of directory if finished.
    *
-   * @param inodeId
-   *          - directory inode id.
+   * @param spsPath
+   *          - satisfier path
    */
-  void markScanCompletedForPath(Long inodeId);
+  void markScanCompletedForPath(T spsPath);
 
   /**
    * Notify the details of storage movement attempt finished blocks.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index 6b449aa..08a26e1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -66,7 +66,7 @@ import com.google.common.base.Preconditions;
  * storage policy type in Namespace, but physical block storage movement will
  * not happen until user runs "Mover Tool" explicitly for such files. The
  * StoragePolicySatisfier Daemon thread implemented for addressing the case
- * where users may want to physically move the blocks by a dedidated daemon 
(can
+ * where users may want to physically move the blocks by a dedicated daemon 
(can
  * run inside Namenode or stand alone) instead of running mover tool 
explicitly.
  * Just calling client API to satisfyStoragePolicy on a file/dir will
  * automatically trigger to move its physical storage locations as expected in
@@ -77,19 +77,19 @@ import com.google.common.base.Preconditions;
  * physical block movements.
  */
 @InterfaceAudience.Private
-public class StoragePolicySatisfier implements SPSService, Runnable {
+public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
   public static final Logger LOG =
       LoggerFactory.getLogger(StoragePolicySatisfier.class);
   private Daemon storagePolicySatisfierThread;
-  private BlockStorageMovementNeeded storageMovementNeeded;
-  private BlockStorageMovementAttemptedItems storageMovementsMonitor;
+  private BlockStorageMovementNeeded<T> storageMovementNeeded;
+  private BlockStorageMovementAttemptedItems<T> storageMovementsMonitor;
   private volatile boolean isRunning = false;
   private volatile StoragePolicySatisfierMode spsMode =
       StoragePolicySatisfierMode.NONE;
   private int spsWorkMultiplier;
   private long blockCount = 0L;
   private int blockMovementMaxRetry;
-  private Context ctxt;
+  private Context<T> ctxt;
   private BlockMoveTaskHandler blockMoveTaskHandler;
   private final Configuration conf;
 
@@ -135,15 +135,15 @@ public class StoragePolicySatisfier implements 
SPSService, Runnable {
     }
   }
 
-  public void init(final Context context, final FileIdCollector 
fileIDCollector,
+  public void init(final Context<T> context,
+      final FileCollector<T> fileIDCollector,
       final BlockMoveTaskHandler blockMovementTaskHandler,
       final BlockMovementListener blockMovementListener) {
     this.ctxt = context;
-    this.storageMovementNeeded =
-        new BlockStorageMovementNeeded(context, fileIDCollector);
-    this.storageMovementsMonitor =
-        new BlockStorageMovementAttemptedItems(this,
-        storageMovementNeeded, blockMovementListener);
+    this.storageMovementNeeded = new BlockStorageMovementNeeded<T>(context,
+        fileIDCollector);
+    this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems<T>(
+        this, storageMovementNeeded, blockMovementListener);
     this.blockMoveTaskHandler = blockMovementTaskHandler;
     this.spsWorkMultiplier = getSPSWorkMultiplier(getConf());
     this.blockMovementMaxRetry = getConf().getInt(
@@ -257,24 +257,24 @@ public class StoragePolicySatisfier implements 
SPSService, Runnable {
         continue;
       }
       try {
+        ItemInfo<T> itemInfo = null;
+        boolean retryItem = false;
         if (!ctxt.isInSafeMode()) {
-          ItemInfo itemInfo = storageMovementNeeded.get();
+          itemInfo = storageMovementNeeded.get();
           if (itemInfo != null) {
             if(itemInfo.getRetryCount() >= blockMovementMaxRetry){
               LOG.info("Failed to satisfy the policy after "
                   + blockMovementMaxRetry + " retries. Removing inode "
-                  + itemInfo.getFileId() + " from the queue");
+                  + itemInfo.getFile() + " from the queue");
               storageMovementNeeded.removeItemTrackInfo(itemInfo, false);
               continue;
             }
-            long trackId = itemInfo.getFileId();
+            T trackId = itemInfo.getFile();
             BlocksMovingAnalysis status = null;
             DatanodeStorageReport[] liveDnReports;
             BlockStoragePolicy existingStoragePolicy;
             // TODO: presently, context internally acquire the lock
             // and returns the result. Need to discuss to move the lock 
outside?
-            boolean hasLowRedundancyBlocks = ctxt
-                .hasLowRedundancyBlocks(trackId);
             HdfsFileStatus fileStatus = ctxt.getFileInfo(trackId);
             // Check path existence.
             if (fileStatus == null || fileStatus.isDir()) {
@@ -289,7 +289,7 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
 
               HdfsLocatedFileStatus file = (HdfsLocatedFileStatus) fileStatus;
               status = analyseBlocksStorageMovementsAndAssignToDN(file,
-                  hasLowRedundancyBlocks, existingStoragePolicy, 
liveDnReports);
+                  existingStoragePolicy, liveDnReports);
               switch (status.status) {
               // Just add to monitor, so it will be retried after timeout
               case ANALYSIS_SKIPPED_FOR_RETRY:
@@ -302,8 +302,8 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
                       + "movement attempt finished report",
                       status.status, fileStatus.getPath());
                 }
-                this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo
-                    .getStartId(), itemInfo.getFileId(), monotonicNow(),
+                this.storageMovementsMonitor.add(new AttemptedItemInfo<T>(
+                    itemInfo.getStartPath(), itemInfo.getFile(), 
monotonicNow(),
                     status.assignedBlocks, itemInfo.getRetryCount()));
                 break;
               case NO_BLOCKS_TARGETS_PAIRED:
@@ -312,8 +312,7 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
                       + " retry queue as none of the blocks found its eligible"
                       + " targets.", trackId, fileStatus.getPath());
                 }
-                itemInfo.increRetryCount();
-                this.storageMovementNeeded.add(itemInfo);
+                retryItem = true;
                 break;
               case FEW_LOW_REDUNDANCY_BLOCKS:
                 if (LOG.isDebugEnabled()) {
@@ -321,8 +320,7 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
                       + "retry queue as some of the blocks are low redundant.",
                       trackId, fileStatus.getPath());
                 }
-                itemInfo.increRetryCount();
-                this.storageMovementNeeded.add(itemInfo);
+                retryItem = true;
                 break;
               case BLOCKS_FAILED_TO_MOVE:
                 if (LOG.isDebugEnabled()) {
@@ -330,7 +328,7 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
                       + "retry queue as some of the blocks movement failed.",
                       trackId, fileStatus.getPath());
                 }
-                this.storageMovementNeeded.add(itemInfo);
+                retryItem = true;
                 break;
               // Just clean Xattrs
               case BLOCKS_TARGET_PAIRING_SKIPPED:
@@ -354,6 +352,10 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
           Thread.sleep(3000);
           blockCount = 0L;
         }
+        if (retryItem) {
+          itemInfo.increRetryCount();
+          this.storageMovementNeeded.add(itemInfo);
+        }
       } catch (IOException e) {
         LOG.error("Exception during StoragePolicySatisfier execution - "
             + "will continue next cycle", e);
@@ -377,7 +379,7 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
   }
 
   private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
-      HdfsLocatedFileStatus fileInfo, boolean hasLowRedundancyBlocks,
+      HdfsLocatedFileStatus fileInfo,
       BlockStoragePolicy existingStoragePolicy,
       DatanodeStorageReport[] liveDns) {
     BlocksMovingAnalysis.Status status =
@@ -403,9 +405,17 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
           new ArrayList<>());
     }
     List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
-
+    boolean hasLowRedundancyBlocks = false;
+    int replication = fileInfo.getReplication();
     for (int i = 0; i < blocks.size(); i++) {
       LocatedBlock blockInfo = blocks.get(i);
+
+      // Block is considered as low redundancy when the block locations array
+      // length is less than expected replication factor. If any of the block 
is
+      // low redundant, then hasLowRedundancyBlocks will be marked as true.
+      hasLowRedundancyBlocks |= isLowRedundancyBlock(blockInfo, replication,
+          ecPolicy);
+
       List<StorageType> expectedStorageTypes;
       if (blockInfo.isStriped()) {
         if (ErasureCodingPolicyManager
@@ -446,13 +456,15 @@ public class StoragePolicySatisfier implements 
SPSService, Runnable {
           // policy.
           status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED;
         }
-      } else if (hasLowRedundancyBlocks
-          && status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
-        // Check if the previous block was successfully paired.
-        status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
       }
     }
 
+    // If there is no block paired and few blocks are low redundant, so marking
+    // the status as FEW_LOW_REDUNDANCY_BLOCKS.
+    if (hasLowRedundancyBlocks
+        && status == BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED) {
+      status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
+    }
     List<Block> assignedBlockIds = new ArrayList<Block>();
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
       // Check for at least one block storage movement has been chosen
@@ -471,6 +483,33 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
   }
 
   /**
+   * The given block is considered as low redundancy when the block locations
+   * length is less than expected replication factor. For EC blocks, redundancy
+   * is the summation of data + parity blocks.
+   *
+   * @param blockInfo
+   *          block
+   * @param replication
+   *          replication factor of the given file block
+   * @param ecPolicy
+   *          erasure coding policy of the given file block
+   * @return true if the given block is low redundant.
+   */
+  private boolean isLowRedundancyBlock(LocatedBlock blockInfo, int replication,
+      ErasureCodingPolicy ecPolicy) {
+    boolean hasLowRedundancyBlock = false;
+    if (blockInfo.isStriped()) {
+      // For EC blocks, redundancy is the summation of data + parity blocks.
+      replication = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits();
+    }
+    // block is considered as low redundancy when the block locations length is
+    // less than expected replication factor.
+    hasLowRedundancyBlock = blockInfo.getLocations().length < replication ? 
true
+        : false;
+    return hasLowRedundancyBlock;
+  }
+
+  /**
    * Compute the list of block moving information corresponding to the given
    * blockId. This will check that each block location of the given block is
    * satisfying the expected storage policy. If block location is not satisfied
@@ -863,7 +902,7 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
   }
 
   @VisibleForTesting
-  BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
+  public BlockStorageMovementAttemptedItems<T> getAttemptedItemsMonitor() {
     return storageMovementsMonitor;
   }
 
@@ -880,7 +919,7 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
   /**
    * Clear queues for given track id.
    */
-  public void clearQueue(long trackId) {
+  public void clearQueue(T trackId) {
     storageMovementNeeded.clearQueue(trackId);
   }
 
@@ -889,7 +928,7 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
    * attempted or reported time stamp. This is used by
    * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
    */
-  final static class AttemptedItemInfo extends ItemInfo {
+  final static class AttemptedItemInfo<T> extends ItemInfo<T> {
     private long lastAttemptedOrReportedTime;
     private final List<Block> blocks;
 
@@ -903,7 +942,7 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
      * @param lastAttemptedOrReportedTime
      *          last attempted or reported time
      */
-    AttemptedItemInfo(long rootId, long trackId,
+    AttemptedItemInfo(T rootId, T trackId,
         long lastAttemptedOrReportedTime,
         List<Block> blocks, int retryCount) {
       super(rootId, trackId, retryCount);
@@ -932,24 +971,33 @@ public class StoragePolicySatisfier implements 
SPSService, Runnable {
 
   }
 
+  /**
+   * Returns sps invoked path status. This method is used by internal satisfy
+   * storage policy service.
+   *
+   * @param path
+   *          sps path
+   * @return storage policy satisfy path status
+   * @throws IOException
+   */
   public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
       String path) throws IOException {
     return storageMovementNeeded.getStatus(ctxt.getFileID(path));
   }
 
   @Override
-  public void addFileIdToProcess(ItemInfo trackInfo, boolean scanCompleted) {
+  public void addFileToProcess(ItemInfo<T> trackInfo, boolean scanCompleted) {
     storageMovementNeeded.add(trackInfo, scanCompleted);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Added track info for inode {} to block "
-          + "storageMovementNeeded queue", trackInfo.getFileId());
+          + "storageMovementNeeded queue", trackInfo.getFile());
     }
   }
 
   @Override
-  public void addAllFileIdsToProcess(long startId, List<ItemInfo> itemInfoList,
+  public void addAllFilesToProcess(T startPath, List<ItemInfo<T>> itemInfoList,
       boolean scanCompleted) {
-    getStorageMovementQueue().addAll(startId, itemInfoList, scanCompleted);
+    getStorageMovementQueue().addAll(startPath, itemInfoList, scanCompleted);
   }
 
   @Override
@@ -963,12 +1011,12 @@ public class StoragePolicySatisfier implements 
SPSService, Runnable {
   }
 
   @VisibleForTesting
-  public BlockStorageMovementNeeded getStorageMovementQueue() {
+  public BlockStorageMovementNeeded<T> getStorageMovementQueue() {
     return storageMovementNeeded;
   }
 
   @Override
-  public void markScanCompletedForPath(Long inodeId) {
+  public void markScanCompletedForPath(T inodeId) {
     getStorageMovementQueue().markScanCompletedForDir(inodeId);
   }
 
@@ -976,7 +1024,6 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
    * Join main SPS thread.
    */
   public void join() throws InterruptedException {
-    //TODO Add join here on SPS rpc server also
     storagePolicySatisfierThread.join();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
index 5bdf6ae..5ec0372 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
@@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
 public class StoragePolicySatisfyManager {
   private static final Logger LOG = LoggerFactory
       .getLogger(StoragePolicySatisfyManager.class);
-  private final StoragePolicySatisfier spsService;
+  private final StoragePolicySatisfier<Long> spsService;
   private final boolean storagePolicyEnabled;
   private volatile StoragePolicySatisfierMode mode;
   private final Queue<Long> pathsToBeTraveresed;
@@ -84,7 +84,7 @@ public class StoragePolicySatisfyManager {
     pathsToBeTraveresed = new LinkedList<Long>();
     // instantiate SPS service by just keeps config reference and not starting
     // any supporting threads.
-    spsService = new StoragePolicySatisfier(conf);
+    spsService = new StoragePolicySatisfier<Long>(conf);
     this.namesystem = namesystem;
     this.blkMgr = blkMgr;
   }
@@ -309,7 +309,7 @@ public class StoragePolicySatisfyManager {
   /**
    * @return internal SPS service instance.
    */
-  public SPSService getInternalSPSService() {
+  public SPSService<Long> getInternalSPSService() {
     return this.spsService;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
index 9f5cadd..615e297 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
@@ -206,21 +206,11 @@ public interface NamenodeProtocol {
   boolean isRollingUpgrade() throws IOException;
 
   /**
-   * Gets the file path for the given file id. This API used by External SPS.
-   *
-   * @param inodeId
-   *          - file inode id.
-   * @return path
-   */
-  @Idempotent
-  String getFilePath(Long inodeId) throws IOException;
-
-  /**
-   * @return Gets the next available sps path id, otherwise null. This API used
+   * @return Gets the next available sps path, otherwise null. This API used
    *         by External SPS.
    */
   @AtMostOnce
-  Long getNextSPSPathId() throws IOException;
+  String getNextSPSPath() throws IOException;
 
   /**
    * Verifies whether the given Datanode has the enough estimated size with
@@ -236,15 +226,5 @@ public interface NamenodeProtocol {
   @Idempotent
   boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
       long estimatedSize) throws IOException;
-
-  /**
-   * Check if any low redundancy blocks for given file id. This API used by
-   * External SPS.
-   *
-   * @param inodeID
-   *          - inode id.
-   */
-  @Idempotent
-  boolean hasLowRedundancyBlocks(long inodeID) throws IOException;
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
index 4a762649..7580ba9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
@@ -81,11 +81,11 @@ public class ExternalSPSBlockMoveTaskHandler implements 
BlockMoveTaskHandler {
   private final SaslDataTransferClient saslClient;
   private final BlockStorageMovementTracker blkMovementTracker;
   private Daemon movementTrackerThread;
-  private final SPSService service;
+  private final SPSService<String> service;
   private final BlockDispatcher blkDispatcher;
 
   public ExternalSPSBlockMoveTaskHandler(Configuration conf,
-      NameNodeConnector nnc, SPSService spsService) {
+      NameNodeConnector nnc, SPSService<String> spsService) {
     int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
         DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
     moveExecutor = initializeBlockMoverThreadPool(moverThreads);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8467ec24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
index c309209..5d0aee6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.sps;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -46,15 +48,15 @@ import org.slf4j.LoggerFactory;
  * SPS from Namenode state.
  */
 @InterfaceAudience.Private
-public class ExternalSPSContext implements Context {
+public class ExternalSPSContext implements Context<String> {
   public static final Logger LOG =
       LoggerFactory.getLogger(ExternalSPSContext.class);
-  private SPSService service;
+  private SPSService<String> service;
   private NameNodeConnector nnc = null;
   private BlockStoragePolicySuite createDefaultSuite =
       BlockStoragePolicySuite.createDefaultSuite();
 
-  public ExternalSPSContext(SPSService service, NameNodeConnector nnc) {
+  public ExternalSPSContext(SPSService<String> service, NameNodeConnector nnc) 
{
     this.service = service;
     this.nnc = nnc;
   }
@@ -110,14 +112,12 @@ public class ExternalSPSContext implements Context {
   }
 
   @Override
-  public boolean isFileExist(long inodeId) {
-    String filePath = null;
+  public boolean isFileExist(String filePath) {
     try {
-      filePath = getFilePath(inodeId);
       return nnc.getDistributedFileSystem().exists(new Path(filePath));
     } catch (IllegalArgumentException | IOException e) {
-      LOG.warn("Exception while getting file is for the given path:{} "
-          + "and fileId:{}", filePath, inodeId, e);
+      LOG.warn("Exception while getting file is for the given path:{}",
+          filePath, e);
     }
     return false;
   }
@@ -133,8 +133,8 @@ public class ExternalSPSContext implements Context {
   }
 
   @Override
-  public void removeSPSHint(long inodeId) throws IOException {
-    nnc.getDistributedFileSystem().removeXAttr(new Path(getFilePath(inodeId)),
+  public void removeSPSHint(String inodeId) throws IOException {
+    nnc.getDistributedFileSystem().removeXAttr(new Path(inodeId),
         HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY);
   }
 
@@ -150,9 +150,15 @@ public class ExternalSPSContext implements Context {
   }
 
   @Override
-  public HdfsFileStatus getFileInfo(long inodeID) throws IOException {
-    return nnc.getDistributedFileSystem().getClient()
-        .getLocatedFileInfo(getFilePath(inodeID), false);
+  public HdfsFileStatus getFileInfo(String path) throws IOException {
+    HdfsLocatedFileStatus fileInfo = null;
+    try {
+      fileInfo = nnc.getDistributedFileSystem().getClient()
+          .getLocatedFileInfo(path, false);
+    } catch (FileNotFoundException e) {
+      LOG.debug("Path:{} doesn't exists!", path, e);
+    }
+    return fileInfo;
   }
 
   @Override
@@ -162,17 +168,6 @@ public class ExternalSPSContext implements Context {
   }
 
   @Override
-  public boolean hasLowRedundancyBlocks(long inodeID) {
-    try {
-      return nnc.getNNProtocolConnection().hasLowRedundancyBlocks(inodeID);
-    } catch (IOException e) {
-      LOG.warn("Failed to check whether fileid:{} has low redundancy blocks.",
-          inodeID, e);
-      return false;
-    }
-  }
-
-  @Override
   public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
       long estimatedSize) {
     // TODO: Instead of calling namenode for checking the available space, it
@@ -190,9 +185,9 @@ public class ExternalSPSContext implements Context {
   }
 
   @Override
-  public Long getNextSPSPathId() {
+  public String getNextSPSPath() {
     try {
-      return nnc.getNNProtocolConnection().getNextSPSPathId();
+      return nnc.getNNProtocolConnection().getNextSPSPath();
     } catch (IOException e) {
       LOG.warn("Exception while getting next sps path id from Namenode.", e);
       return null;
@@ -200,7 +195,7 @@ public class ExternalSPSContext implements Context {
   }
 
   @Override
-  public void removeSPSPathId(long pathId) {
+  public void removeSPSPathId(String pathId) {
     // We need not specifically implement for external.
   }
 
@@ -208,15 +203,4 @@ public class ExternalSPSContext implements Context {
   public void removeAllSPSPathIds() {
     // We need not specifically implement for external.
   }
-
-  @Override
-  public String getFilePath(Long inodeId) {
-    try {
-      return nnc.getNNProtocolConnection().getFilePath(inodeId);
-    } catch (IOException e) {
-      LOG.warn("Exception while getting file path id:{} from Namenode.",
-          inodeId, e);
-      return null;
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to