HDFS-13165: [SPS]: Collects successfully moved block details via IBR. 
Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2acc50b8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2acc50b8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2acc50b8

Branch: refs/heads/HDFS-10285
Commit: 2acc50b826fa8b00f2b09d9546c4b3215b89d46d
Parents: 75ccc13
Author: Rakesh Radhakrishnan <rake...@apache.org>
Authored: Sun Apr 29 11:06:59 2018 +0530
Committer: Uma Maheswara Rao Gangumalla <umamah...@apache.org>
Committed: Sun Aug 12 03:06:05 2018 -0700

----------------------------------------------------------------------
 .../DatanodeProtocolClientSideTranslatorPB.java |  11 +-
 .../DatanodeProtocolServerSideTranslatorPB.java |   4 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java |  25 ---
 .../server/blockmanagement/BlockManager.java    |  86 +++++++++-
 .../sps/BlockMovementAttemptFinished.java       |  24 ++-
 .../common/sps/BlockStorageMovementTracker.java | 109 +-----------
 .../sps/BlocksMovementsStatusHandler.java       |  70 +-------
 .../hdfs/server/datanode/BPServiceActor.java    |  14 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |   7 +-
 .../datanode/StoragePolicySatisfyWorker.java    |  48 ++----
 .../namenode/FSDirSatisfyStoragePolicyOp.java   |  13 +-
 .../hdfs/server/namenode/FSDirXAttrOp.java      |   8 +-
 .../hdfs/server/namenode/FSDirectory.java       |   5 +-
 .../hdfs/server/namenode/FSNamesystem.java      |  30 ++--
 .../hadoop/hdfs/server/namenode/NameNode.java   |  19 ++-
 .../hdfs/server/namenode/NameNodeRpcServer.java |  46 +++--
 .../sps/BlockStorageMovementAttemptedItems.java | 167 +++++++++++++------
 .../hdfs/server/namenode/sps/SPSService.java    |  19 ++-
 .../namenode/sps/StoragePolicySatisfier.java    | 154 +++++++++++------
 .../hdfs/server/protocol/DatanodeProtocol.java  |   4 +-
 .../sps/ExternalSPSBlockMoveTaskHandler.java    |  32 ++--
 .../sps/ExternalStoragePolicySatisfier.java     |   3 +-
 .../src/main/proto/DatanodeProtocol.proto       |   9 -
 .../src/main/resources/hdfs-default.xml         |  41 +++++
 .../TestNameNodePrunesMissingStorages.java      |   4 +-
 .../datanode/InternalDataNodeTestUtils.java     |   4 +-
 .../SimpleBlocksMovementsStatusHandler.java     |  88 ++++++++++
 .../server/datanode/TestBPOfferService.java     |  12 +-
 .../hdfs/server/datanode/TestBlockRecovery.java |   4 +-
 .../server/datanode/TestDataNodeLifeline.java   |   7 +-
 .../TestDatanodeProtocolRetryPolicy.java        |   4 +-
 .../server/datanode/TestFsDatasetCache.java     |   4 +-
 .../TestStoragePolicySatisfyWorker.java         |  76 +--------
 .../hdfs/server/datanode/TestStorageReport.java |   4 +-
 .../server/namenode/NNThroughputBenchmark.java  |   9 +-
 .../hdfs/server/namenode/NameNodeAdapter.java   |   4 +-
 .../hdfs/server/namenode/TestDeadDatanode.java  |   5 +-
 .../namenode/TestNameNodeReconfigure.java       |  17 +-
 .../TestBlockStorageMovementAttemptedItems.java |  88 ++++++----
 .../sps/TestStoragePolicySatisfier.java         |  73 ++++++--
 ...stStoragePolicySatisfierWithStripedFile.java |  40 +++--
 .../sps/TestExternalStoragePolicySatisfier.java |  44 ++---
 42 files changed, 776 insertions(+), 659 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index dcc0705..e4125dc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -48,7 +48,6 @@ import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlock
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -139,8 +138,7 @@ public class DatanodeProtocolClientSideTranslatorPB 
implements
       VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
-      @Nonnull SlowDiskReports slowDisks,
-      BlocksStorageMoveAttemptFinished storageMovementFinishedBlks)
+      @Nonnull SlowDiskReports slowDisks)
           throws IOException {
     HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
         .setRegistration(PBHelper.convert(registration))
@@ -165,13 +163,6 @@ public class DatanodeProtocolClientSideTranslatorPB 
implements
       builder.addAllSlowDisks(PBHelper.convertSlowDiskInfo(slowDisks));
     }
 
