This is an automated email from the ASF dual-hosted git repository.

reddycharan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new b4ca453  Move common placementpolicy components to 
TopologyAwareEnsemblePlacementPolicy.
b4ca453 is described below

commit b4ca4537b62e75f71e3329186652f982025175a2
Author: Charan Reddy Guttapalem <[email protected]>
AuthorDate: Fri May 10 09:59:17 2019 -0700

    Move common placementpolicy components to 
TopologyAwareEnsemblePlacementPolicy.
    
    Descriptions of the changes in this PR:
    
    - Moving components/methods which are common in nature with respect to
    placementpolicy from RackawareEnsemblePlacementPolicyImpl to
    TopologyAwareEnsemblePlacementPolicy, so that any new placementpolicy
    implementation can extend TopologyAwareEnsemblePlacementPolicy and reuse
    those common components/generic.
    - This change has no functionality change, it is just reorganizing code.
    
    
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #2089 from reddycharan/3azplacement
---
 .../client/DefaultEnsemblePlacementPolicy.java     |   2 +-
 .../RackawareEnsemblePlacementPolicyImpl.java      | 162 +------------
 .../TopologyAwareEnsemblePlacementPolicy.java      | 265 ++++++++++++++++++++-
 .../bookkeeper/client/WeightedRandomSelection.java | 139 +----------
 ...ction.java => WeightedRandomSelectionImpl.java} |  33 ++-
 .../client/TestWeightedRandomSelection.java        |   2 +-
 6 files changed, 298 insertions(+), 305 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index c130b5d..f6bb1af 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -195,7 +195,7 @@ public class DefaultEnsemblePlacementPolicy implements 
