HDFS-6682. Add a metric to expose the timestamp of the oldest under-replicated 
block. (aajisaka)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/02c01815
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/02c01815
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/02c01815

Branch: refs/heads/YARN-1197
Commit: 02c01815eca656814febcdaca6115e5f53b9c746
Parents: ab3197c
Author: Akira Ajisaka <aajis...@apache.org>
Authored: Fri Jul 24 11:37:23 2015 +0900
Committer: Akira Ajisaka <aajis...@apache.org>
Committed: Fri Jul 24 11:37:23 2015 +0900

----------------------------------------------------------------------
 .../hadoop-common/src/site/markdown/Metrics.md  |  1 +
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 ++
 .../server/blockmanagement/BlockManager.java    |  4 ++
 .../blockmanagement/UnderReplicatedBlocks.java  | 33 ++++++++++++--
 .../hdfs/server/namenode/FSNamesystem.java      |  9 +++-
 .../TestUnderReplicatedBlocks.java              | 48 ++++++++++++++++++++
 6 files changed, 93 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/02c01815/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md 
b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index 646cda5..2e6c095 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -201,6 +201,7 @@ Each metrics record contains tags such as HAState and 
Hostname as additional inf
 | Name | Description |
 |:---- |:---- |
 | `MissingBlocks` | Current number of missing blocks |
+| `TimeOfTheOldestBlockToBeReplicated` | The timestamp of the oldest block to 
be replicated. If there are no under-replicated or corrupt blocks, return 0. |
 | `ExpiredHeartbeats` | Total number of expired heartbeats |
 | `TransactionsSinceLastCheckpoint` | Total number of transactions since last 
checkpoint |
 | `TransactionsSinceLastLogRoll` | Total number of transactions since last 
edit log roll |

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02c01815/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index bcc1e25..f86d41e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -747,6 +747,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8730. Clean up the import statements in ClientProtocol.
     (Takanobu Asanuma via wheat9)
 
+    HDFS-6682. Add a metric to expose the timestamp of the oldest
+    under-replicated block. (aajisaka)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02c01815/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 7dce2a8..64603d0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -171,6 +171,10 @@ public class BlockManager implements BlockStatsMXBean {
   public int getPendingDataNodeMessageCount() {
     return pendingDNMessages.count();
   }
+  /** Used by metrics. */
+  public long getTimeOfTheOldestBlockToBeReplicated() {
+    return neededReplications.getTimeOfTheOldestBlockToBeReplicated();
+  }
 
   /**replicationRecheckInterval is how often namenode checks for new 
replication work*/
   private final long replicationRecheckInterval;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02c01815/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
index 000416e..d8aec99 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
@@ -18,10 +18,15 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.util.Time;
 
 /**
  * Keep prioritized queues of under replicated blocks.
@@ -82,6 +87,9 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
 
   /** The number of corrupt blocks with replication factor 1 */
   private int corruptReplOneBlocks = 0;
+  /** Keep timestamp when a block is put into the queue. */
+  private final Map<BlockInfo, Long> timestampsMap =
+      Collections.synchronizedMap(new LinkedHashMap<BlockInfo, Long>());
 
   /** Create an object. */
   UnderReplicatedBlocks() {
@@ -91,12 +99,13 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
   }
 
   /**
-   * Empty the queues.
+   * Empty the queues and timestamps.
    */
   void clear() {
     for (int i = 0; i < LEVEL; i++) {
       priorityQueues.get(i).clear();
     }
+    timestampsMap.clear();
   }
 
   /** Return the total number of under replication blocks */
@@ -119,6 +128,20 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> 
{
     return size;
   }
 
+  /**
+   * Return the smallest timestamp of the under-replicated/corrupt blocks.
+   * If there are no under-replicated or corrupt blocks, return 0.
+   */
+  long getTimeOfTheOldestBlockToBeReplicated() {
+    synchronized (timestampsMap) {
+      if (timestampsMap.isEmpty()) {
+        return 0;
+      }
+      // Since we are using LinkedHashMap, the first value is the smallest.
+      return timestampsMap.entrySet().iterator().next().getValue();
+    }
+  }
+
   /** Return the number of corrupt blocks */
   synchronized int getCorruptBlockSize() {
     return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size();
@@ -197,7 +220,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
               + " has only {} replicas and need {} replicas so is added to" +
               " neededReplications at priority level {}", block, curReplicas,
           expectedReplicas, priLevel);
-
+      timestampsMap.put(block, Time.now());
       return true;
     }
     return false;
