http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
index 7580ba9..f5225d2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
@@ -20,13 +20,10 @@ package org.apache.hadoop.hdfs.server.sps;
 
 import java.io.IOException;
 import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -39,7 +36,6 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
@@ -48,15 +44,14 @@ import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.balancer.KeyManager;
 import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
 import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
 import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus;
 import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker;
 import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
-import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
 import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler;
 import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
@@ -105,12 +100,14 @@ public class ExternalSPSBlockMoveTaskHandler implements 
BlockMoveTaskHandler {
     int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
     blkDispatcher = new BlockDispatcher(HdfsConstants.READ_TIMEOUT,
         ioFileBufferSize, connectToDnViaHostname);
+
+    startMovementTracker();
   }
 
   /**
    * Initializes block movement tracker daemon and starts the thread.
    */
-  public void init() {
+  private void startMovementTracker() {
     movementTrackerThread = new Daemon(this.blkMovementTracker);
     movementTrackerThread.setName("BlockStorageMovementTracker");
     movementTrackerThread.start();
@@ -156,24 +153,16 @@ public class ExternalSPSBlockMoveTaskHandler implements 
BlockMoveTaskHandler {
     // dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
     LOG.debug("Received BlockMovingTask {}", blkMovingInfo);
     BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo);
-    Future<BlockMovementAttemptFinished> moveCallable = mCompletionServ
-        .submit(blockMovingTask);
-    blkMovementTracker.addBlock(blkMovingInfo.getBlock(), moveCallable);
+    mCompletionServ.submit(blockMovingTask);
   }
 
   private class ExternalBlocksMovementsStatusHandler
-      extends BlocksMovementsStatusHandler {
+      implements BlocksMovementsStatusHandler {
     @Override
-    public void handle(
-        List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
-      List<Block> blocks = new ArrayList<>();
-      for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
-        blocks.add(item.getBlock());
-      }
-      BlocksStorageMoveAttemptFinished blkAttempted =
-          new BlocksStorageMoveAttemptFinished(
-          blocks.toArray(new Block[blocks.size()]));
-      service.notifyStorageMovementAttemptFinishedBlks(blkAttempted);
+    public void handle(BlockMovementAttemptFinished attemptedMove) {
+      service.notifyStorageMovementAttemptFinishedBlk(
+          attemptedMove.getTargetDatanode(), attemptedMove.getTargetType(),
+          attemptedMove.getBlock());
     }
   }
 
@@ -194,6 +183,7 @@ public class ExternalSPSBlockMoveTaskHandler implements 
BlockMoveTaskHandler {
       BlockMovementStatus blkMovementStatus = moveBlock();
       return new BlockMovementAttemptFinished(blkMovingInfo.getBlock(),
           blkMovingInfo.getSource(), blkMovingInfo.getTarget(),
+          blkMovingInfo.getTargetStorageType(),
           blkMovementStatus);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
index 6fc35ea..236b887 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
@@ -86,7 +86,6 @@ public final class ExternalStoragePolicySatisfier {
           new ExternalBlockMovementListener();
       ExternalSPSBlockMoveTaskHandler externalHandler =
           new ExternalSPSBlockMoveTaskHandler(spsConf, nnc, sps);
-      externalHandler.init();
       sps.init(context, new ExternalSPSFilePathCollector(sps), externalHandler,
           blkMoveListener);
       sps.start(true, StoragePolicySatisfierMode.EXTERNAL);
@@ -147,7 +146,7 @@ public final class ExternalStoragePolicySatisfier {
       for (Block block : moveAttemptFinishedBlks) {
         actualBlockMovements.add(block);
       }
-      LOG.info("Movement attempted blocks", actualBlockMovements);
+      LOG.info("Movement attempted blocks:{}", actualBlockMovements);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 7c35494..baf7ec7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -185,14 +185,6 @@ message BlockMovingInfoProto {
 }
 
 /**
- * Blocks for which storage movements has been attempted and finished
- * with either success or failure.
- */
-message BlocksStorageMoveAttemptFinishedProto {
-  repeated BlockProto blocks = 1;
-}
-
-/**
  * registration - Information of the datanode registering with the namenode
  */
 message RegisterDatanodeRequestProto {
@@ -249,7 +241,6 @@ message HeartbeatRequestProto {
   optional bool requestFullBlockReportLease = 9 [ default = false ];
   repeated SlowPeerReportProto slowPeers = 10;
   repeated SlowDiskReportProto slowDisks = 11;
-  optional BlocksStorageMoveAttemptFinishedProto 
storageMoveAttemptFinishedBlks = 12;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index faeeebd..cf01354 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4592,6 +4592,47 @@
 </property>
 
 <property>
+  <name>dfs.storage.policy.satisfier.max.outstanding.paths</name>
+  <value>10000</value>
+  <description>
+    Defines the maximum number of paths to satisfy that can be queued up in the
+    Satisfier call queue in a period of time. Default value is 10000.
+  </description>
+</property>
+
+<property>
+  <name>dfs.storage.policy.satisfier.address</name>
+  <value>0.0.0.0:0</value>
+  <description>
+    The hostname used for a keytab based Kerberos login. Keytab based login
+    is required when dfs.storage.policy.satisfier.mode is external.
+  </description>
+</property>
+
+<property>
+  <name>dfs.storage.policy.satisfier.keytab.file</name>
+  <value></value>
+  <description>
+    The keytab file used by external StoragePolicySatisfier to login as its
+    service principal. The principal name is configured with
+    dfs.storage.policy.satisfier.kerberos.principal. Keytab based login
+    is required when dfs.storage.policy.satisfier.mode is external.
+  </description>
+</property>
+
+<property>
+  <name>dfs.storage.policy.satisfier.kerberos.principal</name>
+  <value></value>
+  <description>
+    The StoragePolicySatisfier principal. This is typically set to
+    satisfier/_h...@realm.tld. The StoragePolicySatisfier will substitute
+    _HOST with its own fully qualified hostname at startup. The _HOST 
placeholder
+    allows using the same configuration setting on different servers. Keytab
+    based login is required when dfs.storage.policy.satisfier.mode is external.
+  </description>
+</property>
+
+<property>
   <name>dfs.pipeline.ecn</name>
   <value>false</value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index f247370..05b6d30 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -39,7 +39,6 @@ import 
org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -117,8 +116,7 @@ public class TestNameNodePrunesMissingStorages {
       cluster.stopDataNode(0);
       cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 
0,
           0, null, true, SlowPeerReports.EMPTY_REPORT,
-          SlowDiskReports.EMPTY_REPORT,
-          new BlocksStorageMoveAttemptFinished(null));
+          SlowDiskReports.EMPTY_REPORT);
 
       // Check that the missing storage was pruned.
       assertThat(dnDescriptor.getStorageInfos().length, 
is(expectedStoragesAfterTest));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
index d13d717..b453991 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
@@ -37,7 +37,6 @@ import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -168,8 +167,7 @@ public class InternalDataNodeTestUtils {
             Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class),
             Mockito.anyBoolean(),
             Mockito.any(SlowPeerReports.class),
-            Mockito.any(SlowDiskReports.class),
-            Mockito.any(BlocksStorageMoveAttemptFinished.class))).thenReturn(
+            Mockito.any(SlowDiskReports.class))).thenReturn(
         new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
             HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
             .nextLong() | 1L));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimpleBlocksMovementsStatusHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimpleBlocksMovementsStatusHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimpleBlocksMovementsStatusHandler.java
new file mode 100644
index 0000000..b361ce5
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimpleBlocksMovementsStatusHandler.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
+import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
+
+/**
+ * Blocks movements status handler, which is used to collect details of the
+ * completed block movements and later these attempted finished(with success or
+ * failure) blocks can be accessed to notify respective listeners, if any.
+ */
+public class SimpleBlocksMovementsStatusHandler
+    implements BlocksMovementsStatusHandler {
+  private final List<Block> blockIdVsMovementStatus = new ArrayList<>();
+
+  /**
+   * Collect all the storage movement attempt finished blocks. Later this will
+   * be send to namenode via heart beat.
+   *
+   * @param moveAttemptFinishedBlk
+   *          storage movement attempt finished block
+   */
+  public void handle(BlockMovementAttemptFinished moveAttemptFinishedBlk) {
+    // Adding to the tracking report list. Later this can be accessed to know
+    // the attempted block movements.
+    synchronized (blockIdVsMovementStatus) {
+      blockIdVsMovementStatus.add(moveAttemptFinishedBlk.getBlock());
+    }
+  }
+
+  /**
+   * @return unmodifiable list of storage movement attempt finished blocks.
+   */
+  public List<Block> getMoveAttemptFinishedBlocks() {
+    List<Block> moveAttemptFinishedBlks = new ArrayList<>();
+    // 1. Adding all the completed block ids.
+    synchronized (blockIdVsMovementStatus) {
+      if (blockIdVsMovementStatus.size() > 0) {
+        moveAttemptFinishedBlks = Collections
+            .unmodifiableList(blockIdVsMovementStatus);
+      }
+    }
+    return moveAttemptFinishedBlks;
+  }
+
+  /**
+   * Remove the storage movement attempt finished blocks from the tracking 
list.
+   *
+   * @param moveAttemptFinishedBlks
+   *          set of storage movement attempt finished blocks
+   */
+  public void remove(List<Block> moveAttemptFinishedBlks) {
+    if (moveAttemptFinishedBlks != null) {
+      blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks);
+    }
+  }
+
+  /**
+   * Clear the blockID vs movement status tracking map.
+   */
+  public void removeAll() {
+    synchronized (blockIdVsMovementStatus) {
+      blockIdVsMovementStatus.clear();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 0fa1696..d0c3a83 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -49,7 +49,6 @@ import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -124,8 +123,8 @@ public class TestBPOfferService {
     Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
     Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn"))
         .when(mockDn).getMetrics();
-    Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn)).when(mockDn)
-        .getStoragePolicySatisfyWorker();
+    Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn, null))
+        .when(mockDn).getStoragePolicySatisfyWorker();
 
     // Set up a simulated dataset with our fake BP
     mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
@@ -160,8 +159,7 @@ public class TestBPOfferService {
           Mockito.any(VolumeFailureSummary.class),
           Mockito.anyBoolean(),
           Mockito.any(SlowPeerReports.class),
-          Mockito.any(SlowDiskReports.class),
-          Mockito.any(BlocksStorageMoveAttemptFinished.class));
+          Mockito.any(SlowDiskReports.class));
     mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
     datanodeCommands[nnIdx] = new DatanodeCommand[0];
     return mock;