-    // Adding blocks movement results to the heart beat request.
-    if (storageMovementFinishedBlks != null
-        && storageMovementFinishedBlks.getBlocks() != null) {
-      builder.setStorageMoveAttemptFinishedBlks(
-          PBHelper.convertBlksMovReport(storageMovementFinishedBlks));
-    }
-
     HeartbeatResponseProto resp;
     try {
       resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index b5bb80a..5cba284 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -122,9 +122,7 @@ public class DatanodeProtocolServerSideTranslatorPB 
implements
           request.getXceiverCount(), request.getFailedVolumes(),
           volumeFailureSummary, request.getRequestFullBlockReportLease(),
           PBHelper.convertSlowPeerInfo(request.getSlowPeersList()),
-          PBHelper.convertSlowDiskInfo(request.getSlowDisksList()),
-          PBHelper.convertBlksMovReport(
-              request.getStorageMoveAttemptFinishedBlks()));
+          PBHelper.convertSlowDiskInfo(request.getSlowDisksList()));
     } catch (IOException e) {
       throw new ServiceException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 38f72c0..f51f839 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -57,7 +57,6 @@ import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerRepo
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMoveAttemptFinishedProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@@ -105,7 +104,6 @@ import 
org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStr
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import 
org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import 
org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
@@ -971,29 +969,6 @@ public class PBHelper {
     return SlowDiskReports.create(slowDisksMap);
   }
 
-  public static BlocksStorageMoveAttemptFinished convertBlksMovReport(
-      BlocksStorageMoveAttemptFinishedProto proto) {
-
-    List<BlockProto> blocksList = proto.getBlocksList();
-    Block[] blocks = new Block[blocksList.size()];
-    for (int i = 0; i < blocksList.size(); i++) {
-      BlockProto blkProto = blocksList.get(i);
-      blocks[i] = PBHelperClient.convert(blkProto);
-    }
-    return new BlocksStorageMoveAttemptFinished(blocks);
-  }
-
-  public static BlocksStorageMoveAttemptFinishedProto convertBlksMovReport(
-      BlocksStorageMoveAttemptFinished blocksMoveAttemptFinished) {
-    BlocksStorageMoveAttemptFinishedProto.Builder builder =
-        BlocksStorageMoveAttemptFinishedProto.newBuilder();
-    Block[] blocks = blocksMoveAttemptFinished.getBlocks();
-    for (Block block : blocks) {
-      builder.addBlocks(PBHelperClient.convert(block));
-    }
-    return builder.build();
-  }
-
   public static JournalInfo convert(JournalInfoProto info) {
     int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
     int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index e7979b4..42e246c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@@ -92,6 +93,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@@ -427,8 +429,11 @@ public class BlockManager implements BlockStatsMXBean {
 
   private final BlockIdManager blockIdManager;
 
-  /** For satisfying block storage policies. */
-  private final StoragePolicySatisfyManager spsManager;
+  /**
+   * For satisfying block storage policies. Instantiates if sps is enabled
+   * internally or externally.
+   */
+  private StoragePolicySatisfyManager spsManager;
 
   /** Minimum live replicas needed for the datanode to be transitioned
    * from ENTERING_MAINTENANCE to IN_MAINTENANCE.
@@ -469,8 +474,7 @@ public class BlockManager implements BlockStatsMXBean {
         DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
         * 1000L);
 
-    // sps manager manages the user invoked sps paths and does the movement.
-    spsManager = new StoragePolicySatisfyManager(conf, namesystem, this);
+    createSPSManager(conf);
 
     blockTokenSecretManager = createBlockTokenSecretManager(conf);
 
@@ -699,7 +703,9 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   public void close() {
-    getSPSManager().stop();
+    if (getSPSManager() != null) {
+      getSPSManager().stop();
+    }
     bmSafeMode.close();
     try {
       redundancyThread.interrupt();
@@ -713,7 +719,9 @@ public class BlockManager implements BlockStatsMXBean {
     datanodeManager.close();
     pendingReconstruction.stop();
     blocksMap.close();
-    getSPSManager().stopGracefully();
+    if (getSPSManager() != null) {
+      getSPSManager().stopGracefully();
+    }
   }
 
   /** @return the datanodeManager */
@@ -3881,6 +3889,21 @@ public class BlockManager implements BlockStatsMXBean {
     }
     processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
         delHintNode);
+
+    // notify SPS about the reported block
+    notifyStorageMovementAttemptFinishedBlk(storageInfo, block);
+  }
+
+  private void notifyStorageMovementAttemptFinishedBlk(
+      DatanodeStorageInfo storageInfo, Block block) {
+    if (getSPSManager() != null) {
+      SPSService<Long> sps = getSPSManager().getInternalSPSService();
+      if (sps.isRunning()) {
+        sps.notifyStorageMovementAttemptFinishedBlk(
+            storageInfo.getDatanodeDescriptor(), storageInfo.getStorageType(),
+            block);
+      }
+    }
   }
   
   private void processAndHandleReportedBlock(
@@ -5026,6 +5049,57 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
+   * Create SPS manager instance. It manages the user invoked sps paths and 
does
+   * the movement.
+   *
+   * @param conf
+   *          configuration
+   * @return true if the instance is successfully created, false otherwise.
+   */
+  private boolean createSPSManager(final Configuration conf) {
+    return createSPSManager(conf, null);
+  }
+
+  /**
+   * Create SPS manager instance. It manages the user invoked sps paths and 
does
+   * the movement.
+   *
+   * @param conf
+   *          configuration
+   * @param spsMode
+   *          satisfier mode
+   * @return true if the instance is successfully created, false otherwise.
+   */
+  public boolean createSPSManager(final Configuration conf,
+      final String spsMode) {
+    // sps manager manages the user invoked sps paths and does the movement.
+    // StoragePolicySatisfier(SPS) configs
+    boolean storagePolicyEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY,
+        DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT);
+    String modeVal = spsMode;
+    if (org.apache.commons.lang.StringUtils.isBlank(modeVal)) {
+      modeVal = conf.get(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+          DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
+    }
+    StoragePolicySatisfierMode mode = StoragePolicySatisfierMode
+        .fromString(modeVal);
+    if (!storagePolicyEnabled || mode == StoragePolicySatisfierMode.NONE) {
+      LOG.info("Storage policy satisfier is disabled");
+      return false;
+    }
+    spsManager = new StoragePolicySatisfyManager(conf, namesystem, this);
+    return true;
+  }
+
+  /**
+   * Nullify SPS manager as this feature is disabled fully.
+   */
+  public void disableSPS() {
+    spsManager = null;
+  }
+
+  /**
    * @return sps manager.
    */
   public StoragePolicySatisfyManager getSPSManager() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java
index 419d806..29c5e9c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.common.sps;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 
@@ -33,6 +34,7 @@ public class BlockMovementAttemptFinished {
   private final Block block;
   private final DatanodeInfo src;
   private final DatanodeInfo target;
+  private final StorageType targetType;
   private final BlockMovementStatus status;
 
   /**
@@ -44,14 +46,17 @@ public class BlockMovementAttemptFinished {
    *          src datanode
    * @param target
    *          target datanode
+   * @param targetType
+   *          target storage type
    * @param status
    *          movement status
    */
   public BlockMovementAttemptFinished(Block block, DatanodeInfo src,
-      DatanodeInfo target, BlockMovementStatus status) {
+      DatanodeInfo target, StorageType targetType, BlockMovementStatus status) 
{
     this.block = block;
     this.src = src;
     this.target = target;
+    this.targetType = targetType;
     this.status = status;
   }
 
@@ -64,6 +69,20 @@ public class BlockMovementAttemptFinished {
   }
 
   /**
+   * @return the target datanode where it moved the block.
+   */
+  public DatanodeInfo getTargetDatanode() {
+    return target;
+  }
+
+  /**
+   * @return target storage type.
+   */
+  public StorageType getTargetType() {
+    return targetType;
+  }
+
+  /**
    * @return block movement status code.
    */
   public BlockMovementStatus getStatus() {
@@ -74,7 +93,8 @@ public class BlockMovementAttemptFinished {
   public String toString() {
     return new StringBuilder().append("Block movement attempt finished(\n  ")
         .append(" block : ").append(block).append(" src node: ").append(src)
-        .append(" target node: ").append(target).append(" movement status: ")
+        .append(" target node: ").append(target).append(" target type: ")
+        .append(targetType).append(" movement status: ")
         .append(status).append(")").toString();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java
index b20d6cf..4ee415e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java
@@ -17,17 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.common.sps;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,13 +34,10 @@ import org.slf4j.LoggerFactory;
 public class BlockStorageMovementTracker implements Runnable {
   private static final Logger LOG = LoggerFactory
       .getLogger(BlockStorageMovementTracker.class);
-  private final CompletionService<BlockMovementAttemptFinished> 
moverCompletionService;
+  private final CompletionService<BlockMovementAttemptFinished>
+      moverCompletionService;
   private final BlocksMovementsStatusHandler blksMovementsStatusHandler;
 
-  // Keeps the information - block vs its list of future move tasks
-  private final Map<Block, List<Future<BlockMovementAttemptFinished>>> 
moverTaskFutures;
-  private final Map<Block, List<BlockMovementAttemptFinished>> movementResults;
-
   private volatile boolean running = true;
 
   /**
@@ -60,53 +52,21 @@ public class BlockStorageMovementTracker implements 
Runnable {
       CompletionService<BlockMovementAttemptFinished> moverCompletionService,
       BlocksMovementsStatusHandler handler) {
     this.moverCompletionService = moverCompletionService;
-    this.moverTaskFutures = new HashMap<>();
     this.blksMovementsStatusHandler = handler;
-    this.movementResults = new HashMap<>();
   }
 
   @Override
   public void run() {
     while (running) {
-      if (moverTaskFutures.size() <= 0) {
-        try {
-          synchronized (moverTaskFutures) {
-            // Waiting for mover tasks.
-            moverTaskFutures.wait(2000);
-          }
-        } catch (InterruptedException ignore) {
-          // Sets interrupt flag of this thread.
-          Thread.currentThread().interrupt();
-        }
-      }
       try {
-        Future<BlockMovementAttemptFinished> future =
-            moverCompletionService.take();
+        Future<BlockMovementAttemptFinished> future = moverCompletionService
+            .take();
         if (future != null) {
           BlockMovementAttemptFinished result = future.get();
           LOG.debug("Completed block movement. {}", result);
-          Block block = result.getBlock();
-          List<Future<BlockMovementAttemptFinished>> blocksMoving =
-              moverTaskFutures.get(block);
-          if (blocksMoving == null) {
-            LOG.warn("Future task doesn't exist for block : {} ", block);
-            continue;
-          }
-          blocksMoving.remove(future);
-
-          List<BlockMovementAttemptFinished> resultPerTrackIdList =
-              addMovementResultToBlockIdList(result);
-
-          // Completed all the scheduled blocks movement under this 'trackId'.
-          if (blocksMoving.isEmpty() || moverTaskFutures.get(block) == null) {
-            synchronized (moverTaskFutures) {
-              moverTaskFutures.remove(block);
-            }
-            if (running) {
-              // handle completed or inprogress blocks movements per trackId.
-              blksMovementsStatusHandler.handle(resultPerTrackIdList);
-            }
-            movementResults.remove(block);
+          if (running && blksMovementsStatusHandler != null) {
+            // handle completed block movement.
+            blksMovementsStatusHandler.handle(result);
           }
         }
       } catch (InterruptedException e) {
@@ -122,63 +82,10 @@ public class BlockStorageMovementTracker implements 
Runnable {
     }
   }
 
-  private List<BlockMovementAttemptFinished> addMovementResultToBlockIdList(
-      BlockMovementAttemptFinished result) {
-    Block block = result.getBlock();
-    List<BlockMovementAttemptFinished> perBlockIdList;
-    synchronized (movementResults) {
-      perBlockIdList = movementResults.get(block);
-      if (perBlockIdList == null) {
-        perBlockIdList = new ArrayList<>();
-        movementResults.put(block, perBlockIdList);
-      }
-      perBlockIdList.add(result);
-    }
-    return perBlockIdList;
-  }
-
-  /**
-   * Add future task to the tracking list to check the completion status of the
-   * block movement.
-   *
-   * @param blockID
-   *          block identifier
-   * @param futureTask
-   *          future task used for moving the respective block
-   */
-  public void addBlock(Block block,
-      Future<BlockMovementAttemptFinished> futureTask) {
-    synchronized (moverTaskFutures) {
-      List<Future<BlockMovementAttemptFinished>> futures =
-          moverTaskFutures.get(block);
-      // null for the first task
-      if (futures == null) {
-        futures = new ArrayList<>();
-        moverTaskFutures.put(block, futures);
-      }
-      futures.add(futureTask);
-      // Notify waiting tracker thread about the newly added tasks.
-      moverTaskFutures.notify();
-    }
-  }
-
-  /**
-   * Clear the pending movement and movement result queues.
-   */
-  public void removeAll() {
-    synchronized (moverTaskFutures) {
-      moverTaskFutures.clear();
-    }
-    synchronized (movementResults) {
-      movementResults.clear();
-    }
-  }
-
   /**
-   * Sets running flag to false and clear the pending movement result queues.
+   * Sets running flag to false.
    */
   public void stopTracking() {
     running = false;
-    removeAll();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java
index f9f3954..ab67424 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java
@@ -18,78 +18,22 @@
 
 package org.apache.hadoop.hdfs.server.common.sps;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.Block;
 
 /**
- * Blocks movements status handler, which is used to collect details of the
- * completed block movements and later these attempted finished(with success or
- * failure) blocks can be accessed to notify respective listeners, if any.
+ * Blocks movements status handler, which can be used to collect details of the
+ * completed block movements.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class BlocksMovementsStatusHandler {
-  private final List<Block> blockIdVsMovementStatus = new ArrayList<>();
-
-  /**
-   * Collect all the storage movement attempt finished blocks. Later this will
-   * be send to namenode via heart beat.
-   *
-   * @param moveAttemptFinishedBlks
-   *          set of storage movement attempt finished blocks
-   */
-  public void handle(
-      List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
-    List<Block> blocks = new ArrayList<>();
-
-    for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
-      blocks.add(item.getBlock());
-    }
-    // Adding to the tracking report list. Later this can be accessed to know
-    // the attempted block movements.
-    synchronized (blockIdVsMovementStatus) {
-      blockIdVsMovementStatus.addAll(blocks);
-    }
-  }
+public interface BlocksMovementsStatusHandler {
 
   /**
-   * @return unmodifiable list of storage movement attempt finished blocks.
-   */
-  public List<Block> getMoveAttemptFinishedBlocks() {
-    List<Block> moveAttemptFinishedBlks = new ArrayList<>();
-    // 1. Adding all the completed block ids.
-    synchronized (blockIdVsMovementStatus) {
-      if (blockIdVsMovementStatus.size() > 0) {
-        moveAttemptFinishedBlks = Collections
-            .unmodifiableList(blockIdVsMovementStatus);
-      }
-    }
-    return moveAttemptFinishedBlks;
-  }
-
-  /**
-   * Remove the storage movement attempt finished blocks from the tracking 
list.
+   * Collect all the storage movement attempt finished blocks.
    *
-   * @param moveAttemptFinishedBlks
-   *          set of storage movement attempt finished blocks
-   */
-  public void remove(List<Block> moveAttemptFinishedBlks) {
-    if (moveAttemptFinishedBlks != null) {
-      blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks);
-    }
-  }
-
-  /**
-   * Clear the blockID vs movement status tracking map.
+   * @param moveAttemptFinishedBlk
+   *          storage movement attempt finished block
    */
-  public void removeAll() {
-    synchronized (blockIdVsMovementStatus) {
-      blockIdVsMovementStatus.clear();
-    }
-  }
+  void handle(BlockMovementAttemptFinished moveAttemptFinishedBlk);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index b7beda4..dab8ae9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -514,12 +514,6 @@ class BPServiceActor implements Runnable {
             SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) 
:
             SlowDiskReports.EMPTY_REPORT;
 
-    // Get the blocks storage move attempt finished blocks
-    List<Block> results = dn.getStoragePolicySatisfyWorker()
-        .getBlocksMovementsStatusHandler().getMoveAttemptFinishedBlocks();
-    BlocksStorageMoveAttemptFinished storageMoveAttemptFinishedBlks =
-        getStorageMoveAttemptFinishedBlocks(results);
-
     HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
         reports,
         dn.getFSDataset().getCacheCapacity(),
@@ -530,19 +524,13 @@ class BPServiceActor implements Runnable {
         volumeFailureSummary,
         requestBlockReportLease,
         slowPeers,
-        slowDisks,
-        storageMoveAttemptFinishedBlks);
+        slowDisks);
 
     if (outliersReportDue) {
       // If the report was due and successfully sent, schedule the next one.
       scheduler.scheduleNextOutlierReport();
     }
 
-    // Remove the blocks movement results after successfully transferring
-    // to namenode.
-    dn.getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler()
-        .remove(results);
-
     return response;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 4ee364b..a714602 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1427,7 +1427,7 @@ public class DataNode extends ReconfigurableBase
     ecWorker = new ErasureCodingWorker(getConf(), this);
     blockRecoveryWorker = new BlockRecoveryWorker(this);
     storagePolicySatisfyWorker =
-        new StoragePolicySatisfyWorker(getConf(), this);
+        new StoragePolicySatisfyWorker(getConf(), this, null);
     storagePolicySatisfyWorker.start();
 
     blockPoolManager = new BlockPoolManager(this);
@@ -2137,11 +2137,6 @@ public class DataNode extends ReconfigurableBase
       notifyAll();
     }
     tracer.close();
-
-    // Waiting to finish SPS worker thread.
-    if (storagePolicySatisfyWorker != null) {
-      storagePolicySatisfyWorker.waitToFinishWorkerThread();
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index af6137c..0157205 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -24,7 +24,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -38,19 +37,17 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
 import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
 import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus;
 import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker;
 import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
-import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * StoragePolicySatisfyWorker handles the storage policy satisfier commands.
  * These commands would be issued from NameNode as part of Datanode's heart 
beat
@@ -67,19 +64,19 @@ public class StoragePolicySatisfyWorker {
 
   private final int moverThreads;
   private final ExecutorService moveExecutor;
-  private final CompletionService<BlockMovementAttemptFinished> 
moverCompletionService;
-  private final BlocksMovementsStatusHandler handler;
+  private final CompletionService<BlockMovementAttemptFinished>
+      moverCompletionService;
   private final BlockStorageMovementTracker movementTracker;
   private Daemon movementTrackerThread;
   private final BlockDispatcher blkDispatcher;
 
-  public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
+  public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode,
+      BlocksMovementsStatusHandler handler) {
     this.datanode = datanode;
-    // Defaulting to 10. This is to minimise the number of move ops.
+    // Defaulting to 10. This is to minimize the number of move ops.
     moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 10);
     moveExecutor = initializeBlockMoverThreadPool(moverThreads);
     moverCompletionService = new ExecutorCompletionService<>(moveExecutor);
-    handler = new BlocksMovementsStatusHandler();
     movementTracker = new BlockStorageMovementTracker(moverCompletionService,
         handler);
     movementTrackerThread = new Daemon(movementTracker);
@@ -88,7 +85,6 @@ public class StoragePolicySatisfyWorker {
     int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
     blkDispatcher = new BlockDispatcher(dnConf.getSocketTimeout(),
         ioFileBufferSize, dnConf.getConnectToDnViaHostname());
-    // TODO: Needs to manage the number of concurrent moves per DataNode.
   }
 
   /**
@@ -100,22 +96,17 @@ public class StoragePolicySatisfyWorker {
   }
 
   /**
-   * Stop StoragePolicySatisfyWorker, which will stop block movement tracker
-   * thread.
+   * Stop StoragePolicySatisfyWorker, which will terminate executor service and
+   * stop block movement tracker thread.
    */
   void stop() {
     movementTracker.stopTracking();
     movementTrackerThread.interrupt();
-  }
-
-  /**
-   * Timed wait to stop BlockStorageMovement tracker daemon thread.
-   */
-  void waitToFinishWorkerThread() {
+    moveExecutor.shutdown();
     try {
-      movementTrackerThread.join(3000);
-    } catch (InterruptedException ignore) {
-      // ignore
+      moveExecutor.awaitTermination(500, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted while waiting for mover thread to terminate", e);
     }
   }
 
@@ -160,10 +151,7 @@ public class StoragePolicySatisfyWorker {
           : "Source and Target storage type shouldn't be same!";
       BlockMovingTask blockMovingTask = new BlockMovingTask(blockPoolID,
           blkMovingInfo);
-      Future<BlockMovementAttemptFinished> moveCallable = 
moverCompletionService
-          .submit(blockMovingTask);
-      movementTracker.addBlock(blkMovingInfo.getBlock(),
-          moveCallable);
+      moverCompletionService.submit(blockMovingTask);
     }
   }
 
@@ -185,7 +173,8 @@ public class StoragePolicySatisfyWorker {
     public BlockMovementAttemptFinished call() {
       BlockMovementStatus status = moveBlock();
       return new BlockMovementAttemptFinished(blkMovingInfo.getBlock(),
-          blkMovingInfo.getSource(), blkMovingInfo.getTarget(), status);
+          blkMovingInfo.getSource(), blkMovingInfo.getTarget(),
+          blkMovingInfo.getTargetStorageType(), status);
     }
 
     private BlockMovementStatus moveBlock() {
@@ -217,11 +206,6 @@ public class StoragePolicySatisfyWorker {
     }
   }
 
-  @VisibleForTesting
-  BlocksMovementsStatusHandler getBlocksMovementsStatusHandler() {
-    return handler;
-  }
-
   /**
    * Drop the in-progress SPS work queues.
    */
@@ -229,7 +213,5 @@ public class StoragePolicySatisfyWorker {
     LOG.info("Received request to drop StoragePolicySatisfierWorker queues. "
         + "So, none of the SPS Worker queued block movements will"
         + " be scheduled.");
-    movementTracker.removeAll();
-    handler.removeAll();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
index 45d6218..3f873d7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
 
 import com.google.common.collect.Lists;
 
@@ -102,7 +103,11 @@ final class FSDirSatisfyStoragePolicyOp {
 
         // Adding directory in the pending queue, so FileInodeIdCollector
         // process directory child in batch and recursively
-        fsd.getBlockManager().getSPSManager().addPathId(inode.getId());
+        StoragePolicySatisfyManager spsManager =
+            fsd.getBlockManager().getSPSManager();
+        if (spsManager != null) {
+          spsManager.addPathId(inode.getId());
+        }
       }
     } finally {
       fsd.writeUnlock();
@@ -116,7 +121,11 @@ final class FSDirSatisfyStoragePolicyOp {
     } else {
       // Adding directory in the pending queue, so FileInodeIdCollector process
       // directory child in batch and recursively
-      fsd.getBlockManager().getSPSManager().addPathId(inode.getId());
+      StoragePolicySatisfyManager spsManager =
+          fsd.getBlockManager().getSPSManager();
+      if (spsManager != null) {
+        spsManager.addPathId(inode.getId());
+      }
       return true;
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
index 1150a72..3b68979 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
 import org.apache.hadoop.security.AccessControlException;
 
 import java.io.FileNotFoundException;
@@ -209,8 +210,11 @@ class FSDirXAttrOp {
       for (XAttr xattr : toRemove) {
         if (XATTR_SATISFY_STORAGE_POLICY
             .equals(XAttrHelper.getPrefixedName(xattr))) {
-          fsd.getBlockManager().getSPSManager().getInternalSPSService()
-              .clearQueue(inode.getId());
+          StoragePolicySatisfyManager spsManager =
+              fsd.getBlockManager().getSPSManager();
+          if (spsManager != null) {
+            spsManager.getInternalSPSService().clearQueue(inode.getId());
+          }
           break;
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 6539b51..2a976d2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -58,6 +58,7 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import 
org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo.UpdatedReplicationInfo;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
 import org.apache.hadoop.hdfs.util.ByteArray;
 import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
@@ -1401,7 +1402,9 @@ public class FSDirectory implements Closeable {
       if (!inode.isSymlink()) {
         final XAttrFeature xaf = inode.getXAttrFeature();
         addEncryptionZone((INodeWithAdditionalFields) inode, xaf);
-        if (namesystem.getBlockManager().getSPSManager().isEnabled()) {
+        StoragePolicySatisfyManager spsManager =
+            namesystem.getBlockManager().getSPSManager();
+        if (spsManager != null && spsManager.isEnabled()) {
           addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf);
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 8c5a410..e1ceecd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -259,7 +259,6 @@ import 
org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import 
org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
-import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
@@ -268,7 +267,6 @@ import 
org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
 import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
 import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
 import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -1292,7 +1290,9 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
         FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir,
             edekCacheLoaderDelay, edekCacheLoaderInterval);
       }
-      blockManager.getSPSManager().start();
+      if (blockManager.getSPSManager() != null) {
+        blockManager.getSPSManager().start();
+      }
     } finally {
       startingActiveService = false;
       blockManager.checkSafeMode();
@@ -1322,7 +1322,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     LOG.info("Stopping services started for active state");
     writeLock();
     try {
-      if (blockManager != null) {
+      if (blockManager != null && blockManager.getSPSManager() != null) {
         blockManager.getSPSManager().stop();
       }
       stopSecretManager();
@@ -1363,7 +1363,9 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
         // Don't want to keep replication queues when not in Active.
         blockManager.clearQueues();
         blockManager.setInitializedReplQueues(false);
-        blockManager.getSPSManager().stopGracefully();
+        if (blockManager.getSPSManager() != null) {
+          blockManager.getSPSManager().stopGracefully();
+        }
       }
     } finally {
       writeUnlock("stopActiveServices");
@@ -2272,7 +2274,8 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
           DFS_STORAGE_POLICY_ENABLED_KEY));
     }
     // checks sps status
-    if (!blockManager.getSPSManager().isEnabled() || (blockManager
+    boolean disabled = (blockManager.getSPSManager() == null);
+    if (disabled || (blockManager
         .getSPSManager().getMode() == StoragePolicySatisfierMode.INTERNAL
         && !blockManager.getSPSManager().isInternalSatisfierRunning())) {
       throw new UnsupportedActionException(
@@ -3970,8 +3973,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
-      @Nonnull SlowDiskReports slowDisks,
-      BlocksStorageMoveAttemptFinished blksMovementsFinished)
+      @Nonnull SlowDiskReports slowDisks)
           throws IOException {
     readLock();
     try {
@@ -3987,18 +3989,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
         blockReportLeaseId =  blockManager.requestBlockReportLeaseId(nodeReg);
       }
 
-      // Handle blocks movement results sent by the coordinator datanode.
-      SPSService sps = blockManager.getSPSManager().getInternalSPSService();
-      if (!sps.isRunning()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(
-              "Storage policy satisfier is not running. So, ignoring storage"
-                  + "  movement attempt finished block info sent by DN");
-        }
-      } else {
-        sps.notifyStorageMovementAttemptFinishedBlks(blksMovementsFinished);
-      }
-
       //create ha status
       final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
           haContext.getState().getServiceState(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 4e3a3ba..7f78d2f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -2147,7 +2147,24 @@ public class NameNode extends ReconfigurableBase 
implements
     }
     StoragePolicySatisfierMode mode = StoragePolicySatisfierMode
         .fromString(newVal);
-    namesystem.getBlockManager().getSPSManager().changeModeEvent(mode);
+    if (mode == StoragePolicySatisfierMode.NONE) {
+      // disabling sps service
+      if (namesystem.getBlockManager().getSPSManager() != null) {
+        namesystem.getBlockManager().getSPSManager().changeModeEvent(mode);
+        namesystem.getBlockManager().disableSPS();
+      }
+    } else {
+      // enabling sps service
+      boolean spsCreated = (namesystem.getBlockManager()
+          .getSPSManager() != null);
+      if (!spsCreated) {
+        spsCreated = namesystem.getBlockManager().createSPSManager(getConf(),
+            newVal);
+      }
+      if (spsCreated) {
+        namesystem.getBlockManager().getSPSManager().changeModeEvent(mode);
+      }
+    }
     return newVal;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 1590423..57e827d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -156,8 +156,8 @@ import 
org.apache.hadoop.hdfs.server.common.HttpGetFailedException;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -1517,16 +1517,14 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
       int failedVolumes, VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
-      @Nonnull SlowDiskReports slowDisks,
-      BlocksStorageMoveAttemptFinished storageMovementFinishedBlks)
+      @Nonnull SlowDiskReports slowDisks)
           throws IOException {
     checkNNStartup();
     verifyRequest(nodeReg);
     return namesystem.handleHeartbeat(nodeReg, report,
         dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
         failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
-        slowPeers, slowDisks,
-        storageMovementFinishedBlks);
+        slowPeers, slowDisks);
   }
 
   @Override // DatanodeProtocol
@@ -2543,10 +2541,12 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
     if (nn.isStandbyState()) {
       throw new StandbyException("Not supported by Standby Namenode.");
     }
-    boolean isSPSRunning = namesystem.getBlockManager().getSPSManager()
-        .isInternalSatisfierRunning();
+    StoragePolicySatisfyManager spsMgr =
+        namesystem.getBlockManager().getSPSManager();
+    boolean isInternalSatisfierRunning = (spsMgr != null
+        ? spsMgr.isInternalSatisfierRunning() : false);
     namesystem.logAuditEvent(true, operationName, null);
-    return isSPSRunning;
+    return isInternalSatisfierRunning;
   }
 
   @Override
@@ -2556,6 +2556,14 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
     if (nn.isStandbyState()) {
       throw new StandbyException("Not supported by Standby Namenode.");
     }
+    if (namesystem.getBlockManager().getSPSManager() == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Satisfier is not running inside namenode, so status "
+            + "can't be returned.");
+      }
+      throw new IOException("Satisfier is not running inside namenode, "
+          + "so status can't be returned.");
+    }
     return namesystem.getBlockManager().getSPSManager()
         .checkStoragePolicySatisfyPathStatus(path);
   }
@@ -2568,16 +2576,20 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
     if (nn.isStandbyState()) {
       throw new StandbyException("Not supported by Standby Namenode.");
     }
-    // Check that SPS daemon service is running inside namenode
-    if (namesystem.getBlockManager().getSPSManager()
-        .getMode() == StoragePolicySatisfierMode.INTERNAL) {
-      LOG.debug("SPS service is internally enabled and running inside "
-          + "namenode, so external SPS is not allowed to fetch the path Ids");
-      throw new IOException("SPS service is internally enabled and running"
-          + " inside namenode, so external SPS is not allowed to fetch"
-          + " the path Ids");
+    // Check that SPS is enabled externally
+    StoragePolicySatisfyManager spsMgr =
+        namesystem.getBlockManager().getSPSManager();
+    StoragePolicySatisfierMode spsMode = (spsMgr != null ? spsMgr.getMode()
+        : StoragePolicySatisfierMode.NONE);
+    if (spsMode != StoragePolicySatisfierMode.EXTERNAL) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SPS service mode is {}, so external SPS service is "
+            + "not allowed to fetch the path Ids", spsMode);
+      }
+      throw new IOException("SPS service mode is " + spsMode + ", so "
+          + "external SPS service is not allowed to fetch the path Ids");
     }
-    Long pathId = namesystem.getBlockManager().getSPSManager().getNextPathId();
+    Long pathId = spsMgr.getNextPathId();
     if (pathId == null) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
index d2f0bb2..5b25491 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
@@ -17,21 +17,28 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.sps;
 
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY;
 import static org.apache.hadoop.util.Time.monotonicNow;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY;
-
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import 
org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
+import 
org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.StorageTypeNodePair;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,10 +67,13 @@ public class BlockStorageMovementAttemptedItems<T> {
    * processing and sent to DNs.
    */
   private final List<AttemptedItemInfo<T>> storageMovementAttemptedItems;
-  private final List<Block> movementFinishedBlocks;
+  private Map<Block, Set<StorageTypeNodePair>> scheduledBlkLocs;
+  // Maintains separate Queue to keep the movement finished blocks. This Q
+  // is used to update the storageMovementAttemptedItems list asynchronously.
+  private final BlockingQueue<Block> movementFinishedBlocks;
   private volatile boolean monitorRunning = true;
   private Daemon timerThread = null;
-  private final BlockMovementListener blkMovementListener;
+  private BlockMovementListener blkMovementListener;
   //
   // It might take anywhere between 5 to 10 minutes before
   // a request is timed out.
@@ -94,7 +104,8 @@ public class BlockStorageMovementAttemptedItems<T> {
         DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT);
     this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
     storageMovementAttemptedItems = new ArrayList<>();
-    movementFinishedBlocks = new ArrayList<>();
+    scheduledBlkLocs = new HashMap<>();
+    movementFinishedBlocks = new LinkedBlockingQueue<>();
     this.blkMovementListener = blockMovementListener;
   }
 
@@ -105,29 +116,67 @@ public class BlockStorageMovementAttemptedItems<T> {
    * @param itemInfo
    *          - tracking info
    */
-  public void add(AttemptedItemInfo<T> itemInfo) {
+  public void add(T startPath, T file, long monotonicNow,
+      Map<Block, Set<StorageTypeNodePair>> assignedBlocks, int retryCount) {
+    AttemptedItemInfo<T> itemInfo = new AttemptedItemInfo<T>(startPath, file,
+        monotonicNow, assignedBlocks.keySet(), retryCount);
     synchronized (storageMovementAttemptedItems) {
       storageMovementAttemptedItems.add(itemInfo);
     }
+    synchronized (scheduledBlkLocs) {
+      scheduledBlkLocs.putAll(assignedBlocks);
+    }
   }
 
   /**
-   * Add the storage movement attempt finished blocks to
-   * storageMovementFinishedBlocks.
+   * Notify the storage movement attempt finished block.
    *
-   * @param moveAttemptFinishedBlks
-   *          storage movement attempt finished blocks
+   * @param reportedDn
+   *          reported datanode
+   * @param type
+   *          storage type
+   * @param reportedBlock
+   *          reported block
    */
-  public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
-    if (moveAttemptFinishedBlks.length == 0) {
-      return;
+  public void notifyReportedBlock(DatanodeInfo reportedDn, StorageType type,
+      Block reportedBlock) {
+    synchronized (scheduledBlkLocs) {
+      if (scheduledBlkLocs.size() <= 0) {
+        return;
+      }
+      matchesReportedBlock(reportedDn, type, reportedBlock);
     }
-    synchronized (movementFinishedBlocks) {
-      movementFinishedBlocks.addAll(Arrays.asList(moveAttemptFinishedBlks));
+  }
+
+  private void matchesReportedBlock(DatanodeInfo reportedDn, StorageType type,
+      Block reportedBlock) {
+    Set<StorageTypeNodePair> blkLocs = scheduledBlkLocs.get(reportedBlock);
+    if (blkLocs == null) {
+      return; // unknown block, simply skip.
     }
-    // External listener if it is plugged-in
-    if (blkMovementListener != null) {
-      blkMovementListener.notifyMovementTriedBlocks(moveAttemptFinishedBlks);
+
+    for (StorageTypeNodePair dn : blkLocs) {
+      boolean foundDn = dn.getDatanodeInfo().compareTo(reportedDn) == 0 ? true
+          : false;
+      boolean foundType = dn.getStorageType().equals(type);
+      if (foundDn && foundType) {
+        blkLocs.remove(dn);
+        // listener if it is plugged-in
+        if (blkMovementListener != null) {
+          blkMovementListener
+              .notifyMovementTriedBlocks(new Block[] {reportedBlock});
+        }
+        // All the block locations has reported.
+        if (blkLocs.size() <= 0) {
+          movementFinishedBlocks.add(reportedBlock);
+          scheduledBlkLocs.remove(reportedBlock); // clean-up reported block
+        }
+        return; // found
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Reported block:{} not found in attempted blocks. Datanode:{}"
+          + ", StorageType:{}", reportedBlock, reportedDn, type);
     }
   }
 
@@ -203,14 +252,12 @@ public class BlockStorageMovementAttemptedItems<T> {
         if (now > itemInfo.getLastAttemptedOrReportedTime()
             + selfRetryTimeout) {
           T file = itemInfo.getFile();
-          synchronized (movementFinishedBlocks) {
-            ItemInfo<T> candidate = new ItemInfo<T>(itemInfo.getStartPath(),
-                file, itemInfo.getRetryCount() + 1);
-            blockStorageMovementNeeded.add(candidate);
-            iter.remove();
-            LOG.info("TrackID: {} becomes timed out and moved to needed "
-                + "retries queue for next iteration.", file);
-          }
+          ItemInfo<T> candidate = new ItemInfo<T>(itemInfo.getStartPath(), 
file,
+              itemInfo.getRetryCount() + 1);
+          blockStorageMovementNeeded.add(candidate);
+          iter.remove();
+          LOG.info("TrackID: {} becomes timed out and moved to needed "
+              + "retries queue for next iteration.", file);
         }
       }
     }
@@ -218,29 +265,25 @@ public class BlockStorageMovementAttemptedItems<T> {
 
   @VisibleForTesting
   void blockStorageMovementReportedItemsCheck() throws IOException {
-    synchronized (movementFinishedBlocks) {
-      Iterator<Block> finishedBlksIter = movementFinishedBlocks.iterator();
-      while (finishedBlksIter.hasNext()) {
-        Block blk = finishedBlksIter.next();
-        synchronized (storageMovementAttemptedItems) {
-          Iterator<AttemptedItemInfo<T>> iterator =
-              storageMovementAttemptedItems.iterator();
-          while (iterator.hasNext()) {
-            AttemptedItemInfo<T> attemptedItemInfo = iterator.next();
-            attemptedItemInfo.getBlocks().remove(blk);
-            if (attemptedItemInfo.getBlocks().isEmpty()) {
-              // TODO: try add this at front of the Queue, so that this element
-              // gets the chance first and can be cleaned from queue quickly as
-              // all movements already done.
-              blockStorageMovementNeeded.add(new ItemInfo<T>(attemptedItemInfo
-                  .getStartPath(), attemptedItemInfo.getFile(),
-                  attemptedItemInfo.getRetryCount() + 1));
-              iterator.remove();
-            }
+    // Removes all available blocks from this queue and process it.
+    Collection<Block> finishedBlks = new ArrayList<>();
+    movementFinishedBlocks.drainTo(finishedBlks);
+
+    // Update attempted items list
+    for (Block blk : finishedBlks) {
+      synchronized (storageMovementAttemptedItems) {
+        Iterator<AttemptedItemInfo<T>> iterator = storageMovementAttemptedItems
+            .iterator();
+        while (iterator.hasNext()) {
+          AttemptedItemInfo<T> attemptedItemInfo = iterator.next();
+          attemptedItemInfo.getBlocks().remove(blk);
+          if (attemptedItemInfo.getBlocks().isEmpty()) {
+            blockStorageMovementNeeded.add(new ItemInfo<T>(
+                attemptedItemInfo.getStartPath(), attemptedItemInfo.getFile(),
+                attemptedItemInfo.getRetryCount() + 1));
+            iterator.remove();
           }
         }
-        // Remove attempted blocks from movementFinishedBlocks list.
-        finishedBlksIter.remove();
       }
     }
   }
@@ -252,15 +295,29 @@ public class BlockStorageMovementAttemptedItems<T> {
 
   @VisibleForTesting
   public int getAttemptedItemsCount() {
-    return storageMovementAttemptedItems.size();
+    synchronized (storageMovementAttemptedItems) {
+      return storageMovementAttemptedItems.size();
+    }
   }
 
   public void clearQueues() {
-    synchronized (movementFinishedBlocks) {
-      movementFinishedBlocks.clear();
-    }
+    movementFinishedBlocks.clear();
     synchronized (storageMovementAttemptedItems) {
       storageMovementAttemptedItems.clear();
     }
+    synchronized (scheduledBlkLocs) {
+      scheduledBlkLocs.clear();
+    }
+  }
+
+  /**
+   * Sets external listener for testing.
+   *
+   * @param blkMoveListener
+   *          block movement listener callback object
+   */
+  @VisibleForTesting
+  void setBlockMovementListener(BlockMovementListener blkMoveListener) {
+    this.blkMovementListener = blkMoveListener;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
index 71d8fd1..5032377 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
@@ -22,8 +22,10 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 
 /**
  * An interface for SPSService, which exposes life cycle and processing APIs.
@@ -131,11 +133,16 @@ public interface SPSService<T> {
   void markScanCompletedForPath(T spsPath);
 
   /**
-   * Notify the details of storage movement attempt finished blocks.
+   * Given node is reporting that it received a certain movement attempt
+   * finished block.
    *
-   * @param moveAttemptFinishedBlks
-   *          - array contains all the blocks that are attempted to move
+   * @param dnInfo
+   *          - reported datanode
+   * @param storageType
+   *          - storage type
+   * @param block
+   *          - block that is attempted to move
    */
-  void notifyStorageMovementAttemptFinishedBlks(
-      BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks);
+  void notifyStorageMovementAttemptFinishedBlk(DatanodeInfo dnInfo,
+      StorageType storageType, Block block);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index 1c7a580..cbd6001 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -24,9 +24,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -50,7 +53,6 @@ import org.apache.hadoop.hdfs.server.balancer.Matcher;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
@@ -83,8 +85,6 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
   private BlockStorageMovementNeeded<T> storageMovementNeeded;
   private BlockStorageMovementAttemptedItems<T> storageMovementsMonitor;
   private volatile boolean isRunning = false;
-  private volatile StoragePolicySatisfierMode spsMode =
-      StoragePolicySatisfierMode.NONE;
   private int spsWorkMultiplier;
   private long blockCount = 0L;
   private int blockMovementMaxRetry;
@@ -128,11 +128,12 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
     }
 
     private Status status = null;
-    private List<Block> assignedBlocks = null;
+    private Map<Block, Set<StorageTypeNodePair>> assignedBlocks = null;
 
-    BlocksMovingAnalysis(Status status, List<Block> blockMovingInfo) {
+    BlocksMovingAnalysis(Status status,
+        Map<Block, Set<StorageTypeNodePair>> assignedBlocks) {
       this.status = status;
-      this.assignedBlocks = blockMovingInfo;
+      this.assignedBlocks = assignedBlocks;
     }
   }
 
@@ -164,7 +165,7 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
           serviceMode);
       return;
     }
-    if (spsMode == StoragePolicySatisfierMode.INTERNAL
+    if (serviceMode == StoragePolicySatisfierMode.INTERNAL
         && ctxt.isMoverRunning()) {
       isRunning = false;
       LOG.error(
@@ -175,14 +176,13 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
     }
     if (reconfigStart) {
       LOG.info("Starting {} StoragePolicySatisfier, as admin requested to "
-          + "start it.", StringUtils.toLowerCase(spsMode.toString()));
+          + "start it.", StringUtils.toLowerCase(serviceMode.toString()));
     } else {
       LOG.info("Starting {} StoragePolicySatisfier.",
-          StringUtils.toLowerCase(spsMode.toString()));
+          StringUtils.toLowerCase(serviceMode.toString()));
     }
 
     isRunning = true;
-    this.spsMode = serviceMode;
     // Ensure that all the previously submitted block movements(if any) have to
     // be stopped in all datanodes.
     addDropSPSWorkCommandsToAllDNs();
@@ -297,36 +297,36 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
                 // be removed on storage movement attempt finished report.
               case BLOCKS_TARGETS_PAIRED:
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("Block analysis status:{} for the file path:{}."
+                  LOG.debug("Block analysis status:{} for the file id:{}."
                       + " Adding to attempt monitor queue for the storage "
                       + "movement attempt finished report",
-                      status.status, fileStatus.getPath());
+                      status.status, fileStatus.getFileId());
                 }
-                this.storageMovementsMonitor.add(new AttemptedItemInfo<T>(
-                    itemInfo.getStartPath(), itemInfo.getFile(), 
monotonicNow(),
-                    status.assignedBlocks, itemInfo.getRetryCount()));
+                this.storageMovementsMonitor.add(itemInfo.getStartPath(),
+                    itemInfo.getFile(), monotonicNow(), status.assignedBlocks,
+                    itemInfo.getRetryCount());
                 break;
               case NO_BLOCKS_TARGETS_PAIRED:
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("Adding trackID:{} for the file path:{} back to"
+                  LOG.debug("Adding trackID:{} for the file id:{} back to"
                       + " retry queue as none of the blocks found its eligible"
-                      + " targets.", trackId, fileStatus.getPath());
+                      + " targets.", trackId, fileStatus.getFileId());
                 }
                 retryItem = true;
                 break;
               case FEW_LOW_REDUNDANCY_BLOCKS:
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("Adding trackID:{} for the file path:{} back to "
+                  LOG.debug("Adding trackID:{} for the file id:{} back to "
                       + "retry queue as some of the blocks are low redundant.",
-                      trackId, fileStatus.getPath());
+                      trackId, fileStatus.getFileId());
                 }
                 retryItem = true;
                 break;
               case BLOCKS_FAILED_TO_MOVE:
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("Adding trackID:{} for the file path:{} back to "
+                  LOG.debug("Adding trackID:{} for the file id:{} back to "
                       + "retry queue as some of the blocks movement failed.",
-                      trackId, fileStatus.getPath());
+                      trackId, fileStatus.getFileId());
                 }
                 retryItem = true;
                 break;
@@ -334,9 +334,9 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
               case BLOCKS_TARGET_PAIRING_SKIPPED:
               case BLOCKS_ALREADY_SATISFIED:
               default:
-                LOG.info("Block analysis status:{} for the file path:{}."
+                LOG.info("Block analysis status:{} for the file id:{}."
                     + " So, Cleaning up the Xattrs.", status.status,
-                    fileStatus.getPath());
+                    fileStatus.getFileId());
                 storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
                 break;
               }
@@ -389,19 +389,19 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
     if (!lastBlkComplete) {
       // Postpone, currently file is under construction
       LOG.info("File: {} is under construction. So, postpone"
-          + " this to the next retry iteration", fileInfo.getPath());
+          + " this to the next retry iteration", fileInfo.getFileId());
       return new BlocksMovingAnalysis(
           BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY,
-          new ArrayList<>());
+          new HashMap<>());
     }
 
     List<LocatedBlock> blocks = locatedBlocks.getLocatedBlocks();
     if (blocks.size() == 0) {
       LOG.info("File: {} is not having any blocks."
-          + " So, skipping the analysis.", fileInfo.getPath());
+          + " So, skipping the analysis.", fileInfo.getFileId());
       return new BlocksMovingAnalysis(
           BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
-          new ArrayList<>());
+          new HashMap<>());
     }
     List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
     boolean hasLowRedundancyBlocks = false;
@@ -432,7 +432,7 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
               + "So, ignoring to move the blocks");
           return new BlocksMovingAnalysis(
               BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
-              new ArrayList<>());
+              new HashMap<>());
         }
       } else {
         expectedStorageTypes = existingStoragePolicy
@@ -465,13 +465,21 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
         && status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
       status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
     }