EnsemblePlacementPolicy {
         this.isWeighted = conf.getDiskWeightBasedPlacementEnabled();
         if (this.isWeighted) {
             this.maxWeightMultiple = 
conf.getBookieMaxWeightMultipleForWeightBasedPlacement();
-            this.weightedSelection = new 
WeightedRandomSelection<BookieSocketAddress>(this.maxWeightMultiple);
+            this.weightedSelection = new 
WeightedRandomSelectionImpl<BookieSocketAddress>(this.maxWeightMultiple);
         }
         return this;
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index ed35d49..be06e13 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -46,10 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Supplier;
 
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
@@ -88,10 +85,7 @@ import org.slf4j.LoggerFactory;
 public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsemblePlacementPolicy {
 
     static final Logger LOG = 
LoggerFactory.getLogger(RackawareEnsemblePlacementPolicyImpl.class);
-    boolean isWeighted;
     int maxWeightMultiple;
-    private Map<BookieNode, WeightedObject> bookieInfoMap = new 
HashMap<BookieNode, WeightedObject>();
-    private WeightedRandomSelection<BookieNode> weightedSelection;
 
     protected int minNumRacksPerWriteQuorum;
     protected boolean enforceMinNumRacksPerWriteQuorum;
@@ -112,109 +106,10 @@ public class RackawareEnsemblePlacementPolicyImpl 
extends TopologyAwareEnsembleP
     static final int UNAVAIL_MASK     = 0x40 << 24;
     static final int MASK_BITS        = 0xFFF << 20;
 
-    static class DefaultResolver implements DNSToSwitchMapping {
-
-        final Supplier<String> defaultRackSupplier;
-
-        public DefaultResolver(Supplier<String> defaultRackSupplier) {
-            checkNotNull(defaultRackSupplier, "defaultRackSupplier should not 
be null");
-            this.defaultRackSupplier = defaultRackSupplier;
-        }
-
-        @Override
-        public List<String> resolve(List<String> names) {
-            List<String> rNames = new ArrayList<String>(names.size());
-            for (@SuppressWarnings("unused") String name : names) {
-                final String defaultRack = defaultRackSupplier.get();
-                checkNotNull(defaultRack, "defaultRack cannot be null");
-                rNames.add(defaultRack);
-            }
-            return rNames;
-        }
-
-        @Override
-        public void reloadCachedMappings() {
-            // nop
-        }
-
-    }
-
-    /**
-     * Decorator for any existing dsn resolver.
-     * Backfills returned data with appropriate default rack info.
-     */
-    static class DNSResolverDecorator implements DNSToSwitchMapping {
-
-        final Supplier<String> defaultRackSupplier;
-        final DNSToSwitchMapping resolver;
-        @StatsDoc(
-                name = FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER,
-                help = "total number of times Resolver failed to resolve rack 
information of a node"
-        )
-        final Counter failedToResolveNetworkLocationCounter;
-
-        DNSResolverDecorator(DNSToSwitchMapping resolver, Supplier<String> 
defaultRackSupplier,
-                Counter failedToResolveNetworkLocationCounter) {
-            checkNotNull(resolver, "Resolver cannot be null");
-            checkNotNull(defaultRackSupplier, "defaultRackSupplier should not 
be null");
-            this.defaultRackSupplier = defaultRackSupplier;
-            this.resolver = resolver;
-            this.failedToResolveNetworkLocationCounter = 
failedToResolveNetworkLocationCounter;
-        }
-
-        public List<String> resolve(List<String> names) {
-            if (names == null) {
-                return Collections.emptyList();
-            }
-            final String defaultRack = defaultRackSupplier.get();
-            checkNotNull(defaultRack, "Default rack cannot be null");
-
-            List<String> rNames = resolver.resolve(names);
-            if (rNames != null && rNames.size() == names.size()) {
-                for (int i = 0; i < rNames.size(); ++i) {
-                    if (rNames.get(i) == null) {
-                        LOG.warn("Failed to resolve network location for {}, 
using default rack for it : {}.",
-                                names.get(i), defaultRack);
-                        failedToResolveNetworkLocationCounter.inc();
-                        rNames.set(i, defaultRack);
-                    }
-                }
-                return rNames;
-            }
-
-            LOG.warn("Failed to resolve network location for {}, using default 
rack for them : {}.", names,
-                    defaultRack);
-            rNames = new ArrayList<>(names.size());
-
-            for (int i = 0; i < names.size(); ++i) {
-                failedToResolveNetworkLocationCounter.inc();
-                rNames.add(defaultRack);
-            }
-            return rNames;
-        }
-
-        @Override
-        public boolean useHostName() {
-            return resolver.useHostName();
-        }
-
-        @Override
-        public void reloadCachedMappings() {
-            resolver.reloadCachedMappings();
-        }
-    }
-
-    // for now, we just maintain the writable bookies' topology
-    protected NetworkTopology topology;
-    protected DNSToSwitchMapping dnsResolver;
     protected HashedWheelTimer timer;
-    protected final Map<BookieSocketAddress, BookieNode> knownBookies;
     // Use a loading cache so slow bookies are expired. Use entryId as values.
     protected Cache<BookieSocketAddress, Long> slowBookies;
     protected BookieNode localNode;
-    protected final ReentrantReadWriteLock rwLock;
-    // Initialize to empty set
-    protected ImmutableSet<BookieSocketAddress> readOnlyBookies = 
ImmutableSet.of();
     protected boolean reorderReadsRandom = false;
     protected boolean enforceDurability = false;
     protected int stabilizePeriodSeconds = 0;
@@ -222,19 +117,10 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
     // looks like these only assigned in the same thread as constructor, 
immediately after constructor;
     // no need to make volatile
     protected StatsLogger statsLogger = null;
+
     @StatsDoc(
-        name = BOOKIES_JOINED,
-        help = "The distribution of number of bookies joined the cluster on 
each network topology change"
-    )
-    protected OpStatsLogger bookiesJoinedCounter = null;
-    @StatsDoc(
-        name = BOOKIES_LEFT,
-        help = "The distribution of number of bookies left the cluster on each 
network topology change"
-    )
-    protected OpStatsLogger bookiesLeftCounter = null;
-    @StatsDoc(
-        name = READ_REQUESTS_REORDERED,
-        help = "The distribution of number of bookies reordered on each read 
request"
+            name = READ_REQUESTS_REORDERED,
+            help = "The distribution of number of bookies reordered on each 
read request"
     )
     protected OpStatsLogger readReorderedCounter = null;
     @StatsDoc(
@@ -257,9 +143,6 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
     RackawareEnsemblePlacementPolicyImpl(boolean enforceDurability) {
         this.enforceDurability = enforceDurability;
         topology = new NetworkTopologyImpl();
-        knownBookies = new HashMap<BookieSocketAddress, BookieNode>();
-
-        rwLock = new ReentrantReadWriteLock();
     }
 
     protected BookieNode createBookieNode(BookieSocketAddress addr) {
@@ -341,7 +224,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
         this.isWeighted = isWeighted;
         if (this.isWeighted) {
             this.maxWeightMultiple = maxWeightMultiple;
-            this.weightedSelection = new 
WeightedRandomSelection<BookieNode>(this.maxWeightMultiple);
+            this.weightedSelection = new 
WeightedRandomSelectionImpl<BookieNode>(this.maxWeightMultiple);
             LOG.info("Weight based placement with max multiple of " + 
this.maxWeightMultiple);
         } else {
             LOG.info("Not weighted");
@@ -553,14 +436,6 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
         return nodes;
     }
 
-    private static Set<String> getNetworkLocations(Set<Node> bookieNodes) {
-        Set<String> networkLocs = new HashSet<>();
-        for (Node bookieNode : bookieNodes) {
-            networkLocs.add(bookieNode.getNetworkLocation());
-        }
-        return networkLocs;
-    }
-
     /*
      * this method should be called in readlock scope of 'rwLock'
      */
@@ -883,7 +758,8 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
             return null;
         }
 
-        WeightedRandomSelection<BookieNode> wRSelection = new 
WeightedRandomSelection<BookieNode>(maxWeightMultiple);
+        WeightedRandomSelection<BookieNode> wRSelection = new 
WeightedRandomSelectionImpl<BookieNode>(
+                maxWeightMultiple);
         wRSelection.updateMap(rackMap);
         return wRSelection;
     }
@@ -1000,7 +876,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
                         rackMap.put(n, new BookieInfo());
                     }
                 }
-                wRSelection = new 
WeightedRandomSelection<BookieNode>(this.maxWeightMultiple);
+                wRSelection = new 
WeightedRandomSelectionImpl<BookieNode>(this.maxWeightMultiple);
                 wRSelection.updateMap(rackMap);
             }
         } else {
@@ -1287,30 +1163,6 @@ public class RackawareEnsemblePlacementPolicyImpl 
extends TopologyAwareEnsembleP
         return writeSet;
     }
 
