Author: suresh
Date: Wed Mar  7 06:43:11 2012
New Revision: 1297863

URL: http://svn.apache.org/viewvc?rev=1297863&view=rev
Log:
HDFS-2476. Merging change r1201991 from trunk to 0.23

Added:
    
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java
      - copied unchanged from r1201991, 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java
    
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java
      - copied unchanged from r1201991, 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java
    
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightHashSet.java
      - copied unchanged from r1201991, 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightHashSet.java
    
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java
      - copied unchanged from r1201991, 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java
Modified:
    
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
    
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
    
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
    
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java
    
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java

Modified: 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1297863&r1=1297862&r2=1297863&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
(original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
Wed Mar  7 06:43:11 2012
@@ -109,6 +109,10 @@ Release 0.23.3 - UNRELEASED
     HDFS-2495. Increase granularity of write operations in ReplicationMonitor
     thus reducing contention for write lock. (Tomasz Nykiel via hairong)
 
+    HDFS-2476. More CPU efficient data structure for under-replicated,
+               over-replicated, and invalidated blocks.
+               (Tomasz Nykiel via todd)
+
   BUG FIXES
  
     HDFS-2481. Unknown protocol: 
org.apache.hadoop.hdfs.protocol.ClientProtocol.

Modified: 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1297863&r1=1297862&r2=1297863&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 Wed Mar  7 06:43:11 2012
@@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.Daemon;
 
@@ -142,8 +143,8 @@ public class BlockManager {
   // eventually remove these extras.
   // Mapping: StorageID -> TreeSet<Block>
   //
-  public final Map<String, Collection<Block>> excessReplicateMap =
-    new TreeMap<String, Collection<Block>>();
+  public final Map<String, LightWeightLinkedSet<Block>> excessReplicateMap =
+    new TreeMap<String, LightWeightLinkedSet<Block>>();
 
   //
   // Store set of Blocks that need to be replicated 1 or more times.
@@ -1255,7 +1256,7 @@ public class BlockManager {
     Collection<DatanodeDescriptor> nodesCorrupt = 
corruptReplicas.getNodes(block);
     while(it.hasNext()) {
       DatanodeDescriptor node = it.next();
-      Collection<Block> excessBlocks =
+      LightWeightLinkedSet<Block> excessBlocks =
         excessReplicateMap.get(node.getStorageID());
       if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
         corrupt++;
@@ -1987,7 +1988,7 @@ public class BlockManager {
     for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
          it.hasNext();) {
       DatanodeDescriptor cur = it.next();
-      Collection<Block> excessBlocks = excessReplicateMap.get(cur
+      LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
           .getStorageID());
       if (excessBlocks == null || !excessBlocks.contains(block)) {
         if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
@@ -2105,9 +2106,9 @@ public class BlockManager {
 
   private void addToExcessReplicate(DatanodeInfo dn, Block block) {
     assert namesystem.hasWriteLock();
-    Collection<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
+    LightWeightLinkedSet<Block> excessBlocks = 
excessReplicateMap.get(dn.getStorageID());
     if (excessBlocks == null) {
-      excessBlocks = new TreeSet<Block>();
+      excessBlocks = new LightWeightLinkedSet<Block>();
       excessReplicateMap.put(dn.getStorageID(), excessBlocks);
     }
     if (excessBlocks.add(block)) {
@@ -2155,7 +2156,7 @@ public class BlockManager {
       // We've removed a block from a node, so it's definitely no longer
       // in "excess" there.
       //
-      Collection<Block> excessBlocks = excessReplicateMap.get(node
+      LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node
           .getStorageID());
       if (excessBlocks != null) {
         if (excessBlocks.remove(block)) {
@@ -2305,8 +2306,8 @@ public class BlockManager {
       } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
         count++;
       } else {
-        Collection<Block> blocksExcess =
-          excessReplicateMap.get(node.getStorageID());
+        LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
+            .getStorageID());
         if (blocksExcess != null && blocksExcess.contains(b)) {
           excess++;
         } else {

Modified: 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1297863&r1=1297862&r2=1297863&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
 Wed Mar  7 06:43:11 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.Deprecated
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 
@@ -120,11 +121,11 @@ public class DatanodeDescriptor extends 
   private BlockQueue<BlockInfoUnderConstruction> recoverBlocks =
                                 new BlockQueue<BlockInfoUnderConstruction>();
   /** A set of blocks to be invalidated by this datanode */
-  private Set<Block> invalidateBlocks = new TreeSet<Block>();
+  private LightWeightHashSet<Block> invalidateBlocks = new 
LightWeightHashSet<Block>();
 
   /* Variables for maintaining number of blocks scheduled to be written to
    * this datanode. This count is approximate and might be slightly bigger
-   * in case of errors (e.g. datanode does not report if an error occurs 
+   * in case of errors (e.g. datanode does not report if an error occurs
    * while writing the block).
    */
   private int currApproxBlocksScheduled = 0;
@@ -400,45 +401,11 @@ public class DatanodeDescriptor extends 
    * Remove the specified number of blocks to be invalidated
    */
   public Block[] getInvalidateBlocks(int maxblocks) {
-    return getBlockArray(invalidateBlocks, maxblocks); 
-  }
-
-  static private Block[] getBlockArray(Collection<Block> blocks, int max) {
-    Block[] blockarray = null;
-    synchronized(blocks) {
-      int available = blocks.size();
-      int n = available;
-      if (max > 0 && n > 0) {
-        if (max < n) {
-          n = max;
-        }
-        // allocate the properly sized block array ... 
-        blockarray = new Block[n];
-
-        // iterate tree collecting n blocks... 
-        Iterator<Block> e = blocks.iterator();
-        int blockCount = 0;
-
-        while (blockCount < n && e.hasNext()) {
-          // insert into array ... 
-          blockarray[blockCount++] = e.next();
-
-          // remove from tree via iterator, if we are removing 
-          // less than total available blocks
-          if (n < available){
-            e.remove();
-          }
-        }
-        assert(blockarray.length == n);
-        
-        // now if the number of blocks removed equals available blocks,
-        // them remove all blocks in one fell swoop via clear
-        if (n == available) { 
-          blocks.clear();
-        }
-      }
+    synchronized (invalidateBlocks) {
+      Block[] deleteList = invalidateBlocks.pollToArray(new Block[Math.min(
+          invalidateBlocks.size(), maxblocks)]);
+      return deleteList.length == 0 ? null : deleteList;
     }
-    return blockarray;
   }
 
   /** Serialization for FSEditLog */

Modified: 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1297863&r1=1297862&r2=1297863&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
 Wed Mar  7 06:43:11 2012
@@ -30,8 +30,9 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 
-/** 
+/**
  * Keeps a Collection for every named machine containing blocks
  * that have recently been invalidated and are thought to live
  * on the machine in question.
@@ -39,8 +40,8 @@ import org.apache.hadoop.hdfs.server.nam
 @InterfaceAudience.Private
 class InvalidateBlocks {
   /** Mapping: StorageID -> Collection of Blocks */
-  private final Map<String, Collection<Block>> node2blocks =
-      new TreeMap<String, Collection<Block>>();
+  private final Map<String, LightWeightHashSet<Block>> node2blocks =
+      new TreeMap<String, LightWeightHashSet<Block>>();
   /** The total number of blocks in the map. */
   private long numBlocks = 0L;
 
@@ -67,9 +68,9 @@ class InvalidateBlocks {
    */
   synchronized void add(final Block block, final DatanodeInfo datanode,
       final boolean log) {
-    Collection<Block> set = node2blocks.get(datanode.getStorageID());
+    LightWeightHashSet<Block> set = node2blocks.get(datanode.getStorageID());
     if (set == null) {
-      set = new HashSet<Block>();
+      set = new LightWeightHashSet<Block>();
       node2blocks.put(datanode.getStorageID(), set);
     }
     if (set.add(block)) {
@@ -83,7 +84,7 @@ class InvalidateBlocks {
 
   /** Remove a storage from the invalidatesSet */
   synchronized void remove(final String storageID) {
-    final Collection<Block> blocks = node2blocks.remove(storageID);
+    final LightWeightHashSet<Block> blocks = node2blocks.remove(storageID);
     if (blocks != null) {
       numBlocks -= blocks.size();
     }
@@ -91,7 +92,7 @@ class InvalidateBlocks {
 
   /** Remove the block from the specified storage. */
   synchronized void remove(final String storageID, final Block block) {
-    final Collection<Block> v = node2blocks.get(storageID);
+    final LightWeightHashSet<Block> v = node2blocks.get(storageID);
     if (v != null && v.remove(block)) {
       numBlocks--;
       if (v.isEmpty()) {
@@ -109,8 +110,8 @@ class InvalidateBlocks {
       return;
     }
 
-    for(Map.Entry<String,Collection<Block>> entry : node2blocks.entrySet()) {
-      final Collection<Block> blocks = entry.getValue();
+    for(Map.Entry<String,LightWeightHashSet<Block>> entry : 
node2blocks.entrySet()) {
+      final LightWeightHashSet<Block> blocks = entry.getValue();
       if (blocks.size() > 0) {
         out.println(datanodeManager.getDatanode(entry.getKey()).getName() + 
blocks);
       }
@@ -143,21 +144,17 @@ class InvalidateBlocks {
 
   private synchronized List<Block> invalidateWork(
       final String storageId, final DatanodeDescriptor dn) {
-    final Collection<Block> set = node2blocks.get(storageId);
+    final LightWeightHashSet<Block> set = node2blocks.get(storageId);
     if (set == null) {
       return null;
     }
 
     // # blocks that can be sent in one message is limited
     final int limit = datanodeManager.blockInvalidateLimit;
-    final List<Block> toInvalidate = new ArrayList<Block>(limit);
-    final Iterator<Block> it = set.iterator();
-    for(int count = 0; count < limit && it.hasNext(); count++) {
-      toInvalidate.add(it.next());
-      it.remove();
-    }
+    final List<Block> toInvalidate = set.pollN(limit);
+
     // If we send everything in this message, remove this node entry
-    if (!it.hasNext()) {
+    if (set.isEmpty()) {
       remove(storageId);
     }
 

Modified: 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java?rev=1297863&r1=1297862&r2=1297863&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
 Wed Mar  7 06:43:11 2012
@@ -24,6 +24,7 @@ import java.util.NavigableSet;
 import java.util.TreeSet;
 
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 
 /**
@@ -80,13 +81,13 @@ class UnderReplicatedBlocks implements I
   /** The queue for corrupt blocks: {@value} */
   static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
   /** the queues themselves */
-  private final List<NavigableSet<Block>> priorityQueues
-      = new ArrayList<NavigableSet<Block>>(LEVEL);
+  private List<LightWeightLinkedSet<Block>> priorityQueues
+      = new ArrayList<LightWeightLinkedSet<Block>>();
 
   /** Create an object. */
   UnderReplicatedBlocks() {
     for (int i = 0; i < LEVEL; i++) {
-      priorityQueues.add(new TreeSet<Block>());
+      priorityQueues.add(new LightWeightLinkedSet<Block>());
     }
   }
 
@@ -123,10 +124,10 @@ class UnderReplicatedBlocks implements I
   synchronized int getCorruptBlockSize() {
     return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size();
   }
-  
+
   /** Check if a block is in the neededReplication queue */
   synchronized boolean contains(Block block) {
-    for (NavigableSet<Block> set : priorityQueues) {
+    for(LightWeightLinkedSet<Block> set : priorityQueues) {
       if (set.contains(block)) {
         return true;
       }

Modified: 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1297863&r1=1297862&r2=1297863&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 Wed Mar  7 06:43:11 2012
@@ -4013,7 +4013,7 @@ public class FSNamesystem implements Nam
    * @throws IOException
    */
   Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
-      String startBlockAfter) throws IOException {
+       String[] cookieTab) throws IOException {
 
     readLock();
     try {
@@ -4022,23 +4022,27 @@ public class FSNamesystem implements Nam
                               "replication queues have not been initialized.");
       }
       checkSuperuserPrivilege();
-      long startBlockId = 0;
       // print a limited # of corrupt files per call
       int count = 0;
       ArrayList<CorruptFileBlockInfo> corruptFiles = new 
ArrayList<CorruptFileBlockInfo>();
-      
-      if (startBlockAfter != null) {
-        startBlockId = Block.filename2id(startBlockAfter);
-      }
 
       final Iterator<Block> blkIterator = 
blockManager.getCorruptReplicaBlockIterator();
+
+      if (cookieTab == null) {
+        cookieTab = new String[] { null };
+      }
+      int skip = getIntCookie(cookieTab[0]);
+      for (int i = 0; i < skip && blkIterator.hasNext(); i++) {
+        blkIterator.next();
+      }
+
       while (blkIterator.hasNext()) {
         Block blk = blkIterator.next();
         INode inode = blockManager.getINode(blk);
+        skip++;
         if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) 
{
           String src = FSDirectory.getFullPathName(inode);
-          if (((startBlockAfter == null) || (blk.getBlockId() > startBlockId))
-              && (src.startsWith(path))) {
+          if (src.startsWith(path)){
             corruptFiles.add(new CorruptFileBlockInfo(src, blk));
             count++;
             if (count >= DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED)
@@ -4046,13 +4050,32 @@ public class FSNamesystem implements Nam
           }
         }
       }
+      cookieTab[0] = String.valueOf(skip);
       LOG.info("list corrupt file blocks returned: " + count);
       return corruptFiles;
     } finally {
       readUnlock();
     }
   }
-  
+
+  /**
+   * Convert string cookie to integer.
+   */
+  private static int getIntCookie(String cookie){
+    int c;
+    if(cookie == null){
+      c = 0;
+    } else {
+      try{
+        c = Integer.parseInt(cookie);
+      }catch (NumberFormatException e) {
+        c = 0;
+      }
+    }
+    c = Math.max(0, c);
+    return c;
+  }
+
   /**
    * Create delegation token secret manager
    */

Modified: 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1297863&r1=1297862&r2=1297863&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 Wed Mar  7 06:43:11 2012
@@ -729,17 +729,16 @@ class NameNodeRpcServer implements Namen
   @Override // ClientProtocol
   public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
       throws IOException {
+       String[] cookieTab = new String[] { cookie };
     Collection<FSNamesystem.CorruptFileBlockInfo> fbs =
-      namesystem.listCorruptFileBlocks(path, cookie);
-    
+      namesystem.listCorruptFileBlocks(path, cookieTab);
+
     String[] files = new String[fbs.size()];
-    String lastCookie = "";
     int i = 0;
     for(FSNamesystem.CorruptFileBlockInfo fb: fbs) {
       files[i++] = fb.path;
-      lastCookie = fb.block.getBlockName();
     }
-    return new CorruptFileBlocks(files, lastCookie);
+    return new CorruptFileBlocks(files, cookieTab[0]);
   }
 
   /**

Modified: 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1297863&r1=1297862&r2=1297863&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
 Wed Mar  7 06:43:11 2012
@@ -113,11 +113,11 @@ public class NamenodeFsck {
   // We return back N files that are corrupt; the list of files returned is
   // ordered by block id; to allow continuation support, pass in the last block
   // # from previous call
-  private String startBlockAfter = null;
-  
+  private String[] currentCookie = new String[] { null };
+
   private final Configuration conf;
   private final PrintWriter out;
-  
+
   /**
    * Filesystem checker.
    * @param conf configuration (namenode config)
@@ -155,11 +155,11 @@ public class NamenodeFsck {
         this.showCorruptFileBlocks = true;
       }
       else if (key.equals("startblockafter")) {
-        this.startBlockAfter = pmap.get("startblockafter")[0]; 
+        this.currentCookie[0] = pmap.get("startblockafter")[0];
       }
     }
   }
-  
+
   /**
    * Check files on DFS, starting from the indicated path.
    */
@@ -215,19 +215,20 @@ public class NamenodeFsck {
       out.close();
     }
   }
- 
+
   private void listCorruptFileBlocks() throws IOException {
     Collection<FSNamesystem.CorruptFileBlockInfo> corruptFiles = namenode.
-      getNamesystem().listCorruptFileBlocks(path, startBlockAfter);
+      getNamesystem().listCorruptFileBlocks(path, currentCookie);
     int numCorruptFiles = corruptFiles.size();
     String filler;
     if (numCorruptFiles > 0) {
       filler = Integer.toString(numCorruptFiles);
-    } else if (startBlockAfter == null) {
+    } else if (currentCookie[0].equals("0")) {
       filler = "no";
     } else {
       filler = "no more";
     }
+    out.println("Cookie:\t" + currentCookie[0]);
     for (FSNamesystem.CorruptFileBlockInfo c : corruptFiles) {
       out.println(c.toString());
     }

Modified: 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java?rev=1297863&r1=1297862&r2=1297863&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java
 Wed Mar  7 06:43:11 2012
@@ -144,14 +144,15 @@ public class DFSck extends Configured im
       throws IOException {
     int errCode = -1;
     int numCorrupt = 0;
-    String lastBlock = null;
+    int cookie = 0;
     final String noCorruptLine = "has no CORRUPT files";
     final String noMoreCorruptLine = "has no more CORRUPT files";
+    final String cookiePrefix = "Cookie:";
     boolean allDone = false;
     while (!allDone) {
       final StringBuffer url = new StringBuffer(baseUrl);
-      if (lastBlock != null) {
-        url.append("&startblockafter=").append(lastBlock);
+      if (cookie > 0) {
+        url.append("&startblockafter=").append(String.valueOf(cookie));
       }
       URL path = new URL(url.toString());
       SecurityUtil.fetchServiceTicket(path);
@@ -162,29 +163,31 @@ public class DFSck extends Configured im
       try {
         String line = null;
         while ((line = input.readLine()) != null) {
-          if ((line.endsWith(noCorruptLine)) || 
+          if (line.startsWith(cookiePrefix)){
+            try{
+              cookie = Integer.parseInt(line.split("\t")[1]);
+            } catch (Exception e){
+              allDone = true;
+              break;
+            }
+            continue;
+          }
+          if ((line.endsWith(noCorruptLine)) ||
               (line.endsWith(noMoreCorruptLine)) ||
               (line.endsWith(NamenodeFsck.NONEXISTENT_STATUS))) {
             allDone = true;
             break;
           }
           if ((line.isEmpty())
-              || (line.startsWith("FSCK started by")) 
+              || (line.startsWith("FSCK started by"))
               || (line.startsWith("The filesystem under path")))
             continue;
           numCorrupt++;
           if (numCorrupt == 1) {
-            out.println("The list of corrupt files under path '" 
+            out.println("The list of corrupt files under path '"
                 + dir + "' are:");
           }
           out.println(line);
-          try {
-            // Get the block # that we need to send in next call
-            lastBlock = line.split("\t")[0];
-          } catch (Exception e) {
-            allDone = true;
-            break;
-          }
         }
       } finally {
         input.close();

Modified: 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java?rev=1297863&r1=1297862&r2=1297863&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
 Wed Mar  7 06:43:11 2012
@@ -323,9 +323,10 @@ public class TestListCorruptFileBlocks {
       FSNamesystem.CorruptFileBlockInfo[] cfb = corruptFileBlocks
           .toArray(new FSNamesystem.CorruptFileBlockInfo[0]);
       // now get the 2nd and 3rd file that is corrupt
+      String[] cookie = new String[]{"1"};
       Collection<FSNamesystem.CorruptFileBlockInfo> nextCorruptFileBlocks =
         namenode.getNamesystem()
-          .listCorruptFileBlocks("/corruptData", cfb[0].block.getBlockName());
+          .listCorruptFileBlocks("/corruptData", cookie);
       FSNamesystem.CorruptFileBlockInfo[] ncfb = nextCorruptFileBlocks
           .toArray(new FSNamesystem.CorruptFileBlockInfo[0]);
       numCorrupt = nextCorruptFileBlocks.size();
@@ -333,9 +334,9 @@ public class TestListCorruptFileBlocks {
       assertTrue(ncfb[0].block.getBlockName()
           .equalsIgnoreCase(cfb[1].block.getBlockName()));
 
-      corruptFileBlocks = 
-        namenode.getNamesystem().listCorruptFileBlocks("/corruptData",
-          ncfb[1].block.getBlockName());
+      corruptFileBlocks =
+        namenode.getNamesystem()
+          .listCorruptFileBlocks("/corruptData", cookie);
       numCorrupt = corruptFileBlocks.size();
       assertTrue(numCorrupt == 0);
       // Do a listing on a dir which doesn't have any corrupt blocks and


Reply via email to