@@ -380,8 +378,8 @@ public class TestBPOfferService {
     Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
     Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")).
       when(mockDn).getMetrics();
-    Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn)).when(mockDn)
-        .getStoragePolicySatisfyWorker();
+    Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn, null))
+        .when(mockDn).getStoragePolicySatisfyWorker();
     final AtomicInteger count = new AtomicInteger();
     Mockito.doAnswer(new Answer<Void>() {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 052eb87..07fd4ae 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -93,7 +93,6 @@ import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -233,8 +232,7 @@ public class TestBlockRecovery {
             Mockito.any(VolumeFailureSummary.class),
             Mockito.anyBoolean(),
             Mockito.any(SlowPeerReports.class),
-            Mockito.any(SlowDiskReports.class),
-            Mockito.any(BlocksStorageMoveAttemptFinished.class)))
+            Mockito.any(SlowDiskReports.class)))
         .thenReturn(new HeartbeatResponse(
             new DatanodeCommand[0],
             new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
index 0dd15c3..28427bc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
@@ -50,7 +50,6 @@ import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
@@ -173,8 +172,7 @@ public class TestDataNodeLifeline {
             any(VolumeFailureSummary.class),
             anyBoolean(),
             any(SlowPeerReports.class),
-            any(SlowDiskReports.class),
-            any(BlocksStorageMoveAttemptFinished.class));
+            any(SlowDiskReports.class));
 
     // Intercept lifeline to trigger latch count-down on each call.
     doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
@@ -239,8 +237,7 @@ public class TestDataNodeLifeline {
             any(VolumeFailureSummary.class),
             anyBoolean(),
             any(SlowPeerReports.class),
-            any(SlowDiskReports.class),
-            any(BlocksStorageMoveAttemptFinished.class));
+            any(SlowDiskReports.class));
 
     // While waiting on the latch for the expected number of heartbeat 
