Repository: hadoop
Updated Branches:
  refs/heads/HDFS-10285 c6a1e5ab4 -> b1afd1f24 (forced update)


HDFS-11334: [SPS]: NN switch and rescheduling movements can lead to have more 
than one coordinator for same file blocks. Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2363781e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2363781e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2363781e

Branch: refs/heads/HDFS-10285
Commit: 2363781ed8fab36aa56a03a2f2619d7d60d8703e
Parents: 9f7c005
Author: Uma Maheswara Rao G <uma.ganguma...@intel.com>
Authored: Tue Apr 18 15:23:58 2017 -0700
Committer: Rakesh Radhakrishnan <rake...@apache.org>
Committed: Sun Jul 15 20:17:49 2018 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   2 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java |   6 +
 .../server/blockmanagement/DatanodeManager.java |  12 ++
 .../hdfs/server/datanode/BPServiceActor.java    |   4 +-
 .../datanode/BlockStorageMovementTracker.java   |  37 +++-
 .../hadoop/hdfs/server/datanode/DataNode.java   |  12 +-
 .../datanode/StoragePolicySatisfyWorker.java    |  95 +++++++++--
 .../BlockStorageMovementAttemptedItems.java     |  80 ++++++---
 .../server/namenode/StoragePolicySatisfier.java |  15 +-
 .../protocol/BlocksStorageMovementResult.java   |   6 +-
 .../src/main/proto/DatanodeProtocol.proto       |   1 +
 .../TestStoragePolicySatisfyWorker.java         |  68 ++++----
 .../TestStoragePolicySatisfierWithHA.java       | 170 +++++++++++++++++--
 13 files changed, 413 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2363781e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 00152cc..b5341a2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -619,7 +619,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String 
DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY =
       "dfs.storage.policy.satisfier.self.retry.timeout.millis";
   public static final int 
DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT =
-      30 * 60 * 1000;
+      20 * 60 * 1000;
 
   public static final String  DFS_DATANODE_ADDRESS_KEY = 
"dfs.datanode.address";
   public static final int     DFS_DATANODE_DEFAULT_PORT = 9866;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2363781e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 0c03608..996b986 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -985,6 +985,9 @@ public class PBHelper {
       case FAILURE:
         status = Status.FAILURE;
         break;
+      case IN_PROGRESS:
+        status = Status.IN_PROGRESS;
+        break;
       default:
         throw new AssertionError("Unknown status: " + resultProto.getStatus());
       }
