Added: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java?view=auto&rev=533966
==============================================================================
--- 
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java 
(added)
+++ 
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java 
Tue May  1 01:39:50 2007
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.util.*;
+
+/* Class for keeping track of under replication blocks
+ * Blocks have replication priority, with priority 0 indicating the highest
+ * Blocks have only one replicas has the highest
+ */
+class UnderReplicatedBlocks {
+  private static final int LEVEL = 3;
+  private List<TreeSet<Block>> priorityQueues = new 
ArrayList<TreeSet<Block>>();
+      
+  /* constructor */
+  UnderReplicatedBlocks() {
+    for(int i=0; i<LEVEL; i++) {
+      priorityQueues.add(new TreeSet<Block>());
+    }
+  }
+      
+  /* Return the total number of under replication blocks */
+  synchronized int size() {
+    int size = 0;
+    for (int i=0; i<LEVEL; i++) {
+      size += priorityQueues.get(i).size();
+    }
+    return size;
+  }
+        
+  /* Check if a block is in the neededReplication queue */
+  synchronized boolean contains(Block block) {
+    for(TreeSet<Block> set:priorityQueues) {
+      if(set.contains(block)) { return true; }
+    }
+    return false;
+  }
+      
+  /* Return the priority of a block
+   * @param block a under replication block
+   * @param curReplicas current number of replicas of the block
+   * @param expectedReplicas expected number of replicas of the block
+   */
+  private int getPriority(Block block, 
+                          int curReplicas, int expectedReplicas) {
+    if (curReplicas<=0 || curReplicas>=expectedReplicas) {
+      return LEVEL; // no need to replicate
+    } else if(curReplicas==1) {
+      return 0; // highest priority
+    } else if(curReplicas*3<expectedReplicas) {
+      return 1;
+    } else {
+      return 2;
+    }
+  }
+      
+  /* add a block to a under replication queue according to its priority
+   * @param block a under replication block
+   * @param curReplicas current number of replicas of the block
+   * @param expectedReplicas expected number of replicas of the block
+   */
+  synchronized boolean add(
+                           Block block, int curReplicas, int expectedReplicas) 
{
+    if(curReplicas<=0 || expectedReplicas <= curReplicas) {
+      return false;
+    }
+    int priLevel = getPriority(block, curReplicas, expectedReplicas);
+    if(priorityQueues.get(priLevel).add(block)) {
+      NameNode.stateChangeLog.debug(
+                                    "BLOCK* 
NameSystem.UnderReplicationBlock.add:"
+                                    + block.getBlockName()
+                                    + " has only "+curReplicas
+                                    + " replicas and need " + expectedReplicas
+                                    + " replicas so is added to 
neededReplications"
+                                    + " at priority level " + priLevel);
+      return true;
+    }
+    return false;
+  }
+
+  /* remove a block from a under replication queue */
+  synchronized boolean remove(Block block, 
+                              int oldReplicas, int oldExpectedReplicas) {
+    int priLevel = getPriority(block, oldReplicas, oldExpectedReplicas);
+    return remove(block, priLevel);
+  }
+      
+  /* remove a block from a under replication queue given a priority*/
+  private boolean remove(Block block, int priLevel) {
+    if(priLevel >= 0 && priLevel < LEVEL 
+        && priorityQueues.get(priLevel).remove(block)) {
+      NameNode.stateChangeLog.debug(
+                                    "BLOCK* 
NameSystem.UnderReplicationBlock.remove: "
+                                    + "Removing block " + block.getBlockName()
+                                    + " from priority queue "+ priLevel);
+      return true;
+    } else {
+      for(int i=0; i<LEVEL; i++) {
+        if(i!=priLevel && priorityQueues.get(i).remove(block)) {
+          NameNode.stateChangeLog.debug(
+                                        "BLOCK* 
NameSystem.UnderReplicationBlock.remove: "
+                                        + "Removing block " + 
block.getBlockName()
+                                        + " from priority queue "+ i);
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+      
+  /* update the priority level of a block */
+  synchronized void update(Block block, int curReplicas, int 
curExpectedReplicas,
+                           int curReplicasDelta, int expectedReplicasDelta) {
+    int oldReplicas = curReplicas-curReplicasDelta;
+    int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
+    int curPri = getPriority(block, curReplicas, curExpectedReplicas);
+    int oldPri = getPriority(block, oldReplicas, oldExpectedReplicas);
+    NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + 
+                                  block +
+                                  " curReplicas " + curReplicas +
+                                  " curExpectedReplicas " + 
curExpectedReplicas +
+                                  " oldReplicas " + oldReplicas +
+                                  " oldExpectedReplicas  " + 
oldExpectedReplicas +
+                                  " curPri  " + curPri +
+                                  " oldPri  " + oldPri);
+    if(oldPri != LEVEL && oldPri != curPri) {
+      remove(block, oldPri);
+    }
+    if(curPri != LEVEL && oldPri != curPri 
+        && priorityQueues.get(curPri).add(block)) {
+      NameNode.stateChangeLog.debug(
+                                    "BLOCK* 
NameSystem.UnderReplicationBlock.update:"
+                                    + block.getBlockName()
+                                    + " has only "+curReplicas
+                                    + " replicas and need " + 
curExpectedReplicas
+                                    + " replicas so is added to 
neededReplications"
+                                    + " at priority level " + curPri);
+    }
+  }
+      
+  /* return a iterator of all the under replication blocks */
+  synchronized Iterator<Block> iterator() {
+    return new Iterator<Block>() {
+      private int level;
+      private List<Iterator<Block>> iterators = new 
ArrayList<Iterator<Block>>();
+              
+      {
+        level=0;
+        for(int i=0; i<LEVEL; i++) {
+          iterators.add(priorityQueues.get(i).iterator());
+        }
+      }
+              
+      private void update() {
+        while(level< LEVEL-1 && !iterators.get(level).hasNext()) {
+          level++;
+        }
+      }
+              
+      public Block next() {
+        update();
+        return iterators.get(level).next();
+      }
+              
+      public boolean hasNext() {
+        update();
+        return iterators.get(level).hasNext();
+      }
+              
+      public void remove() {
+        iterators.get(level).remove();
+      }
+    };
+  }
+}

Modified: 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java?view=diff&rev=533966&r1=533965&r2=533966
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java 
(original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java 
Tue May  1 01:39:50 2007
@@ -3,7 +3,7 @@
 import junit.framework.TestCase;
 
 public class TestHost2NodesMap extends TestCase {
-  static private FSNamesystem.Host2NodesMap map = new 
FSNamesystem.Host2NodesMap();
+  static private Host2NodesMap map = new Host2NodesMap();
   private final static DatanodeDescriptor dataNodes[] = new 
DatanodeDescriptor[] {
     new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"),
     new DatanodeDescriptor(new DatanodeID("h2:5020", "0", -1), "/d1/r1"),

Modified: 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java?view=diff&rev=533966&r1=533965&r2=533966
==============================================================================
--- 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java 
(original)
+++ 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java 
Tue May  1 01:39:50 2007
@@ -15,7 +15,7 @@
   private static final Configuration CONF = new Configuration();
   private static final NetworkTopology cluster;
   private static NameNode namenode;
-  private static FSNamesystem.ReplicationTargetChooser replicator;
+  private static ReplicationTargetChooser replicator;
   private static DatanodeDescriptor dataNodes[] = 
     new DatanodeDescriptor[] {
       new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"),


Reply via email to