messages,
     // poll DataNode tracking information.  We expect that the DataNode always

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
index d47da69..bb1d9ef 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -223,8 +222,7 @@ public class TestDatanodeProtocolRetryPolicy {
            Mockito.any(VolumeFailureSummary.class),
            Mockito.anyBoolean(),
            Mockito.any(SlowPeerReports.class),
-           Mockito.any(SlowDiskReports.class),
-           Mockito.any(BlocksStorageMoveAttemptFinished.class));
+           Mockito.any(SlowDiskReports.class));
 
     dn = new DataNode(conf, locations, null, null) {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index 3732b2e..2dbd5b9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -66,7 +66,6 @@ import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.Page
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -209,8 +208,7 @@ public class TestFsDatasetCache {
           (StorageReport[]) any(), anyLong(), anyLong(),
           anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
           anyBoolean(), any(SlowPeerReports.class),
-          any(SlowDiskReports.class),
-          any(BlocksStorageMoveAttemptFinished.class));
+          any(SlowDiskReports.class));
     } finally {
       lock.writeLock().unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
index 06a66f7..51d3254 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.junit.Assert.assertTrue;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -35,8 +33,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
@@ -173,8 +171,10 @@ public class TestStoragePolicySatisfyWorker {
     DatanodeInfo targetDnInfo = DFSTestUtil
         .getLocalDatanodeInfo(src.getXferPort());
 
+    SimpleBlocksMovementsStatusHandler handler =
+        new SimpleBlocksMovementsStatusHandler();
     StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf,
-        src);
+        src, handler);
     try {
       worker.start();
       List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
@@ -184,81 +184,19 @@ public class TestStoragePolicySatisfyWorker {
       blockMovingInfos.add(blockMovingInfo);
       worker.processBlockMovingTasks(cluster.getNamesystem().getBlockPoolId(),
           blockMovingInfos);
-
-      waitForBlockMovementCompletion(worker, 1, 30000);
+      waitForBlockMovementCompletion(handler, 1, 30000);
     } finally {
       worker.stop();
     }
   }
 