@@ -1011,6 +1014,9 @@ public class PBHelper {
       case FAILURE:
         status = BlocksStorageMovementResultProto.Status.FAILURE;
         break;
+      case IN_PROGRESS:
+        status = BlocksStorageMovementResultProto.Status.IN_PROGRESS;
+        break;
       default:
         throw new AssertionError("Unknown status: " + report.getStatus());
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2363781e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index da340a8..2d7c80e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1091,6 +1091,18 @@ public class DatanodeManager {
           nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion());
           nodeS.setDisallowed(false); // Node is in the include list
 
+          // Sets dropSPSWork flag to true, to ensure that
+          // DNA_DROP_SPS_WORK_COMMAND will send to datanode via next heartbeat
+          // response immediately after the node registration. This is
+          // to avoid a situation, where multiple trackId responses coming from
+          // different co-odinator datanodes. After SPS monitor time out, it
+          // will retry the files which were scheduled to the disconnected(for
+          // long time more than heartbeat expiry) DN, by finding new
+          // co-ordinator datanode. Now, if the expired datanode reconnects 
back
+          // after SPS reschedules, it leads to get different movement results
+          // from reconnected and new DN co-ordinators.
+          nodeS.setDropSPSWork(true);
+
           // resolve network location
           if(this.rejectUnresolvedTopologyDN) {
             nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2363781e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 0f93fb0..f537f49 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -536,7 +536,7 @@ class BPServiceActor implements Runnable {
 
     // Remove the blocks movement results after successfully transferring
     // to namenode.
-    dn.getStoragePolicySatisfyWorker().getBlocksMovementsCompletionHandler()
+    dn.getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler()
         .remove(blksMovementResults);
 
     return response;
@@ -544,7 +544,7 @@ class BPServiceActor implements Runnable {
 
   private BlocksStorageMovementResult[] getBlocksMovementResults() {
     List<BlocksStorageMovementResult> trackIdVsMovementStatus = dn
-        .getStoragePolicySatisfyWorker().getBlocksMovementsCompletionHandler()
+        .getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler()
         .getBlksMovementResults();
     BlocksStorageMovementResult[] blksMovementResult =
         new BlocksStorageMovementResult[trackIdVsMovementStatus.size()];

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2363781e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
index e623cef..99858bc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -28,7 +29,7 @@ import java.util.concurrent.Future;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import 
org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
-import 
org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler;
+import 
org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsStatusHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,32 +42,34 @@ public class BlockStorageMovementTracker implements 
Runnable {
   private static final Logger LOG = LoggerFactory
       .getLogger(BlockStorageMovementTracker.class);
   private final CompletionService<BlockMovementResult> moverCompletionService;
-  private final BlocksMovementsCompletionHandler 
blksMovementscompletionHandler;
+  private final BlocksMovementsStatusHandler blksMovementsStatusHandler;
 
   // Keeps the information - trackID vs its list of blocks
   private final Map<Long, List<Future<BlockMovementResult>>> moverTaskFutures;
   private final Map<Long, List<BlockMovementResult>> movementResults;
 
+  private volatile boolean running = true;
+
   /**
    * BlockStorageMovementTracker constructor.
    *
    * @param moverCompletionService
    *          completion service.
    * @param handler
-   *          blocks movements completion handler
+   *          blocks movements status handler
    */
   public BlockStorageMovementTracker(
       CompletionService<BlockMovementResult> moverCompletionService,
-      BlocksMovementsCompletionHandler handler) {
+      BlocksMovementsStatusHandler handler) {
     this.moverCompletionService = moverCompletionService;
     this.moverTaskFutures = new HashMap<>();
-    this.blksMovementscompletionHandler = handler;
+    this.blksMovementsStatusHandler = handler;
     this.movementResults = new HashMap<>();
   }
 
   @Override
   public void run() {
-    while (true) {
+    while (running) {
       if (moverTaskFutures.size() <= 0) {
         try {
           synchronized (moverTaskFutures) {
@@ -95,8 +98,8 @@ public class BlockStorageMovementTracker implements Runnable {
             synchronized (moverTaskFutures) {
               moverTaskFutures.remove(trackId);
             }
-            // handle completed blocks movements per trackId.
-            blksMovementscompletionHandler.handle(resultPerTrackIdList);
+            // handle completed or inprogress blocks movements per trackId.
+            blksMovementsStatusHandler.handle(resultPerTrackIdList);
             movementResults.remove(trackId);
           }
         }
@@ -158,4 +161,22 @@ public class BlockStorageMovementTracker implements 
Runnable {
       movementResults.clear();
     }
   }
+
+  /**
+   * @return the list of trackIds which are still waiting to complete all the
+   *         scheduled blocks movements.
+   */
+  Set<Long> getInProgressTrackIds() {
+    synchronized (moverTaskFutures) {
+      return moverTaskFutures.keySet();
+    }
+  }
+
+  /**
+   * Sets running flag to false and clear the pending movement result queues.
+   */
+  public void stopTracking() {
+    running = false;
+    removeAll();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2363781e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index abc1f1e..196d4c7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1425,6 +1425,7 @@ public class DataNode extends ReconfigurableBase
     blockRecoveryWorker = new BlockRecoveryWorker(this);
     storagePolicySatisfyWorker =
         new StoragePolicySatisfyWorker(getConf(), this);
+    storagePolicySatisfyWorker.start();
 
     blockPoolManager = new BlockPoolManager(this);
     blockPoolManager.refreshNamenodes(getConf());
@@ -1976,7 +1977,11 @@ public class DataNode extends ReconfigurableBase
         }
       }
     }
-    
+
+    // stop storagePolicySatisfyWorker
+    if (storagePolicySatisfyWorker != null) {
+      storagePolicySatisfyWorker.stop();
+    }
     List<BPOfferService> bposArray = (this.blockPoolManager == null)
         ? new ArrayList<BPOfferService>()
         : this.blockPoolManager.getAllNamenodeThreads();
@@ -2129,6 +2134,11 @@ public class DataNode extends ReconfigurableBase
       notifyAll();
     }
     tracer.close();
+
+    // Waiting to finish SPS worker thread.
+    if (storagePolicySatisfyWorker != null) {
+      storagePolicySatisfyWorker.waitToFinishWorkerThread();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2363781e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index a96ac98..f4f97dd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
+import static org.apache.hadoop.util.Time.monotonicNow;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -31,7 +32,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
@@ -87,10 +90,13 @@ public class StoragePolicySatisfyWorker {
   private final int moverThreads;
   private final ExecutorService moveExecutor;
   private final CompletionService<BlockMovementResult> moverCompletionService;
-  private final BlocksMovementsCompletionHandler handler;
+  private final BlocksMovementsStatusHandler handler;
   private final BlockStorageMovementTracker movementTracker;
   private Daemon movementTrackerThread;
 
+  private long inprogressTrackIdsCheckInterval = 30 * 1000; // 30seconds.
+  private long nextInprogressRecheckTime;
+
   public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
     this.datanode = datanode;
     this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
@@ -99,15 +105,52 @@ public class StoragePolicySatisfyWorker {
         DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
     moveExecutor = initializeBlockMoverThreadPool(moverThreads);
     moverCompletionService = new ExecutorCompletionService<>(moveExecutor);
-    handler = new BlocksMovementsCompletionHandler();
+    handler = new BlocksMovementsStatusHandler();
     movementTracker = new BlockStorageMovementTracker(moverCompletionService,
         handler);
     movementTrackerThread = new Daemon(movementTracker);
     movementTrackerThread.setName("BlockStorageMovementTracker");
-    movementTrackerThread.start();
+
+    // Interval to check that the inprogress trackIds. The time interval is
+    // proportional o the heart beat interval time period.
+    final long heartbeatIntervalSeconds = conf.getTimeDuration(
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
+    inprogressTrackIdsCheckInterval = 5 * heartbeatIntervalSeconds;
+    // update first inprogress recheck time to a future time stamp.
+    nextInprogressRecheckTime = monotonicNow()
+        + inprogressTrackIdsCheckInterval;
+
     // TODO: Needs to manage the number of concurrent moves per DataNode.
   }
 
+  /**
+   * Start StoragePolicySatisfyWorker, which will start block movement tracker
+   * thread to track the completion of block movements.
+   */
+  void start() {
+    movementTrackerThread.start();
+  }
+
+  /**
+   * Stop StoragePolicySatisfyWorker, which will stop block movement tracker
+   * thread.
+   */
+  void stop() {
+    movementTrackerThread.interrupt();
+    movementTracker.stopTracking();
+  }
+
+  /**
+   * Timed wait to stop BlockStorageMovement tracker daemon thread.
+   */
+  void waitToFinishWorkerThread() {
+    try {
+      movementTrackerThread.join(3000);
+    } catch (InterruptedException ie) {
+    }
+  }
+
   private ThreadPoolExecutor initializeBlockMoverThreadPool(int num) {
     LOG.debug("Block mover to satisfy storage policy; pool threads={}", num);
 
@@ -352,11 +395,11 @@ public class StoragePolicySatisfyWorker {
   }
 
   /**
-   * Blocks movements completion handler, which is used to collect details of
-   * the completed list of block movements and this status(success or failure)
-   * will be send to the namenode via heartbeat.
+   * Blocks movements status handler, which is used to collect details of the
+   * completed or inprogress list of block movements and this status(success or
+   * failure or inprogress) will be send to the namenode via heartbeat.
    */
-  static class BlocksMovementsCompletionHandler {
+  class BlocksMovementsStatusHandler {
     private final List<BlocksStorageMovementResult> trackIdVsMovementStatus =
         new ArrayList<>();
 
@@ -395,14 +438,21 @@ public class StoragePolicySatisfyWorker {
      * @return unmodifiable list of blocks storage movement results.
      */
     List<BlocksStorageMovementResult> getBlksMovementResults() {
+      List<BlocksStorageMovementResult> movementResults = new ArrayList<>();
+      // 1. Adding all the completed trackids.
       synchronized (trackIdVsMovementStatus) {
-        if (trackIdVsMovementStatus.size() <= 0) {
-          return new ArrayList<>();
+        if (trackIdVsMovementStatus.size() > 0) {
+          movementResults = Collections
+              .unmodifiableList(trackIdVsMovementStatus);
         }
-        List<BlocksStorageMovementResult> results = Collections
-            .unmodifiableList(trackIdVsMovementStatus);
-        return results;
       }
+      // 2. Adding the in progress track ids after those which are completed.
+      Set<Long> inProgressTrackIds = getInProgressTrackIds();
+      for (Long trackId : inProgressTrackIds) {
+        movementResults.add(new BlocksStorageMovementResult(trackId,
+            BlocksStorageMovementResult.Status.IN_PROGRESS));
+      }
+      return movementResults;
     }
 
     /**
@@ -433,7 +483,7 @@ public class StoragePolicySatisfyWorker {
   }
 
   @VisibleForTesting
-  BlocksMovementsCompletionHandler getBlocksMovementsCompletionHandler() {
+  BlocksMovementsStatusHandler getBlocksMovementsStatusHandler() {
     return handler;
   }
 
@@ -447,4 +497,23 @@ public class StoragePolicySatisfyWorker {
     movementTracker.removeAll();
     handler.removeAll();
   }
+
+  /**
+   * Gets list of trackids which are inprogress. Will do collection 
periodically
+   * on 'dfs.datanode.storage.policy.satisfier.worker.inprogress.recheck.time.
+   * millis' interval.
+   *
+   * @return collection of trackids which are inprogress
+   */
+  private Set<Long> getInProgressTrackIds() {
+    Set<Long> trackIds = new HashSet<>();
+    long now = monotonicNow();
+    if (nextInprogressRecheckTime >= now) {
+      trackIds = movementTracker.getInProgressTrackIds();
+
+      // schedule next re-check interval
+      nextInprogressRecheckTime = now + inprogressTrackIdsCheckInterval;
+    }
+    return trackIds;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2363781e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index 26b98d8..f2406da 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import 
org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,9 +40,11 @@ import com.google.common.annotations.VisibleForTesting;
  * A monitor class for checking whether block storage movements finished or 
not.
  * If block storage movement results from datanode indicates about the movement
  * success, then it will just remove the entries from tracking. If it reports
- * failure, then it will add back to needed block storage movements list. If no
- * DN reports about movement for longer time, then such items will be retries
- * automatically after timeout. The default timeout would be 30mins.
+ * failure, then it will add back to needed block storage movements list. If it
+ * reports in_progress, that means the blocks movement is in progress and the
+ * coordinator is still tracking the movement. If no DN reports about movement
+ * for longer time, then such items will be retries automatically after 
timeout.
+ * The default timeout would be 30mins.
  */
 public class BlockStorageMovementAttemptedItems {
   private static final Logger LOG =
@@ -57,10 +60,10 @@ public class BlockStorageMovementAttemptedItems {
   private Daemon timerThread = null;
   private final StoragePolicySatisfier sps;
   //
-  // It might take anywhere between 30 to 60 minutes before
+  // It might take anywhere between 20 to 60 minutes before
   // a request is timed out.
   //
-  private long selfRetryTimeout = 30 * 60 * 1000;
+  private long selfRetryTimeout = 20 * 60 * 1000;
 
   //
   // It might take anywhere between 5 to 10 minutes before
@@ -159,35 +162,35 @@ public class BlockStorageMovementAttemptedItems {
 
   /**
    * This class contains information of an attempted trackID. Information such
-   * as, (a)last attempted time stamp, (b)whether all the blocks in the trackID
-   * were attempted and blocks movement has been scheduled to satisfy storage
-   * policy. This is used by
+   * as, (a)last attempted or reported time stamp, (b)whether all the blocks in
+   * the trackID were attempted and blocks movement has been scheduled to
+   * satisfy storage policy. This is used by
    * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
    */
   private final static class ItemInfo {
-    private final long lastAttemptedTimeStamp;
+    private long lastAttemptedOrReportedTime;
     private final boolean allBlockLocsAttemptedToSatisfy;
 
     /**
      * ItemInfo constructor.
      *
-     * @param lastAttemptedTimeStamp
-     *          last attempted time stamp
+     * @param lastAttemptedOrReportedTime
+     *          last attempted or reported time
      * @param allBlockLocsAttemptedToSatisfy
      *          whether all the blocks in the trackID were attempted and blocks
      *          movement has been scheduled to satisfy storage policy
      */
-    private ItemInfo(long lastAttemptedTimeStamp,
+    private ItemInfo(long lastAttemptedOrReportedTime,
         boolean allBlockLocsAttemptedToSatisfy) {
-      this.lastAttemptedTimeStamp = lastAttemptedTimeStamp;
+      this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
       this.allBlockLocsAttemptedToSatisfy = allBlockLocsAttemptedToSatisfy;
     }
 
     /**
-     * @return last attempted time stamp.
+     * @return last attempted or reported time stamp.
      */
-    private long getLastAttemptedTimeStamp() {
-      return lastAttemptedTimeStamp;
+    private long getLastAttemptedOrReportedTime() {
+      return lastAttemptedOrReportedTime;
     }
 
     /**
@@ -200,6 +203,14 @@ public class BlockStorageMovementAttemptedItems {
     private boolean isAllBlockLocsAttemptedToSatisfy() {
       return allBlockLocsAttemptedToSatisfy;
     }
+
+    /**
+     * Update lastAttemptedOrReportedTime, so that the expiration time will be
+     * postponed to future.
+     */
+    private void touchLastReportedTimeStamp() {
+      this.lastAttemptedOrReportedTime = monotonicNow();
+    }
   }
 
   /**
@@ -234,7 +245,8 @@ public class BlockStorageMovementAttemptedItems {
       while (iter.hasNext()) {
         Entry<Long, ItemInfo> entry = iter.next();
         ItemInfo itemInfo = entry.getValue();
-        if (now > itemInfo.getLastAttemptedTimeStamp() + selfRetryTimeout) {
+        if (now > itemInfo.getLastAttemptedOrReportedTime()
+            + selfRetryTimeout) {
           Long blockCollectionID = entry.getKey();
           synchronized (storageMovementAttemptedResults) {
             if (!isExistInResult(blockCollectionID)) {
@@ -273,6 +285,7 @@ public class BlockStorageMovementAttemptedItems {
       Iterator<BlocksStorageMovementResult> resultsIter =
           storageMovementAttemptedResults.iterator();
       while (resultsIter.hasNext()) {
+        boolean isInprogress = false;
         // TrackID need to be retried in the following cases:
         // 1) All or few scheduled block(s) movement has been failed.
         // 2) All the scheduled block(s) movement has been succeeded but there
@@ -282,16 +295,19 @@ public class BlockStorageMovementAttemptedItems {
         BlocksStorageMovementResult storageMovementAttemptedResult = 
resultsIter
             .next();
         synchronized (storageMovementAttemptedItems) {
-          if (storageMovementAttemptedResult
-              .getStatus() == BlocksStorageMovementResult.Status.FAILURE) {
+          Status status = storageMovementAttemptedResult.getStatus();
+          ItemInfo itemInfo;
+          switch (status) {
+          case FAILURE:
             blockStorageMovementNeeded
                 .add(storageMovementAttemptedResult.getTrackId());
             LOG.warn("Blocks storage movement results for the tracking id: {}"
                 + " is reported from co-ordinating datanode, but result"
                 + " status is FAILURE. So, added for retry",
                 storageMovementAttemptedResult.getTrackId());
-          } else {
-            ItemInfo itemInfo = storageMovementAttemptedItems
+            break;
+          case SUCCESS:
+            itemInfo = storageMovementAttemptedItems
                 .get(storageMovementAttemptedResult.getTrackId());
 
             // ItemInfo could be null. One case is, before the blocks movements
@@ -320,10 +336,26 @@ public class BlockStorageMovementAttemptedItems {
               this.sps.notifyBlkStorageMovementFinished(
                   storageMovementAttemptedResult.getTrackId());
             }
+            break;
+          case IN_PROGRESS:
+            isInprogress = true;
+            itemInfo = storageMovementAttemptedItems
+                .get(storageMovementAttemptedResult.getTrackId());
+            if(itemInfo != null){
+              // update the attempted expiration time to next cycle.
+              itemInfo.touchLastReportedTimeStamp();
+            }
+            break;
+          default:
+            LOG.error("Unknown status: {}", status);
+            break;
+          }
+          // Remove trackID from the attempted list if the attempt has been
+          // completed(success or failure), if any.
+          if (!isInprogress) {
+            storageMovementAttemptedItems
+                .remove(storageMovementAttemptedResult.getTrackId());
           }
-          // Remove trackID from the attempted list, if any.
-          storageMovementAttemptedItems
-              .remove(storageMovementAttemptedResult.getTrackId());
         }
         // Remove trackID from results as processed above.
         resultsIter.remove();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2363781e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 8cf9920..8be0a2a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -108,6 +108,11 @@ public class StoragePolicySatisfier implements Runnable {
     } else {
       LOG.info("Starting StoragePolicySatisfier.");
     }
+
+    // Ensure that all the previously submitted block movements(if any) have to
+    // be stopped in all datanodes.
+    addDropSPSWorkCommandsToAllDNs();
+
     storagePolicySatisfierThread = new Daemon(this);
     storagePolicySatisfierThread.setName("StoragePolicySatisfier");
     storagePolicySatisfierThread.start();
@@ -133,7 +138,7 @@ public class StoragePolicySatisfier implements Runnable {
       LOG.info("Stopping StoragePolicySatisfier, as admin requested to "
           + "deactivate it.");
       this.clearQueuesWithNotification();
-      this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
+      addDropSPSWorkCommandsToAllDNs();
     } else {
       LOG.info("Stopping StoragePolicySatisfier.");
     }
@@ -170,6 +175,14 @@ public class StoragePolicySatisfier implements Runnable {
     return namesystem.isFileOpenedForWrite(moverId);
   }
 
+  /**
+   * Adding drop commands to all datanodes to stop performing the satisfier
+   * block movements, if any.
+   */
+  private void addDropSPSWorkCommandsToAllDNs() {
+    this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
+  }
+
   @Override
   public void run() {
     boolean isMoverRunning = !checkIfMoverRunning();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2363781e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
index 713b83b..b484eb1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
@@ -35,9 +35,13 @@ public class BlocksStorageMovementResult {
    * retry these failed blocks movements. Example selected target node is no
    * more running or no space. So, retrying by selecting new target node might
    * work.
+   *
+   * <p>
+   * IN_PROGRESS - If all or some of the blocks associated to track id are
+   * still moving.
    */
   public static enum Status {
-    SUCCESS, FAILURE;
+    SUCCESS, FAILURE, IN_PROGRESS;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2363781e/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 899dc7e..080f7fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -192,6 +192,7 @@ message BlocksStorageMovementResultProto {
   enum Status {
     SUCCESS = 1; // block movement succeeded
     FAILURE = 2; // block movement failed and needs to retry
+    IN_PROGRESS = 3; // block movement is still in progress
   }
   required uint64 trackID = 1;
   required Status status = 2;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2363781e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
index 86b8b50..8fbbf33 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -176,16 +176,21 @@ public class TestStoragePolicySatisfyWorker {
 
     StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf,
         src);
-    List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
-    BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo(
-        lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo,
-        lb.getStorageTypes()[0], StorageType.ARCHIVE);
-    blockMovingInfos.add(blockMovingInfo);
-    INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
-    worker.processBlockMovingTasks(inode.getId(),
-        cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
-
-    waitForBlockMovementCompletion(worker, inode.getId(), 1, 30000);
+    try {
+      worker.start();
+      List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
+      BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo(
+          lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo,
+          lb.getStorageTypes()[0], StorageType.ARCHIVE);
+      blockMovingInfos.add(blockMovingInfo);
+      INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
+      worker.processBlockMovingTasks(inode.getId(),
+          cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
+
+      waitForBlockMovementCompletion(worker, inode.getId(), 1, 30000);
+    } finally {
+      worker.stop();
+    }
   }
 
   /**
@@ -212,24 +217,29 @@ public class TestStoragePolicySatisfyWorker {
 
     StoragePolicySatisfyWorker worker =
         new StoragePolicySatisfyWorker(conf, src);
-    List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
-    List<LocatedBlock> locatedBlocks =
-        dfs.getClient().getLocatedBlocks(file, 0).getLocatedBlocks();
-    for (LocatedBlock locatedBlock : locatedBlocks) {
-      BlockMovingInfo blockMovingInfo =
-          prepareBlockMovingInfo(locatedBlock.getBlock().getLocalBlock(),
-              locatedBlock.getLocations()[0], targetDnInfo,
-              locatedBlock.getStorageTypes()[0], StorageType.ARCHIVE);
-      blockMovingInfos.add(blockMovingInfo);
+    worker.start();
+    try {
+      List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
+      List<LocatedBlock> locatedBlocks =
+          dfs.getClient().getLocatedBlocks(file, 0).getLocatedBlocks();
+      for (LocatedBlock locatedBlock : locatedBlocks) {
+        BlockMovingInfo blockMovingInfo =
+            prepareBlockMovingInfo(locatedBlock.getBlock().getLocalBlock(),
+                locatedBlock.getLocations()[0], targetDnInfo,
+                locatedBlock.getStorageTypes()[0], StorageType.ARCHIVE);
+        blockMovingInfos.add(blockMovingInfo);
+      }
+      INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
+      worker.processBlockMovingTasks(inode.getId(),
+          cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
+      // Wait till results queue build up
+      waitForBlockMovementResult(worker, inode.getId(), 30000);
+      worker.dropSPSWork();
+      assertTrue(worker.getBlocksMovementsStatusHandler()
+          .getBlksMovementResults().size() == 0);
+    } finally {
+      worker.stop();
     }
-    INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
-    worker.processBlockMovingTasks(inode.getId(),
-        cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
-    // Wait till results queue build up
-    waitForBlockMovementResult(worker, inode.getId(), 30000);
-    worker.dropSPSWork();
-    assertTrue(worker.getBlocksMovementsCompletionHandler()
-        .getBlksMovementResults().size() == 0);
   }
 
   private void waitForBlockMovementResult(
@@ -239,7 +249,7 @@ public class TestStoragePolicySatisfyWorker {
       @Override
       public Boolean get() {
         List<BlocksStorageMovementResult> completedBlocks = worker
-            .getBlocksMovementsCompletionHandler().getBlksMovementResults();
+            .getBlocksMovementsStatusHandler().getBlksMovementResults();
         return completedBlocks.size() > 0;
       }
     }, 100, timeout);
@@ -252,7 +262,7 @@ public class TestStoragePolicySatisfyWorker {
       @Override
       public Boolean get() {
         List<BlocksStorageMovementResult> completedBlocks = worker
-            .getBlocksMovementsCompletionHandler().getBlksMovementResults();
+            .getBlocksMovementsStatusHandler().getBlksMovementResults();
         int failedCount = 0;
         for (BlocksStorageMovementResult blkMovementResult : completedBlocks) {
           if (blkMovementResult.getStatus() ==

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2363781e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java
index 4d226ff..c88d5be 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java
@@ -17,51 +17,90 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
-
-import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Tests that StoragePolicySatisfier is able to work with HA enabled.
  */
 public class TestStoragePolicySatisfierWithHA {
   private MiniDFSCluster cluster = null;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestStoragePolicySatisfierWithHA.class);
 
-  @Before
-  public void setUp() throws IOException {
-    Configuration conf = new Configuration();
+  private final Configuration config = new HdfsConfiguration();
+  private static final int DEFAULT_BLOCK_SIZE = 1024;
+  private DistributedFileSystem dfs = null;
+
+  private StorageType[][] allDiskTypes =
+      new StorageType[][]{{StorageType.DISK, StorageType.DISK},
+          {StorageType.DISK, StorageType.DISK},
+          {StorageType.DISK, StorageType.DISK}};
+  private int numOfDatanodes = 3;
+  private int storagesPerDatanode = 2;
+  private long capacity = 2 * 256 * 1024 * 1024;
+  private int nnIndex = 0;
+
+  private void createCluster() throws IOException {
+    config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    startCluster(config, allDiskTypes, numOfDatanodes, storagesPerDatanode,
+        capacity);
+    dfs = cluster.getFileSystem(nnIndex);
+  }
+
+  private void startCluster(final Configuration conf,
+      StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
+      long nodeCapacity) throws IOException {
+    long[][] capacities = new long[numberOfDatanodes][storagesPerDn];
+    for (int i = 0; i < numberOfDatanodes; i++) {
+      for (int j = 0; j < storagesPerDn; j++) {
+        capacities[i][j] = nodeCapacity;
+      }
+    }
     cluster = new MiniDFSCluster.Builder(conf)
         .nnTopology(MiniDFSNNTopology.simpleHATopology())
-        .numDataNodes(1)
-        .build();
+        .numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn)
+        .storageTypes(storageTypes).storageCapacities(capacities).build();
+    cluster.waitActive();
+    cluster.transitionToActive(0);
   }
 
   /**
    * Tests to verify that SPS should run/stop automatically when NN state
    * changes between Standby and Active.
    */
-  @Test(timeout = 100000)
+  @Test(timeout = 90000)
   public void testWhenNNHAStateChanges() throws IOException {
     try {
-      DistributedFileSystem fs;
+      createCluster();
       boolean running;
 
-      cluster.waitActive();
-      fs = cluster.getFileSystem(0);
+      dfs = cluster.getFileSystem(1);
 
       try {
-        fs.getClient().isStoragePolicySatisfierRunning();
+        dfs.getClient().isStoragePolicySatisfierRunning();
         Assert.fail("Call this function to Standby NN should "
             + "raise an exception.");
       } catch (RemoteException e) {
@@ -72,14 +111,15 @@ public class TestStoragePolicySatisfierWithHA {
       }
 
       cluster.transitionToActive(0);
-      running = fs.getClient().isStoragePolicySatisfierRunning();
+      dfs = cluster.getFileSystem(0);
+      running = dfs.getClient().isStoragePolicySatisfierRunning();
       Assert.assertTrue("StoragePolicySatisfier should be active "
           + "when NN transits from Standby to Active mode.", running);
 
       // NN transits from Active to Standby
       cluster.transitionToStandby(0);
       try {
-        fs.getClient().isStoragePolicySatisfierRunning();
+        dfs.getClient().isStoragePolicySatisfierRunning();
         Assert.fail("NN in Standby again, call this function should "
             + "raise an exception.");
       } catch (RemoteException e) {
@@ -106,4 +146,104 @@ public class TestStoragePolicySatisfierWithHA {
       cluster.shutdown();
     }
   }
+
+  /**
+   * Test to verify that during namenode switch over will add
+   * DNA_DROP_SPS_WORK_COMMAND to all the datanodes. Later, this will ensure to
+   * drop all the SPS queues at datanode.
+   */
+  @Test(timeout = 90000)
+  public void testNamenodeSwitchoverShouldDropSPSWork() throws Exception {
+    try {
+      createCluster();
+
+      FSNamesystem fsn = cluster.getNamesystem(0);
+      ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+      List<DatanodeDescriptor> listOfDns = new ArrayList<>();
+      for (DataNode dn : dataNodes) {
+        DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn,
+            dn.getDatanodeId());
+        listOfDns.add(dnd);
+      }
+      cluster.shutdownDataNodes();
+
+      cluster.transitionToStandby(0);
+      LOG.info("**Transition to Active**");
+      cluster.transitionToActive(1);
+
+      // Verify that Standby-to-Active transition should set drop SPS flag to
+      // true. This will ensure that DNA_DROP_SPS_WORK_COMMAND will be
+      // propagated to datanode during heartbeat response.
+      int retries = 20;
+      boolean dropSPSWork = false;
+      while (retries > 0) {
+        for (DatanodeDescriptor dnd : listOfDns) {
+          dropSPSWork = dnd.shouldDropSPSWork();
+          if (!dropSPSWork) {
+            retries--;
+            Thread.sleep(250);
+            break;
+          }
+        }
+        if (dropSPSWork) {
+          break;
+        }
+      }
+      Assert.assertTrue("Didn't drop SPS work", dropSPSWork);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test to verify that SPS work will be dropped once the datanode is marked 
as
+   * expired. Internally 'dropSPSWork' flag is set as true while expiration and
+   * at the time of reconnection, will send DNA_DROP_SPS_WORK_COMMAND to that
+   * datanode.
+   */
+  @Test(timeout = 90000)
+  public void testDeadDatanode() throws Exception {
+    int heartbeatExpireInterval = 2 * 2000;
+    config.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+        3000);
+    config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000L);
+    createCluster();
+
+    DataNode dn = cluster.getDataNodes().get(0);
+    DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+
+    FSNamesystem fsn = cluster.getNamesystem(0);
+    DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn,
+        dn.getDatanodeId());
+    boolean isDead = false;
+    int retries = 20;
+    while (retries > 0) {
+      isDead = dnd.getLastUpdateMonotonic() < (monotonicNow()
+          - heartbeatExpireInterval);
+      if (isDead) {
+        break;
+      }
+      retries--;
+      Thread.sleep(250);
+    }
+    Assert.assertTrue("Datanode is alive", isDead);
+    // Disable datanode heartbeat, so that the datanode will get expired after
+    // the recheck interval and become dead.
+    DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
+
+    // Verify that datanode expiration will set drop SPS flag to
+    // true. This will ensure that DNA_DROP_SPS_WORK_COMMAND will be
+    // propagated to datanode during reconnection.
+    boolean dropSPSWork = false;
+    retries = 50;
+    while (retries > 0) {
+      dropSPSWork = dnd.shouldDropSPSWork();
+      if (dropSPSWork) {
+        break;
+      }
+      retries--;
+      Thread.sleep(100);
+    }
+    Assert.assertTrue("Didn't drop SPS work", dropSPSWork);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to