-    List<Block> assignedBlockIds = new ArrayList<Block>();
+    Map<Block, Set<StorageTypeNodePair>> assignedBlocks = new HashMap<>();
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
       // Check for at least one block storage movement has been chosen
       try {
         blockMoveTaskHandler.submitMoveTask(blkMovingInfo);
         LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
-        assignedBlockIds.add(blkMovingInfo.getBlock());
+        StorageTypeNodePair nodeStorage = new StorageTypeNodePair(
+            blkMovingInfo.getTargetStorageType(), blkMovingInfo.getTarget());
+        Set<StorageTypeNodePair> nodesWithStorage = assignedBlocks
+            .get(blkMovingInfo.getBlock());
+        if (nodesWithStorage == null) {
+          nodesWithStorage = new HashSet<>();
+          assignedBlocks.put(blkMovingInfo.getBlock(), nodesWithStorage);
+        }
+        nodesWithStorage.add(nodeStorage);
         blockCount++;
       } catch (IOException e) {
         LOG.warn("Exception while scheduling movement task", e);
@@ -479,7 +487,7 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
         status = BlocksMovingAnalysis.Status.BLOCKS_FAILED_TO_MOVE;
       }
     }
-    return new BlocksMovingAnalysis(status, assignedBlockIds);
+    return new BlocksMovingAnalysis(status, assignedBlocks);
   }
 
   /**
@@ -545,6 +553,11 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
           new ArrayList<StorageTypeNodePair>();
       List<DatanodeInfo> existingBlockStorages = new ArrayList<DatanodeInfo>(
           Arrays.asList(storages));
+
+      // Add existing storages into exclude nodes to avoid choosing this as
+      // remote target later.
+      List<DatanodeInfo> excludeNodes = new ArrayList<>(existingBlockStorages);
+
       // if expected type exists in source node already, local movement would 
be
       // possible, so lets find such sources first.
       Iterator<DatanodeInfo> iterator = existingBlockStorages.iterator();
@@ -582,7 +595,7 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
       foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove(
           blockMovingInfos, blockInfo, sourceWithStorageMap,
           expectedStorageTypes, targetDns,
-          ecPolicy);
+          ecPolicy, excludeNodes);
     }
     return foundMatchingTargetNodesForBlock;
   }
@@ -601,6 +614,10 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
    *          - Expecting storages to move
    * @param targetDns
    *          - Available DNs for expected storage types
+   * @param ecPolicy
+   *          - erasure coding policy of sps invoked file
+   * @param excludeNodes
+   *          - existing source nodes, which has replica copy
    * @return false if some of the block locations failed to find target node to
    *         satisfy the storage policy
    */