-  /**
-   * Tests that drop SPS work method clears all the queues.
-   *
-   * @throws Exception
-   */
-  @Test(timeout = 120000)
-  public void testDropSPSWork() throws Exception {
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(20).build();
-
-    cluster.waitActive();
-    final DistributedFileSystem dfs = cluster.getFileSystem();
-    final String file = "/testDropSPSWork";
-    DFSTestUtil.createFile(dfs, new Path(file), false, 1024, 50 * 100,
-        DEFAULT_BLOCK_SIZE, (short) 2, 0, false, null);
-
-    // move to ARCHIVE
-    dfs.setStoragePolicy(new Path(file), "COLD");
-
-    DataNode src = cluster.getDataNodes().get(2);
-    DatanodeInfo targetDnInfo =
-        DFSTestUtil.getLocalDatanodeInfo(src.getXferPort());
-
-    StoragePolicySatisfyWorker worker =
-        new StoragePolicySatisfyWorker(conf, src);
-    worker.start();
-    try {
-      List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
-      List<LocatedBlock> locatedBlocks =
-          dfs.getClient().getLocatedBlocks(file, 0).getLocatedBlocks();
-      for (LocatedBlock locatedBlock : locatedBlocks) {
-        BlockMovingInfo blockMovingInfo =
-            prepareBlockMovingInfo(locatedBlock.getBlock().getLocalBlock(),
-                locatedBlock.getLocations()[0], targetDnInfo,
-                locatedBlock.getStorageTypes()[0], StorageType.ARCHIVE);
-        blockMovingInfos.add(blockMovingInfo);
-      }
-      worker.processBlockMovingTasks(cluster.getNamesystem().getBlockPoolId(),
-          blockMovingInfos);
-      // Wait till results queue build up
-      waitForBlockMovementResult(worker, 30000);
-      worker.dropSPSWork();
-      assertTrue(worker.getBlocksMovementsStatusHandler()
-          .getMoveAttemptFinishedBlocks().size() == 0);
-    } finally {
-      worker.stop();
-    }
-  }
-
-  private void waitForBlockMovementResult(
-      final StoragePolicySatisfyWorker worker, int timeout) throws Exception {
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        List<Block> completedBlocks = worker.getBlocksMovementsStatusHandler()
-            .getMoveAttemptFinishedBlocks();
-        return completedBlocks.size() > 0;
-      }
-    }, 100, timeout);
-  }
-
   private void waitForBlockMovementCompletion(
-      final StoragePolicySatisfyWorker worker,
+      final SimpleBlocksMovementsStatusHandler handler,
       int expectedFinishedItemsCount, int timeout) throws Exception {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
-        List<Block> completedBlocks = worker.getBlocksMovementsStatusHandler()
-            .getMoveAttemptFinishedBlocks();
+        List<Block> completedBlocks = handler.getMoveAttemptFinishedBlocks();
         int finishedCount = completedBlocks.size();
         LOG.info("Block movement completed count={}, expected={} and 
actual={}",
             completedBlocks.size(), expectedFinishedItemsCount, finishedCount);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
index 20402f2..5f62ddb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -111,8 +110,7 @@ public class TestStorageReport {
         anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
         Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(),
         Mockito.any(SlowPeerReports.class),
-        Mockito.any(SlowDiskReports.class),
-        Mockito.any(BlocksStorageMoveAttemptFinished.class));
+        Mockito.any(SlowDiskReports.class));
 
     StorageReport[] reports = captor.getValue();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index ec00ae7..3a3c471 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -56,7 +56,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -957,8 +956,8 @@ public class NNThroughputBenchmark implements Tool {
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
           0L, 0L, 0, 0, 0, null, true,
-          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
-          new BlocksStorageMoveAttemptFinished(null)).getCommands();
+          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
+          .getCommands();
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
           if(LOG.isDebugEnabled()) {
@@ -1008,8 +1007,8 @@ public class NNThroughputBenchmark implements Tool {
           false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
           rep, 0L, 0L, 0, 0, 0, null, true,
-          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
-          new BlocksStorageMoveAttemptFinished(null)).getCommands();
+          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
+          .getCommands();
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
           if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index 899bb82..b85527a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -40,7 +40,6 @@ import 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@@ -131,8 +130,7 @@ public class NameNodeAdapter {
     return namesystem.handleHeartbeat(nodeReg,
         BlockManagerTestUtil.getStorageReportsForDatanode(dd),
         dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true,
-        SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
-        new BlocksStorageMoveAttemptFinished(null));
+        SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
   }
 
   public static boolean setReplication(final FSNamesystem ns,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 65628b9..df74107 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -44,7 +44,6 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -140,8 +139,8 @@ public class TestDeadDatanode {
         false, 0, 0, 0, 0, 0) };
     DatanodeCommand[] cmd =
         dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
-            SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
-            new BlocksStorageMoveAttemptFinished(null)).getCommands();
+            SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
+        .getCommands();
     assertEquals(1, cmd.length);
     assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
         .getAction());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java
index 47ea39f..ee0b2e6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.test.GenericTestUtils;
 
@@ -250,10 +251,9 @@ public class TestNameNodeReconfigure {
         StoragePolicySatisfierMode.INTERNAL.toString());
 
     // Since DFS_STORAGE_POLICY_ENABLED_KEY is disabled, SPS can't be enabled.
