Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java?view=auto&rev=533966 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java Tue May 1 01:39:50 2007 @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.dfs; + +import java.util.*; + +/* Class for keeping track of under replication blocks + * Blocks have replication priority, with priority 0 indicating the highest + * Blocks have only one replicas has the highest + */ +class UnderReplicatedBlocks { + private static final int LEVEL = 3; + private List<TreeSet<Block>> priorityQueues = new ArrayList<TreeSet<Block>>(); + + /* constructor */ + UnderReplicatedBlocks() { + for(int i=0; i<LEVEL; i++) { + priorityQueues.add(new TreeSet<Block>()); + } + } + + /* Return the total number of under replication blocks */ + synchronized int size() { + int size = 0; + for (int i=0; i<LEVEL; i++) { + size += priorityQueues.get(i).size(); + } + return size; + } + + /* Check if a block is in the neededReplication queue */ + synchronized boolean contains(Block block) { + for(TreeSet<Block> set:priorityQueues) { + if(set.contains(block)) { return true; } + } + return false; + } + + /* Return the priority of a block + * @param block a under replication block + * @param curReplicas current number of replicas of the block + * @param expectedReplicas expected number of replicas of the block + */ + private int getPriority(Block block, + int curReplicas, int expectedReplicas) { + if (curReplicas<=0 || curReplicas>=expectedReplicas) { + return LEVEL; // no need to replicate + } else if(curReplicas==1) { + return 0; // highest priority + } else if(curReplicas*3<expectedReplicas) { + return 1; + } else { + return 2; + } + } + + /* add a block to a under replication queue according to its priority + * @param block a under replication block + * @param curReplicas current number of replicas of the block + * @param expectedReplicas expected number of replicas of the block + */ + synchronized boolean add( + Block block, int curReplicas, int expectedReplicas) { + if(curReplicas<=0 || expectedReplicas <= curReplicas) { + return false; + } + int priLevel = getPriority(block, curReplicas, expectedReplicas); + if(priorityQueues.get(priLevel).add(block)) { + NameNode.stateChangeLog.debug( + "BLOCK* NameSystem.UnderReplicationBlock.add:" + + block.getBlockName() + + " has only "+curReplicas + + " replicas and need " + expectedReplicas + + " replicas so is added to neededReplications" + + " at priority level " + priLevel); + return true; + } + return false; + } + + /* remove a block from a under replication queue */ + synchronized boolean remove(Block block, + int oldReplicas, int oldExpectedReplicas) { + int priLevel = getPriority(block, oldReplicas, oldExpectedReplicas); + return remove(block, priLevel); + } + + /* remove a block from a under replication queue given a priority*/ + private boolean remove(Block block, int priLevel) { + if(priLevel >= 0 && priLevel < LEVEL + && priorityQueues.get(priLevel).remove(block)) { + NameNode.stateChangeLog.debug( + "BLOCK* NameSystem.UnderReplicationBlock.remove: " + + "Removing block " + block.getBlockName() + + " from priority queue "+ priLevel); + return true; + } else { + for(int i=0; i<LEVEL; i++) { + if(i!=priLevel && priorityQueues.get(i).remove(block)) { + NameNode.stateChangeLog.debug( + "BLOCK* NameSystem.UnderReplicationBlock.remove: " + + "Removing block " + block.getBlockName() + + " from priority queue "+ i); + return true; + } + } + } + return false; + } + + /* update the priority level of a block */ + synchronized void update(Block block, int curReplicas, int curExpectedReplicas, + int curReplicasDelta, int expectedReplicasDelta) { + int oldReplicas = curReplicas-curReplicasDelta; + int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; + int curPri = getPriority(block, curReplicas, curExpectedReplicas); + int oldPri = getPriority(block, oldReplicas, oldExpectedReplicas); + NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + + block + + " curReplicas " + curReplicas + + " curExpectedReplicas " + curExpectedReplicas + + " oldReplicas " + oldReplicas + + " oldExpectedReplicas " + oldExpectedReplicas + + " curPri " + curPri + + " oldPri " + oldPri); + if(oldPri != LEVEL && oldPri != curPri) { + remove(block, oldPri); + } + if(curPri != LEVEL && oldPri != curPri + && priorityQueues.get(curPri).add(block)) { + NameNode.stateChangeLog.debug( + "BLOCK* NameSystem.UnderReplicationBlock.update:" + + block.getBlockName() + + " has only "+curReplicas + + " replicas and need " + curExpectedReplicas + + " replicas so is added to neededReplications" + + " at priority level " + curPri); + } + } + + /* return a iterator of all the under replication blocks */ + synchronized Iterator<Block> iterator() { + return new Iterator<Block>() { + private int level; + private List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>(); + + { + level=0; + for(int i=0; i<LEVEL; i++) { + iterators.add(priorityQueues.get(i).iterator()); + } + } + + private void update() { + while(level< LEVEL-1 && !iterators.get(level).hasNext()) { + level++; + } + } + + public Block next() { + update(); + return iterators.get(level).next(); + } + + public boolean hasNext() { + update(); + return iterators.get(level).hasNext(); + } + + public void remove() { + iterators.get(level).remove(); + } + }; + } +}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java?view=diff&rev=533966&r1=533965&r2=533966 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java Tue May 1 01:39:50 2007 @@ -3,7 +3,7 @@ import junit.framework.TestCase; public class TestHost2NodesMap extends TestCase { - static private FSNamesystem.Host2NodesMap map = new FSNamesystem.Host2NodesMap(); + static private Host2NodesMap map = new Host2NodesMap(); private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] { new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"), new DatanodeDescriptor(new DatanodeID("h2:5020", "0", -1), "/d1/r1"), Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java?view=diff&rev=533966&r1=533965&r2=533966 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java Tue May 1 01:39:50 2007 @@ -15,7 +15,7 @@ private static final Configuration CONF = new Configuration(); private static final NetworkTopology cluster; private static NameNode namenode; - private static FSNamesystem.ReplicationTargetChooser replicator; + private static ReplicationTargetChooser replicator; private static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] { new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"),