-    /**
-     * Shuffle all the entries of an array that matches a mask.
-     * It assumes all entries with the same mask are contiguous in the array.
-     */
-    static void shuffleWithMask(DistributionSchedule.WriteSet writeSet,
-                                int mask, int bits) {
-        int first = -1;
-        int last = -1;
-        for (int i = 0; i < writeSet.size(); i++) {
-            if ((writeSet.get(i) & bits) == mask) {
-                if (first == -1) {
-                    first = i;
-                }
-                last = i;
-            }
-        }
-        if (first != -1) {
-            for (int i = last + 1; i > first; i--) {
-                int swapWith = ThreadLocalRandom.current().nextInt(i);
-                writeSet.set(swapWith, writeSet.set(i, 
writeSet.get(swapWith)));
-            }
-        }
-    }
-
     // this method should be called in readlock scope of 'rwlock'
     @Override
     public boolean 
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int 
writeQuorumSize,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
index 7fa7555..355632b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
@@ -17,19 +17,38 @@
  */
 package org.apache.bookkeeper.client;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_JOINED;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_LEFT;
+import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER;
+
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
 
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.net.NetUtils;
 import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.net.Node;
 import org.apache.bookkeeper.net.NodeBase;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.annotations.StatsDoc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,15 +56,34 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
         