-    assertEquals("SPS shouldn't start as "
-        + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY + " is disabled", false,
-            nameNode.getNamesystem().getBlockManager().getSPSManager()
-            .isInternalSatisfierRunning());
+    assertNull("SPS shouldn't start as "
+        + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY + " is disabled",
+            nameNode.getNamesystem().getBlockManager().getSPSManager());
     verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
         StoragePolicySatisfierMode.INTERNAL, false);
 
@@ -352,9 +352,12 @@ public class TestNameNodeReconfigure {
 
   void verifySPSEnabled(final NameNode nameNode, String property,
       StoragePolicySatisfierMode expected, boolean isSatisfierRunning) {
-    assertEquals(property + " has wrong value", isSatisfierRunning, nameNode
-        .getNamesystem().getBlockManager().getSPSManager()
-        .isInternalSatisfierRunning());
+    StoragePolicySatisfyManager spsMgr = nameNode
+            .getNamesystem().getBlockManager().getSPSManager();
+    boolean isInternalSatisfierRunning = spsMgr != null
+        ? spsMgr.isInternalSatisfierRunning() : false;
+    assertEquals(property + " has wrong value", isSatisfierRunning,
+        isInternalSatisfierRunning);
     String actual = nameNode.getConf().get(property,
         DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
     assertEquals(property + " has wrong value", expected,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
index 29af885..ed1fe92 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
@@ -22,13 +22,18 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
-import 
org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import 
org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.StorageTypeNodePair;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -92,14 +97,16 @@ public class TestBlockStorageMovementAttemptedItems {
    */
   @Test(timeout = 30000)
   public void testAddReportedMoveAttemptFinishedBlocks() throws Exception {
-    bsmAttemptedItems.start(); // start block movement result monitor thread
     Long item = new Long(1234);
-    List<Block> blocks = new ArrayList<Block>();
-    blocks.add(new Block(item));
-    bsmAttemptedItems.add(new AttemptedItemInfo<Long>(0L, 0L, 0L, blocks, 0));
-    Block[] blockArray = new Block[blocks.size()];
-    blocks.toArray(blockArray);
-    bsmAttemptedItems.notifyMovementTriedBlocks(blockArray);
+    Block block = new Block(item);
+    DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867);
+    Set<StorageTypeNodePair> locs = new HashSet<>();
+    locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo));
+    Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>();
+    blocksMap.put(block, locs);
+    bsmAttemptedItems.add(0L, 0L, 0L, blocksMap, 0);
+    bsmAttemptedItems.notifyReportedBlock(dnInfo, StorageType.ARCHIVE,
+        block);
     assertEquals("Failed to receive result!", 1,
         bsmAttemptedItems.getMovementFinishedBlocksCount());
   }
@@ -111,9 +118,13 @@ public class TestBlockStorageMovementAttemptedItems {
   public void testNoBlockMovementAttemptFinishedReportAdded() throws Exception 
{
     bsmAttemptedItems.start(); // start block movement report monitor thread
     Long item = new Long(1234);
-    List<Block> blocks = new ArrayList<>();
-    blocks.add(new Block(item));
-    bsmAttemptedItems.add(new AttemptedItemInfo<Long>(0L, 0L, 0L, blocks, 0));
+    Block block = new Block(item);
+    DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867);
+    Set<StorageTypeNodePair> locs = new HashSet<>();
+    locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo));
+    Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>();
+    blocksMap.put(block, locs);
+    bsmAttemptedItems.add(0L, 0L, 0L, blocksMap, 0);
     assertEquals("Shouldn't receive result", 0,
         bsmAttemptedItems.getMovementFinishedBlocksCount());
     assertEquals("Item doesn't exist in the attempted list", 1,
@@ -129,15 +140,18 @@ public class TestBlockStorageMovementAttemptedItems {
   @Test(timeout = 30000)
   public void testPartialBlockMovementShouldBeRetried1() throws Exception {
     Long item = new Long(1234);
-    List<Block> blocks = new ArrayList<>();
-    blocks.add(new Block(item));
-    blocks.add(new Block(5678L));
+    Block block1 = new Block(item);
+    Block block2 = new Block(5678L);
     Long trackID = 0L;
-    bsmAttemptedItems
-        .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0));
-    Block[] blksMovementReport = new Block[1];
-    blksMovementReport[0] = new Block(item);
-    bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
+    DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867);
+    Set<StorageTypeNodePair> locs = new HashSet<>();
+    locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo));
+    Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>();
+    blocksMap.put(block1, locs);
+    blocksMap.put(block2, locs);
+    bsmAttemptedItems.add(trackID, trackID, 0L, blocksMap, 0);
+    bsmAttemptedItems.notifyReportedBlock(dnInfo, StorageType.ARCHIVE,
+        block1);
 
     // start block movement report monitor thread
     bsmAttemptedItems.start();