@@ -609,9 +626,8 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
       List<StorageTypeNodePair> sourceWithStorageList,
       List<StorageType> expectedTypes,
       EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>> targetDns,
-      ErasureCodingPolicy ecPolicy) {
+      ErasureCodingPolicy ecPolicy, List<DatanodeInfo> excludeNodes) {
     boolean foundMatchingTargetNodesForBlock = true;
-    List<DatanodeInfo> excludeNodes = new ArrayList<>();
 
     // Looping over all the source node locations and choose the target
     // storage within same node if possible. This is done separately to
@@ -638,10 +654,12 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
           expectedTypes.remove(chosenTarget.storageType);
         }
       }
-      // To avoid choosing this excludeNodes as targets later
-      excludeNodes.add(existingTypeNodePair.dn);
     }
-
+    // If all the sources and targets are paired within same node, then simply
+    // return.
+    if (expectedTypes.size() <= 0) {
+      return foundMatchingTargetNodesForBlock;
+    }
     // Looping over all the source node locations. Choose a remote target
     // storage node if it was not found out within same node.
     for (int i = 0; i < sourceWithStorageList.size(); i++) {
@@ -824,14 +842,29 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
   /**
    * Keeps datanode with its respective storage type.
    */
-  private static final class StorageTypeNodePair {
+  static final class StorageTypeNodePair {
     private final StorageType storageType;
     private final DatanodeInfo dn;
 
-    private StorageTypeNodePair(StorageType storageType, DatanodeInfo dn) {
+    StorageTypeNodePair(StorageType storageType, DatanodeInfo dn) {
       this.storageType = storageType;
       this.dn = dn;
     }
+
+    public DatanodeInfo getDatanodeInfo() {
+      return dn;
+    }
+
+    public StorageType getStorageType() {
+      return storageType;
+    }
+
+    @Override
+    public String toString() {
+      return new StringBuilder().append("StorageTypeNodePair(\n  ")
+          .append("DatanodeInfo: ").append(dn).append(", StorageType: ")
+          .append(storageType).toString();
+    }
   }
 
   private EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>>
@@ -1043,18 +1076,19 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
   }
 
   /**
-   * Receives set of storage movement attempt finished blocks report.
+   * Receives storage movement attempt finished block report.
    *
-   * @param moveAttemptFinishedBlks
-   *          set of storage movement attempt finished blocks.
+   * @param dnInfo
+   *          reported datanode
+   * @param storageType
+   *          - storage type
+   * @param block
+   *          movement attempt finished block.
    */
-  public void notifyStorageMovementAttemptFinishedBlks(
-      BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks) {
-    if (moveAttemptFinishedBlks.getBlocks().length <= 0) {
-      return;
-    }
-    storageMovementsMonitor
-        .notifyMovementTriedBlocks(moveAttemptFinishedBlks.getBlocks());
+  @Override
+  public void notifyStorageMovementAttemptFinishedBlk(DatanodeInfo dnInfo,
+      StorageType storageType, Block block) {
+    storageMovementsMonitor.notifyReportedBlock(dnInfo, storageType, block);
   }
 
   @VisibleForTesting
@@ -1086,7 +1120,7 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
    */
   final static class AttemptedItemInfo<T> extends ItemInfo<T> {
     private long lastAttemptedOrReportedTime;
-    private final List<Block> blocks;
+    private final Set<Block> blocks;
 
     /**
      * AttemptedItemInfo constructor.
@@ -1097,10 +1131,14 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
      *          trackId for file.
      * @param lastAttemptedOrReportedTime
      *          last attempted or reported time
+     * @param blocks
+     *          scheduled blocks
+     * @param retryCount
+     *          file retry count
      */
     AttemptedItemInfo(T rootId, T trackId,
         long lastAttemptedOrReportedTime,
-        List<Block> blocks, int retryCount) {
+        Set<Block> blocks, int retryCount) {
       super(rootId, trackId, retryCount);
       this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
       this.blocks = blocks;
@@ -1121,10 +1159,9 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
       this.lastAttemptedOrReportedTime = monotonicNow();
     }
 
-    List<Block> getBlocks() {
+    Set<Block> getBlocks() {
       return this.blocks;
     }
-
   }
 
   /**
@@ -1241,4 +1278,15 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
         "It should be a positive, non-zero integer value.");
     return spsWorkMultiplier;
   }
+
+  /**
+   * Sets external listener for testing.
+   *
+   * @param blkMovementListener
+   *          block movement listener callback object
+   */
+  @VisibleForTesting
+  void setBlockMovementListener(BlockMovementListener blkMovementListener) {
+    storageMovementsMonitor.setBlockMovementListener(blkMovementListener);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2acc50b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index fcc2df1..311b68f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -112,7 +112,6 @@ public interface DatanodeProtocol {
    * @param slowPeers Details of peer DataNodes that were detected as being
    *                  slow to respond to packet writes. Empty report if no
    *                  slow peers were detected by the DataNode.
-   * @param storageMovFinishedBlks array of movement attempt finished blocks
    * @throws IOException on error
    */
   @Idempotent
@@ -126,8 +125,7 @@ public interface DatanodeProtocol {
                                        VolumeFailureSummary 
volumeFailureSummary,
                                        boolean requestFullBlockReportLease,
                                        @Nonnull SlowPeerReports slowPeers,
-                                       @Nonnull SlowDiskReports slowDisks,
-                                       BlocksStorageMoveAttemptFinished 
storageMovFinishedBlks)
+                                       @Nonnull SlowDiskReports slowDisks)
       throws IOException;
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to