http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index d3c5cb1..2f621e6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -156,7 +156,7 @@ import 
org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -1517,14 +1517,15 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
       @Nonnull SlowDiskReports slowDisks,
-      BlocksStorageMovementResult[] blkMovementStatus) throws IOException {
+      BlocksStorageMoveAttemptFinished storageMovementFinishedBlks)
+          throws IOException {
     checkNNStartup();
     verifyRequest(nodeReg);
     return namesystem.handleHeartbeat(nodeReg, report,
         dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
         failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
         slowPeers, slowDisks,
-        blkMovementStatus);
+        storageMovementFinishedBlks);
   }
 
   @Override // DatanodeProtocol

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index a4372d5..a28a806 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.util.Time.monotonicNow;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -44,7 +46,7 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.util.Daemon;
@@ -82,25 +84,38 @@ public class StoragePolicySatisfier implements Runnable {
   /**
    * Represents the collective analysis status for all blocks.
    */
-  private enum BlocksMovingAnalysisStatus {
-    // Represents that, the analysis skipped due to some conditions. A such
-    // condition is if block collection is in incomplete state.
-    ANALYSIS_SKIPPED_FOR_RETRY,
-    // Represents that, all block storage movement needed blocks found its
-    // targets.
-    ALL_BLOCKS_TARGETS_PAIRED,
-    // Represents that, only fewer or none of the block storage movement needed
-    // block found its eligible targets.
-    FEW_BLOCKS_TARGETS_PAIRED,
-    // Represents that, none of the blocks found for block storage movements.
-    BLOCKS_ALREADY_SATISFIED,
-    // Represents that, the analysis skipped due to some conditions.
-    // Example conditions are if no blocks really exists in block collection or
-    // if analysis is not required on ec files with unsuitable storage policies
-    BLOCKS_TARGET_PAIRING_SKIPPED,
-    // Represents that, All the reported blocks are satisfied the policy but
-    // some of the blocks are low redundant.
-    FEW_LOW_REDUNDANCY_BLOCKS
+  private static class BlocksMovingAnalysis {
+
+    enum Status {
+      // Represents that, the analysis skipped due to some conditions. A such
+      // condition is if block collection is in incomplete state.
+      ANALYSIS_SKIPPED_FOR_RETRY,
+      // Represents that few or all blocks found respective target to do
+      // the storage movement.
+      BLOCKS_TARGETS_PAIRED,
+      // Represents that none of the blocks found respective target to do
+      // the storage movement.
+      NO_BLOCKS_TARGETS_PAIRED,
+      // Represents that, none of the blocks found for block storage movements.
+      BLOCKS_ALREADY_SATISFIED,
+      // Represents that, the analysis skipped due to some conditions.
+      // Example conditions are if no blocks really exists in block collection
+      // or
+      // if analysis is not required on ec files with unsuitable storage
+      // policies
+      BLOCKS_TARGET_PAIRING_SKIPPED,
+      // Represents that, All the reported blocks are satisfied the policy but
+      // some of the blocks are low redundant.
+      FEW_LOW_REDUNDANCY_BLOCKS
+    }
+
+    private Status status = null;
+    private List<Block> assignedBlocks = null;
+
+    BlocksMovingAnalysis(Status status, List<Block> blockMovingInfo) {
+      this.status = status;
+      this.assignedBlocks = blockMovingInfo;
+    }
   }
 
   public StoragePolicySatisfier(final Namesystem namesystem,
@@ -118,8 +133,7 @@ public class StoragePolicySatisfier implements Runnable {
         conf.getLong(
             
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
             
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
-        storageMovementNeeded,
-        this);
+        storageMovementNeeded);
     this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf);
   }
 
@@ -232,21 +246,25 @@ public class StoragePolicySatisfier implements Runnable {
                 namesystem.getBlockCollection(trackId);
             // Check blockCollectionId existence.
             if (blockCollection != null) {
-              BlocksMovingAnalysisStatus status =
+              BlocksMovingAnalysis status =
                   analyseBlocksStorageMovementsAndAssignToDN(blockCollection);
-              switch (status) {
+              switch (status.status) {
               // Just add to monitor, so it will be retried after timeout
               case ANALYSIS_SKIPPED_FOR_RETRY:
-                // Just add to monitor, so it will be tracked for result and
-                // be removed on successful storage movement result.
-              case ALL_BLOCKS_TARGETS_PAIRED:
-                this.storageMovementsMonitor.add(itemInfo, true);
+                // Just add to monitor, so it will be tracked for report and
+                // be removed on storage movement attempt finished report.
+              case BLOCKS_TARGETS_PAIRED:
+                this.storageMovementsMonitor.add(new AttemptedItemInfo(
+                    itemInfo.getStartId(), itemInfo.getTrackId(),
+                    monotonicNow(), status.assignedBlocks));
                 break;
-              // Add to monitor with allBlcoksAttemptedToSatisfy flag false, so
-              // that it will be tracked and still it will be consider for 
retry
-              // as analysis was not found targets for storage movement blocks.
-              case FEW_BLOCKS_TARGETS_PAIRED:
-                this.storageMovementsMonitor.add(itemInfo, false);
+              case NO_BLOCKS_TARGETS_PAIRED:
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Adding trackID " + trackId
+                      + " back to retry queue as none of the blocks"
+                      + " found its eligible targets.");
+                }
+                this.storageMovementNeeded.add(itemInfo);
                 break;
               case FEW_LOW_REDUNDANCY_BLOCKS:
                 if (LOG.isDebugEnabled()) {
@@ -310,10 +328,10 @@ public class StoragePolicySatisfier implements Runnable {
     return;
   }
 
-  private BlocksMovingAnalysisStatus 
analyseBlocksStorageMovementsAndAssignToDN(
+  private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
       BlockCollection blockCollection) {
-    BlocksMovingAnalysisStatus status =
-        BlocksMovingAnalysisStatus.BLOCKS_ALREADY_SATISFIED;
+    BlocksMovingAnalysis.Status status =
+        BlocksMovingAnalysis.Status.BLOCKS_ALREADY_SATISFIED;
     byte existingStoragePolicyID = blockCollection.getStoragePolicyID();
     BlockStoragePolicy existingStoragePolicy =
         blockManager.getStoragePolicy(existingStoragePolicyID);
@@ -322,17 +340,18 @@ public class StoragePolicySatisfier implements Runnable {
       // So, should we add back? or leave it to user
       LOG.info("BlockCollectionID: {} file is under construction. So, postpone"
           + " this to the next retry iteration", blockCollection.getId());
-      return BlocksMovingAnalysisStatus.ANALYSIS_SKIPPED_FOR_RETRY;
+      return new BlocksMovingAnalysis(
+          BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY,
+          new ArrayList<>());
     }
 
-    // First datanode will be chosen as the co-ordinator node for storage
-    // movements. Later this can be optimized if needed.
-    DatanodeDescriptor coordinatorNode = null;
     BlockInfo[] blocks = blockCollection.getBlocks();
     if (blocks.length == 0) {
       LOG.info("BlockCollectionID: {} file is not having any blocks."
           + " So, skipping the analysis.", blockCollection.getId());
-      return BlocksMovingAnalysisStatus.BLOCKS_TARGET_PAIRING_SKIPPED;
+      return new BlocksMovingAnalysis(
+          BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
+          new ArrayList<>());
     }
     List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
 
@@ -352,7 +371,9 @@ public class StoragePolicySatisfier implements Runnable {
           LOG.warn("The storage policy " + existingStoragePolicy.getName()
               + " is not suitable for Striped EC files. "
               + "So, ignoring to move the blocks");
-          return BlocksMovingAnalysisStatus.BLOCKS_TARGET_PAIRING_SKIPPED;
+          return new BlocksMovingAnalysis(
+              BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
+              new ArrayList<>());
         }
       } else {
         expectedStorageTypes = existingStoragePolicy
@@ -370,30 +391,35 @@ public class StoragePolicySatisfier implements Runnable {
           new LinkedList<StorageType>(Arrays.asList(storageTypes));
       if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
           existing, true)) {
-        boolean computeStatus = computeBlockMovingInfos(blockMovingInfos,
+        boolean blocksPaired = computeBlockMovingInfos(blockMovingInfos,
             blockInfo, expectedStorageTypes, existing, storages);
-        if (computeStatus
-            && status != BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED
-            && !blockManager.hasLowRedundancyBlocks(blockCollection)) {
-          status = BlocksMovingAnalysisStatus.ALL_BLOCKS_TARGETS_PAIRED;
+        if (blocksPaired) {
+          status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED;
         } else {
-          status = BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED;
+          // none of the blocks found its eligible targets for satisfying the
+          // storage policy.
+          status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED;
         }
       } else {
         if (blockManager.hasLowRedundancyBlocks(blockCollection)) {
-          status = BlocksMovingAnalysisStatus.FEW_LOW_REDUNDANCY_BLOCKS;
+          status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
         }
       }
     }
 
-    assignBlockMovingInfosToCoordinatorDn(blockCollection.getId(),
-        blockMovingInfos, coordinatorNode);
-    int count = 0;
+    List<Block> assignedBlockIds = new ArrayList<Block>();
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
-      count = count + blkMovingInfo.getSources().length;
+      // Check for at least one block storage movement has been chosen
+      if (blkMovingInfo.getTarget() != null) {
+        // assign block storage movement task to the target node
+        ((DatanodeDescriptor) blkMovingInfo.getTarget())
+            .addBlocksToMoveStorage(blkMovingInfo);
+        LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
+        assignedBlockIds.add(blkMovingInfo.getBlock());
+        blockCount++;
+      }
     }
-    blockCount = blockCount + count;
-    return status;
+    return new BlocksMovingAnalysis(status, assignedBlockIds);
   }
 
   /**
@@ -468,41 +494,6 @@ public class StoragePolicySatisfier implements Runnable {
     return foundMatchingTargetNodesForBlock;
   }
 
-  private void assignBlockMovingInfosToCoordinatorDn(long blockCollectionID,
-      List<BlockMovingInfo> blockMovingInfos,
-      DatanodeDescriptor coordinatorNode) {
-
-    if (blockMovingInfos.size() < 1) {
-      // TODO: Major: handle this case. I think we need retry cases to
-      // be implemented. Idea is, if some files are not getting storage 
movement
-      // chances, then we can just retry limited number of times and exit.
-      return;
-    }
-
-    // For now, first datanode will be chosen as the co-ordinator. Later
-    // this can be optimized if needed.
-    coordinatorNode = (DatanodeDescriptor) blockMovingInfos.get(0)
-        .getSources()[0];
-
-    boolean needBlockStorageMovement = false;
-    for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
-      // Check for atleast one block storage movement has been chosen
-      if (blkMovingInfo.getTargets().length > 0){
-        needBlockStorageMovement = true;
-        break;
-      }
-    }
-    if (!needBlockStorageMovement) {
-      // Simply return as there is no targets selected for scheduling the block
-      // movement.
-      return;
-    }
-
-    // 'BlockCollectionId' is used as the tracking ID. All the blocks under 
this
-    // blockCollectionID will be added to this datanode.
-    coordinatorNode.addBlocksToMoveStorage(blockCollectionID, 
blockMovingInfos);
-  }
-
   /**
    * Find the good target node for each source node for which block storages 
was
    * misplaced.
@@ -526,10 +517,6 @@ public class StoragePolicySatisfier implements Runnable {
       List<StorageType> expected,
       StorageTypeNodeMap locsForExpectedStorageTypes) {
     boolean foundMatchingTargetNodesForBlock = true;
-    List<DatanodeInfo> sourceNodes = new ArrayList<>();
-    List<StorageType> sourceStorageTypes = new ArrayList<>();
-    List<DatanodeInfo> targetNodes = new ArrayList<>();
-    List<StorageType> targetStorageTypes = new ArrayList<>();
     List<DatanodeDescriptor> excludeNodes = new ArrayList<>();
 
     // Looping over all the source node locations and choose the target
@@ -544,10 +531,15 @@ public class StoragePolicySatisfier implements Runnable {
         StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(
             blockInfo, existingTypeNodePair.dn, expected);
         if (chosenTarget != null) {
-          sourceNodes.add(existingTypeNodePair.dn);
-          sourceStorageTypes.add(existingTypeNodePair.storageType);
-          targetNodes.add(chosenTarget.dn);
-          targetStorageTypes.add(chosenTarget.storageType);
+          if (blockInfo.isStriped()) {
+            buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
+                existingTypeNodePair.storageType, chosenTarget.dn,
+                chosenTarget.storageType, blockMovingInfos);
+          } else {
+            buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
+                existingTypeNodePair.storageType, chosenTarget.dn,
+                chosenTarget.storageType, blockMovingInfos);
+          }
           expected.remove(chosenTarget.storageType);
           // TODO: We can increment scheduled block count for this node?
         }
@@ -563,7 +555,7 @@ public class StoragePolicySatisfier implements Runnable {
       StorageTypeNodePair chosenTarget = null;
       // Chosen the target storage within same datanode. So just skipping this
       // source node.
-      if (sourceNodes.contains(existingTypeNodePair.dn)) {
+      if (checkIfAlreadyChosen(blockMovingInfos, existingTypeNodePair.dn)) {
         continue;
       }
       if (chosenTarget == null && blockManager.getDatanodeManager()
@@ -586,10 +578,16 @@ public class StoragePolicySatisfier implements Runnable {
                 Matcher.ANY_OTHER, locsForExpectedStorageTypes, excludeNodes);
       }
       if (null != chosenTarget) {
-        sourceNodes.add(existingTypeNodePair.dn);
-        sourceStorageTypes.add(existingTypeNodePair.storageType);
-        targetNodes.add(chosenTarget.dn);
-        targetStorageTypes.add(chosenTarget.storageType);
+        if (blockInfo.isStriped()) {
+          buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
+              existingTypeNodePair.storageType, chosenTarget.dn,
+              chosenTarget.storageType, blockMovingInfos);
+        } else {
+          buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
+              existingTypeNodePair.storageType, chosenTarget.dn,
+              chosenTarget.storageType, blockMovingInfos);
+        }
+
         expected.remove(chosenTarget.storageType);
         excludeNodes.add(chosenTarget.dn);
         // TODO: We can increment scheduled block count for this node?
@@ -605,47 +603,33 @@ public class StoragePolicySatisfier implements Runnable {
       foundMatchingTargetNodesForBlock = false;
     }
 
-    blockMovingInfos.addAll(getBlockMovingInfos(blockInfo, sourceNodes,
-        sourceStorageTypes, targetNodes, targetStorageTypes));
     return foundMatchingTargetNodesForBlock;
   }
 
-  private List<BlockMovingInfo> getBlockMovingInfos(BlockInfo blockInfo,
-      List<DatanodeInfo> sourceNodes, List<StorageType> sourceStorageTypes,
-      List<DatanodeInfo> targetNodes, List<StorageType> targetStorageTypes) {
-    List<BlockMovingInfo> blkMovingInfos = new ArrayList<>();
-    // No source-target node pair exists.
-    if (sourceNodes.size() <= 0) {
-      return blkMovingInfos;
-    }
-
-    if (blockInfo.isStriped()) {
-      buildStripedBlockMovingInfos(blockInfo, sourceNodes, sourceStorageTypes,
-          targetNodes, targetStorageTypes, blkMovingInfos);
-    } else {
-      buildContinuousBlockMovingInfos(blockInfo, sourceNodes,
-          sourceStorageTypes, targetNodes, targetStorageTypes, blkMovingInfos);
+  private boolean checkIfAlreadyChosen(List<BlockMovingInfo> blockMovingInfos,
+      DatanodeDescriptor dn) {
+    for (BlockMovingInfo blockMovingInfo : blockMovingInfos) {
+      if (blockMovingInfo.getSource().equals(dn)) {
+        return true;
+      }
     }
-    return blkMovingInfos;
+    return false;
   }
 
   private void buildContinuousBlockMovingInfos(BlockInfo blockInfo,
-      List<DatanodeInfo> sourceNodes, List<StorageType> sourceStorageTypes,
-      List<DatanodeInfo> targetNodes, List<StorageType> targetStorageTypes,
+      DatanodeInfo sourceNode, StorageType sourceStorageType,
+      DatanodeInfo targetNode, StorageType targetStorageType,
       List<BlockMovingInfo> blkMovingInfos) {
     Block blk = new Block(blockInfo.getBlockId(), blockInfo.getNumBytes(),
         blockInfo.getGenerationStamp());
-    BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk,
-        sourceNodes.toArray(new DatanodeInfo[sourceNodes.size()]),
-        targetNodes.toArray(new DatanodeInfo[targetNodes.size()]),
-        sourceStorageTypes.toArray(new StorageType[sourceStorageTypes.size()]),
-        targetStorageTypes.toArray(new 
StorageType[targetStorageTypes.size()]));
+    BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
+        targetNode, sourceStorageType, targetStorageType);
     blkMovingInfos.add(blkMovingInfo);
   }
 
   private void buildStripedBlockMovingInfos(BlockInfo blockInfo,
-      List<DatanodeInfo> sourceNodes, List<StorageType> sourceStorageTypes,
-      List<DatanodeInfo> targetNodes, List<StorageType> targetStorageTypes,
+      DatanodeInfo sourceNode, StorageType sourceStorageType,
+      DatanodeInfo targetNode, StorageType targetStorageType,
       List<BlockMovingInfo> blkMovingInfos) {
     // For a striped block, it needs to construct internal block at the given
     // index of a block group. Here it is iterating over all the block indices
@@ -655,30 +639,17 @@ public class StoragePolicySatisfier implements Runnable {
     for (StorageAndBlockIndex si : sBlockInfo.getStorageAndIndexInfos()) {
       if (si.getBlockIndex() >= 0) {
         DatanodeDescriptor dn = si.getStorage().getDatanodeDescriptor();
-        DatanodeInfo[] srcNode = new DatanodeInfo[1];
-        StorageType[] srcStorageType = new StorageType[1];
-        DatanodeInfo[] targetNode = new DatanodeInfo[1];
-        StorageType[] targetStorageType = new StorageType[1];
-        for (int i = 0; i < sourceNodes.size(); i++) {
-          DatanodeInfo node = sourceNodes.get(i);
-          if (node.equals(dn)) {
-            srcNode[0] = node;
-            srcStorageType[0] = sourceStorageTypes.get(i);
-            targetNode[0] = targetNodes.get(i);
-            targetStorageType[0] = targetStorageTypes.get(i);
-
-            // construct internal block
-            long blockId = blockInfo.getBlockId() + si.getBlockIndex();
-            long numBytes = StripedBlockUtil.getInternalBlockLength(
-                sBlockInfo.getNumBytes(), sBlockInfo.getCellSize(),
-                sBlockInfo.getDataBlockNum(), si.getBlockIndex());
-            Block blk = new Block(blockId, numBytes,
-                blockInfo.getGenerationStamp());
-            BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, srcNode,
-                targetNode, srcStorageType, targetStorageType);
-            blkMovingInfos.add(blkMovingInfo);
-            break; // found matching source-target nodes
-          }
+        if (sourceNode.equals(dn)) {
+          // construct internal block
+          long blockId = blockInfo.getBlockId() + si.getBlockIndex();
+          long numBytes = StripedBlockUtil.getInternalBlockLength(
+              sBlockInfo.getNumBytes(), sBlockInfo.getCellSize(),
+              sBlockInfo.getDataBlockNum(), si.getBlockIndex());
+          Block blk = new Block(blockId, numBytes,
+              blockInfo.getGenerationStamp());
+          BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
+              targetNode, sourceStorageType, targetStorageType);
+          blkMovingInfos.add(blkMovingInfo);
         }
       }
     }
@@ -817,18 +788,18 @@ public class StoragePolicySatisfier implements Runnable {
   }
 
   /**
-   * Receives the movement results of collection of blocks associated to a
-   * trackId.
+   * Receives set of storage movement attempt finished blocks report.
    *
-   * @param blksMovementResults
-   *          movement status of the set of blocks associated to a trackId.
+   * @param moveAttemptFinishedBlks
+   *          set of storage movement attempt finished blocks.
    */
-  void handleBlocksStorageMovementResults(
-      BlocksStorageMovementResult[] blksMovementResults) {
-    if (blksMovementResults.length <= 0) {
+  void handleStorageMovementAttemptFinishedBlks(
+      BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks) {
+    if (moveAttemptFinishedBlks.getBlocks().length <= 0) {
       return;
     }
-    storageMovementsMonitor.addResults(blksMovementResults);
+    storageMovementsMonitor
+        .addReportedMovedBlocks(moveAttemptFinishedBlks.getBlocks());
   }
 
   @VisibleForTesting
@@ -906,4 +877,52 @@ public class StoragePolicySatisfier implements Runnable {
       return (startId != trackId);
     }
   }
+
+  /**
+   * This class contains information of an attempted blocks and its last
+   * attempted or reported time stamp. This is used by
+   * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
+   */
+  final static class AttemptedItemInfo extends ItemInfo {
+    private long lastAttemptedOrReportedTime;
+    private final List<Block> blocks;
+
+    /**
+     * AttemptedItemInfo constructor.
+     *
+     * @param rootId
+     *          rootId for trackId
+     * @param trackId
+     *          trackId for file.
+     * @param lastAttemptedOrReportedTime
+     *          last attempted or reported time
+     */
+    AttemptedItemInfo(long rootId, long trackId,
+        long lastAttemptedOrReportedTime,
+        List<Block> blocks) {
+      super(rootId, trackId);
+      this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
+      this.blocks = blocks;
+    }
+
+    /**
+     * @return last attempted or reported time stamp.
+     */
+    long getLastAttemptedOrReportedTime() {
+      return lastAttemptedOrReportedTime;
+    }
+
+    /**
+     * Update lastAttemptedOrReportedTime, so that the expiration time will be
+     * postponed to future.
+     */
+    void touchLastReportedTimeStamp() {
+      this.lastAttemptedOrReportedTime = monotonicNow();
+    }
+
+    List<Block> getBlocks() {
+      return this.blocks;
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
index 5dcf4e7..e90317d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.protocol;
 
-import java.util.Arrays;
 import java.util.Collection;
 
 import org.apache.hadoop.fs.StorageType;
@@ -29,22 +28,15 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  * given set of blocks to specified target DataNodes to fulfill the block
  * storage policy.
  *
- * Upon receiving this command, this DataNode coordinates all the block 
movement
- * by passing the details to
+ * Upon receiving this command, this DataNode pass the array of block movement
+ * details to
  * {@link org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker}
- * service. After the block movement this DataNode sends response back to the
- * NameNode about the movement status.
- *
- * The coordinator datanode will use 'trackId' identifier to coordinate the
- * block movement of the given set of blocks. TrackId is a unique identifier
- * that represents a group of blocks. Namenode will generate this unique value
- * and send it to the coordinator datanode along with the
- * BlockStorageMovementCommand. Datanode will monitor the completion of the
- * block movements that grouped under this trackId and notifies Namenode about
- * the completion status.
+ * service. Later, StoragePolicySatisfyWorker will schedule block movement 
tasks
+ * for these blocks and monitors the completion of each task. After the block
+ * movement attempt is finished(with success or failure) this DataNode will 
send
+ * response back to NameNode about the block movement attempt finished details.
  */
 public class BlockStorageMovementCommand extends DatanodeCommand {
-  private final long trackID;
   private final String blockPoolId;
   private final Collection<BlockMovingInfo> blockMovingTasks;
 
@@ -53,30 +45,17 @@ public class BlockStorageMovementCommand extends 
DatanodeCommand {
    *
    * @param action
    *          protocol specific action
-   * @param trackID
-   *          unique identifier to monitor the given set of block movements
-   * @param blockPoolId
-   *          block pool ID
    * @param blockMovingInfos
    *          block to storage info that will be used for movement
    */
-  public BlockStorageMovementCommand(int action, long trackID,
-      String blockPoolId, Collection<BlockMovingInfo> blockMovingInfos) {
+  public BlockStorageMovementCommand(int action, String blockPoolId,
+      Collection<BlockMovingInfo> blockMovingInfos) {
     super(action);
-    this.trackID = trackID;
     this.blockPoolId = blockPoolId;
     this.blockMovingTasks = blockMovingInfos;
   }
 
   /**
-   * Returns trackID, which will be used to monitor the block movement assigned
-   * to this coordinator datanode.
-   */
-  public long getTrackID() {
-    return trackID;
-  }
-
-  /**
    * Returns block pool ID.
    */
   public String getBlockPoolId() {
@@ -95,33 +74,29 @@ public class BlockStorageMovementCommand extends 
DatanodeCommand {
    */
   public static class BlockMovingInfo {
     private Block blk;
-    private DatanodeInfo[] sourceNodes;
-    private DatanodeInfo[] targetNodes;
-    private StorageType[] sourceStorageTypes;
-    private StorageType[] targetStorageTypes;
+    private DatanodeInfo sourceNode;
+    private DatanodeInfo targetNode;
+    private StorageType sourceStorageType;
+    private StorageType targetStorageType;
 
     /**
      * Block to storage info constructor.
      *
      * @param block
-     *          block
-     * @param sourceDnInfos
-     *          node that can be the sources of a block move
-     * @param targetDnInfos
-     *          target datanode info
-     * @param srcStorageTypes
+     *          block info
+     * @param sourceDnInfo
+     *          node that can be the source of a block move
+     * @param srcStorageType
      *          type of source storage media
-     * @param targetStorageTypes
-     *          type of destin storage media
      */
-    public BlockMovingInfo(Block block,
-        DatanodeInfo[] sourceDnInfos, DatanodeInfo[] targetDnInfos,
-        StorageType[] srcStorageTypes, StorageType[] targetStorageTypes) {
+    public BlockMovingInfo(Block block, DatanodeInfo sourceDnInfo,
+        DatanodeInfo targetDnInfo, StorageType srcStorageType,
+        StorageType targetStorageType) {
       this.blk = block;
-      this.sourceNodes = sourceDnInfos;
-      this.targetNodes = targetDnInfos;
-      this.sourceStorageTypes = srcStorageTypes;
-      this.targetStorageTypes = targetStorageTypes;
+      this.sourceNode = sourceDnInfo;
+      this.targetNode = targetDnInfo;
+      this.sourceStorageType = srcStorageType;
+      this.targetStorageType = targetStorageType;
     }
 
     public void addBlock(Block block) {
@@ -129,35 +104,33 @@ public class BlockStorageMovementCommand extends 
DatanodeCommand {
     }
 
     public Block getBlock() {
-      return this.blk;
+      return blk;
     }
 
-    public DatanodeInfo[] getSources() {
-      return sourceNodes;
+    public DatanodeInfo getSource() {
+      return sourceNode;
     }
 
-    public DatanodeInfo[] getTargets() {
-      return targetNodes;
+    public DatanodeInfo getTarget() {
+      return targetNode;
     }
 
-    public StorageType[] getTargetStorageTypes() {
-      return targetStorageTypes;
+    public StorageType getTargetStorageType() {
+      return targetStorageType;
     }
 
-    public StorageType[] getSourceStorageTypes() {
-      return sourceStorageTypes;
+    public StorageType getSourceStorageType() {
+      return sourceStorageType;
     }
 
     @Override
     public String toString() {
       return new StringBuilder().append("BlockMovingInfo(\n  ")
           .append("Moving block: ").append(blk).append(" From: ")
-          .append(Arrays.asList(sourceNodes)).append(" To: [")
-          .append(Arrays.asList(targetNodes)).append("\n  ")
-          .append(" sourceStorageTypes: ")
-          .append(Arrays.toString(sourceStorageTypes))
-          .append(" targetStorageTypes: ")
-          .append(Arrays.toString(targetStorageTypes)).append(")").toString();
+          .append(sourceNode).append(" To: [").append(targetNode).append("\n  
")
+          .append(" sourceStorageType: ").append(sourceStorageType)
+          .append(" targetStorageType: ").append(targetStorageType).append(")")
+          .toString();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMoveAttemptFinished.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMoveAttemptFinished.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMoveAttemptFinished.java
new file mode 100644
index 0000000..c837e013
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMoveAttemptFinished.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * This class represents, the blocks for which storage movements has done by
+ * datanodes. The movementFinishedBlocks array contains all the blocks that are
+ * attempted to do the movement and it could be finished with either success or
+ * failure.
+ */
+public class BlocksStorageMoveAttemptFinished {
+
+  private final Block[] movementFinishedBlocks;
+
+  public BlocksStorageMoveAttemptFinished(Block[] moveAttemptFinishedBlocks) {
+    this.movementFinishedBlocks = moveAttemptFinishedBlocks;
+  }
+
+  public Block[] getBlocks() {
+    return movementFinishedBlocks;
+  }
+
+  @Override
+  public String toString() {
+    return new StringBuilder().append("BlocksStorageMovementFinished(\n  ")
+        .append("  blockID: ").append(Arrays.toString(movementFinishedBlocks))
+        .append(")").toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
deleted file mode 100644
index 7f749ec4..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.protocol;
-
-/**
- * This class represents, movement status of a set of blocks associated to a
- * track Id.
- */
-public class BlocksStorageMovementResult {
-
-  private final long trackId;
-  private final Status status;
-
-  /**
-   * SUCCESS - If all the blocks associated to track id has moved successfully
-   * or maximum possible movements done.
-   *
-   * <p>
-   * FAILURE - If any of its(trackId) blocks movement failed and requires to
-   * retry these failed blocks movements. Example selected target node is no
-   * more running or no space. So, retrying by selecting new target node might
-   * work.
-   *
-   * <p>
-   * IN_PROGRESS - If all or some of the blocks associated to track id are
-   * still moving.
-   */
-  public enum Status {
-    SUCCESS, FAILURE, IN_PROGRESS;
-  }
-
-  /**
-   * BlocksStorageMovementResult constructor.
-   *
-   * @param trackId
-   *          tracking identifier
-   * @param status
-   *          block movement status
-   */
-  public BlocksStorageMovementResult(long trackId, Status status) {
-    this.trackId = trackId;
-    this.status = status;
-  }
-
-  public long getTrackId() {
-    return trackId;
-  }
-
-  public Status getStatus() {
-    return status;
-  }
-
-  @Override
-  public String toString() {
-    return new StringBuilder().append("BlocksStorageMovementResult(\n  ")
-        .append("track id: ").append(trackId).append("  status: ")
-        .append(status).append(")").toString();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index 5e1f148..fcc2df1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -112,8 +112,7 @@ public interface DatanodeProtocol {
    * @param slowPeers Details of peer DataNodes that were detected as being
    *                  slow to respond to packet writes. Empty report if no
    *                  slow peers were detected by the DataNode.
-   * @param blksMovementResults array of movement status of a set of blocks
-   *                            associated to a trackId.
+   * @param storageMovFinishedBlks array of movement attempt finished blocks
    * @throws IOException on error
    */
   @Idempotent
@@ -128,7 +127,7 @@ public interface DatanodeProtocol {
                                        boolean requestFullBlockReportLease,
                                        @Nonnull SlowPeerReports slowPeers,
                                        @Nonnull SlowDiskReports slowDisks,
-                                       BlocksStorageMovementResult[] 
blksMovementResults)
+                                       BlocksStorageMoveAttemptFinished 
storageMovFinishedBlks)
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 080f7fa..7c35494 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -162,9 +162,8 @@ message BlockECReconstructionCommandProto {
  * Block storage movement command
  */
 message BlockStorageMovementCommandProto {
-  required uint64 trackID = 1;
-  required string blockPoolId = 2;
-  repeated BlockStorageMovementProto blockStorageMovement = 3;
+  required string blockPoolId = 1;
+  repeated BlockMovingInfoProto blockMovingInfo = 2;
 }
 
 /**
@@ -177,25 +176,20 @@ message DropSPSWorkCommandProto {
 /**
  * Block storage movement information
  */
-message BlockStorageMovementProto {
+message BlockMovingInfoProto {
   required BlockProto block = 1;
-  required DatanodeInfosProto sourceDnInfos = 2;
-  required DatanodeInfosProto targetDnInfos = 3;
-  required StorageTypesProto sourceStorageTypes = 4;
-  required StorageTypesProto targetStorageTypes = 5;
+  required DatanodeInfoProto sourceDnInfo = 2;
+  required DatanodeInfoProto targetDnInfo = 3;
+  required StorageTypeProto sourceStorageType = 4;
+  required StorageTypeProto targetStorageType = 5;
 }
 
 /**
- * Movement status of the set of blocks associated to a trackId.
+ * Blocks for which storage movements has been attempted and finished
+ * with either success or failure.
  */
-message BlocksStorageMovementResultProto {
-  enum Status {
-    SUCCESS = 1; // block movement succeeded
-    FAILURE = 2; // block movement failed and needs to retry
-    IN_PROGRESS = 3; // block movement is still in progress
-  }
-  required uint64 trackID = 1;
-  required Status status = 2;
+message BlocksStorageMoveAttemptFinishedProto {
+  repeated BlockProto blocks = 1;
 }
 
 /**
@@ -255,7 +249,7 @@ message HeartbeatRequestProto {
   optional bool requestFullBlockReportLease = 9 [ default = false ];
   repeated SlowPeerReportProto slowPeers = 10;
   repeated SlowDiskReportProto slowDisks = 11;
-  repeated BlocksStorageMovementResultProto blksMovementResults = 12;
+  optional BlocksStorageMoveAttemptFinishedProto 
storageMoveAttemptFinishedBlks = 12;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index e0ca175..e16341e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4523,24 +4523,35 @@
 
 <property>
   <name>dfs.storage.policy.satisfier.recheck.timeout.millis</name>
-  <value>300000</value>
+  <value>60000</value>
   <description>
     Blocks storage movements monitor re-check interval in milliseconds.
     This check will verify whether any blocks storage movement results arrived 
from DN
     and also verify if any of file blocks movements not at all reported to DN
     since dfs.storage.policy.satisfier.self.retry.timeout.
-    The default value is 5 * 60 * 1000 (5 mins)
+    The default value is 1 * 60 * 1000 (1 mins)
   </description>
 </property>
 
 <property>
   <name>dfs.storage.policy.satisfier.self.retry.timeout.millis</name>
-  <value>1800000</value>
+  <value>300000</value>
   <description>
-    If any of file related block movements not at all reported by coordinator 
datanode,
+    If any of file related block movements not at all reported by datanode,
     then after this timeout(in milliseconds), the item will be added back to 
movement needed list
     at namenode which will be retried for block movements.
-    The default value is 30 * 60 * 1000 (30 mins)
+    The default value is 5 * 60 * 1000 (5 mins)
+  </description>
+</property>
+
+<property>
+  <name>dfs.storage.policy.satisfier.low.max-streams.preference</name>
+  <value>false</value>
+  <description>
+    If true, blocks to move tasks will share equal ratio of number of 
highest-priority
+    replication streams (dfs.namenode.replication.max-streams) with pending 
replica and
+    erasure-coded reconstruction tasks. If false, blocks to move tasks will 
only use
+    the delta number of replication streams. The default value is false.
   </description>
 </property>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md 
b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
index c8a9466..5defbd0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
@@ -106,7 +106,7 @@ Following 2 options will allow users to move the blocks 
based on new policy set.
 ### <u>S</u>torage <u>P</u>olicy <u>S</u>atisfier (SPS)
 
 When user changes the storage policy on a file/directory, user can call 
`HdfsAdmin` API `satisfyStoragePolicy()` to move the blocks as per the new 
policy set.
-The SPS daemon thread runs along with namenode and periodically scans for the 
storage mismatches between new policy set and the physical blocks placed. This 
will only track the files/directories for which user invoked 
satisfyStoragePolicy. If SPS identifies some blocks to be moved for a file, 
then it will schedule block movement tasks to datanodes. A Coordinator 
DataNode(C-DN) will track all block movements associated to a file and notify 
to namenode about movement success/failure. If there are any failures in 
movement, the SPS will re-attempt by sending new block movement task.
+The SPS daemon thread runs along with namenode and periodically scans for the 
storage mismatches between new policy set and the physical blocks placed. This 
will only track the files/directories for which user invoked 
satisfyStoragePolicy. If SPS identifies some blocks to be moved for a file, 
then it will schedule block movement tasks to datanodes. If there are any 
failures in movement, the SPS will re-attempt by sending new block movement 
tasks.
 
 SPS can be enabled and disabled dynamically without restarting the Namenode.
 
@@ -129,10 +129,10 @@ Detailed design documentation can be found at [Storage 
Policy Satisfier(SPS) (HD
    enabled and vice versa.
 
 *   **dfs.storage.policy.satisfier.recheck.timeout.millis** - A timeout to 
re-check the processed block storage movement
-   command results from Co-ordinator Datanode.
+   command results from Datanodes.
 
 *   **dfs.storage.policy.satisfier.self.retry.timeout.millis** - A timeout to 
retry if no block movement results reported from
-   Co-ordinator Datanode in this configured timeout.
+   Datanode in this configured timeout.
 
 ### Mover - A New Data Migration Tool
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index 9530e20..f247370 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -39,7 +39,7 @@ import 
org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -117,7 +117,8 @@ public class TestNameNodePrunesMissingStorages {
       cluster.stopDataNode(0);
       cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 
0,
           0, null, true, SlowPeerReports.EMPTY_REPORT,
-          SlowDiskReports.EMPTY_REPORT, new BlocksStorageMovementResult[0]);
+          SlowDiskReports.EMPTY_REPORT,
+          new BlocksStorageMoveAttemptFinished(null));
 
       // Check that the missing storage was pruned.
       assertThat(dnDescriptor.getStorageInfos().length, 
is(expectedStoragesAfterTest));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
index bd831d6..d13d717 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
@@ -37,7 +37,7 @@ import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -169,7 +169,7 @@ public class InternalDataNodeTestUtils {
             Mockito.anyBoolean(),
             Mockito.any(SlowPeerReports.class),
             Mockito.any(SlowDiskReports.class),
-            Mockito.any(BlocksStorageMovementResult[].class))).thenReturn(
+            Mockito.any(BlocksStorageMoveAttemptFinished.class))).thenReturn(
         new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
             HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
             .nextLong() | 1L));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 3d006e0..0fa1696 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -49,7 +49,7 @@ import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -161,7 +161,7 @@ public class TestBPOfferService {
           Mockito.anyBoolean(),
           Mockito.any(SlowPeerReports.class),
           Mockito.any(SlowDiskReports.class),
-          Mockito.any(BlocksStorageMovementResult[].class));
+          Mockito.any(BlocksStorageMoveAttemptFinished.class));
     mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
     datanodeCommands[nnIdx] = new DatanodeCommand[0];
     return mock;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index a05fdfd..052eb87 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -93,7 +93,7 @@ import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -234,7 +234,7 @@ public class TestBlockRecovery {
             Mockito.anyBoolean(),
             Mockito.any(SlowPeerReports.class),
             Mockito.any(SlowDiskReports.class),
-            Mockito.any(BlocksStorageMovementResult[].class)))
+            Mockito.any(BlocksStorageMoveAttemptFinished.class)))
         .thenReturn(new HeartbeatResponse(
             new DatanodeCommand[0],
             new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
index b15b530..0dd15c3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
@@ -50,7 +50,7 @@ import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
@@ -174,7 +174,7 @@ public class TestDataNodeLifeline {
             anyBoolean(),
             any(SlowPeerReports.class),
             any(SlowDiskReports.class),
-            any(BlocksStorageMovementResult[].class));
+            any(BlocksStorageMoveAttemptFinished.class));
 
     // Intercept lifeline to trigger latch count-down on each call.
     doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
@@ -240,7 +240,7 @@ public class TestDataNodeLifeline {
             anyBoolean(),
             any(SlowPeerReports.class),
             any(SlowDiskReports.class),
-            any(BlocksStorageMovementResult[].class));
+            any(BlocksStorageMoveAttemptFinished.class));
 
     // While waiting on the latch for the expected number of heartbeat 
messages,
     // poll DataNode tracking information.  We expect that the DataNode always

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
index d7ac3f9..d47da69 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
@@ -44,7 +44,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -224,7 +224,7 @@ public class TestDatanodeProtocolRetryPolicy {
            Mockito.anyBoolean(),
            Mockito.any(SlowPeerReports.class),
            Mockito.any(SlowDiskReports.class),
-           Mockito.any(BlocksStorageMovementResult[].class));
+           Mockito.any(BlocksStorageMoveAttemptFinished.class));
 
     dn = new DataNode(conf, locations, null, null) {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index b9f21a0..3732b2e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -66,7 +66,7 @@ import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.Page
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -210,7 +210,7 @@ public class TestFsDatasetCache {
           anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
           anyBoolean(), any(SlowPeerReports.class),
           any(SlowDiskReports.class),
-          (BlocksStorageMovementResult[]) any());
+          any(BlocksStorageMoveAttemptFinished.class));
     } finally {
       lock.writeLock().unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
index b84b1d2..3681cae 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -36,8 +36,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.namenode.INode;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
@@ -180,11 +178,10 @@ public class TestStoragePolicySatisfyWorker {
           lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo,
           lb.getStorageTypes()[0], StorageType.ARCHIVE);
       blockMovingInfos.add(blockMovingInfo);
-      INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
-      worker.processBlockMovingTasks(inode.getId(),
-          cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
+      worker.processBlockMovingTasks(cluster.getNamesystem().getBlockPoolId(),
+          blockMovingInfos);
 
-      waitForBlockMovementCompletion(worker, inode.getId(), 1, 30000);
+      waitForBlockMovementCompletion(worker, 1, 30000);
     } finally {
       worker.stop();
     }
@@ -226,50 +223,42 @@ public class TestStoragePolicySatisfyWorker {
                 locatedBlock.getStorageTypes()[0], StorageType.ARCHIVE);
         blockMovingInfos.add(blockMovingInfo);
       }
-      INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
-      worker.processBlockMovingTasks(inode.getId(),
-          cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
+      worker.processBlockMovingTasks(cluster.getNamesystem().getBlockPoolId(),
+          blockMovingInfos);
       // Wait till results queue build up
-      waitForBlockMovementResult(worker, inode.getId(), 30000);
+      waitForBlockMovementResult(worker, 30000);
       worker.dropSPSWork();
       assertTrue(worker.getBlocksMovementsStatusHandler()
-          .getBlksMovementResults().size() == 0);
+          .getMoveAttemptFinishedBlocks().size() == 0);
     } finally {
       worker.stop();
     }
   }
 
   private void waitForBlockMovementResult(
-      final StoragePolicySatisfyWorker worker, final long inodeId, int timeout)
-          throws Exception {
+      final StoragePolicySatisfyWorker worker, int timeout) throws Exception {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
-        List<BlocksStorageMovementResult> completedBlocks = worker
-            .getBlocksMovementsStatusHandler().getBlksMovementResults();
+        List<Block> completedBlocks = worker.getBlocksMovementsStatusHandler()
+            .getMoveAttemptFinishedBlocks();
         return completedBlocks.size() > 0;
       }
     }, 100, timeout);
   }
 
   private void waitForBlockMovementCompletion(
-      final StoragePolicySatisfyWorker worker, final long inodeId,
-      int expectedFailedItemsCount, int timeout) throws Exception {
+      final StoragePolicySatisfyWorker worker,
+      int expectedFinishedItemsCount, int timeout) throws Exception {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
-        List<BlocksStorageMovementResult> completedBlocks = worker
-            .getBlocksMovementsStatusHandler().getBlksMovementResults();
-        int failedCount = 0;
-        for (BlocksStorageMovementResult blkMovementResult : completedBlocks) {
-          if (blkMovementResult.getStatus() ==
-              BlocksStorageMovementResult.Status.FAILURE) {
-            failedCount++;
-          }
-        }
+        List<Block> completedBlocks = worker.getBlocksMovementsStatusHandler()
+            .getMoveAttemptFinishedBlocks();
+        int finishedCount = completedBlocks.size();
         LOG.info("Block movement completed count={}, expected={} and 
actual={}",
-            completedBlocks.size(), expectedFailedItemsCount, failedCount);
-        return expectedFailedItemsCount == failedCount;
+            completedBlocks.size(), expectedFinishedItemsCount, finishedCount);
+        return expectedFinishedItemsCount == finishedCount;
       }
     }, 100, timeout);
   }
@@ -304,8 +293,7 @@ public class TestStoragePolicySatisfyWorker {
   private BlockMovingInfo prepareBlockMovingInfo(Block block,
       DatanodeInfo src, DatanodeInfo destin, StorageType storageType,
       StorageType targetStorageType) {
-    return new BlockMovingInfo(block, new DatanodeInfo[] {src},
-        new DatanodeInfo[] {destin}, new StorageType[] {storageType},
-        new StorageType[] {targetStorageType});
+    return new BlockMovingInfo(block, src, destin, storageType,
+        targetStorageType);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
index df120ca..20402f2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -112,7 +112,7 @@ public class TestStorageReport {
         Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(),
         Mockito.any(SlowPeerReports.class),
         Mockito.any(SlowDiskReports.class),
-        Mockito.any(BlocksStorageMovementResult[].class));
+        Mockito.any(BlocksStorageMoveAttemptFinished.class));
 
     StorageReport[] reports = captor.getValue();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index 1e016f7..ec00ae7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -56,7 +56,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -958,7 +958,7 @@ public class NNThroughputBenchmark implements Tool {
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
           0L, 0L, 0, 0, 0, null, true,
           SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
-          new BlocksStorageMovementResult[0]).getCommands();
+          new BlocksStorageMoveAttemptFinished(null)).getCommands();
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
           if(LOG.isDebugEnabled()) {
@@ -1009,7 +1009,7 @@ public class NNThroughputBenchmark implements Tool {
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
           rep, 0L, 0L, 0, 0, 0, null, true,
           SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
-          new BlocksStorageMovementResult[0]).getCommands();
+          new BlocksStorageMoveAttemptFinished(null)).getCommands();
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
           if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index 4584add..899bb82 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -40,7 +40,7 @@ import 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@@ -132,7 +132,7 @@ public class NameNodeAdapter {
         BlockManagerTestUtil.getStorageReportsForDatanode(dd),
         dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true,
         SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
-        new BlocksStorageMovementResult[0]);
+        new BlocksStorageMoveAttemptFinished(null));
   }
 
   public static boolean setReplication(final FSNamesystem ns,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
index 7918821..f79326f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
@@ -18,10 +18,17 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.util.Time.monotonicNow;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import 
org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.AttemptedItemInfo;
 
 import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,9 +49,8 @@ public class TestBlockStorageMovementAttemptedItems {
     unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(
         Mockito.mock(Namesystem.class),
         Mockito.mock(StoragePolicySatisfier.class), 100);
-    StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
     bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
-        selfRetryTimeout, unsatisfiedStorageMovementFiles, sps);
+        selfRetryTimeout, unsatisfiedStorageMovementFiles);
   }
 
   @After
@@ -76,120 +82,115 @@ public class TestBlockStorageMovementAttemptedItems {
     return isItemFound;
   }
 
+  /**
+   * Verify that moved blocks reporting should queued up the block info.
+   */
   @Test(timeout = 30000)
-  public void testAddResultWithFailureResult() throws Exception {
-    bsmAttemptedItems.start(); // start block movement result monitor thread
-    Long item = new Long(1234);
-    bsmAttemptedItems.add(new ItemInfo(0L, item), true);
-    bsmAttemptedItems.addResults(
-        new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
-            item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
-    assertTrue(checkItemMovedForRetry(item, 200));
-  }
-
-  @Test(timeout = 30000)
-  public void testAddResultWithSucessResult() throws Exception {
+  public void testAddReportedMoveAttemptFinishedBlocks() throws Exception {
     bsmAttemptedItems.start(); // start block movement result monitor thread
     Long item = new Long(1234);
-    bsmAttemptedItems.add(new ItemInfo(0L, item), true);
-    bsmAttemptedItems.addResults(
-        new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
-            item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
-    assertFalse(checkItemMovedForRetry(item, 200));
+    List<Block> blocks = new ArrayList<Block>();
+    blocks.add(new Block(item));
+    bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks));
+    Block[] blockArray = new Block[blocks.size()];
+    blocks.toArray(blockArray);
+    bsmAttemptedItems.addReportedMovedBlocks(blockArray);
+    assertEquals("Failed to receive result!", 1,
+        bsmAttemptedItems.getMovementFinishedBlocksCount());
   }
 
+  /**
+   * Verify empty moved blocks reporting queue.
+   */
   @Test(timeout = 30000)
-  public void testNoResultAdded() throws Exception {
-    bsmAttemptedItems.start(); // start block movement result monitor thread
+  public void testNoBlockMovementAttemptFinishedReportAdded() throws Exception 
{
+    bsmAttemptedItems.start(); // start block movement report monitor thread
     Long item = new Long(1234);
-    bsmAttemptedItems.add(new ItemInfo(0L, item), true);
-    // After self retry timeout, it should be added back for retry
-    assertTrue("Failed to add to the retry list",
-        checkItemMovedForRetry(item, 600));
-    assertEquals("Failed to remove from the attempted list", 0,
+    List<Block> blocks = new ArrayList<>();
+    blocks.add(new Block(item));
+    bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks));
+    assertEquals("Shouldn't receive result", 0,
+        bsmAttemptedItems.getMovementFinishedBlocksCount());
+    assertEquals("Item doesn't exist in the attempted list", 1,
         bsmAttemptedItems.getAttemptedItemsCount());
   }
 
   /**
-   * Partial block movement with BlocksStorageMovementResult#SUCCESS. Here,
-   * first occurrence is #blockStorageMovementResultCheck() and then
+   * Partial block movement with
+   * BlockMovementStatus#DN_BLK_STORAGE_MOVEMENT_SUCCESS. Here, first 
occurrence
+   * is #blockStorageMovementReportedItemsCheck() and then
    * #blocksStorageMovementUnReportedItemsCheck().
    */
   @Test(timeout = 30000)
   public void testPartialBlockMovementShouldBeRetried1() throws Exception {
     Long item = new Long(1234);
-    bsmAttemptedItems.add(new ItemInfo(0L, item), false);
-    bsmAttemptedItems.addResults(
-        new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
-            item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
-
-    // start block movement result monitor thread
+    List<Block> blocks = new ArrayList<>();
+    blocks.add(new Block(item));
+    blocks.add(new Block(5678L));
+    Long trackID = 0L;
+    bsmAttemptedItems
+        .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks));
+    Block[] blksMovementReport = new Block[1];
+    blksMovementReport[0] = new Block(item);
+    bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
+
+    // start block movement report monitor thread
     bsmAttemptedItems.start();
     assertTrue("Failed to add to the retry list",
-        checkItemMovedForRetry(item, 5000));
+        checkItemMovedForRetry(trackID, 5000));
     assertEquals("Failed to remove from the attempted list", 0,
         bsmAttemptedItems.getAttemptedItemsCount());
   }
 
   /**
-   * Partial block movement with BlocksStorageMovementResult#SUCCESS. Here,
-   * first occurrence is #blocksStorageMovementUnReportedItemsCheck() and then
-   * #blockStorageMovementResultCheck().
+   * Partial block movement. Here, first occurrence is
+   * #blocksStorageMovementUnReportedItemsCheck() and then
+   * #blockStorageMovementReportedItemsCheck().
    */
   @Test(timeout = 30000)
   public void testPartialBlockMovementShouldBeRetried2() throws Exception {
     Long item = new Long(1234);
-    bsmAttemptedItems.add(new ItemInfo(0L, item), false);
-    bsmAttemptedItems.addResults(
-        new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
-            item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
+    Long trackID = 0L;
+    List<Block> blocks = new ArrayList<>();
+    blocks.add(new Block(item));
+    bsmAttemptedItems
+        .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks));
+    Block[] blksMovementReport = new Block[1];
+    blksMovementReport[0] = new Block(item);
+    bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
 
     Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out
 
     bsmAttemptedItems.blocksStorageMovementUnReportedItemsCheck();
-    bsmAttemptedItems.blockStorageMovementResultCheck();
+    bsmAttemptedItems.blockStorageMovementReportedItemsCheck();
 
     assertTrue("Failed to add to the retry list",
-        checkItemMovedForRetry(item, 5000));
+        checkItemMovedForRetry(trackID, 5000));
     assertEquals("Failed to remove from the attempted list", 0,
         bsmAttemptedItems.getAttemptedItemsCount());
   }
 
   /**
-   * Partial block movement with only BlocksStorageMovementResult#FAILURE
-   * result and storageMovementAttemptedItems list is empty.
+   * Partial block movement with only BlocksStorageMoveAttemptFinished report
+   * and storageMovementAttemptedItems list is empty.
    */
   @Test(timeout = 30000)
   public void testPartialBlockMovementWithEmptyAttemptedQueue()
       throws Exception {
     Long item = new Long(1234);
-    bsmAttemptedItems.addResults(
-        new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
-            item, BlocksStorageMovementResult.Status.FAILURE)});
-    bsmAttemptedItems.blockStorageMovementResultCheck();
+    Long trackID = 0L;
+    List<Block> blocks = new ArrayList<>();
+    blocks.add(new Block(item));
+    bsmAttemptedItems
+        .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks));
+    Block[] blksMovementReport = new Block[1];
+    blksMovementReport[0] = new Block(item);
+    bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
     assertFalse(
         "Should not add in queue again if it is not there in"
             + " storageMovementAttemptedItems",
-        checkItemMovedForRetry(item, 5000));
-    assertEquals("Failed to remove from the attempted list", 0,
-        bsmAttemptedItems.getAttemptedItemsCount());
-  }
-
-  /**
-   * Partial block movement with BlocksStorageMovementResult#FAILURE result and
-   * storageMovementAttemptedItems.
-   */
-  @Test(timeout = 30000)
-  public void testPartialBlockMovementShouldBeRetried4() throws Exception {
-    Long item = new Long(1234);
-    bsmAttemptedItems.add(new ItemInfo(0L, item), false);
-    bsmAttemptedItems.addResults(
-        new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
-            item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
-    bsmAttemptedItems.blockStorageMovementResultCheck();
-    assertTrue("Failed to add to the retry list",
-        checkItemMovedForRetry(item, 5000));
-    assertEquals("Failed to remove from the attempted list", 0,
+        checkItemMovedForRetry(trackID, 5000));
+    assertEquals("Failed to remove from the attempted list", 1,
         bsmAttemptedItems.getAttemptedItemsCount());
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7e8b1c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 36beaa8..65628b9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -44,7 +44,7 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -141,7 +141,7 @@ public class TestDeadDatanode {
     DatanodeCommand[] cmd =
         dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
             SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
-            new BlocksStorageMovementResult[0]).getCommands();
+            new BlocksStorageMoveAttemptFinished(null)).getCommands();
     assertEquals(1, cmd.length);
     assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
         .getAction());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to