@@ -155,14 +169,16 @@ public class TestBlockStorageMovementAttemptedItems {
   @Test(timeout = 30000)
   public void testPartialBlockMovementShouldBeRetried2() throws Exception {
     Long item = new Long(1234);
+    Block block = new Block(item);
     Long trackID = 0L;
-    List<Block> blocks = new ArrayList<>();
-    blocks.add(new Block(item));
-    bsmAttemptedItems
-        .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0));
-    Block[] blksMovementReport = new Block[1];
-    blksMovementReport[0] = new Block(item);
-    bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
+    DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867);
+    Set<StorageTypeNodePair> locs = new HashSet<>();
+    locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo));
+    Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>();
+    blocksMap.put(block, locs);
+    bsmAttemptedItems.add(trackID, trackID, 0L, blocksMap, 0);
+    bsmAttemptedItems.notifyReportedBlock(dnInfo, StorageType.ARCHIVE,
+        block);
 
     Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out
 
@@ -183,14 +199,16 @@ public class TestBlockStorageMovementAttemptedItems {
   public void testPartialBlockMovementWithEmptyAttemptedQueue()
       throws Exception {
     Long item = new Long(1234);
+    Block block = new Block(item);
     Long trackID = 0L;
-    List<Block> blocks = new ArrayList<>();
-    blocks.add(new Block(item));
-    bsmAttemptedItems
-        .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0));
-    Block[] blksMovementReport = new Block[1];
-    blksMovementReport[0] = new Block(item);
-    bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
+    DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867);
+    Set<StorageTypeNodePair> locs = new HashSet<>();
+    locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo));
+    Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>();
+    blocksMap.put(block, locs);
+    bsmAttemptedItems.add(trackID, trackID, 0L, blocksMap, 0);
+    bsmAttemptedItems.notifyReportedBlock(dnInfo, StorageType.ARCHIVE,
+        block);
     assertFalse(
         "Should not add in queue again if it is not there in"
             + " storageMovementAttemptedItems",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
index 75aeb86..b05717a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
@@ -51,6 +51,7 @@ import 
org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.StripedFileTestUtil;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -107,6 +108,8 @@ public class TestStoragePolicySatisfier {
   public static final long CAPACITY = 2 * 256 * 1024 * 1024;
   public static final String FILE = "/testMoveToSatisfyStoragePolicy";
   public static final int DEFAULT_BLOCK_SIZE = 1024;
+  private ExternalBlockMovementListener blkMoveListener =
+      new ExternalBlockMovementListener();
 
   /**
    * Sets hdfs cluster.
@@ -1029,6 +1032,9 @@ public class TestStoragePolicySatisfier {
       config.set(DFSConfigKeys
           .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
           "3000");
+      config.set(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+          "5000");
       StorageType[][] newtypes = new StorageType[][] {
           {StorageType.ARCHIVE, StorageType.DISK},
           {StorageType.ARCHIVE, StorageType.DISK},
@@ -1072,6 +1078,9 @@ public class TestStoragePolicySatisfier {
       config.set(DFSConfigKeys
           .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
           "3000");
+      config.set(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+          "5000");
       StorageType[][] newtypes = new StorageType[][] {
           {StorageType.ARCHIVE, StorageType.DISK},
           {StorageType.ARCHIVE, StorageType.DISK},
@@ -1089,7 +1098,7 @@ public class TestStoragePolicySatisfier {
       fs.setStoragePolicy(filePath, "COLD");
       fs.satisfyStoragePolicy(filePath);
       DFSTestUtil.waitExpectedStorageType(filePath.toString(),
-          StorageType.ARCHIVE, 3, 30000, hdfsCluster.getFileSystem());
+          StorageType.ARCHIVE, 3, 60000, hdfsCluster.getFileSystem());
       assertFalse("Log output does not contain expected log message: ",
           logs.getOutput().contains("some of the blocks are low redundant"));
     } finally {
@@ -1425,6 +1434,9 @@ public class TestStoragePolicySatisfier {
       config.set(DFSConfigKeys
           .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
           "3000");
+      config.set(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+          "5000");
       config.setBoolean(DFSConfigKeys
           .DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_KEY,
           false);
@@ -1467,7 +1479,7 @@ public class TestStoragePolicySatisfier {
       for (int i = 1; i <= 10; i++) {
         Path filePath = new Path("/file" + i);
         DFSTestUtil.waitExpectedStorageType(filePath.toString(),
-            StorageType.DISK, 4, 30000, hdfsCluster.getFileSystem());
+            StorageType.DISK, 4, 60000, hdfsCluster.getFileSystem());
       }
       for (int i = 11; i <= 20; i++) {
         Path filePath = new Path("/file" + i);
@@ -1725,20 +1737,16 @@ public class TestStoragePolicySatisfier {
   public void waitForBlocksMovementAttemptReport(
       long expectedMovementFinishedBlocksCount, int timeout)
           throws TimeoutException, InterruptedException {
-    BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
-    final StoragePolicySatisfier<Long> sps =
-        (StoragePolicySatisfier<Long>) blockManager
-        .getSPSManager().getInternalSPSService();
+    Assert.assertNotNull("Didn't set external block move listener",
+        blkMoveListener);
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
+        int actualCount = blkMoveListener.getActualBlockMovements().size();
         LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
             expectedMovementFinishedBlocksCount,
-            ((BlockStorageMovementAttemptedItems<Long>) (sps
-                
.getAttemptedItemsMonitor())).getMovementFinishedBlocksCount());
-        return ((BlockStorageMovementAttemptedItems<Long>) (sps
-            .getAttemptedItemsMonitor()))
-                .getMovementFinishedBlocksCount()
+            actualCount);
+        return actualCount
             >= expectedMovementFinishedBlocksCount;
       }
     }, 100, timeout);
@@ -1790,11 +1798,54 @@ public class TestStoragePolicySatisfier {
         .numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn)
         .storageTypes(storageTypes).storageCapacities(capacities).build();
     cluster.waitActive();
+
+    // Sets external listener for assertion.
+    blkMoveListener.clear();
+    BlockManager blockManager = cluster.getNamesystem().getBlockManager();
+    final StoragePolicySatisfier<Long> sps =
+        (StoragePolicySatisfier<Long>) blockManager
+        .getSPSManager().getInternalSPSService();
+    sps.setBlockMovementListener(blkMoveListener);
     return cluster;
   }
 
   public void restartNamenode() throws IOException {
     hdfsCluster.restartNameNodes();
     hdfsCluster.waitActive();
+    BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
+    StoragePolicySatisfyManager spsMgr = blockManager.getSPSManager();
+    if (spsMgr != null && spsMgr.isInternalSatisfierRunning()) {
+      // Sets external listener for assertion.
+      blkMoveListener.clear();
+      final StoragePolicySatisfier<Long> sps =
+          (StoragePolicySatisfier<Long>) spsMgr.getInternalSPSService();
+      sps.setBlockMovementListener(blkMoveListener);
+    }
+  }
+
+  /**
+   * Implementation of listener callback, where it collects all the sps move
+   * attempted blocks for assertion.
+   */
+  public static final class ExternalBlockMovementListener
+      implements BlockMovementListener {
+
+    private List<Block> actualBlockMovements = new ArrayList<>();
+
+    @Override
+    public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
+      for (Block block : moveAttemptFinishedBlks) {
+        actualBlockMovements.add(block);
+      }
+      LOG.info("Movement attempted blocks:{}", actualBlockMovements);
+    }
+
+    public List<Block> getActualBlockMovements() {
+      return actualBlockMovements;
+    }
+
+    public void clear() {
+      actualBlockMovements.clear();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
index e69a833..857bd6c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import 
org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier.ExternalBlockMovementListener;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Before;
@@ -70,6 +71,8 @@ public class TestStoragePolicySatisfierWithStripedFile {
   private int cellSize;
   private int defaultStripeBlockSize;
   private Configuration conf;
+  private ExternalBlockMovementListener blkMoveListener =
+      new ExternalBlockMovementListener();
 
   private ErasureCodingPolicy getEcPolicy() {
     return StripedFileTestUtil.getDefaultECPolicy();
@@ -131,6 +134,15 @@ public class TestStoragePolicySatisfierWithStripedFile {
     HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
     try {
       cluster.waitActive();
+
+      // Sets external listener for assertion.
+      blkMoveListener.clear();
+      BlockManager blockManager = cluster.getNamesystem().getBlockManager();
+      final StoragePolicySatisfier<Long> sps =
+          (StoragePolicySatisfier<Long>) blockManager
+          .getSPSManager().getInternalSPSService();
+      sps.setBlockMovementListener(blkMoveListener);
+
       DistributedFileSystem dfs = cluster.getFileSystem();
       dfs.enableErasureCodingPolicy(
           StripedFileTestUtil.getDefaultECPolicy().getName());
@@ -240,6 +252,15 @@ public class TestStoragePolicySatisfierWithStripedFile {
     HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
     try {
       cluster.waitActive();
+
+      // Sets external listener for assertion.
+      blkMoveListener.clear();
+      BlockManager blockManager = cluster.getNamesystem().getBlockManager();
+      final StoragePolicySatisfier<Long> sps =
+          (StoragePolicySatisfier<Long>) blockManager
+          .getSPSManager().getInternalSPSService();
+      sps.setBlockMovementListener(blkMoveListener);
+
       DistributedFileSystem dfs = cluster.getFileSystem();
       dfs.enableErasureCodingPolicy(
           StripedFileTestUtil.getDefaultECPolicy().getName());
@@ -328,6 +349,9 @@ public class TestStoragePolicySatisfierWithStripedFile {
     conf.set(DFSConfigKeys
         .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
         "3000");
+    conf.set(DFSConfigKeys
+        .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+        "5000");
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(numOfDatanodes)
         .storagesPerDatanode(storagesPerDatanode)
@@ -559,22 +583,16 @@ public class TestStoragePolicySatisfierWithStripedFile {
   private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster,
       long expectedMoveFinishedBlks, int timeout)
           throws TimeoutException, InterruptedException {
-    BlockManager blockManager = cluster.getNamesystem().getBlockManager();
-    final StoragePolicySatisfier<Long> sps =
-        (StoragePolicySatisfier<Long>) blockManager
-        .getSPSManager().getInternalSPSService();
-    Assert.assertNotNull("Failed to get SPS object reference!", sps);
-
+    Assert.assertNotNull("Didn't set external block move listener",
+        blkMoveListener);
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
+        int actualCount = blkMoveListener.getActualBlockMovements().size();
         LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
             expectedMoveFinishedBlks,
-            ((BlockStorageMovementAttemptedItems<Long>) sps
-                .getAttemptedItemsMonitor()).getMovementFinishedBlocksCount());
-        return ((BlockStorageMovementAttemptedItems<Long>) sps
-            .getAttemptedItemsMonitor())
-                .getMovementFinishedBlocksCount() >= expectedMoveFinishedBlks;
+            actualCount);
+        return actualCount >= expectedMoveFinishedBlks;
       }
     }, 100, timeout);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93a04547/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
index 28e172a..be243cb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
@@ -54,11 +54,9 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.Block;
 import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
 import 
org.apache.hadoop.hdfs.server.namenode.sps.BlockStorageMovementAttemptedItems;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
 import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
@@ -92,6 +90,8 @@ public class TestExternalStoragePolicySatisfier
   private File baseDir;
   private StoragePolicySatisfier<String> externalSps;
   private ExternalSPSContext externalCtxt;
+  private ExternalBlockMovementListener blkMoveListener =
+      new ExternalBlockMovementListener();
 
   @After
   public void destroy() throws Exception {
@@ -144,15 +144,12 @@ public class TestExternalStoragePolicySatisfier
     nnc = getNameNodeConnector(getConf());
 
     externalSps = new StoragePolicySatisfier<String>(getConf());
-    externalCtxt = new ExternalSPSContext(externalSps,
-        getNameNodeConnector(conf));
+    externalCtxt = new ExternalSPSContext(externalSps, nnc);
 
-    ExternalBlockMovementListener blkMoveListener =
-        new ExternalBlockMovementListener();
+    blkMoveListener.clear();
     ExternalSPSBlockMoveTaskHandler externalHandler =
         new ExternalSPSBlockMoveTaskHandler(conf, nnc,
             externalSps);
-    externalHandler.init();
     externalSps.init(externalCtxt,
         new ExternalSPSFilePathCollector(externalSps), externalHandler,
         blkMoveListener);
@@ -169,33 +166,17 @@ public class TestExternalStoragePolicySatisfier
     getCluster().waitActive();
     externalSps = new StoragePolicySatisfier<>(getConf());
 
-    externalCtxt = new ExternalSPSContext(externalSps,
-        getNameNodeConnector(getConf()));
-    ExternalBlockMovementListener blkMoveListener =
-        new ExternalBlockMovementListener();
+    externalCtxt = new ExternalSPSContext(externalSps, nnc);
+    blkMoveListener.clear();
     ExternalSPSBlockMoveTaskHandler externalHandler =
         new ExternalSPSBlockMoveTaskHandler(getConf(), nnc,
             externalSps);
-    externalHandler.init();
     externalSps.init(externalCtxt,
         new ExternalSPSFilePathCollector(externalSps), externalHandler,
         blkMoveListener);
     externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL);
   }
 
-  private class ExternalBlockMovementListener implements BlockMovementListener 
{
-
-    private List<Block> actualBlockMovements = new ArrayList<>();
-
-    @Override
-    public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
-      for (Block block : moveAttemptFinishedBlks) {
-        actualBlockMovements.add(block);
-      }
-      LOG.info("Movement attempted blocks", actualBlockMovements);
-    }
-  }
-
   private NameNodeConnector getNameNodeConnector(Configuration conf)
       throws IOException {
     final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
@@ -237,16 +218,15 @@ public class TestExternalStoragePolicySatisfier
   public void waitForBlocksMovementAttemptReport(
       long expectedMovementFinishedBlocksCount, int timeout)
           throws TimeoutException, InterruptedException {
+    Assert.assertNotNull("Didn't set external block move listener",
+        blkMoveListener);
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
+        int actualCount = blkMoveListener.getActualBlockMovements().size();
         LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
-            expectedMovementFinishedBlocksCount,
-            ((BlockStorageMovementAttemptedItems<String>) (externalSps
-                
.getAttemptedItemsMonitor())).getMovementFinishedBlocksCount());
-        return ((BlockStorageMovementAttemptedItems<String>) (externalSps
-            .getAttemptedItemsMonitor()))
-                .getMovementFinishedBlocksCount()
+            expectedMovementFinishedBlocksCount, actualCount);
+        return actualCount
             >= expectedMovementFinishedBlocksCount;
       }
     }, 100, timeout);
@@ -352,6 +332,8 @@ public class TestExternalStoragePolicySatisfier
       files.add(FILE);
       DistributedFileSystem fs = getFS();
 
+      // stops sps to make the SPS Q with many outstanding requests.
+      externalSps.stopGracefully();
       // Creates 4 more files. Send all of them for satisfying the storage
       // policy together.
       for (int i = 0; i < 3; i++) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to