@@ -242,8 +265,9 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
     if(priLevel >= 0 && priLevel < LEVEL
         && priorityQueues.get(priLevel).remove(block)) {
       NameNode.blockStateChangeLog.debug(
-        "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block {}" +
-            " from priority queue {}", block, priLevel);
+          "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block {}" +
+              " from priority queue {}", block, priLevel);
+      timestampsMap.remove(block);
       return true;
     } else {
       // Try to remove the block from all queues if the block was
@@ -253,6 +277,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
           NameNode.blockStateChangeLog.debug(
               "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block" 
+
                   " {} from priority queue {}", block, priLevel);
+          timestampsMap.remove(block);
           return true;
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02c01815/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 0b44431..0a2422e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3770,7 +3770,14 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     // not locking
     return blockManager.getMissingReplOneBlocksCount();
   }
-  
+
+  @Metric({"TimeOfTheOldestBlockToBeReplicated",
+      "The timestamp of the oldest block to be replicated. If there are no" +
+      "under-replicated or corrupt blocks, return 0."})
+  public long getTimeOfTheOldestBlockToBeReplicated() {
+    return blockManager.getTimeOfTheOldestBlockToBeReplicated();
+  }
+
   @Metric({"ExpiredHeartbeats", "Number of expired heartbeats"})
   public int getExpiredHeartbeats() {
     return datanodeStatistics.getExpiredHeartbeats();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02c01815/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
index 27b35f0..7615cee 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.util.Time;
 import org.junit.Test;
 
 import java.util.Iterator;
@@ -146,4 +148,50 @@ public class TestUnderReplicatedBlocks {
 
   }
 
+  @Test
+  public void testGetTimeOfTheOldestBlockToBeReplicated() {
+    UnderReplicatedBlocks blocks = new UnderReplicatedBlocks();
+    BlockInfo block1 = new BlockInfoContiguous(new Block(1), (short) 1);
+    BlockInfo block2 = new BlockInfoContiguous(new Block(2), (short) 1);
+
+    // if there are no under-replicated or corrupt blocks, return 0
+    assertEquals(blocks.getTimeOfTheOldestBlockToBeReplicated(), 0L);
+
+    // add block1, add block2, remove block1, remove block2
+    long time1 = Time.now();
+    blocks.add(block1, 1, 0, 3);
+    long time2 = Time.now();
+    assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() >= time1);
+    assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() <= time2);
+
+    blocks.add(block2, 2, 0, 3);
+    long time3 = Time.now();
+    assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() >= time1);
+    assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() <= time2);
+
+    blocks.remove(block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
+    assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() >= time2);
+    assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() <= time3);
+
+    blocks.remove(block2, UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED);
+    assertEquals(blocks.getTimeOfTheOldestBlockToBeReplicated(), 0L);
+
+    // add block2, add block1, remove block1, remove block2
+    time1 = Time.now();
+    blocks.add(block2, 2, 0, 3);
+    time2 = Time.now();
+    assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() >= time1);
+    assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() <= time2);
+
+    blocks.add(block1, 1, 0, 3);
+    assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() >= time1);
+    assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() <= time2);
+
+    blocks.remove(block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
+    assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() >= time1);
+    assertTrue(blocks.getTimeOfTheOldestBlockToBeReplicated() <= time2);
+
+    blocks.remove(block2, UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED);
+    assertEquals(blocks.getTimeOfTheOldestBlockToBeReplicated(), 0L);
+  }
 }

Reply via email to