Author: dhruba
Date: Tue Jan  8 02:39:55 2008
New Revision: 609925

URL: http://svn.apache.org/viewvc?rev=609925&view=rev
Log:
HADOOP-2110. Block report processing creates fewer transient objects.
Datanode Protocol version changed from 10 to 11.
(Sanjay Radia via dhruba)


Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockListAsLongs.java   
(with props)
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=609925&r1=609924&r2=609925&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jan  8 02:39:55 2008
@@ -36,6 +36,10 @@
 
     HADOOP-2381.  Support permission information in FileStatus. Client
     Protocol version changed from 21 to 22.  (Raghu Angadi via dhruba)
+
+    HADOOP-2110. Block report processing creates fewer transient objects.
+    Datanode Protocol version changed from 10 to 11.  
+    (Sanjay Radia via dhruba)
     
   NEW FEATURES
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java?rev=609925&r1=609924&r2=609925&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java Tue Jan  8 
02:39:55 2008
@@ -59,7 +59,7 @@
 
   /**
    */
-  public Block(long blkid, long len) {
+  public Block(final long blkid, final long len) {
     this.blkid = blkid;
     this.len = len;
   }
@@ -81,6 +81,10 @@
     this.len = len;
   }
 
+  public void set(long blkid, long len) {
+    this.blkid = blkid;
+    this.len = len;
+  }
   /**
    */
   public long getBlockId() {

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockListAsLongs.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockListAsLongs.java?rev=609925&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockListAsLongs.java 
(added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockListAsLongs.java 
Tue Jan  8 02:39:55 2008
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+/**
+ * This class provides an interface for accessing list of blocks that
+ * has been implemented as long[].
+ * This class is usefull for block report. Rather than send block reports
+ * as a Block[] we can send it as a long[].
+ *
+ */
+class BlockListAsLongs {
+  /**
+   * A block as 2 longs
+   *   block-id and block length
+   */
+  private static final int LONGS_PER_BLOCK = 2;
+  
+  private static int index2BlockId(int index) {
+    return index*LONGS_PER_BLOCK;
+  }
+  private static int index2BlockLen(int index) {
+    return (index*LONGS_PER_BLOCK) + 1;
+  }
+  
+  private long[] blockList;
+  
+  /**
+   * Converting a block[] to a long[]
+   * @param blockArray - the input array block[]
+   * @return the output array of long[]
+   */
+  
+  static long[] convertToArrayLongs(final Block[] blockArray) {
+    long[] blocksAsLongs = new long[blockArray.length * LONGS_PER_BLOCK];
+
+    BlockListAsLongs bl = new BlockListAsLongs(blocksAsLongs);
+    assert bl.getNumberOfBlocks() == blockArray.length;
+
+    for (int i = 0; i < blockArray.length; i++) {
+      bl.setBlock(i, blockArray[i]);
+    }
+    return blocksAsLongs;
+  }
+
+  /**
+   * Constructor
+   * @param iBlockList - BlockListALongs create from this long[] parameter
+   */
+  BlockListAsLongs(final long[] iBlockList) {
+    if (iBlockList == null) {
+      blockList = new long[0];
+    } else {
+      if (iBlockList.length%LONGS_PER_BLOCK != 0) {
+        // must be multiple of LONGS_PER_BLOCK
+        throw new IllegalArgumentException();
+      }
+      blockList = iBlockList;
+    }
+  }
+
+  
+  /**
+   * The number of blocks
+   * @return - the number of blocks
+   */
+  int getNumberOfBlocks() {
+    return blockList.length/LONGS_PER_BLOCK;
+  }
+  
+  
+  /**
+   * The block-id of the indexTh block
+   * @param index - the block whose block-id is desired
+   * @return the block-id
+   */
+  long getBlockId(final int index)  {
+    return blockList[index2BlockId(index)];
+  }
+  
+  /**
+   * The block-len of the indexTh block
+   * @param index - the block whose block-len is desired
+   * @return - the block-len
+   */
+  long getBlockLen(final int index)  {
+    return blockList[index2BlockLen(index)];
+  }
+  
+  /**
+   * Set the indexTh block
+   * @param index - the index of the block to set
+   * @param b - the block is set to the value of the this block
+   */
+  void setBlock(final int index, final Block b) {
+    blockList[index2BlockId(index)] = b.getBlockId();
+    blockList[index2BlockLen(index)] = b.getNumBytes();
+  }
+}

Propchange: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockListAsLongs.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockListAsLongs.java
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=609925&r1=609924&r2=609925&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue Jan  8 
02:39:55 2008
@@ -121,6 +121,14 @@
   private Semaphore balancingSem = new Semaphore(MAX_BALANCING_THREADS);
   long balanceBandwidth;
   private Throttler balancingThrottler;
+  
+  /**
+   * Current system time.
+   * @return current time in msec.
+   */
+  static long now() {
+    return System.currentTimeMillis();
+  }
 
   private static class DataNodeMetrics implements Updater {
     private final MetricsRecord metricsRecord;
@@ -603,8 +611,13 @@
           // Get back a list of local block(s) that are obsolete
           // and can be safely GC'ed.
           //
+          long brStartTime = now();
+          Block[] bReport = data.getBlockReport();
           DatanodeCommand cmd = namenode.blockReport(dnRegistration,
-                                                     data.getBlockReport());
+                  BlockListAsLongs.convertToArrayLongs(bReport));
+          long brTime = now() - brStartTime;
+          LOG.info("BlockReport of " + bReport.length +
+              " blocks got processed in " + brTime + " msecs");
           //
           // If we have sent the first block report, then wait a random
           // time before we start the periodic block reports.

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java?rev=609925&r1=609924&r2=609925&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java 
Tue Jan  8 02:39:55 2008
@@ -318,7 +318,7 @@
   }
 
   void reportDiff(BlocksMap blocksMap,
-                  Block[] newReport,
+                  BlockListAsLongs newReport,
                   Collection<Block> toAdd,
                   Collection<Block> toRemove) {
     // place a deilimiter in the list which separates blocks 
@@ -327,12 +327,20 @@
     boolean added = this.addBlock(delimiter);
     assert added : "Delimiting block cannot be present in the node";
     if(newReport == null)
-      newReport = new Block[0];
+      newReport = new BlockListAsLongs( new long[0]);
     // scan the report and collect newly reported blocks
-    for(Block blk : newReport) {
-      BlockInfo storedBlock = blocksMap.getStoredBlock(blk);
-      if(storedBlock == null || storedBlock.findDatanode(this) < 0) {
-        toAdd.add(blk);
+    // Note we are taking special precaution to limit tmp blocks allocated
+    // as part this block report - which why block list is stored as longs
+    Block iblk = new Block(); // a fixed new'ed block to be reused with index i
+    for (int i = 0; i < newReport.getNumberOfBlocks(); ++i) {
+      iblk.set(newReport.getBlockId(i), newReport.getBlockLen(i));
+      BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
+      if(storedBlock == null) { // Brand new block
+        toAdd.add(new Block(iblk));
+        continue;
+      }
+      if(storedBlock.findDatanode(this) < 0) {// Known block, but not on the DN
+        toAdd.add(storedBlock);
         continue;
       }
       // move block to the head of the list

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?rev=609925&r1=609924&r2=609925&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java 
Tue Jan  8 02:39:55 2008
@@ -32,8 +32,9 @@
 interface DatanodeProtocol extends VersionedProtocol {
   /*
    * 10: blockReceived also sends hints for deletion
+   * 11 Block reports as long[]
    */
-  public static final long versionID = 10L;
+  public static final long versionID = 11L;
   
   // error code
   final static int NOTIFY = 0;
@@ -83,9 +84,16 @@
    * and should be deleted.  This function is meant to upload *all*
    * the locally-stored blocks.  It's invoked upon startup and then
    * infrequently afterwards.
+   * @param registration
+   * @param blocks - the block list as an array of longs.
+   *     Each block is represented as 2 longs.
+   *     This is done instead of Block[] to reduce memory used by block 
reports.
+   *     
+   * @return - the next command for DN to process.
+   * @throws IOException
    */
   public DatanodeCommand blockReport(DatanodeRegistration registration,
-                                     Block blocks[]) throws IOException;
+                                     long[] blocks) throws IOException;
     
   /**
    * blockReceived() allows the DataNode to tell the NameNode about

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=609925&r1=609924&r2=609925&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue 
Jan  8 02:39:55 2008
@@ -2296,11 +2296,12 @@
    * update the (machine-->blocklist) and (block-->machinelist) tables.
    */
   public synchronized Block[] processReport(DatanodeID nodeID, 
-                                            Block newReport[]
+                                            BlockListAsLongs newReport
                                             ) throws IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
-                                    +"from "+nodeID.getName()+" 
"+newReport.length+" blocks");
+                             + "from " + nodeID.getName()+" " + 
+                             newReport.getNumberOfBlocks()+" blocks");
     }
     DatanodeDescriptor node = getDatanode(nodeID);
     if (node == null) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=609925&r1=609924&r2=609925&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Tue Jan  8 
02:39:55 2008
@@ -602,12 +602,13 @@
   }
 
   public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
-                                     Block blocks[]) throws IOException {
+                                     long[] blocks) throws IOException {
     verifyRequest(nodeReg);
+    BlockListAsLongs blist = new BlockListAsLongs(blocks);
     stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
-                         +"from "+nodeReg.getName()+" "+blocks.length+" 
blocks");
+           +"from "+nodeReg.getName()+" "+blist.getNumberOfBlocks() +" 
blocks");
 
-    Block blocksToDelete[] = namesystem.processReport(nodeReg, blocks);
+    Block blocksToDelete[] = namesystem.processReport(nodeReg, blist);
     if (blocksToDelete != null && blocksToDelete.length > 0)
       return new BlockCommand(blocksToDelete);
     if (getFSImage().isUpgradeFinalized())

Modified: 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java?rev=609925&r1=609924&r2=609925&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java 
(original)
+++ 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java 
Tue Jan  8 02:39:55 2008
@@ -693,7 +693,8 @@
       assert daemonId < numThreads : "Wrong daemonId.";
       TinyDatanode dn = datanodes[daemonId];
       long start = System.currentTimeMillis();
-      nameNode.blockReport(dn.dnRegistration, dn.blocks);
+      nameNode.blockReport(dn.dnRegistration,
+          BlockListAsLongs.convertToArrayLongs(dn.blocks));
       long end = System.currentTimeMillis();
       return end-start;
     }


Reply via email to