This is an automated email from the ASF dual-hosted git repository.
reddycharan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new b4ca453 Move common placementpolicy components to
TopologyAwareEnsemblePlacementPolicy.
b4ca453 is described below
commit b4ca4537b62e75f71e3329186652f982025175a2
Author: Charan Reddy Guttapalem <[email protected]>
AuthorDate: Fri May 10 09:59:17 2019 -0700
Move common placementpolicy components to
TopologyAwareEnsemblePlacementPolicy.
Descriptions of the changes in this PR:
- Moving components/methods which are common in nature with respect to
placementpolicy from RackawareEnsemblePlacementPolicyImpl to
TopologyAwareEnsemblePlacementPolicy, so that any new placementpolicy
implementation can extend TopologyAwareEnsemblePlacementPolicy and reuse
those common components/generic.
- This change has no functionality change, it is just reorganizing code.
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #2089 from reddycharan/3azplacement
---
.../client/DefaultEnsemblePlacementPolicy.java | 2 +-
.../RackawareEnsemblePlacementPolicyImpl.java | 162 +------------
.../TopologyAwareEnsemblePlacementPolicy.java | 265 ++++++++++++++++++++-
.../bookkeeper/client/WeightedRandomSelection.java | 139 +----------
...ction.java => WeightedRandomSelectionImpl.java} | 33 ++-
.../client/TestWeightedRandomSelection.java | 2 +-
6 files changed, 298 insertions(+), 305 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index c130b5d..f6bb1af 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -195,7 +195,7 @@ public class DefaultEnsemblePlacementPolicy implements
EnsemblePlacementPolicy {
this.isWeighted = conf.getDiskWeightBasedPlacementEnabled();
if (this.isWeighted) {
this.maxWeightMultiple =
conf.getBookieMaxWeightMultipleForWeightBasedPlacement();
- this.weightedSelection = new
WeightedRandomSelection<BookieSocketAddress>(this.maxWeightMultiple);
+ this.weightedSelection = new
WeightedRandomSelectionImpl<BookieSocketAddress>(this.maxWeightMultiple);
}
return this;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index ed35d49..be06e13 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -46,10 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Supplier;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
@@ -88,10 +85,7 @@ import org.slf4j.LoggerFactory;
public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsemblePlacementPolicy {
static final Logger LOG =
LoggerFactory.getLogger(RackawareEnsemblePlacementPolicyImpl.class);
- boolean isWeighted;
int maxWeightMultiple;
- private Map<BookieNode, WeightedObject> bookieInfoMap = new
HashMap<BookieNode, WeightedObject>();
- private WeightedRandomSelection<BookieNode> weightedSelection;
protected int minNumRacksPerWriteQuorum;
protected boolean enforceMinNumRacksPerWriteQuorum;
@@ -112,109 +106,10 @@ public class RackawareEnsemblePlacementPolicyImpl
extends TopologyAwareEnsembleP
static final int UNAVAIL_MASK = 0x40 << 24;
static final int MASK_BITS = 0xFFF << 20;
- static class DefaultResolver implements DNSToSwitchMapping {
-
- final Supplier<String> defaultRackSupplier;
-
- public DefaultResolver(Supplier<String> defaultRackSupplier) {
- checkNotNull(defaultRackSupplier, "defaultRackSupplier should not
be null");
- this.defaultRackSupplier = defaultRackSupplier;
- }
-
- @Override
- public List<String> resolve(List<String> names) {
- List<String> rNames = new ArrayList<String>(names.size());
- for (@SuppressWarnings("unused") String name : names) {
- final String defaultRack = defaultRackSupplier.get();
- checkNotNull(defaultRack, "defaultRack cannot be null");
- rNames.add(defaultRack);
- }
- return rNames;
- }
-
- @Override
- public void reloadCachedMappings() {
- // nop
- }
-
- }
-
- /**
- * Decorator for any existing dsn resolver.
- * Backfills returned data with appropriate default rack info.
- */
- static class DNSResolverDecorator implements DNSToSwitchMapping {
-
- final Supplier<String> defaultRackSupplier;
- final DNSToSwitchMapping resolver;
- @StatsDoc(
- name = FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER,
- help = "total number of times Resolver failed to resolve rack
information of a node"
- )
- final Counter failedToResolveNetworkLocationCounter;
-
- DNSResolverDecorator(DNSToSwitchMapping resolver, Supplier<String>
defaultRackSupplier,
- Counter failedToResolveNetworkLocationCounter) {
- checkNotNull(resolver, "Resolver cannot be null");
- checkNotNull(defaultRackSupplier, "defaultRackSupplier should not
be null");
- this.defaultRackSupplier = defaultRackSupplier;
- this.resolver = resolver;
- this.failedToResolveNetworkLocationCounter =
failedToResolveNetworkLocationCounter;
- }
-
- public List<String> resolve(List<String> names) {
- if (names == null) {
- return Collections.emptyList();
- }
- final String defaultRack = defaultRackSupplier.get();
- checkNotNull(defaultRack, "Default rack cannot be null");
-
- List<String> rNames = resolver.resolve(names);
- if (rNames != null && rNames.size() == names.size()) {
- for (int i = 0; i < rNames.size(); ++i) {
- if (rNames.get(i) == null) {
- LOG.warn("Failed to resolve network location for {},
using default rack for it : {}.",
- names.get(i), defaultRack);
- failedToResolveNetworkLocationCounter.inc();
- rNames.set(i, defaultRack);
- }
- }
- return rNames;
- }
-
- LOG.warn("Failed to resolve network location for {}, using default
rack for them : {}.", names,
- defaultRack);
- rNames = new ArrayList<>(names.size());
-
- for (int i = 0; i < names.size(); ++i) {
- failedToResolveNetworkLocationCounter.inc();
- rNames.add(defaultRack);
- }
- return rNames;
- }
-
- @Override
- public boolean useHostName() {
- return resolver.useHostName();
- }
-
- @Override
- public void reloadCachedMappings() {
- resolver.reloadCachedMappings();
- }
- }
-
- // for now, we just maintain the writable bookies' topology
- protected NetworkTopology topology;
- protected DNSToSwitchMapping dnsResolver;
protected HashedWheelTimer timer;
- protected final Map<BookieSocketAddress, BookieNode> knownBookies;
// Use a loading cache so slow bookies are expired. Use entryId as values.
protected Cache<BookieSocketAddress, Long> slowBookies;
protected BookieNode localNode;
- protected final ReentrantReadWriteLock rwLock;
- // Initialize to empty set
- protected ImmutableSet<BookieSocketAddress> readOnlyBookies =
ImmutableSet.of();
protected boolean reorderReadsRandom = false;
protected boolean enforceDurability = false;
protected int stabilizePeriodSeconds = 0;
@@ -222,19 +117,10 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
// looks like these only assigned in the same thread as constructor,
immediately after constructor;
// no need to make volatile
protected StatsLogger statsLogger = null;
+
@StatsDoc(
- name = BOOKIES_JOINED,
- help = "The distribution of number of bookies joined the cluster on
each network topology change"
- )
- protected OpStatsLogger bookiesJoinedCounter = null;
- @StatsDoc(
- name = BOOKIES_LEFT,
- help = "The distribution of number of bookies left the cluster on each
network topology change"
- )
- protected OpStatsLogger bookiesLeftCounter = null;
- @StatsDoc(
- name = READ_REQUESTS_REORDERED,
- help = "The distribution of number of bookies reordered on each read
request"
+ name = READ_REQUESTS_REORDERED,
+ help = "The distribution of number of bookies reordered on each
read request"
)
protected OpStatsLogger readReorderedCounter = null;
@StatsDoc(
@@ -257,9 +143,6 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
RackawareEnsemblePlacementPolicyImpl(boolean enforceDurability) {
this.enforceDurability = enforceDurability;
topology = new NetworkTopologyImpl();
- knownBookies = new HashMap<BookieSocketAddress, BookieNode>();
-
- rwLock = new ReentrantReadWriteLock();
}
protected BookieNode createBookieNode(BookieSocketAddress addr) {
@@ -341,7 +224,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
this.isWeighted = isWeighted;
if (this.isWeighted) {
this.maxWeightMultiple = maxWeightMultiple;
- this.weightedSelection = new
WeightedRandomSelection<BookieNode>(this.maxWeightMultiple);
+ this.weightedSelection = new
WeightedRandomSelectionImpl<BookieNode>(this.maxWeightMultiple);
LOG.info("Weight based placement with max multiple of " +
this.maxWeightMultiple);
} else {
LOG.info("Not weighted");
@@ -553,14 +436,6 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
return nodes;
}
- private static Set<String> getNetworkLocations(Set<Node> bookieNodes) {
- Set<String> networkLocs = new HashSet<>();
- for (Node bookieNode : bookieNodes) {
- networkLocs.add(bookieNode.getNetworkLocation());
- }
- return networkLocs;
- }
-
/*
* this method should be called in readlock scope of 'rwLock'
*/
@@ -883,7 +758,8 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
return null;
}
- WeightedRandomSelection<BookieNode> wRSelection = new
WeightedRandomSelection<BookieNode>(maxWeightMultiple);
+ WeightedRandomSelection<BookieNode> wRSelection = new
WeightedRandomSelectionImpl<BookieNode>(
+ maxWeightMultiple);
wRSelection.updateMap(rackMap);
return wRSelection;
}
@@ -1000,7 +876,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
rackMap.put(n, new BookieInfo());
}
}
- wRSelection = new
WeightedRandomSelection<BookieNode>(this.maxWeightMultiple);
+ wRSelection = new
WeightedRandomSelectionImpl<BookieNode>(this.maxWeightMultiple);
wRSelection.updateMap(rackMap);
}
} else {
@@ -1287,30 +1163,6 @@ public class RackawareEnsemblePlacementPolicyImpl
extends TopologyAwareEnsembleP
return writeSet;
}
- /**
- * Shuffle all the entries of an array that matches a mask.
- * It assumes all entries with the same mask are contiguous in the array.
- */
- static void shuffleWithMask(DistributionSchedule.WriteSet writeSet,
- int mask, int bits) {
- int first = -1;
- int last = -1;
- for (int i = 0; i < writeSet.size(); i++) {
- if ((writeSet.get(i) & bits) == mask) {
- if (first == -1) {
- first = i;
- }
- last = i;
- }
- }
- if (first != -1) {
- for (int i = last + 1; i > first; i--) {
- int swapWith = ThreadLocalRandom.current().nextInt(i);
- writeSet.set(swapWith, writeSet.set(i,
writeSet.get(swapWith)));
- }
- }
- }
-
// this method should be called in readlock scope of 'rwlock'
@Override
public boolean
isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int
writeQuorumSize,
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
index 7fa7555..355632b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
@@ -17,19 +17,38 @@
*/
package org.apache.bookkeeper.client;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_JOINED;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_LEFT;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER;
+
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.net.NetUtils;
import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.net.Node;
import org.apache.bookkeeper.net.NodeBase;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,15 +56,34 @@ abstract class TopologyAwareEnsemblePlacementPolicy
implements
ITopologyAwareEnsemblePlacementPolicy<TopologyAwareEnsemblePlacementPolicy.BookieNode>
{
static final Logger LOG =
LoggerFactory.getLogger(TopologyAwareEnsemblePlacementPolicy.class);
- protected static class TruePredicate implements Predicate<BookieNode> {
+ protected final Map<BookieSocketAddress, BookieNode> knownBookies = new
HashMap<BookieSocketAddress, BookieNode>();
+ protected final ReentrantReadWriteLock rwLock = new
ReentrantReadWriteLock();
+ protected Map<BookieNode, WeightedObject> bookieInfoMap = new
HashMap<BookieNode, WeightedObject>();
+ // Initialize to empty set
+ protected ImmutableSet<BookieSocketAddress> readOnlyBookies =
ImmutableSet.of();
+ boolean isWeighted;
+ protected WeightedRandomSelection<BookieNode> weightedSelection;
+ // for now, we just maintain the writable bookies' topology
+ protected NetworkTopology topology;
+ protected DNSToSwitchMapping dnsResolver;
+ @StatsDoc(
+ name = BOOKIES_JOINED,
+ help = "The distribution of number of bookies joined the cluster
on each network topology change"
+ )
+ protected OpStatsLogger bookiesJoinedCounter = null;
+ @StatsDoc(
+ name = BOOKIES_LEFT,
+ help = "The distribution of number of bookies left the cluster on each
network topology change"
+ )
+ protected OpStatsLogger bookiesLeftCounter = null;
+ protected static class TruePredicate implements Predicate<BookieNode> {
public static final TruePredicate INSTANCE = new TruePredicate();
@Override
public boolean apply(BookieNode candidate, Ensemble chosenNodes) {
return true;
}
-
}
protected static class EnsembleForReplacementWithNoConstraints implements
Ensemble<BookieNode> {
@@ -487,6 +525,129 @@ abstract class TopologyAwareEnsemblePlacementPolicy
implements
}
}
+ static class DefaultResolver implements DNSToSwitchMapping {
+
+ final Supplier<String> defaultRackSupplier;
+
+ public DefaultResolver(Supplier<String> defaultRackSupplier) {
+ checkNotNull(defaultRackSupplier, "defaultRackSupplier should not
be null");
+ this.defaultRackSupplier = defaultRackSupplier;
+ }
+
+ @Override
+ public List<String> resolve(List<String> names) {
+ List<String> rNames = new ArrayList<String>(names.size());
+ for (@SuppressWarnings("unused") String name : names) {
+ final String defaultRack = defaultRackSupplier.get();
+ checkNotNull(defaultRack, "defaultRack cannot be null");
+ rNames.add(defaultRack);
+ }
+ return rNames;
+ }
+
+ @Override
+ public void reloadCachedMappings() {
+ // nop
+ }
+ }
+
+ /**
+ * Decorator for any existing dsn resolver.
+ * Backfills returned data with appropriate default rack info.
+ */
+ static class DNSResolverDecorator implements DNSToSwitchMapping {
+
+ final Supplier<String> defaultRackSupplier;
+ final DNSToSwitchMapping resolver;
+ @StatsDoc(
+ name = FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER,
+ help = "total number of times Resolver failed to resolve rack
information of a node"
+ )
+ final Counter failedToResolveNetworkLocationCounter;
+
+ DNSResolverDecorator(DNSToSwitchMapping resolver, Supplier<String>
defaultRackSupplier,
+ Counter failedToResolveNetworkLocationCounter) {
+ checkNotNull(resolver, "Resolver cannot be null");
+ checkNotNull(defaultRackSupplier, "defaultRackSupplier should not
be null");
+ this.defaultRackSupplier = defaultRackSupplier;
+ this.resolver = resolver;
+ this.failedToResolveNetworkLocationCounter =
failedToResolveNetworkLocationCounter;
+ }
+
+ public List<String> resolve(List<String> names) {
+ if (names == null) {
+ return Collections.emptyList();
+ }
+ final String defaultRack = defaultRackSupplier.get();
+ checkNotNull(defaultRack, "Default rack cannot be null");
+
+ List<String> rNames = resolver.resolve(names);
+ if (rNames != null && rNames.size() == names.size()) {
+ for (int i = 0; i < rNames.size(); ++i) {
+ if (rNames.get(i) == null) {
+ LOG.warn("Failed to resolve network location for {},
using default rack for it : {}.",
+ names.get(i), defaultRack);
+ failedToResolveNetworkLocationCounter.inc();
+ rNames.set(i, defaultRack);
+ }
+ }
+ return rNames;
+ }
+
+ LOG.warn("Failed to resolve network location for {}, using default
rack for them : {}.", names,
+ defaultRack);
+ rNames = new ArrayList<>(names.size());
+
+ for (int i = 0; i < names.size(); ++i) {
+ failedToResolveNetworkLocationCounter.inc();
+ rNames.add(defaultRack);
+ }
+ return rNames;
+ }
+
+ @Override
+ public boolean useHostName() {
+ return resolver.useHostName();
+ }
+
+ @Override
+ public void reloadCachedMappings() {
+ resolver.reloadCachedMappings();
+ }
+ }
+
+ static Set<String> getNetworkLocations(Set<Node> bookieNodes) {
+ Set<String> networkLocs = new HashSet<>();
+ for (Node bookieNode : bookieNodes) {
+ networkLocs.add(bookieNode.getNetworkLocation());
+ }
+ return networkLocs;
+ }
+
+ /**
+ * Shuffle all the entries of an array that matches a mask.
+ * It assumes all entries with the same mask are contiguous in the array.
+ */
+ static void shuffleWithMask(DistributionSchedule.WriteSet writeSet,
+ int mask, int bits) {
+ int first = -1;
+ int last = -1;
+ for (int i = 0; i < writeSet.size(); i++) {
+ if ((writeSet.get(i) & bits) == mask) {
+ if (first == -1) {
+ first = i;
+ }
+ last = i;
+ }
+ }
+ if (first != -1) {
+ for (int i = last + 1; i > first; i--) {
+ int swapWith = ThreadLocalRandom.current().nextInt(i);
+ writeSet.set(swapWith, writeSet.set(i,
writeSet.get(swapWith)));
+ }
+ }
+ }
+
@Override
public DistributionSchedule.WriteSet reorderReadSequence(
List<BookieSocketAddress> ensemble,
@@ -505,4 +666,104 @@ abstract class TopologyAwareEnsemblePlacementPolicy
implements
retList.addMissingIndices(ensemble.size());
return retList;
}
+
+ @Override
+ public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress>
writableBookies,
+ Set<BookieSocketAddress> readOnlyBookies) {
+ rwLock.writeLock().lock();
+ try {
+ ImmutableSet<BookieSocketAddress> joinedBookies, leftBookies,
deadBookies;
+ Set<BookieSocketAddress> oldBookieSet = knownBookies.keySet();
+ // left bookies : bookies in known bookies, but not in new
writable bookie cluster.
+ leftBookies = Sets.difference(oldBookieSet,
writableBookies).immutableCopy();
+ // joined bookies : bookies in new writable bookie cluster, but
not in known bookies
+ joinedBookies = Sets.difference(writableBookies,
oldBookieSet).immutableCopy();
+ // dead bookies.
+ deadBookies = Sets.difference(leftBookies,
readOnlyBookies).immutableCopy();
+ LOG.debug("Cluster changed : left bookies are {}, joined bookies
are {}, while dead bookies are {}.",
+ leftBookies, joinedBookies, deadBookies);
+ handleBookiesThatLeft(leftBookies);
+ handleBookiesThatJoined(joinedBookies);
+ if (this.isWeighted && (leftBookies.size() > 0 ||
joinedBookies.size() > 0)) {
+ this.weightedSelection.updateMap(this.bookieInfoMap);
+ }
+ if (!readOnlyBookies.isEmpty()) {
+ this.readOnlyBookies = ImmutableSet.copyOf(readOnlyBookies);
+ }
+
+ return deadBookies;
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /*
+ * this method should be called in writelock scope of 'rwLock'
+ */
+ @Override
+ public void handleBookiesThatLeft(Set<BookieSocketAddress> leftBookies) {
+ for (BookieSocketAddress addr : leftBookies) {
+ try {
+ BookieNode node = knownBookies.remove(addr);
+ if (null != node) {
+ topology.remove(node);
+ if (this.isWeighted) {
+ this.bookieInfoMap.remove(node);
+ }
+
+ bookiesLeftCounter.registerSuccessfulValue(1L);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cluster changed : bookie {} left from
cluster.", addr);
+ }
+ }
+ } catch (Throwable t) {
+ LOG.error("Unexpected exception while handling leaving bookie
{}", addr, t);
+ if (bookiesLeftCounter != null) {
+ bookiesLeftCounter.registerFailedValue(1L);
+ }
+ // no need to re-throw; we want to process the rest of the
bookies
+ // exception anyways will be caught/logged/suppressed in the
ZK's event handler
+ }
+ }
+ }
+
+ /*
+ * this method should be called in writelock scope of 'rwLock'
+ */
+ @Override
+ public void handleBookiesThatJoined(Set<BookieSocketAddress>
joinedBookies) {
+ // node joined
+ for (BookieSocketAddress addr : joinedBookies) {
+ try {
+ BookieNode node = createBookieNode(addr);
+ topology.add(node);
+ knownBookies.put(addr, node);
+ if (this.isWeighted) {
+ this.bookieInfoMap.putIfAbsent(node, new BookieInfo());
+ }
+
+ bookiesJoinedCounter.registerSuccessfulValue(1L);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cluster changed : bookie {} joined the
cluster.", addr);
+ }
+ } catch (Throwable t) {
+ // topology.add() throws unchecked exception
+ LOG.error("Unexpected exception while handling joining bookie
{}", addr, t);
+
+ bookiesJoinedCounter.registerFailedValue(1L);
+ // no need to re-throw; we want to process the rest of the
bookies
+ // exception anyways will be caught/logged/suppressed in the
ZK's event handler
+ }
+ }
+ }
+
+ protected BookieNode createBookieNode(BookieSocketAddress addr) {
+ return new BookieNode(addr, resolveNetworkLocation(addr));
+ }
+
+ protected String resolveNetworkLocation(BookieSocketAddress addr) {
+ return NetUtils.resolveNetworkLocation(dnsResolver,
addr.getSocketAddress());
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java
index 0404250..8a44174 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java
@@ -18,146 +18,19 @@
package org.apache.bookkeeper.client;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Collection;
import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class WeightedRandomSelection<T> {
- static final Logger LOG =
LoggerFactory.getLogger(WeightedRandomSelection.class);
+interface WeightedRandomSelection<T> {
interface WeightedObject {
long getWeight();
}
- Double randomMax;
- int maxProbabilityMultiplier;
- Map<T, WeightedObject> map;
- TreeMap<Double, T> cummulativeMap = new TreeMap<Double, T>();
- ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
-
- WeightedRandomSelection() {
- maxProbabilityMultiplier = -1;
- }
-
- WeightedRandomSelection(int maxMultiplier) {
- this.maxProbabilityMultiplier = maxMultiplier;
- }
-
- public void setMaxProbabilityMultiplier(int max) {
- this.maxProbabilityMultiplier = max;
- }
-
- void updateMap(Map<T, WeightedObject> map) {
- // get the sum total of all the values; this will be used to
- // calculate the weighted probability later on
- Long totalWeight = 0L, min = Long.MAX_VALUE;
- List<WeightedObject> values = new
ArrayList<WeightedObject>(map.values());
- Collections.sort(values, new Comparator<WeightedObject>() {
- public int compare(WeightedObject o1, WeightedObject o2) {
- long diff = o1.getWeight() - o2.getWeight();
- if (diff < 0L) {
- return -1;
- } else if (diff > 0L) {
- return 1;
- } else {
- return 0;
- }
- }
- });
- for (int i = 0; i < values.size(); i++) {
- totalWeight += values.get(i).getWeight();
- if (values.get(i).getWeight() != 0 && min >
values.get(i).getWeight()) {
- min = values.get(i).getWeight();
- }
- }
-
- double median = 0;
- if (totalWeight == 0) {
- // all the values are zeros; assign a value of 1 to all and the
totalWeight equal
- // to the size of the values
- min = 1L;
- median = 1;
- totalWeight = (long) values.size();
- } else {
- int mid = values.size() / 2;
- if ((values.size() % 2) == 1) {
- median = values.get(mid).getWeight();
- } else {
- median = (double) (values.get(mid - 1).getWeight() +
values.get(mid).getWeight()) / 2;
- }
- }
-
- double medianWeight, minWeight;
- medianWeight = median / (double) totalWeight;
- minWeight = (double) min / totalWeight;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Updating weights map. MediaWeight: {} MinWeight: {}",
medianWeight, minWeight);
- }
+ void updateMap(Map<T, WeightedObject> map);
- double maxWeight = maxProbabilityMultiplier * medianWeight;
- Map<T, Double> weightMap = new HashMap<T, Double>();
- for (Map.Entry<T, WeightedObject> e : map.entrySet()) {
- double weightedProbability;
- if (e.getValue().getWeight() > 0) {
- weightedProbability = (double) e.getValue().getWeight() /
(double) totalWeight;
- } else {
- weightedProbability = minWeight;
- }
- if (maxWeight > 0 && weightedProbability > maxWeight) {
- weightedProbability = maxWeight;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Capping the probability to {} for {} Value: {}",
- weightedProbability, e.getKey(), e.getValue());
- }
- }
- weightMap.put(e.getKey(), weightedProbability);
- }
+ T getNextRandom();
- // The probability of picking a bookie randomly is
defaultPickProbability
- // but we change that priority by looking at the weight that each
bookie
- // carries.
- TreeMap<Double, T> tmpCummulativeMap = new TreeMap<Double, T>();
- Double key = 0.0;
- for (Map.Entry<T, Double> e : weightMap.entrySet()) {
- tmpCummulativeMap.put(key, e.getKey());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Key: {} Value: {} AssignedKey: {} AssignedWeight:
{}",
- e.getKey(), e.getValue(), key, e.getValue());
- }
- key += e.getValue();
- }
+ T getNextRandom(Collection<T> selectedNodes);
- rwLock.writeLock().lock();
- try {
- this.map = map;
- cummulativeMap = tmpCummulativeMap;
- randomMax = key;
- } finally {
- rwLock.writeLock().unlock();
- }
- }
-
- T getNextRandom() {
- rwLock.readLock().lock();
- try {
- // pick a random number between 0 and randMax
- Double randomNum = randomMax * Math.random();
- // find the nearest key in the map corresponding to the randomNum
- Double key = cummulativeMap.floorKey(randomNum);
- //LOG.info("Random max: {} CummulativeMap size: {} selected key:
{}", randomMax, cummulativeMap.size(),
- // key);
- return cummulativeMap.get(key);
- } finally {
- rwLock.readLock().unlock();
- }
- }
+ void setMaxProbabilityMultiplier(int max);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelectionImpl.java
similarity index 91%
copy from
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java
copy to
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelectionImpl.java
index 0404250..53130a2 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelectionImpl.java
@@ -19,6 +19,7 @@
package org.apache.bookkeeper.client;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -31,31 +32,25 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class WeightedRandomSelection<T> {
- static final Logger LOG =
LoggerFactory.getLogger(WeightedRandomSelection.class);
+class WeightedRandomSelectionImpl<T> implements WeightedRandomSelection<T> {
+ static final Logger LOG =
LoggerFactory.getLogger(WeightedRandomSelectionImpl.class);
- interface WeightedObject {
- long getWeight();
- }
Double randomMax;
int maxProbabilityMultiplier;
Map<T, WeightedObject> map;
TreeMap<Double, T> cummulativeMap = new TreeMap<Double, T>();
ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
- WeightedRandomSelection() {
+ WeightedRandomSelectionImpl() {
maxProbabilityMultiplier = -1;
}
- WeightedRandomSelection(int maxMultiplier) {
+ WeightedRandomSelectionImpl(int maxMultiplier) {
this.maxProbabilityMultiplier = maxMultiplier;
}
- public void setMaxProbabilityMultiplier(int max) {
- this.maxProbabilityMultiplier = max;
- }
-
- void updateMap(Map<T, WeightedObject> map) {
+ @Override
+ public void updateMap(Map<T, WeightedObject> map) {
// get the sum total of all the values; this will be used to
// calculate the weighted probability later on
Long totalWeight = 0L, min = Long.MAX_VALUE;
@@ -146,7 +141,8 @@ class WeightedRandomSelection<T> {
}
}
- T getNextRandom() {
+ @Override
+ public T getNextRandom() {
rwLock.readLock().lock();
try {
// pick a random number between 0 and randMax
@@ -160,4 +156,15 @@ class WeightedRandomSelection<T> {
rwLock.readLock().unlock();
}
}
+
+ @Override
+ public void setMaxProbabilityMultiplier(int max) {
+ this.maxProbabilityMultiplier = max;
+ }
+
+ @Override
+ public T getNextRandom(Collection<T> selectedNodes) {
+ throw new UnsupportedOperationException("getNextRandom is not
implemented for WeightedRandomSelectionImpl");
+ }
+
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWeightedRandomSelection.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWeightedRandomSelection.java
index fe8f74b..2aa3c0e 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWeightedRandomSelection.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWeightedRandomSelection.java
@@ -59,7 +59,7 @@ public class TestWeightedRandomSelection {
@Before
public void setUp() throws Exception {
- wRS = new WeightedRandomSelection<String>();
+ wRS = new WeightedRandomSelectionImpl<String>();
}
@After