http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java
index bcc880c..d954d04 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java
@@ -15,16 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-// This code has been copied from hadoop-common 0.23.1
 package org.apache.bookkeeper.net;
 
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class NetUtils {
+    private static final Logger logger = 
LoggerFactory.getLogger(NetUtils.class);
 
     /**
      * Given a string representation of a host, return its ip address
@@ -58,4 +62,24 @@ public class NetUtils {
         return hostNames;
     }
 
+    public static String resolveNetworkLocation(DNSToSwitchMapping 
dnsResolver, InetSocketAddress addr) {
+        List<String> names = new ArrayList<String>(1);
+        if (dnsResolver instanceof CachedDNSToSwitchMapping) {
+            names.add(addr.getAddress().getHostAddress());
+        } else {
+            names.add(addr.getHostName());
+        }
+        // resolve network addresses
+        List<String> rNames = dnsResolver.resolve(names);
+        String netLoc;
+        if (null == rNames) {
+            logger.warn("Failed to resolve network location for {}, using 
default rack for them : {}.", names,
+                NetworkTopology.DEFAULT_RACK);
+            netLoc = NetworkTopology.DEFAULT_RACK;
+        } else {
+            netLoc = rNames.get(0);
+        }
+        return netLoc;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
index 26abc96..18f3ec9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,863 +15,64 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-// This code has been copied from hadoop-common 0.23.1
 package org.apache.bookkeeper.net;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * The class represents a cluster of computer with a tree hierarchical
- * network topology.
- * For example, a cluster may be consists of many data centers filled
- * with racks of computers.
- * In a network topology, leaves represent data nodes (computers) and inner
- * nodes represent switches/routers that manage traffic in/out of data centers
- * or racks.
- *
+ * Network Topology Interface
  */
-public class NetworkTopology {
-
-    public final static String DEFAULT_RACK = "/default-rack";
-    public final static int DEFAULT_HOST_LEVEL = 2;
-    public static final Logger LOG = 
LoggerFactory.getLogger(NetworkTopology.class);
-
-    public static class InvalidTopologyException extends RuntimeException {
-        private static final long serialVersionUID = 1L;
-
-        public InvalidTopologyException(String msg) {
-            super(msg);
-        }
-    }
-
-    /** InnerNode represents a switch/router of a data center or rack.
-     * Different from a leaf node, it has non-null children.
-     */
-    static class InnerNode extends NodeBase {
-        protected List<Node> children = new ArrayList<Node>();
-        private int numOfLeaves;
-
-        /** Construct an InnerNode from a path-like string */
-        InnerNode(String path) {
-            super(path);
-        }
-
-        /** Construct an InnerNode from its name and its network location */
-        InnerNode(String name, String location) {
-            super(name, location);
-        }
-
-        /** Construct an InnerNode
-         * from its name, its network location, its parent, and its level */
-        InnerNode(String name, String location, InnerNode parent, int level) {
-            super(name, location, parent, level);
-        }
-
-        /** @return its children */
-        List<Node> getChildren() {
-            return children;
-        }
-
-        /** @return the number of children this node has */
-        int getNumOfChildren() {
-            return children.size();
-        }
-
-        /** Judge if this node represents a rack
-         * @return true if it has no child or its children are not InnerNodes
-         */
-        boolean isRack() {
-            if (children.isEmpty()) {
-                return true;
-            }
-
-            Node firstChild = children.get(0);
-            if (firstChild instanceof InnerNode) {
-                return false;
-            }
-
-            return true;
-        }
-
-        /** Judge if this node is an ancestor of node <i>n</i>
-         *
-         * @param n a node
-         * @return true if this node is an ancestor of <i>n</i>
-         */
-        boolean isAncestor(Node n) {
-            return getPath(this).equals(NodeBase.PATH_SEPARATOR_STR)
-                    || (n.getNetworkLocation() + 
NodeBase.PATH_SEPARATOR_STR).startsWith(getPath(this)
-                            + NodeBase.PATH_SEPARATOR_STR);
-        }
-
-        /** Judge if this node is the parent of node <i>n</i>
-         *
-         * @param n a node
-         * @return true if this node is the parent of <i>n</i>
-         */
-        boolean isParent(Node n) {
-            return n.getNetworkLocation().equals(getPath(this));
-        }
-
-        /* Return a child name of this node who is an ancestor of node 
<i>n</i> */
-        private String getNextAncestorName(Node n) {
-            if (!isAncestor(n)) {
-                throw new IllegalArgumentException(this + "is not an ancestor 
of " + n);
-            }
-            String name = 
n.getNetworkLocation().substring(getPath(this).length());
-            if (name.charAt(0) == PATH_SEPARATOR) {
-                name = name.substring(1);
-            }
-            int index = name.indexOf(PATH_SEPARATOR);
-            if (index != -1)
-                name = name.substring(0, index);
-            return name;
-        }
-
-        /** Add node <i>n</i> to the subtree of this node
-         * @param n node to be added
-         * @return true if the node is added; false otherwise
-         */
-        boolean add(Node n) {
-            if (!isAncestor(n))
-                throw new IllegalArgumentException(n.getName() + ", which is 
located at " + n.getNetworkLocation()
-                        + ", is not a decendent of " + getPath(this));
-            if (isParent(n)) {
-                // this node is the parent of n; add n directly
-                n.setParent(this);
-                n.setLevel(this.level + 1);
-                for (int i = 0; i < children.size(); i++) {
-                    if (children.get(i).getName().equals(n.getName())) {
-                        children.set(i, n);
-                        return false;
-                    }
-                }
-                children.add(n);
-                numOfLeaves++;
-                return true;
-            } else {
-                // find the next ancestor node
-                String parentName = getNextAncestorName(n);
-                InnerNode parentNode = null;
-                for (int i = 0; i < children.size(); i++) {
-                    if (children.get(i).getName().equals(parentName)) {
-                        parentNode = (InnerNode) children.get(i);
-                        break;
-                    }
-                }
-                if (parentNode == null) {
-                    // create a new InnerNode
-                    parentNode = createParentNode(parentName);
-                    children.add(parentNode);
-                }
-                // add n to the subtree of the next ancestor node
-                if (parentNode.add(n)) {
-                    numOfLeaves++;
-                    return true;
-                } else {
-                    return false;
-                }
-            }
-        }
-
-        /**
-         * Creates a parent node to be added to the list of children.
-         * Creates a node using the InnerNode four argument constructor 
specifying
-         * the name, location, parent, and level of this node.
-         *
-         * <p>To be overridden in subclasses for specific InnerNode 
implementations,
-         * as alternative to overriding the full {@link #add(Node)} method.
-         *
-         * @param parentName The name of the parent node
-         * @return A new inner node
-         * @see InnerNode#InnerNode(String, String, InnerNode, int)
-         */
-        protected InnerNode createParentNode(String parentName) {
-            return new InnerNode(parentName, getPath(this), this, 
this.getLevel() + 1);
-        }
+public interface NetworkTopology {
 
-        /** Remove node <i>n</i> from the subtree of this node
-         * @param n node to be deleted
-         * @return true if the node is deleted; false otherwise
-         */
-        boolean remove(Node n) {
-            String parent = n.getNetworkLocation();
-            String currentPath = getPath(this);
-            if (!isAncestor(n))
-                throw new IllegalArgumentException(n.getName() + ", which is 
located at " + parent
-                        + ", is not a descendent of " + currentPath);
-            if (isParent(n)) {
-                // this node is the parent of n; remove n directly
-                for (int i = 0; i < children.size(); i++) {
-                    if (children.get(i).getName().equals(n.getName())) {
-                        children.remove(i);
-                        numOfLeaves--;
-                        n.setParent(null);
-                        return true;
-                    }
-                }
-                return false;
-            } else {
-                // find the next ancestor node: the parent node
-                String parentName = getNextAncestorName(n);
-                InnerNode parentNode = null;
-                int i;
-                for (i = 0; i < children.size(); i++) {
-                    if (children.get(i).getName().equals(parentName)) {
-                        parentNode = (InnerNode) children.get(i);
-                        break;
-                    }
-                }
-                if (parentNode == null) {
-                    return false;
-                }
-                // remove n from the parent node
-                boolean isRemoved = parentNode.remove(n);
-                // if the parent node has no children, remove the parent node 
too
-                if (isRemoved) {
-                    if (parentNode.getNumOfChildren() == 0) {
-                        children.remove(i);
-                    }
-                    numOfLeaves--;
-                }
-                return isRemoved;
-            }
-        } // end of remove
-
-        /** Given a node's string representation, return a reference to the 
node
-         * @param loc string location of the form /rack/node
-         * @return null if the node is not found or the childnode is there but
-         * not an instance of {@link InnerNode}
-         */
-        private Node getLoc(String loc) {
-            if (loc == null || loc.length() == 0)
-                return this;
-
-            String[] path = loc.split(PATH_SEPARATOR_STR, 2);
-            Node childnode = null;
-            for (int i = 0; i < children.size(); i++) {
-                if (children.get(i).getName().equals(path[0])) {
-                    childnode = children.get(i);
-                }
-            }
-            if (childnode == null)
-                return null; // non-existing node
-            if (path.length == 1)
-                return childnode;
-            if (childnode instanceof InnerNode) {
-                return ((InnerNode) childnode).getLoc(path[1]);
-            } else {
-                return null;
-            }
-        }
-
-        /** get <i>leafIndex</i> leaf of this subtree
-         * if it is not in the <i>excludedNode</i>
-         *
-         * @param leafIndex an indexed leaf of the node
-         * @param excludedNode an excluded node (can be null)
-         * @return
-         */
-        Node getLeaf(int leafIndex, Node excludedNode) {
-            int count = 0;
-            // check if the excluded node a leaf
-            boolean isLeaf = excludedNode == null || !(excludedNode instanceof 
InnerNode);
-            // calculate the total number of excluded leaf nodes
-            int numOfExcludedLeaves = isLeaf ? 1 : ((InnerNode) 
excludedNode).getNumOfLeaves();
-            if (isLeafParent()) { // children are leaves
-                if (isLeaf) { // excluded node is a leaf node
-                    int excludedIndex = children.indexOf(excludedNode);
-                    if (excludedIndex != -1 && leafIndex >= 0) {
-                        // excluded node is one of the children so adjust the 
leaf index
-                        leafIndex = leafIndex >= excludedIndex ? leafIndex + 1 
: leafIndex;
-                    }
-                }
-                // range check
-                if (leafIndex < 0 || leafIndex >= this.getNumOfChildren()) {
-                    return null;
-                }
-                return children.get(leafIndex);
-            } else {
-                for (int i = 0; i < children.size(); i++) {
-                    InnerNode child = (InnerNode) children.get(i);
-                    if (excludedNode == null || excludedNode != child) {
-                        // not the excludedNode
-                        int numOfLeaves = child.getNumOfLeaves();
-                        if (excludedNode != null && 
child.isAncestor(excludedNode)) {
-                            numOfLeaves -= numOfExcludedLeaves;
-                        }
-                        if (count + numOfLeaves > leafIndex) {
-                            // the leaf is in the child subtree
-                            return child.getLeaf(leafIndex - count, 
excludedNode);
-                        } else {
-                            // go to the next child
-                            count = count + numOfLeaves;
-                        }
-                    } else { // it is the excluededNode
-                        // skip it and set the excludedNode to be null
-                        excludedNode = null;
-                    }
-                }
-                return null;
-            }
-        }
-
-        protected boolean isLeafParent() {
-            return isRack();
-        }
-
-        /**
-          * Determine if children a leaves, default implementation calls 
{@link #isRack()}
-          * <p>To be overridden in subclasses for specific InnerNode 
implementations,
-          * as alternative to overriding the full {@link #getLeaf(int, Node)} 
method.
-          *
-          * @return true if children are leaves, false otherwise
-          */
-        protected boolean areChildrenLeaves() {
-            return isRack();
-        }
-
-        /**
-         * Get number of leaves.
-         */
-        int getNumOfLeaves() {
-            return numOfLeaves;
-        }
-    } // end of InnerNode
-
-    /**
-     * the root cluster map
-     */
-    InnerNode clusterMap;
-    /** Depth of all leaf nodes */
-    private int depthOfAllLeaves = -1;
-    /** rack counter */
-    protected int numOfRacks = 0;
-    /** the lock used to manage access */
-    protected ReadWriteLock netlock = new ReentrantReadWriteLock();
-
-    public NetworkTopology() {
-        clusterMap = new InnerNode(InnerNode.ROOT);
-    }
-
-    /** Add a leaf node
-     * Update node counter & rack counter if necessary
-     * @param node node to be added; can be null
-     * @exception IllegalArgumentException if add a node to a leave
-                                           or node to be added is not a leaf
-     */
-    public void add(Node node) {
-        if (node == null)
-            return;
-        String oldTopoStr = this.toString();
-        if (node instanceof InnerNode) {
-            throw new IllegalArgumentException("Not allow to add an inner 
node: " + NodeBase.getPath(node));
-        }
-        int newDepth = NodeBase.locationToDepth(node.getNetworkLocation()) + 1;
-        netlock.writeLock().lock();
-        try {
-            if ((depthOfAllLeaves != -1) && (depthOfAllLeaves != newDepth)) {
-                LOG.error("Error: can't add leaf node at depth " + newDepth + 
" to topology:\n" + oldTopoStr);
-                throw new InvalidTopologyException("Invalid network topology. "
-                        + "You cannot have a rack and a non-rack node at the 
same level of the network topology.");
-            }
-            Node rack = getNodeForNetworkLocation(node);
-            if (rack != null && !(rack instanceof InnerNode)) {
-                throw new IllegalArgumentException("Unexpected data node " + 
node.toString()
-                        + " at an illegal network location");
-            }
-            if (clusterMap.add(node)) {
-                LOG.info("Adding a new node: " + NodeBase.getPath(node));
-                if (rack == null) {
-                    numOfRacks++;
-                }
-                if (!(node instanceof InnerNode)) {
-                    if (depthOfAllLeaves == -1) {
-                        depthOfAllLeaves = node.getLevel();
-                    }
-                }
-            }
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("NetworkTopology became:\n" + this.toString());
-            }
-        } finally {
-            netlock.writeLock().unlock();
-        }
-    }
+    public final static String DEFAULT_REGION = "/default-region";
+    public final static String DEFAULT_RACK = "/default-region/default-rack";
 
     /**
-     * Return a reference to the node given its string representation.
-     * Default implementation delegates to {@link #getNode(String)}.
-     *
-     * <p>To be overridden in subclasses for specific NetworkTopology
-     * implementations, as alternative to overriding the full {@link 
#add(Node)}
-     *  method.
+     * Add a node to the network topology
      *
-     * @param node The string representation of this node's network location is
-     * used to retrieve a Node object.
-     * @return a reference to the node; null if the node is not in the tree
-     *
-     * @see #add(Node)
-     * @see #getNode(String)
+     * @param node
+     *          add the node to network topology
      */
-    protected Node getNodeForNetworkLocation(Node node) {
-        return getNode(node.getNetworkLocation());
-    }
+    void add(Node node);
 
     /**
-     * Given a string representation of a rack, return its children
-     * @param loc a path-like string representation of a rack
-     * @return a newly allocated list with all the node's children
-     */
-    public List<Node> getDatanodesInRack(String loc) {
-        netlock.readLock().lock();
-        try {
-            loc = NodeBase.normalize(loc);
-            if (!NodeBase.ROOT.equals(loc)) {
-                loc = loc.substring(1);
-            }
-            InnerNode rack = (InnerNode) clusterMap.getLoc(loc);
-            if (rack == null) {
-                return null;
-            }
-            return new ArrayList<Node>(rack.getChildren());
-        } finally {
-            netlock.readLock().unlock();
-        }
-    }
-
-    /** Remove a node
-     * Update node counter and rack counter if necessary
-     * @param node node to be removed; can be null
-     */
-    public void remove(Node node) {
-        if (node == null)
-            return;
-        if (node instanceof InnerNode) {
-            throw new IllegalArgumentException("Not allow to remove an inner 
node: " + NodeBase.getPath(node));
-        }
-        LOG.info("Removing a node: " + NodeBase.getPath(node));
-        netlock.writeLock().lock();
-        try {
-            if (clusterMap.remove(node)) {
-                InnerNode rack = (InnerNode) 
getNode(node.getNetworkLocation());
-                if (rack == null) {
-                    numOfRacks--;
-                }
-            }
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("NetworkTopology became:\n" + this.toString());
-            }
-        } finally {
-            netlock.writeLock().unlock();
-        }
-    }
-
-    /** Check if the tree contains node <i>node</i>
+     * Remove a node from nework topology
      *
-     * @param node a node
-     * @return true if <i>node</i> is already in the tree; false otherwise
+     * @param node
+     *          remove the node from network topology
      */
-    public boolean contains(Node node) {
-        if (node == null)
-            return false;
-        netlock.readLock().lock();
-        try {
-            Node parent = node.getParent();
-            for (int level = node.getLevel(); parent != null && level > 0; 
parent = parent.getParent(), level--) {
-                if (parent == clusterMap) {
-                    return true;
-                }
-            }
-        } finally {
-            netlock.readLock().unlock();
-        }
-        return false;
-    }
-
-    /** Given a string representation of a node, return its reference
-     *
-     * @param loc
-     *          a path-like string representation of a node
-     * @return a reference to the node; null if the node is not in the tree
-     */
-    public Node getNode(String loc) {
-        netlock.readLock().lock();
-        try {
-            loc = NodeBase.normalize(loc);
-            if (!NodeBase.ROOT.equals(loc))
-                loc = loc.substring(1);
-            return clusterMap.getLoc(loc);
-        } finally {
-            netlock.readLock().unlock();
-        }
-    }
-
-    /** Given a string representation of a rack for a specific network
-     *  location
-     *
-     * To be overridden in subclasses for specific NetworkTopology
-     * implementations, as alternative to overriding the full
-     * {@link #getRack(String)} method.
-     * @param loc
-     *          a path-like string representation of a network location
-     * @return a rack string
-     */
-    public String getRack(String loc) {
-        return loc;
-    }
-
-    /** @return the total number of racks */
-    public int getNumOfRacks() {
-        netlock.readLock().lock();
-        try {
-            return numOfRacks;
-        } finally {
-            netlock.readLock().unlock();
-        }
-    }
-
-    /** @return the total number of leaf nodes */
-    public int getNumOfLeaves() {
-        netlock.readLock().lock();
-        try {
-            return clusterMap.getNumOfLeaves();
-        } finally {
-            netlock.readLock().unlock();
-        }
-    }
-
-    /** Return the distance between two nodes
-     * It is assumed that the distance from one node to its parent is 1
-     * The distance between two nodes is calculated by summing up their 
distances
-     * to their closest common ancestor.
-     * @param node1 one node
-     * @param node2 another node
-     * @return the distance between node1 and node2 which is zero if they are 
the same
-     *  or {@link Integer#MAX_VALUE} if node1 or node2 do not belong to the 
cluster
-     */
-    public int getDistance(Node node1, Node node2) {
-        if (node1 == node2) {
-            return 0;
-        }
-        Node n1 = node1, n2 = node2;
-        int dis = 0;
-        netlock.readLock().lock();
-        try {
-            int level1 = node1.getLevel(), level2 = node2.getLevel();
-            while (n1 != null && level1 > level2) {
-                n1 = n1.getParent();
-                level1--;
-                dis++;
-            }
-            while (n2 != null && level2 > level1) {
-                n2 = n2.getParent();
-                level2--;
-                dis++;
-            }
-            while (n1 != null && n2 != null && n1.getParent() != 
n2.getParent()) {
-                n1 = n1.getParent();
-                n2 = n2.getParent();
-                dis += 2;
-            }
-        } finally {
-            netlock.readLock().unlock();
-        }
-        if (n1 == null) {
-            LOG.warn("The cluster does not contain node: " + 
NodeBase.getPath(node1));
-            return Integer.MAX_VALUE;
-        }
-        if (n2 == null) {
-            LOG.warn("The cluster does not contain node: " + 
NodeBase.getPath(node2));
-            return Integer.MAX_VALUE;
-        }
-        return dis + 2;
-    }
-
-    /** Check if two nodes are on the same rack
-     * @param node1 one node (can be null)
-     * @param node2 another node (can be null)
-     * @return true if node1 and node2 are on the same rack; false otherwise
-     * @exception IllegalArgumentException when either node1 or node2 is null, 
or
-     * node1 or node2 do not belong to the cluster
-     */
-    public boolean isOnSameRack(Node node1, Node node2) {
-        if (node1 == null || node2 == null) {
-            return false;
-        }
-
-        netlock.readLock().lock();
-        try {
-            return isSameParents(node1, node2);
-        } finally {
-            netlock.readLock().unlock();
-        }
-    }
-
-    /**
-     * Check if network topology is aware of NodeGroup
-     */
-    public boolean isNodeGroupAware() {
-        return false;
-    }
+    void remove(Node node);
 
     /**
-     * Return false directly as not aware of NodeGroup, to be override in 
sub-class
-     */
-    public boolean isOnSameNodeGroup(Node node1, Node node2) {
-        return false;
-    }
-
-    /**
-     * Compare the parents of each node for equality
-     *
-     * <p>To be overridden in subclasses for specific NetworkTopology
-     * implementations, as alternative to overriding the full
-     * {@link #isOnSameRack(Node, Node)} method.
+     * Check if the tree contains node <i>node</i>.
      *
-     * @param node1 the first node to compare
-     * @param node2 the second node to compare
-     * @return true if their parents are equal, false otherwise
-     *
-     * @see #isOnSameRack(Node, Node)
-     */
-    protected boolean isSameParents(Node node1, Node node2) {
-        return node1.getParent() == node2.getParent();
-    }
-
-    final protected static Random r = new Random();
-
-    /** randomly choose one node from <i>scope</i>
-     * if scope starts with ~, choose one from the all nodes except for the
-     * ones in <i>scope</i>; otherwise, choose one from <i>scope</i>
-     * @param scope range of nodes from which a node will be chosen
-     * @return the chosen node
-     */
-    public Node chooseRandom(String scope) {
-        netlock.readLock().lock();
-        try {
-            if (scope.startsWith("~")) {
-                return chooseRandom(NodeBase.ROOT, scope.substring(1));
-            } else {
-                return chooseRandom(scope, null);
-            }
-        } finally {
-            netlock.readLock().unlock();
-        }
-    }
-
-    private Node chooseRandom(String scope, String excludedScope) {
-        if (excludedScope != null) {
-            if (scope.startsWith(excludedScope)) {
-                return null;
-            }
-            if (!excludedScope.startsWith(scope)) {
-                excludedScope = null;
-            }
-        }
-        Node node = getNode(scope);
-        if (!(node instanceof InnerNode)) {
-            return node;
-        }
-        InnerNode innerNode = (InnerNode) node;
-        int numOfDatanodes = innerNode.getNumOfLeaves();
-        if (excludedScope == null) {
-            node = null;
-        } else {
-            node = getNode(excludedScope);
-            if (!(node instanceof InnerNode)) {
-                numOfDatanodes -= 1;
-            } else {
-                numOfDatanodes -= ((InnerNode) node).getNumOfLeaves();
-            }
-        }
-        int leaveIndex = r.nextInt(numOfDatanodes);
-        return innerNode.getLeaf(leaveIndex, node);
-    }
-
-    /** return leaves in <i>scope</i>
-     * @param scope a path string
-     * @return leaves nodes under specific scope
-     */
-    private Set<Node> doGetLeaves(String scope) {
-        Node node = getNode(scope);
-        Set<Node> leafNodes = new HashSet<Node>();
-        if (!(node instanceof InnerNode)) {
-            leafNodes.add(node);
-        } else {
-            InnerNode innerNode = (InnerNode) node;
-            for (int i = 0; i < innerNode.getNumOfLeaves(); i++) {
-                leafNodes.add(innerNode.getLeaf(i, null));
-            }
-        }
-        return leafNodes;
-    }
-
-    public Set<Node> getLeaves(String scope) {
-        netlock.readLock().lock();
-        try {
-            if (scope.startsWith("~")) {
-                Set<Node> allNodes = doGetLeaves(NodeBase.ROOT);
-                Set<Node> excludeNodes = doGetLeaves(scope.substring(1));
-                allNodes.removeAll(excludeNodes);
-                return allNodes;
-            } else {
-                return doGetLeaves(scope);
-            }
-        } finally {
-            netlock.readLock().unlock();
-        }
-    }
-
-    /** return the number of leaves in <i>scope</i> but not in 
<i>excludedNodes</i>
-     * if scope starts with ~, return the number of nodes that are not
-     * in <i>scope</i> and <i>excludedNodes</i>;
-     * @param scope a path string that may start with ~
-     * @param excludedNodes a list of nodes
-     * @return number of available nodes
+     * @param node
+     *          node to check
+     * @return true if <i>node</i> is already in the network topology, 
otherwise false.
      */
-    public int countNumOfAvailableNodes(String scope, Collection<Node> 
excludedNodes) {
-        boolean isExcluded = false;
-        if (scope.startsWith("~")) {
-            isExcluded = true;
-            scope = scope.substring(1);
-        }
-        scope = NodeBase.normalize(scope);
-        int count = 0; // the number of nodes in both scope & excludedNodes
-        netlock.readLock().lock();
-        try {
-            for (Node node : excludedNodes) {
-                if ((NodeBase.getPath(node) + 
NodeBase.PATH_SEPARATOR_STR).startsWith(scope
-                        + NodeBase.PATH_SEPARATOR_STR)) {
-                    count++;
-                }
-            }
-            Node n = getNode(scope);
-            int scopeNodeCount = 1;
-            if (n instanceof InnerNode) {
-                scopeNodeCount = ((InnerNode) n).getNumOfLeaves();
-            }
-            if (isExcluded) {
-                return clusterMap.getNumOfLeaves() - scopeNodeCount - 
excludedNodes.size() + count;
-            } else {
-                return scopeNodeCount - count;
-            }
-        } finally {
-            netlock.readLock().unlock();
-        }
-    }
-
-    /** convert a network tree to a string */
-    @Override
-    public String toString() {
-        // print the number of racks
-        StringBuilder tree = new StringBuilder();
-        tree.append("Number of racks: ");
-        tree.append(numOfRacks);
-        tree.append("\n");
-        // print the number of leaves
-        int numOfLeaves = getNumOfLeaves();
-        tree.append("Expected number of leaves:");
-        tree.append(numOfLeaves);
-        tree.append("\n");
-        // print nodes
-        for (int i = 0; i < numOfLeaves; i++) {
-            tree.append(NodeBase.getPath(clusterMap.getLeaf(i, null)));
-            tree.append("\n");
-        }
-        return tree.toString();
-    }
+    boolean contains(Node node);
 
     /**
-     * Divide networklocation string into two parts by last separator, and get
-     * the first part here.
-     *
-     * @param networkLocation
+     * Retrieve a node from the network topology
+     * @param loc
      * @return
      */
-    public static String getFirstHalf(String networkLocation) {
-        int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
-        return networkLocation.substring(0, index);
-    }
+    Node getNode(String loc);
 
     /**
-     * Divide networklocation string into two parts by last separator, and get
-     * the second part here.
+     * Returns number of racks in the network topology.
      *
-     * @param networkLocation
-     * @return
+     * @return number of racks in the network topology.
      */
-    public static String getLastHalf(String networkLocation) {
-        int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
-        return networkLocation.substring(index);
-    }
-
-    /** swap two array items */
-    static protected void swap(Node[] nodes, int i, int j) {
-        Node tempNode;
-        tempNode = nodes[j];
-        nodes[j] = nodes[i];
-        nodes[i] = tempNode;
-    }
+    int getNumOfRacks();
 
-    /** Sort nodes array by their distances to <i>reader</i>
-     * It linearly scans the array, if a local node is found, swap it with
-     * the first element of the array.
-     * If a local rack node is found, swap it with the first element following
-     * the local node.
-     * If neither local node or local rack node is found, put a random replica
-     * location at position 0.
-     * It leaves the rest nodes untouched.
-     * @param reader the node that wishes to read a block from one of the nodes
-     * @param nodes the list of nodes containing data for the reader
+    /**
+     * Returns the nodes under a location.
+     *
+     * @param loc
+     *      network location
+     * @return nodes under a location
      */
-    public void pseudoSortByDistance(Node reader, Node[] nodes) {
-        int tempIndex = 0;
-        int localRackNode = -1;
-        if (reader != null) {
-            //scan the array to find the local node & local rack node
-            for (int i = 0; i < nodes.length; i++) {
-                if (tempIndex == 0 && reader == nodes[i]) { //local node
-                    //swap the local node and the node at position 0
-                    if (i != 0) {
-                        swap(nodes, tempIndex, i);
-                    }
-                    tempIndex = 1;
-                    if (localRackNode != -1) {
-                        if (localRackNode == 0) {
-                            localRackNode = i;
-                        }
-                        break;
-                    }
-                } else if (localRackNode == -1 && isOnSameRack(reader, 
nodes[i])) {
-                    //local rack
-                    localRackNode = i;
-                    if (tempIndex != 0)
-                        break;
-                }
-            }
-
-            // swap the local rack node and the node at position tempIndex
-            if (localRackNode != -1 && localRackNode != tempIndex) {
-                swap(nodes, tempIndex, localRackNode);
-                tempIndex++;
-            }
-        }
-
-        // put a random node at position 0 if it is not a local/local-rack node
-        if (tempIndex == 0 && localRackNode == -1 && nodes.length != 0) {
-            swap(nodes, 0, r.nextInt(nodes.length));
-        }
-    }
+    Set<Node> getLeaves(String loc);
 
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
new file mode 100644
index 0000000..78c4fe4
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
@@ -0,0 +1,880 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.net;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The class represents a cluster of computer with a tree hierarchical
+ * network topology.
+ * For example, a cluster may be consists of many data centers filled
+ * with racks of computers.
+ * In a network topology, leaves represent data nodes (computers) and inner
+ * nodes represent switches/routers that manage traffic in/out of data centers
+ * or racks.
+ *
+ */
+public class NetworkTopologyImpl implements NetworkTopology {
+
+    public final static int DEFAULT_HOST_LEVEL = 2;
+    public static final Logger LOG = 
LoggerFactory.getLogger(NetworkTopologyImpl.class);
+
+    public static class InvalidTopologyException extends RuntimeException {
+        private static final long serialVersionUID = 1L;
+
+        public InvalidTopologyException(String msg) {
+            super(msg);
+        }
+    }
+
+    /** InnerNode represents a switch/router of a data center or rack.
+     * Different from a leaf node, it has non-null children.
+     */
+    static class InnerNode extends NodeBase {
+        protected List<Node> children = new ArrayList<Node>();
+        private int numOfLeaves;
+
+        /** Construct an InnerNode from a path-like string */
+        InnerNode(String path) {
+            super(path);
+        }
+
+        /** Construct an InnerNode from its name and its network location */
+        InnerNode(String name, String location) {
+            super(name, location);
+        }
+
+        /** Construct an InnerNode
+         * from its name, its network location, its parent, and its level */
+        InnerNode(String name, String location, InnerNode parent, int level) {
+            super(name, location, parent, level);
+        }
+
+        /** @return its children */
+        List<Node> getChildren() {
+            return children;
+        }
+
+        /** @return the number of children this node has */
+        int getNumOfChildren() {
+            return children.size();
+        }
+
+        /** Judge if this node represents a rack
+         * @return true if it has no child or its children are not InnerNodes
+         */
+        boolean isRack() {
+            if (children.isEmpty()) {
+                return true;
+            }
+
+            Node firstChild = children.get(0);
+            if (firstChild instanceof InnerNode) {
+                return false;
+            }
+
+            return true;
+        }
+
+        /** Judge if this node is an ancestor of node <i>n</i>
+         *
+         * @param n a node
+         * @return true if this node is an ancestor of <i>n</i>
+         */
+        boolean isAncestor(Node n) {
+            return getPath(this).equals(NodeBase.PATH_SEPARATOR_STR)
+                    || (n.getNetworkLocation() + 
NodeBase.PATH_SEPARATOR_STR).startsWith(getPath(this)
+                            + NodeBase.PATH_SEPARATOR_STR);
+        }
+
+        /** Judge if this node is the parent of node <i>n</i>
+         *
+         * @param n a node
+         * @return true if this node is the parent of <i>n</i>
+         */
+        boolean isParent(Node n) {
+            return n.getNetworkLocation().equals(getPath(this));
+        }
+
+        /* Return a child name of this node who is an ancestor of node 
<i>n</i> */
+        private String getNextAncestorName(Node n) {
+            if (!isAncestor(n)) {
+                throw new IllegalArgumentException(this + "is not an ancestor 
of " + n);
+            }
+            String name = 
n.getNetworkLocation().substring(getPath(this).length());
+            if (name.charAt(0) == PATH_SEPARATOR) {
+                name = name.substring(1);
+            }
+            int index = name.indexOf(PATH_SEPARATOR);
+            if (index != -1)
+                name = name.substring(0, index);
+            return name;
+        }
+
+        /** Add node <i>n</i> to the subtree of this node
+         * @param n node to be added
+         * @return true if the node is added; false otherwise
+         */
+        boolean add(Node n) {
+            if (!isAncestor(n))
+                throw new IllegalArgumentException(n.getName() + ", which is 
located at " + n.getNetworkLocation()
+                        + ", is not a decendent of " + getPath(this));
+            if (isParent(n)) {
+                // this node is the parent of n; add n directly
+                n.setParent(this);
+                n.setLevel(this.level + 1);
+                for (int i = 0; i < children.size(); i++) {
+                    if (children.get(i).getName().equals(n.getName())) {
+                        children.set(i, n);
+                        return false;
+                    }
+                }
+                children.add(n);
+                numOfLeaves++;
+                return true;
+            } else {
+                // find the next ancestor node
+                String parentName = getNextAncestorName(n);
+                InnerNode parentNode = null;
+                for (int i = 0; i < children.size(); i++) {
+                    if (children.get(i).getName().equals(parentName)) {
+                        parentNode = (InnerNode) children.get(i);
+                        break;
+                    }
+                }
+                if (parentNode == null) {
+                    // create a new InnerNode
+                    parentNode = createParentNode(parentName);
+                    children.add(parentNode);
+                }
+                // add n to the subtree of the next ancestor node
+                if (parentNode.add(n)) {
+                    numOfLeaves++;
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        }
+
+        /**
+         * Creates a parent node to be added to the list of children.
+         * Creates a node using the InnerNode four argument constructor 
specifying
+         * the name, location, parent, and level of this node.
+         *
+         * <p>To be overridden in subclasses for specific InnerNode 
implementations,
+         * as alternative to overriding the full {@link #add(Node)} method.
+         *
+         * @param parentName The name of the parent node
+         * @return A new inner node
+         * @see InnerNode#InnerNode(String, String, InnerNode, int)
+         */
+        protected InnerNode createParentNode(String parentName) {
+            return new InnerNode(parentName, getPath(this), this, 
this.getLevel() + 1);
+        }
+
+        /** Remove node <i>n</i> from the subtree of this node
+         * @param n node to be deleted
+         * @return true if the node is deleted; false otherwise
+         */
+        boolean remove(Node n) {
+            String parent = n.getNetworkLocation();
+            String currentPath = getPath(this);
+            if (!isAncestor(n))
+                throw new IllegalArgumentException(n.getName() + ", which is 
located at " + parent
+                        + ", is not a descendent of " + currentPath);
+            if (isParent(n)) {
+                // this node is the parent of n; remove n directly
+                for (int i = 0; i < children.size(); i++) {
+                    if (children.get(i).getName().equals(n.getName())) {
+                        children.remove(i);
+                        numOfLeaves--;
+                        n.setParent(null);
+                        return true;
+                    }
+                }
+                return false;
+            } else {
+                // find the next ancestor node: the parent node
+                String parentName = getNextAncestorName(n);
+                InnerNode parentNode = null;
+                int i;
+                for (i = 0; i < children.size(); i++) {
+                    if (children.get(i).getName().equals(parentName)) {
+                        parentNode = (InnerNode) children.get(i);
+                        break;
+                    }
+                }
+                if (parentNode == null) {
+                    return false;
+                }
+                // remove n from the parent node
+                boolean isRemoved = parentNode.remove(n);
+                // if the parent node has no children, remove the parent node 
too
+                if (isRemoved) {
+                    if (parentNode.getNumOfChildren() == 0) {
+                        children.remove(i);
+                    }
+                    numOfLeaves--;
+                }
+                return isRemoved;
+            }
+        } // end of remove
+
+        /** Given a node's string representation, return a reference to the 
node
+         * @param loc string location of the form /rack/node
+         * @return null if the node is not found or the childnode is there but
+         * not an instance of {@link InnerNode}
+         */
+        private Node getLoc(String loc) {
+            if (loc == null || loc.length() == 0)
+                return this;
+
+            String[] path = loc.split(PATH_SEPARATOR_STR, 2);
+            Node childnode = null;
+            for (int i = 0; i < children.size(); i++) {
+                if (children.get(i).getName().equals(path[0])) {
+                    childnode = children.get(i);
+                }
+            }
+            if (childnode == null)
+                return null; // non-existing node
+            if (path.length == 1)
+                return childnode;
+            if (childnode instanceof InnerNode) {
+                return ((InnerNode) childnode).getLoc(path[1]);
+            } else {
+                return null;
+            }
+        }
+
+        /** get <i>leafIndex</i> leaf of this subtree
+         * if it is not in the <i>excludedNode</i>
+         *
+         * @param leafIndex an indexed leaf of the node
+         * @param excludedNode an excluded node (can be null)
+         * @return
+         */
+        Node getLeaf(int leafIndex, Node excludedNode) {
+            int count = 0;
+            // check if the excluded node a leaf
+            boolean isLeaf = excludedNode == null || !(excludedNode instanceof 
InnerNode);
+            // calculate the total number of excluded leaf nodes
+            int numOfExcludedLeaves = isLeaf ? 1 : ((InnerNode) 
excludedNode).getNumOfLeaves();
+            if (isLeafParent()) { // children are leaves
+                if (isLeaf) { // excluded node is a leaf node
+                    int excludedIndex = children.indexOf(excludedNode);
+                    if (excludedIndex != -1 && leafIndex >= 0) {
+                        // excluded node is one of the children so adjust the 
leaf index
+                        leafIndex = leafIndex >= excludedIndex ? leafIndex + 1 
: leafIndex;
+                    }
+                }
+                // range check
+                if (leafIndex < 0 || leafIndex >= this.getNumOfChildren()) {
+                    return null;
+                }
+                return children.get(leafIndex);
+            } else {
+                for (int i = 0; i < children.size(); i++) {
+                    InnerNode child = (InnerNode) children.get(i);
+                    if (excludedNode == null || excludedNode != child) {
+                        // not the excludedNode
+                        int numOfLeaves = child.getNumOfLeaves();
+                        if (excludedNode != null && 
child.isAncestor(excludedNode)) {
+                            numOfLeaves -= numOfExcludedLeaves;
+                        }
+                        if (count + numOfLeaves > leafIndex) {
+                            // the leaf is in the child subtree
+                            return child.getLeaf(leafIndex - count, 
excludedNode);
+                        } else {
+                            // go to the next child
+                            count = count + numOfLeaves;
+                        }
+                    } else { // it is the excluededNode
+                        // skip it and set the excludedNode to be null
+                        excludedNode = null;
+                    }
+                }
+                return null;
+            }
+        }
+
+        protected boolean isLeafParent() {
+            return isRack();
+        }
+
+        /**
+          * Determine if children a leaves, default implementation calls 
{@link #isRack()}
+          * <p>To be overridden in subclasses for specific InnerNode 
implementations,
+          * as alternative to overriding the full {@link #getLeaf(int, Node)} 
method.
+          *
+          * @return true if children are leaves, false otherwise
+          */
+        protected boolean areChildrenLeaves() {
+            return isRack();
+        }
+
+        /**
+         * Get number of leaves.
+         */
+        int getNumOfLeaves() {
+            return numOfLeaves;
+        }
+    } // end of InnerNode
+
+    /**
+     * the root cluster map
+     */
+    InnerNode clusterMap;
+    /** Depth of all leaf nodes */
+    private int depthOfAllLeaves = -1;
+    /** rack counter */
+    protected int numOfRacks = 0;
+    /** the lock used to manage access */
+    protected ReadWriteLock netlock = new ReentrantReadWriteLock();
+
+    public NetworkTopologyImpl() {
+        clusterMap = new InnerNode(InnerNode.ROOT);
+    }
+
+    /** Add a leaf node
+     * Update node counter & rack counter if necessary
+     * @param node node to be added; can be null
+     * @exception IllegalArgumentException if add a node to a leave
+                                           or node to be added is not a leaf
+     */
+    public void add(Node node) {
+        if (node == null)
+            return;
+        String oldTopoStr = this.toString();
+        if (node instanceof InnerNode) {
+            throw new IllegalArgumentException("Not allow to add an inner 
node: " + NodeBase.getPath(node));
+        }
+        int newDepth = NodeBase.locationToDepth(node.getNetworkLocation()) + 1;
+        netlock.writeLock().lock();
+        try {
+            if ((depthOfAllLeaves != -1) && (depthOfAllLeaves != newDepth)) {
+                LOG.error("Error: can't add leaf node at depth " + newDepth + 
" to topology:\n" + oldTopoStr);
+                throw new InvalidTopologyException("Invalid network topology. "
+                        + "You cannot have a rack and a non-rack node at the 
same level of the network topology.");
+            }
+            Node rack = getNodeForNetworkLocation(node);
+            if (rack != null && !(rack instanceof InnerNode)) {
+                throw new IllegalArgumentException("Unexpected data node " + 
node.toString()
+                        + " at an illegal network location");
+            }
+            if (clusterMap.add(node)) {
+                LOG.info("Adding a new node: " + NodeBase.getPath(node));
+                if (rack == null) {
+                    numOfRacks++;
+                }
+                if (!(node instanceof InnerNode)) {
+                    if (depthOfAllLeaves == -1) {
+                        depthOfAllLeaves = node.getLevel();
+                    }
+                }
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("NetworkTopology became:\n" + this.toString());
+            }
+        } finally {
+            netlock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Return a reference to the node given its string representation.
+     * Default implementation delegates to {@link #getNode(String)}.
+     *
+     * <p>To be overridden in subclasses for specific NetworkTopology
+     * implementations, as alternative to overriding the full {@link 
#add(Node)}
+     *  method.
+     *
+     * @param node The string representation of this node's network location is
+     * used to retrieve a Node object.
+     * @return a reference to the node; null if the node is not in the tree
+     *
+     * @see #add(Node)
+     * @see #getNode(String)
+     */
+    protected Node getNodeForNetworkLocation(Node node) {
+        return getNode(node.getNetworkLocation());
+    }
+
+    /**
+     * Given a string representation of a rack, return its children
+     * @param loc a path-like string representation of a rack
+     * @return a newly allocated list with all the node's children
+     */
+    public List<Node> getDatanodesInRack(String loc) {
+        netlock.readLock().lock();
+        try {
+            loc = NodeBase.normalize(loc);
+            if (!NodeBase.ROOT.equals(loc)) {
+                loc = loc.substring(1);
+            }
+            InnerNode rack = (InnerNode) clusterMap.getLoc(loc);
+            if (rack == null) {
+                return null;
+            }
+            return new ArrayList<Node>(rack.getChildren());
+        } finally {
+            netlock.readLock().unlock();
+        }
+    }
+
+    /** Remove a node
+     * Update node counter and rack counter if necessary
+     * @param node node to be removed; can be null
+     */
+    @Override
+    public void remove(Node node) {
+        if (node == null)
+            return;
+        if (node instanceof InnerNode) {
+            throw new IllegalArgumentException("Not allow to remove an inner 
node: " + NodeBase.getPath(node));
+        }
+        LOG.info("Removing a node: " + NodeBase.getPath(node));
+        netlock.writeLock().lock();
+        try {
+            if (clusterMap.remove(node)) {
+                InnerNode rack = (InnerNode) 
getNode(node.getNetworkLocation());
+                if (rack == null) {
+                    numOfRacks--;
+                }
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("NetworkTopology became:\n" + this.toString());
+            }
+        } finally {
+            netlock.writeLock().unlock();
+        }
+    }
+
+    /** Check if the tree contains node <i>node</i>
+     *
+     * @param node a node
+     * @return true if <i>node</i> is already in the tree; false otherwise
+     */
+    @Override
+    public boolean contains(Node node) {
+        if (node == null)
+            return false;
+        netlock.readLock().lock();
+        try {
+            Node parent = node.getParent();
+            for (int level = node.getLevel(); parent != null && level > 0; 
parent = parent.getParent(), level--) {
+                if (parent == clusterMap) {
+                    return true;
+                }
+            }
+        } finally {
+            netlock.readLock().unlock();
+        }
+        return false;
+    }
+
+    /** Given a string representation of a node, return its reference
+     *
+     * @param loc
+     *          a path-like string representation of a node
+     * @return a reference to the node; null if the node is not in the tree
+     */
+    @Override
+    public Node getNode(String loc) {
+        netlock.readLock().lock();
+        try {
+            loc = NodeBase.normalize(loc);
+            if (!NodeBase.ROOT.equals(loc))
+                loc = loc.substring(1);
+            return clusterMap.getLoc(loc);
+        } finally {
+            netlock.readLock().unlock();
+        }
+    }
+
+    /** Given a string representation of a rack for a specific network
+     *  location
+     *
+     * To be overridden in subclasses for specific NetworkTopology
+     * implementations, as alternative to overriding the full
+     * {@link #getRack(String)} method.
+     * @param loc
+     *          a path-like string representation of a network location
+     * @return a rack string
+     */
+    public String getRack(String loc) {
+        return loc;
+    }
+
+    /** @return the total number of racks */
+    @Override
+    public int getNumOfRacks() {
+        netlock.readLock().lock();
+        try {
+            return numOfRacks;
+        } finally {
+            netlock.readLock().unlock();
+        }
+    }
+
+    /** @return the total number of leaf nodes */
+    public int getNumOfLeaves() {
+        netlock.readLock().lock();
+        try {
+            return clusterMap.getNumOfLeaves();
+        } finally {
+            netlock.readLock().unlock();
+        }
+    }
+
+    /** Return the distance between two nodes
+     * It is assumed that the distance from one node to its parent is 1
+     * The distance between two nodes is calculated by summing up their 
distances
+     * to their closest common ancestor.
+     * @param node1 one node
+     * @param node2 another node
+     * @return the distance between node1 and node2 which is zero if they are 
the same
+     *  or {@link Integer#MAX_VALUE} if node1 or node2 do not belong to the 
cluster
+     */
+    public int getDistance(Node node1, Node node2) {
+        if (node1 == node2) {
+            return 0;
+        }
+        Node n1 = node1, n2 = node2;
+        int dis = 0;
+        netlock.readLock().lock();
+        try {
+            int level1 = node1.getLevel(), level2 = node2.getLevel();
+            while (n1 != null && level1 > level2) {
+                n1 = n1.getParent();
+                level1--;
+                dis++;
+            }
+            while (n2 != null && level2 > level1) {
+                n2 = n2.getParent();
+                level2--;
+                dis++;
+            }
+            while (n1 != null && n2 != null && n1.getParent() != 
n2.getParent()) {
+                n1 = n1.getParent();
+                n2 = n2.getParent();
+                dis += 2;
+            }
+        } finally {
+            netlock.readLock().unlock();
+        }
+        if (n1 == null) {
+            LOG.warn("The cluster does not contain node: {}", 
NodeBase.getPath(node1));
+            return Integer.MAX_VALUE;
+        }
+        if (n2 == null) {
+            LOG.warn("The cluster does not contain node: {}", 
NodeBase.getPath(node2));
+            return Integer.MAX_VALUE;
+        }
+        return dis + 2;
+    }
+
+    /** Check if two nodes are on the same rack
+     * @param node1 one node (can be null)
+     * @param node2 another node (can be null)
+     * @return true if node1 and node2 are on the same rack; false otherwise
+     * @exception IllegalArgumentException when either node1 or node2 is null, 
or
+     * node1 or node2 do not belong to the cluster
+     */
+    public boolean isOnSameRack(Node node1, Node node2) {
+        if (node1 == null || node2 == null) {
+            return false;
+        }
+
+        netlock.readLock().lock();
+        try {
+            return isSameParents(node1, node2);
+        } finally {
+            netlock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Check if network topology is aware of NodeGroup
+     */
+    public boolean isNodeGroupAware() {
+        return false;
+    }
+
+    /**
+     * Return false directly as not aware of NodeGroup, to be override in 
sub-class
+     */
+    public boolean isOnSameNodeGroup(Node node1, Node node2) {
+        return false;
+    }
+
+    /**
+     * Compare the parents of each node for equality
+     *
+     * <p>To be overridden in subclasses for specific NetworkTopology
+     * implementations, as alternative to overriding the full
+     * {@link #isOnSameRack(Node, Node)} method.
+     *
+     * @param node1 the first node to compare
+     * @param node2 the second node to compare
+     * @return true if their parents are equal, false otherwise
+     *
+     * @see #isOnSameRack(Node, Node)
+     */
+    protected boolean isSameParents(Node node1, Node node2) {
+        return node1.getParent() == node2.getParent();
+    }
+
+    final protected static Random r = new Random();
+
+    /** randomly choose one node from <i>scope</i>
+     * if scope starts with ~, choose one from the all nodes except for the
+     * ones in <i>scope</i>; otherwise, choose one from <i>scope</i>
+     * @param scope range of nodes from which a node will be chosen
+     * @return the chosen node
+     */
+    public Node chooseRandom(String scope) {
+        netlock.readLock().lock();
+        try {
+            if (scope.startsWith("~")) {
+                return chooseRandom(NodeBase.ROOT, scope.substring(1));
+            } else {
+                return chooseRandom(scope, null);
+            }
+        } finally {
+            netlock.readLock().unlock();
+        }
+    }
+
+    private Node chooseRandom(String scope, String excludedScope) {
+        if (excludedScope != null) {
+            if (scope.startsWith(excludedScope)) {
+                return null;
+            }
+            if (!excludedScope.startsWith(scope)) {
+                excludedScope = null;
+            }
+        }
+        Node node = getNode(scope);
+        if (!(node instanceof InnerNode)) {
+            return node;
+        }
+        InnerNode innerNode = (InnerNode) node;
+        int numOfDatanodes = innerNode.getNumOfLeaves();
+        if (excludedScope == null) {
+            node = null;
+        } else {
+            node = getNode(excludedScope);
+            if (!(node instanceof InnerNode)) {
+                numOfDatanodes -= 1;
+            } else {
+                numOfDatanodes -= ((InnerNode) node).getNumOfLeaves();
+            }
+        }
+        int leaveIndex = r.nextInt(numOfDatanodes);
+        return innerNode.getLeaf(leaveIndex, node);
+    }
+
+    /** return leaves in <i>scope</i>
+     * @param scope a path string
+     * @return leaves nodes under specific scope
+     */
+    private Set<Node> doGetLeaves(String scope) {
+        Node node = getNode(scope);
+        Set<Node> leafNodes = new HashSet<Node>();
+        if (!(node instanceof InnerNode)) {
+            leafNodes.add(node);
+        } else {
+            InnerNode innerNode = (InnerNode) node;
+            for (int i = 0; i < innerNode.getNumOfLeaves(); i++) {
+                leafNodes.add(innerNode.getLeaf(i, null));
+            }
+        }
+        return leafNodes;
+    }
+
+    @Override
+    public Set<Node> getLeaves(String scope) {
+        netlock.readLock().lock();
+        try {
+            if (scope.startsWith("~")) {
+                Set<Node> allNodes = doGetLeaves(NodeBase.ROOT);
+                Set<Node> excludeNodes = doGetLeaves(scope.substring(1));
+                allNodes.removeAll(excludeNodes);
+                return allNodes;
+            } else {
+                return doGetLeaves(scope);
+            }
+        } finally {
+            netlock.readLock().unlock();
+        }
+    }
+
+    /** return the number of leaves in <i>scope</i> but not in 
<i>excludedNodes</i>
+     * if scope starts with ~, return the number of nodes that are not
+     * in <i>scope</i> and <i>excludedNodes</i>;
+     * @param scope a path string that may start with ~
+     * @param excludedNodes a list of nodes
+     * @return number of available nodes
+     */
+    public int countNumOfAvailableNodes(String scope, Collection<Node> 
excludedNodes) {
+        boolean isExcluded = false;
+        if (scope.startsWith("~")) {
+            isExcluded = true;
+            scope = scope.substring(1);
+        }
+        scope = NodeBase.normalize(scope);
+        int count = 0; // the number of nodes in both scope & excludedNodes
+        netlock.readLock().lock();
+        try {
+            for (Node node : excludedNodes) {
+                if ((NodeBase.getPath(node) + 
NodeBase.PATH_SEPARATOR_STR).startsWith(scope
+                        + NodeBase.PATH_SEPARATOR_STR)) {
+                    count++;
+                }
+            }
+            Node n = getNode(scope);
+            int scopeNodeCount = 1;
+            if (n instanceof InnerNode) {
+                scopeNodeCount = ((InnerNode) n).getNumOfLeaves();
+            }
+            if (isExcluded) {
+                return clusterMap.getNumOfLeaves() - scopeNodeCount - 
excludedNodes.size() + count;
+            } else {
+                return scopeNodeCount - count;
+            }
+        } finally {
+            netlock.readLock().unlock();
+        }
+    }
+
+    /** convert a network tree to a string */
+    @Override
+    public String toString() {
+        // print the number of racks
+        StringBuilder tree = new StringBuilder();
+        tree.append("Number of racks: ");
+        tree.append(numOfRacks);
+        tree.append("\n");
+        // print the number of leaves
+        int numOfLeaves = getNumOfLeaves();
+        tree.append("Expected number of leaves:");
+        tree.append(numOfLeaves);
+        tree.append("\n");
+        // print nodes
+        for (int i = 0; i < numOfLeaves; i++) {
+            tree.append(NodeBase.getPath(clusterMap.getLeaf(i, null)));
+            tree.append("\n");
+        }
+        return tree.toString();
+    }
+
+    /**
+     * Divide networklocation string into two parts by last separator, and get
+     * the first part here.
+     *
+     * @param networkLocation
+     * @return
+     */
+    public static String getFirstHalf(String networkLocation) {
+        int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
+        return networkLocation.substring(0, index);
+    }
+
+    /**
+     * Divide networklocation string into two parts by last separator, and get
+     * the second part here.
+     *
+     * @param networkLocation
+     * @return
+     */
+    public static String getLastHalf(String networkLocation) {
+        int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
+        return networkLocation.substring(index);
+    }
+
+    /** swap two array items */
+    static protected void swap(Node[] nodes, int i, int j) {
+        Node tempNode;
+        tempNode = nodes[j];
+        nodes[j] = nodes[i];
+        nodes[i] = tempNode;
+    }
+
+    /** Sort nodes array by their distances to <i>reader</i>
+     * It linearly scans the array, if a local node is found, swap it with
+     * the first element of the array.
+     * If a local rack node is found, swap it with the first element following
+     * the local node.
+     * If neither local node or local rack node is found, put a random replica
+     * location at position 0.
+     * It leaves the rest nodes untouched.
+     * @param reader the node that wishes to read a block from one of the nodes
+     * @param nodes the list of nodes containing data for the reader
+     */
+    public void pseudoSortByDistance(Node reader, Node[] nodes) {
+        int tempIndex = 0;
+        int localRackNode = -1;
+        if (reader != null) {
+            //scan the array to find the local node & local rack node
+            for (int i = 0; i < nodes.length; i++) {
+                if (tempIndex == 0 && reader == nodes[i]) { //local node
+                    //swap the local node and the node at position 0
+                    if (i != 0) {
+                        swap(nodes, tempIndex, i);
+                    }
+                    tempIndex = 1;
+                    if (localRackNode != -1) {
+                        if (localRackNode == 0) {
+                            localRackNode = i;
+                        }
+                        break;
+                    }
+                } else if (localRackNode == -1 && isOnSameRack(reader, 
nodes[i])) {
+                    //local rack
+                    localRackNode = i;
+                    if (tempIndex != 0)
+                        break;
+                }
+            }
+
+            // swap the local rack node and the node at position tempIndex
+            if (localRackNode != -1 && localRackNode != tempIndex) {
+                swap(nodes, tempIndex, localRackNode);
+                tempIndex++;
+            }
+        }
+
+        // put a random node at position 0 if it is not a local/local-rack node
+        if (tempIndex == 0 && localRackNode == -1 && nodes.length != 0) {
+            swap(nodes, 0, r.nextInt(nodes.length));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java
index f11e0a7..62ec6c0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-// This code has been copied from hadoop-common 0.23.1
 package org.apache.bookkeeper.net;
 
 import com.google.common.annotations.Beta;
@@ -31,6 +30,9 @@ import com.google.common.annotations.Beta;
  */
 @Beta
 public interface Node {
+    /** @return the string representation of this node's network location at 
the specified level in the hierarchy*/
+    public String getNetworkLocation(int level);
+
     /** @return the string representation of this node's network location */
     public String getNetworkLocation();
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java
index f1a4b85..13ddfce 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-// This code has been copied from hadoop-common 0.23.1
 package org.apache.bookkeeper.net;
 
 /** A base class that implements interface Node
@@ -183,4 +182,18 @@ public class NodeBase implements Node {
         }
         return depth;
     }
+
+    @Override
+    public String getNetworkLocation(int distanceFromLeaves) {
+        Node node = this;
+        while (distanceFromLeaves > 1) {
+            Node parent = node.getParent();
+            if (null == parent) {
+                break;
+            }
+            node = parent;
+            distanceFromLeaves--;
+        }
+        return node.getNetworkLocation();
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
index 1671cc8..dd8563c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-// This code has been copied from hadoop-common 0.23.1
+
 package org.apache.bookkeeper.net;
 
 import java.io.File;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
new file mode 100644
index 0000000..5dce906
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.net;
+
+import org.apache.bookkeeper.util.MathUtils;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is going to provide a stabilize network topology regarding to flapping 
zookeeper registration.
+ */
+public class StabilizeNetworkTopology implements NetworkTopology {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(StabilizeNetworkTopology.class);
+
+    static class NodeStatus {
+        long lastPresentTime;
+        boolean tentativeToRemove;
+
+        NodeStatus() {
+            this.lastPresentTime = MathUtils.now();
+        }
+
+        synchronized boolean isTentativeToRemove() {
+            return tentativeToRemove;
+        }
+
+        synchronized NodeStatus updateStatus(boolean tentativeToRemove) {
+            this.tentativeToRemove = tentativeToRemove;
+            if (!this.tentativeToRemove) {
+                this.lastPresentTime = MathUtils.now();
+            }
+            return this;
+        }
+
+        synchronized long getLastPresentTime() {
+            return this.lastPresentTime;
+        }
+    }
+
+    protected final NetworkTopologyImpl impl;
+    // timer
+    protected final HashedWheelTimer timer;
+    // statuses
+    protected final ConcurrentMap<Node, NodeStatus> nodeStatuses;
+    // stabilize period seconds
+    protected final long stabilizePeriodMillis;
+
+    private class RemoveNodeTask implements TimerTask {
+
+        private final Node node;
+
+        RemoveNodeTask(Node node) {
+            this.node = node;
+        }
+
+        @Override
+        public void run(Timeout timeout) throws Exception {
+            if (timeout.isCancelled()) {
+                return;
+            }
+            NodeStatus status = nodeStatuses.get(node);
+            if (null == status) {
+                // no status of this node, remove this node from topology
+                impl.remove(node);
+            } else if (status.isTentativeToRemove()) {
+                long millisSinceLastSeen = MathUtils.now() - 
status.getLastPresentTime();
+                if (millisSinceLastSeen >= stabilizePeriodMillis) {
+                    logger.info("Node {} (seen @ {}) becomes stale for {} ms, 
remove it from the topology.",
+                            new Object[] { node, status.getLastPresentTime(), 
millisSinceLastSeen });
+                    impl.remove(node);
+                    nodeStatuses.remove(node, status);
+                }
+            }
+        }
+    }
+
+    public StabilizeNetworkTopology(HashedWheelTimer timer,
+                                    int stabilizePeriodSeconds) {
+        this.impl = new NetworkTopologyImpl();
+        this.timer = timer;
+        this.nodeStatuses = new ConcurrentHashMap<Node, NodeStatus>();
+        this.stabilizePeriodMillis = 
TimeUnit.SECONDS.toMillis(stabilizePeriodSeconds);
+    }
+
+    void updateNode(Node node, boolean tentativeToRemove) {
+        NodeStatus ns = nodeStatuses.get(node);
+        if (null == ns) {
+            NodeStatus newStatus = new NodeStatus();
+            NodeStatus oldStatus = nodeStatuses.putIfAbsent(node, newStatus);
+            if (null == oldStatus) {
+                ns = newStatus;
+            } else {
+                ns = oldStatus;
+            }
+        }
+        ns.updateStatus(tentativeToRemove);
+    }
+
+    @Override
+    public void add(Node node) {
+        updateNode(node, false);
+        this.impl.add(node);
+    }
+
+    @Override
+    public void remove(Node node) {
+        updateNode(node, true);
+        timer.newTimeout(new RemoveNodeTask(node), stabilizePeriodMillis, 
TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public boolean contains(Node node) {
+        return impl.contains(node);
+    }
+
+    @Override
+    public Node getNode(String loc) {
+        return impl.getNode(loc);
+    }
+
+    @Override
+    public int getNumOfRacks() {
+        return impl.getNumOfRacks();
+    }
+
+    @Override
+    public Set<Node> getLeaves(String loc) {
+        return impl.getLeaves(loc);
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 21b16fb..274fd36 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -180,7 +180,7 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
         this.extRegistry = extRegistry;
 
         StringBuilder nameBuilder = new StringBuilder();
-        nameBuilder.append(addr.getHostname().replace('.', '_').replace('-', 
'_'))
+        nameBuilder.append(addr.getHostName().replace('.', '_').replace('-', 
'_'))
             .append("_").append(addr.getPort());
 
         this.statsLogger = 
parentStatsLogger.scope(BookKeeperClientStats.CHANNEL_SCOPE)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
index 987de7a..8dc7e2d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
@@ -50,4 +50,6 @@ public class BookKeeperConstants {
      * memtable (for performance consideration)
      */
     public static final long MAX_LOG_SIZE_LIMIT = 1 * 1024 * 1024 * 1024;
+
+    public static final String FEATURE_REPP_DISABLE_DURABILITY_ENFORCEMENT = 
"repp_disable_durability_enforcement";
 }

Reply via email to