ITopologyAwareEnsemblePlacementPolicy<TopologyAwareEnsemblePlacementPolicy.BookieNode>
 {
     static final Logger LOG = 
LoggerFactory.getLogger(TopologyAwareEnsemblePlacementPolicy.class);
 
-    protected static class TruePredicate implements Predicate<BookieNode> {
+    protected final Map<BookieSocketAddress, BookieNode> knownBookies = new 
HashMap<BookieSocketAddress, BookieNode>();
+    protected final ReentrantReadWriteLock rwLock = new 
ReentrantReadWriteLock();
+    protected Map<BookieNode, WeightedObject> bookieInfoMap = new 
HashMap<BookieNode, WeightedObject>();
+    // Initialize to empty set
+    protected ImmutableSet<BookieSocketAddress> readOnlyBookies = 
ImmutableSet.of();
+    boolean isWeighted;
+    protected WeightedRandomSelection<BookieNode> weightedSelection;
+    // for now, we just maintain the writable bookies' topology
+    protected NetworkTopology topology;
+    protected DNSToSwitchMapping dnsResolver;
+    @StatsDoc(
+            name = BOOKIES_JOINED,
+            help = "The distribution of number of bookies joined the cluster 
on each network topology change"
+    )
+    protected OpStatsLogger bookiesJoinedCounter = null;
+    @StatsDoc(
+        name = BOOKIES_LEFT,
+        help = "The distribution of number of bookies left the cluster on each 
network topology change"
+    )
+    protected OpStatsLogger bookiesLeftCounter = null;
 
+    protected static class TruePredicate implements Predicate<BookieNode> {
         public static final TruePredicate INSTANCE = new TruePredicate();
 
         @Override
         public boolean apply(BookieNode candidate, Ensemble chosenNodes) {
             return true;
         }
-
     }
 
     protected static class EnsembleForReplacementWithNoConstraints implements 
Ensemble<BookieNode> {
@@ -487,6 +525,129 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
         }
     }
 
+    static class DefaultResolver implements DNSToSwitchMapping {
+
+        final Supplier<String> defaultRackSupplier;
+
+        public DefaultResolver(Supplier<String> defaultRackSupplier) {
+            checkNotNull(defaultRackSupplier, "defaultRackSupplier should not 
be null");
+            this.defaultRackSupplier = defaultRackSupplier;
+        }
+
+        @Override
+        public List<String> resolve(List<String> names) {
+            List<String> rNames = new ArrayList<String>(names.size());
+            for (@SuppressWarnings("unused") String name : names) {
+                final String defaultRack = defaultRackSupplier.get();
+                checkNotNull(defaultRack, "defaultRack cannot be null");
+                rNames.add(defaultRack);
+            }
+            return rNames;
+        }
+
+        @Override
+        public void reloadCachedMappings() {
+            // nop
+        }
+    }
+
+    /**
+     * Decorator for any existing dsn resolver.
+     * Backfills returned data with appropriate default rack info.
+     */
+    static class DNSResolverDecorator implements DNSToSwitchMapping {
+
+        final Supplier<String> defaultRackSupplier;
+        final DNSToSwitchMapping resolver;
+        @StatsDoc(
+                name = FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER,
+                help = "total number of times Resolver failed to resolve rack 
information of a node"
+        )
+        final Counter failedToResolveNetworkLocationCounter;
+
+        DNSResolverDecorator(DNSToSwitchMapping resolver, Supplier<String> 
defaultRackSupplier,
+                Counter failedToResolveNetworkLocationCounter) {
+            checkNotNull(resolver, "Resolver cannot be null");
+            checkNotNull(defaultRackSupplier, "defaultRackSupplier should not 
be null");
+            this.defaultRackSupplier = defaultRackSupplier;
+            this.resolver = resolver;
+            this.failedToResolveNetworkLocationCounter = 
failedToResolveNetworkLocationCounter;
+        }
+
+        public List<String> resolve(List<String> names) {
+            if (names == null) {
+                return Collections.emptyList();
+            }
+            final String defaultRack = defaultRackSupplier.get();
+            checkNotNull(defaultRack, "Default rack cannot be null");
+
+            List<String> rNames = resolver.resolve(names);
+            if (rNames != null && rNames.size() == names.size()) {
+                for (int i = 0; i < rNames.size(); ++i) {
+                    if (rNames.get(i) == null) {
+                        LOG.warn("Failed to resolve network location for {}, 
using default rack for it : {}.",
+                                names.get(i), defaultRack);
+                        failedToResolveNetworkLocationCounter.inc();
+                        rNames.set(i, defaultRack);
+                    }
+                }
+                return rNames;
+            }
+
+            LOG.warn("Failed to resolve network location for {}, using default 
rack for them : {}.", names,
+                    defaultRack);
+            rNames = new ArrayList<>(names.size());
+
+            for (int i = 0; i < names.size(); ++i) {
+                failedToResolveNetworkLocationCounter.inc();
+                rNames.add(defaultRack);
+            }
+            return rNames;
+        }
+
+        @Override
+        public boolean useHostName() {
+            return resolver.useHostName();
+        }
+
+        @Override
+        public void reloadCachedMappings() {
+            resolver.reloadCachedMappings();
+        }
+    }
+
+    static Set<String> getNetworkLocations(Set<Node> bookieNodes) {
+        Set<String> networkLocs = new HashSet<>();
+        for (Node bookieNode : bookieNodes) {
+            networkLocs.add(bookieNode.getNetworkLocation());
+        }
+        return networkLocs;
+    }
+
+    /**
+     * Shuffle all the entries of an array that matches a mask.
+     * It assumes all entries with the same mask are contiguous in the array.
+     */
+    static void shuffleWithMask(DistributionSchedule.WriteSet writeSet,
+                                int mask, int bits) {
+        int first = -1;
+        int last = -1;
+        for (int i = 0; i < writeSet.size(); i++) {
+            if ((writeSet.get(i) & bits) == mask) {
+                if (first == -1) {
+                    first = i;
+                }
+                last = i;
+            }
+        }
+        if (first != -1) {
+            for (int i = last + 1; i > first; i--) {
+                int swapWith = ThreadLocalRandom.current().nextInt(i);
+                writeSet.set(swapWith, writeSet.set(i, 
writeSet.get(swapWith)));
+            }
+        }
+    }
+
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
             List<BookieSocketAddress> ensemble,
@@ -505,4 +666,104 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
         retList.addMissingIndices(ensemble.size());
         return retList;
     }
+
+    @Override
+    public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> 
writableBookies,
+            Set<BookieSocketAddress> readOnlyBookies) {
+        rwLock.writeLock().lock();
+        try {
+            ImmutableSet<BookieSocketAddress> joinedBookies, leftBookies, 
deadBookies;
+            Set<BookieSocketAddress> oldBookieSet = knownBookies.keySet();
+            // left bookies : bookies in known bookies, but not in new 
writable bookie cluster.
+            leftBookies = Sets.difference(oldBookieSet, 
writableBookies).immutableCopy();
+            // joined bookies : bookies in new writable bookie cluster, but 
not in known bookies
+            joinedBookies = Sets.difference(writableBookies, 
oldBookieSet).immutableCopy();
+            // dead bookies.
+            deadBookies = Sets.difference(leftBookies, 
readOnlyBookies).immutableCopy();
+            LOG.debug("Cluster changed : left bookies are {}, joined bookies 
are {}, while dead bookies are {}.",
+                    leftBookies, joinedBookies, deadBookies);
+            handleBookiesThatLeft(leftBookies);
+            handleBookiesThatJoined(joinedBookies);
+            if (this.isWeighted && (leftBookies.size() > 0 || 
joinedBookies.size() > 0)) {
+                this.weightedSelection.updateMap(this.bookieInfoMap);
+            }
+            if (!readOnlyBookies.isEmpty()) {
+                this.readOnlyBookies = ImmutableSet.copyOf(readOnlyBookies);
+            }
+
+            return deadBookies;
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /*
+     * this method should be called in writelock scope of 'rwLock'
+     */
+    @Override
+    public void handleBookiesThatLeft(Set<BookieSocketAddress> leftBookies) {
+        for (BookieSocketAddress addr : leftBookies) {
+            try {
+                BookieNode node = knownBookies.remove(addr);
+                if (null != node) {
+                    topology.remove(node);
+                    if (this.isWeighted) {
+                        this.bookieInfoMap.remove(node);
+                    }
+
+                    bookiesLeftCounter.registerSuccessfulValue(1L);
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Cluster changed : bookie {} left from 
cluster.", addr);
+                    }
+                }
+            } catch (Throwable t) {
+                LOG.error("Unexpected exception while handling leaving bookie 
{}", addr, t);
+                if (bookiesLeftCounter != null) {
+                    bookiesLeftCounter.registerFailedValue(1L);
+                }
+                // no need to re-throw; we want to process the rest of the 
bookies
+                // exception anyways will be caught/logged/suppressed in the 
ZK's event handler
+            }
+        }
+    }
+
+    /*
+     * this method should be called in writelock scope of 'rwLock'
+     */
+    @Override
+    public void handleBookiesThatJoined(Set<BookieSocketAddress> 
joinedBookies) {
+        // node joined
+        for (BookieSocketAddress addr : joinedBookies) {
+            try {
+                BookieNode node = createBookieNode(addr);
+                topology.add(node);
+                knownBookies.put(addr, node);
+                if (this.isWeighted) {
+                    this.bookieInfoMap.putIfAbsent(node, new BookieInfo());
+                }
+
+                bookiesJoinedCounter.registerSuccessfulValue(1L);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Cluster changed : bookie {} joined the 
cluster.", addr);
+                }
+            } catch (Throwable t) {
+                // topology.add() throws unchecked exception
+                LOG.error("Unexpected exception while handling joining bookie 
{}", addr, t);
+
+                bookiesJoinedCounter.registerFailedValue(1L);
+                // no need to re-throw; we want to process the rest of the 
bookies
+                // exception anyways will be caught/logged/suppressed in the 
ZK's event handler
+            }
+        }
+    }
+
+    protected BookieNode createBookieNode(BookieSocketAddress addr) {
+        return new BookieNode(addr, resolveNetworkLocation(addr));
+    }
+
+    protected String resolveNetworkLocation(BookieSocketAddress addr) {
+        return NetUtils.resolveNetworkLocation(dnsResolver, 
addr.getSocketAddress());
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java
index 0404250..8a44174 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java
@@ -18,146 +18,19 @@
 
 package org.apache.bookkeeper.client;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Collection;
 import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class WeightedRandomSelection<T> {
-    static final Logger LOG = 
LoggerFactory.getLogger(WeightedRandomSelection.class);
 
+interface WeightedRandomSelection<T> {
     interface WeightedObject {
         long getWeight();
     }
-    Double randomMax;
-    int maxProbabilityMultiplier;
-    Map<T, WeightedObject> map;
-    TreeMap<Double, T> cummulativeMap = new TreeMap<Double, T>();
-    ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
-
-    WeightedRandomSelection() {
-        maxProbabilityMultiplier = -1;
-    }
-
-    WeightedRandomSelection(int maxMultiplier) {
-        this.maxProbabilityMultiplier = maxMultiplier;
-    }
-
-    public void setMaxProbabilityMultiplier(int max) {
-        this.maxProbabilityMultiplier = max;
-    }
-
-    void updateMap(Map<T, WeightedObject> map) {
-        // get the sum total of all the values; this will be used to
-        // calculate the weighted probability later on
-        Long totalWeight = 0L, min = Long.MAX_VALUE;
-        List<WeightedObject> values = new 
ArrayList<WeightedObject>(map.values());
-        Collections.sort(values, new Comparator<WeightedObject>() {
-            public int compare(WeightedObject o1, WeightedObject o2) {
-                long diff = o1.getWeight() - o2.getWeight();
-                if (diff < 0L) {
-                    return -1;
-                } else if (diff > 0L) {
-                    return 1;
-                } else {
-                    return 0;
-                }
-            }
-        });
-        for (int i = 0; i < values.size(); i++) {
-            totalWeight += values.get(i).getWeight();
-            if (values.get(i).getWeight() != 0 && min > 
values.get(i).getWeight()) {
-                min = values.get(i).getWeight();
-            }
-        }
-
-        double median = 0;
-        if (totalWeight == 0) {
-            // all the values are zeros; assign a value of 1 to all and the 
totalWeight equal
-            // to the size of the values
-            min = 1L;
-            median = 1;
-            totalWeight = (long) values.size();
-        } else {
-            int mid = values.size() / 2;
-            if ((values.size() % 2) == 1) {
-                median = values.get(mid).getWeight();
-            } else {
-                median = (double) (values.get(mid - 1).getWeight() + 
values.get(mid).getWeight()) / 2;
-            }
-        }
-
-        double medianWeight, minWeight;
-        medianWeight = median / (double) totalWeight;
-        minWeight = (double) min / totalWeight;
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Updating weights map. MediaWeight: {} MinWeight: {}", 
medianWeight, minWeight);
-        }
+    void updateMap(Map<T, WeightedObject> map);
 
-        double maxWeight = maxProbabilityMultiplier * medianWeight;
-        Map<T, Double> weightMap = new HashMap<T, Double>();
-        for (Map.Entry<T, WeightedObject> e : map.entrySet()) {
-            double weightedProbability;
-            if (e.getValue().getWeight() > 0) {
-                weightedProbability = (double) e.getValue().getWeight() / 
(double) totalWeight;
-            } else {
-                weightedProbability = minWeight;
-            }
-            if (maxWeight > 0 && weightedProbability > maxWeight) {
-                weightedProbability = maxWeight;
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Capping the probability to {} for {} Value: {}",
-                            weightedProbability, e.getKey(), e.getValue());
-                }
-            }
-            weightMap.put(e.getKey(), weightedProbability);
-        }
+    T getNextRandom();
 
-        // The probability of picking a bookie randomly is 
defaultPickProbability
-        // but we change that priority by looking at the weight that each 
bookie
-        // carries.
-        TreeMap<Double, T> tmpCummulativeMap = new TreeMap<Double, T>();
-        Double key = 0.0;
-        for (Map.Entry<T, Double> e : weightMap.entrySet()) {
-            tmpCummulativeMap.put(key, e.getKey());
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Key: {} Value: {} AssignedKey: {} AssignedWeight: 
{}",
-                        e.getKey(), e.getValue(), key, e.getValue());
-            }
-            key += e.getValue();
-        }
+    T getNextRandom(Collection<T> selectedNodes);
 
-        rwLock.writeLock().lock();
-        try {
-            this.map = map;
-            cummulativeMap = tmpCummulativeMap;
-            randomMax = key;
-        } finally {
-            rwLock.writeLock().unlock();
-        }
-    }
-
-    T getNextRandom() {
-        rwLock.readLock().lock();
-        try {
-            // pick a random number between 0 and randMax
-            Double randomNum = randomMax * Math.random();
-            // find the nearest key in the map corresponding to the randomNum
-            Double key = cummulativeMap.floorKey(randomNum);
-            //LOG.info("Random max: {} CummulativeMap size: {} selected key: 
{}", randomMax, cummulativeMap.size(),
-            //    key);
-            return cummulativeMap.get(key);
-        } finally {
-            rwLock.readLock().unlock();
-        }
-    }
+    void setMaxProbabilityMultiplier(int max);
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelectionImpl.java
similarity index 91%
copy from 
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java
copy to 
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelectionImpl.java
index 0404250..53130a2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelectionImpl.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.client;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -31,31 +32,25 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class WeightedRandomSelection<T> {
-    static final Logger LOG = 
LoggerFactory.getLogger(WeightedRandomSelection.class);
+class WeightedRandomSelectionImpl<T> implements WeightedRandomSelection<T> {
+    static final Logger LOG = 
LoggerFactory.getLogger(WeightedRandomSelectionImpl.class);
 
-    interface WeightedObject {
-        long getWeight();
-    }
     Double randomMax;
     int maxProbabilityMultiplier;
     Map<T, WeightedObject> map;
     TreeMap<Double, T> cummulativeMap = new TreeMap<Double, T>();
     ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
 
-    WeightedRandomSelection() {
+    WeightedRandomSelectionImpl() {
         maxProbabilityMultiplier = -1;
     }
 
-    WeightedRandomSelection(int maxMultiplier) {
+    WeightedRandomSelectionImpl(int maxMultiplier) {
         this.maxProbabilityMultiplier = maxMultiplier;
     }
 
-    public void setMaxProbabilityMultiplier(int max) {
-        this.maxProbabilityMultiplier = max;
-    }
-
-    void updateMap(Map<T, WeightedObject> map) {
+    @Override
+    public void updateMap(Map<T, WeightedObject> map) {
         // get the sum total of all the values; this will be used to
         // calculate the weighted probability later on
         Long totalWeight = 0L, min = Long.MAX_VALUE;
@@ -146,7 +141,8 @@ class WeightedRandomSelection<T> {
         }
     }
 
-    T getNextRandom() {
+    @Override
+    public T getNextRandom() {
         rwLock.readLock().lock();
         try {
             // pick a random number between 0 and randMax
@@ -160,4 +156,15 @@ class WeightedRandomSelection<T> {
             rwLock.readLock().unlock();
         }
     }
+
+    @Override
+    public void setMaxProbabilityMultiplier(int max) {
+        this.maxProbabilityMultiplier = max;
+    }
+
+    @Override
+    public T getNextRandom(Collection<T> selectedNodes) {
+        throw new UnsupportedOperationException("getNextRandom is not 
implemented for WeightedRandomSelectionImpl");
+    }
+
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWeightedRandomSelection.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWeightedRandomSelection.java
index fe8f74b..2aa3c0e 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWeightedRandomSelection.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWeightedRandomSelection.java
@@ -59,7 +59,7 @@ public class TestWeightedRandomSelection {
 
     @Before
     public void setUp() throws Exception {
-        wRS = new WeightedRandomSelection<String>();
+        wRS = new WeightedRandomSelectionImpl<String>();
     }
 
     @After

Reply via email to