http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java new file mode 100644 index 0000000..3c41a7c --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -0,0 +1,554 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.client; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.conf.Configurable; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.net.DNSToSwitchMapping; +import org.apache.bookkeeper.net.NetUtils; +import org.apache.bookkeeper.net.NetworkTopology; +import org.apache.bookkeeper.net.NetworkTopologyImpl; +import org.apache.bookkeeper.net.Node; +import org.apache.bookkeeper.net.NodeBase; +import org.apache.bookkeeper.net.ScriptBasedMapping; +import org.apache.bookkeeper.net.StabilizeNetworkTopology; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.util.ReflectionUtils; +import org.jboss.netty.util.HashedWheelTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +/** + * Simple rackware ensemble placement policy. + * + * Make most of the class and methods as protected, so it could be extended to implement other algorithms. + */ +class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacementPolicy { + + static final Logger LOG = LoggerFactory.getLogger(RackawareEnsemblePlacementPolicyImpl.class); + + public static final String REPP_DNS_RESOLVER_CLASS = "reppDnsResolverClass"; + public static final String REPP_RANDOM_READ_REORDERING = "ensembleRandomReadReordering"; + + static final int RACKNAME_DISTANCE_FROM_LEAVES = 1; + + static class DefaultResolver implements DNSToSwitchMapping { + + @Override + public List<String> resolve(List<String> names) { + List<String> rNames = new ArrayList<String>(names.size()); + for (@SuppressWarnings("unused") String name : names) { + rNames.add(NetworkTopology.DEFAULT_RACK); + } + return rNames; + } + + @Override + public void reloadCachedMappings() { + // nop + } + + } + + // for now, we just maintain the writable bookies' topology + protected NetworkTopology topology; + protected DNSToSwitchMapping dnsResolver; + protected HashedWheelTimer timer; + protected final Map<BookieSocketAddress, BookieNode> knownBookies; + protected BookieNode localNode; + protected final ReentrantReadWriteLock rwLock; + protected ImmutableSet<BookieSocketAddress> readOnlyBookies = null; + protected boolean reorderReadsRandom = false; + protected boolean enforceDurability = false; + protected int stabilizePeriodSeconds = 0; + protected StatsLogger statsLogger = null; + + RackawareEnsemblePlacementPolicyImpl() { + this(false); + } + + RackawareEnsemblePlacementPolicyImpl(boolean enforceDurability) { + this.enforceDurability = enforceDurability; + topology = new NetworkTopologyImpl(); + knownBookies = new HashMap<BookieSocketAddress, BookieNode>(); + + rwLock = new ReentrantReadWriteLock(); + } + + protected BookieNode createBookieNode(BookieSocketAddress addr) { + return new BookieNode(addr, resolveNetworkLocation(addr)); + } + + /** + * Initialize the policy. + * + * @param dnsResolver the object used to resolve addresses to their network address + * @return initialized ensemble placement policy + */ + protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dnsResolver, + HashedWheelTimer timer, + boolean reorderReadsRandom, + int stabilizePeriodSeconds, + StatsLogger statsLogger) { + this.statsLogger = statsLogger; + this.reorderReadsRandom = reorderReadsRandom; + this.stabilizePeriodSeconds = stabilizePeriodSeconds; + this.dnsResolver = dnsResolver; + this.timer = timer; + + // create the network topology + if (stabilizePeriodSeconds > 0) { + this.topology = new StabilizeNetworkTopology(timer, stabilizePeriodSeconds); + } else { + this.topology = new NetworkTopologyImpl(); + } + + BookieNode bn; + try { + bn = createBookieNode(new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(), 0)); + } catch (UnknownHostException e) { + LOG.error("Failed to get local host address : ", e); + bn = null; + } + localNode = bn; + LOG.info("Initialize rackaware ensemble placement policy @ {} @ {} : {}.", + new Object[] { localNode, null == localNode ? "Unknown" : localNode.getNetworkLocation(), + dnsResolver.getClass().getName() }); + return this; + } + + @Override + public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf, + Optional<DNSToSwitchMapping> optionalDnsResolver, + HashedWheelTimer timer, + FeatureProvider featureProvider, + StatsLogger statsLogger) { + DNSToSwitchMapping dnsResolver; + if (optionalDnsResolver.isPresent()) { + dnsResolver = optionalDnsResolver.get(); + } else { + String dnsResolverName = conf.getString(REPP_DNS_RESOLVER_CLASS, ScriptBasedMapping.class.getName()); + try { + dnsResolver = ReflectionUtils.newInstance(dnsResolverName, DNSToSwitchMapping.class); + if (dnsResolver instanceof Configurable) { + ((Configurable) dnsResolver).setConf(conf); + } + } catch (RuntimeException re) { + LOG.info("Failed to initialize DNS Resolver {}, used default subnet resolver.", dnsResolverName, re); + dnsResolver = new DefaultResolver(); + } + } + return initialize( + dnsResolver, + timer, + conf.getBoolean(REPP_RANDOM_READ_REORDERING, false), + conf.getNetworkTopologyStabilizePeriodSeconds(), + statsLogger); + } + + @Override + public void uninitalize() { + // do nothing + } + + protected String resolveNetworkLocation(BookieSocketAddress addr) { + return NetUtils.resolveNetworkLocation(dnsResolver, addr.getSocketAddress()); + } + + @Override + public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> writableBookies, + Set<BookieSocketAddress> readOnlyBookies) { + rwLock.writeLock().lock(); + try { + ImmutableSet<BookieSocketAddress> joinedBookies, leftBookies, deadBookies; + Set<BookieSocketAddress> oldBookieSet = knownBookies.keySet(); + // left bookies : bookies in known bookies, but not in new writable bookie cluster. + leftBookies = Sets.difference(oldBookieSet, writableBookies).immutableCopy(); + // joined bookies : bookies in new writable bookie cluster, but not in known bookies + joinedBookies = Sets.difference(writableBookies, oldBookieSet).immutableCopy(); + // dead bookies. + deadBookies = Sets.difference(leftBookies, readOnlyBookies).immutableCopy(); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Cluster changed : left bookies are {}, joined bookies are {}, while dead bookies are {}.", + new Object[] { leftBookies, joinedBookies, deadBookies }); + } + handleBookiesThatLeft(leftBookies); + handleBookiesThatJoined(joinedBookies); + + if (!readOnlyBookies.isEmpty()) { + this.readOnlyBookies = ImmutableSet.copyOf(readOnlyBookies); + } + + return deadBookies; + } finally { + rwLock.writeLock().unlock(); + } + } + + @Override + public void handleBookiesThatLeft(Set<BookieSocketAddress> leftBookies) { + for (BookieSocketAddress addr : leftBookies) { + BookieNode node = knownBookies.remove(addr); + if(null != node) { + topology.remove(node); + if (LOG.isDebugEnabled()) { + LOG.debug("Cluster changed : bookie {} left from cluster.", addr); + } + } + } + } + + @Override + public void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) { + // node joined + for (BookieSocketAddress addr : joinedBookies) { + BookieNode node = createBookieNode(addr); + topology.add(node); + knownBookies.put(addr, node); + if (LOG.isDebugEnabled()) { + LOG.debug("Cluster changed : bookie {} joined the cluster.", addr); + } + } + } + + protected Set<Node> convertBookiesToNodes(Set<BookieSocketAddress> excludeBookies) { + Set<Node> nodes = new HashSet<Node>(); + for (BookieSocketAddress addr : excludeBookies) { + BookieNode bn = knownBookies.get(addr); + if (null == bn) { + bn = createBookieNode(addr); + } + nodes.add(bn); + } + return nodes; + } + + @Override + public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, + Set<BookieSocketAddress> excludeBookies) + throws BKNotEnoughBookiesException { + return newEnsembleInternal(ensembleSize, writeQuorumSize, excludeBookies, null, null); + } + + protected ArrayList<BookieSocketAddress> newEnsembleInternal(int ensembleSize, + int writeQuorumSize, + Set<BookieSocketAddress> excludeBookies, + Ensemble<BookieNode> parentEnsemble, + Predicate<BookieNode> parentPredicate) + throws BKNotEnoughBookiesException { + return newEnsembleInternal( + ensembleSize, + writeQuorumSize, + writeQuorumSize, + excludeBookies, + parentEnsemble, + parentPredicate); + } + + @Override + public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, + int writeQuorumSize, + int ackQuorumSize, + Set<BookieSocketAddress> excludeBookies, + Ensemble<BookieNode> parentEnsemble, + Predicate<BookieNode> parentPredicate) + throws BKNotEnoughBookiesException { + return newEnsembleInternal( + ensembleSize, + writeQuorumSize, + ackQuorumSize, + excludeBookies, + parentEnsemble, + parentPredicate); + } + + protected ArrayList<BookieSocketAddress> newEnsembleInternal( + int ensembleSize, + int writeQuorumSize, + int ackQuorumSize, + Set<BookieSocketAddress> excludeBookies, + Ensemble<BookieNode> parentEnsemble, + Predicate<BookieNode> parentPredicate) throws BKNotEnoughBookiesException { + rwLock.readLock().lock(); + try { + Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies); + RRTopologyAwareCoverageEnsemble ensemble = + new RRTopologyAwareCoverageEnsemble( + ensembleSize, + writeQuorumSize, + ackQuorumSize, + RACKNAME_DISTANCE_FROM_LEAVES, + parentEnsemble, + parentPredicate); + BookieNode prevNode = null; + int numRacks = topology.getNumOfRacks(); + // only one rack, use the random algorithm. + if (numRacks < 2) { + List<BookieNode> bns = selectRandom(ensembleSize, excludeNodes, TruePredicate.instance, + ensemble); + ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>(ensembleSize); + for (BookieNode bn : bns) { + addrs.add(bn.getAddr()); + } + return addrs; + } + // pick nodes by racks, to ensure there is at least two racks per write quorum. + for (int i = 0; i < ensembleSize; i++) { + String curRack; + if (null == prevNode) { + if ((null == localNode) || + localNode.getNetworkLocation().equals(NetworkTopology.DEFAULT_RACK)) { + curRack = NodeBase.ROOT; + } else { + curRack = localNode.getNetworkLocation(); + } + } else { + curRack = "~" + prevNode.getNetworkLocation(); + } + prevNode = selectFromNetworkLocation(curRack, excludeNodes, ensemble, ensemble); + } + ArrayList<BookieSocketAddress> bookieList = ensemble.toList(); + if (ensembleSize != bookieList.size()) { + LOG.error("Not enough {} bookies are available to form an ensemble : {}.", + ensembleSize, bookieList); + throw new BKNotEnoughBookiesException(); + } + return bookieList; + } finally { + rwLock.readLock().unlock(); + } + } + + @Override + public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, + Collection<BookieSocketAddress> currentEnsemble, + BookieSocketAddress bookieToReplace, + Set<BookieSocketAddress> excludeBookies) + throws BKNotEnoughBookiesException { + rwLock.readLock().lock(); + try { + excludeBookies.addAll(currentEnsemble); + BookieNode bn = knownBookies.get(bookieToReplace); + if (null == bn) { + bn = createBookieNode(bookieToReplace); + } + + Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies); + // add the bookie to replace in exclude set + excludeNodes.add(bn); + if (LOG.isDebugEnabled()) { + LOG.debug("Try to choose a new bookie to replace {}, excluding {}.", bookieToReplace, + excludeNodes); + } + // pick a candidate from same rack to replace + BookieNode candidate = selectFromNetworkLocation( + bn.getNetworkLocation(), + excludeNodes, + TruePredicate.instance, + EnsembleForReplacementWithNoConstraints.instance); + if (LOG.isDebugEnabled()) { + LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bn); + } + return candidate.getAddr(); + } finally { + rwLock.readLock().unlock(); + } + } + + @Override + public BookieNode selectFromNetworkLocation( + String networkLoc, + Set<Node> excludeBookies, + Predicate<BookieNode> predicate, + Ensemble<BookieNode> ensemble) + throws BKNotEnoughBookiesException { + // select one from local rack + try { + return selectRandomFromRack(networkLoc, excludeBookies, predicate, ensemble); + } catch (BKNotEnoughBookiesException e) { + LOG.warn("Failed to choose a bookie from {} : " + + "excluded {}, fallback to choose bookie randomly from the cluster.", + networkLoc, excludeBookies); + // randomly choose one from whole cluster, ignore the provided predicate. + return selectRandom(1, excludeBookies, predicate, ensemble).get(0); + } + } + + protected String getRemoteRack(BookieNode node) { + return "~" + node.getNetworkLocation(); + } + + /** + * Choose random node under a given network path. + * + * @param netPath + * network path + * @param excludeBookies + * exclude bookies + * @param predicate + * predicate to check whether the target is a good target. + * @param ensemble + * ensemble structure + * @return chosen bookie. + */ + protected BookieNode selectRandomFromRack(String netPath, Set<Node> excludeBookies, Predicate<BookieNode> predicate, + Ensemble<BookieNode> ensemble) throws BKNotEnoughBookiesException { + List<Node> leaves = new ArrayList<Node>(topology.getLeaves(netPath)); + Collections.shuffle(leaves); + for (Node n : leaves) { + if (excludeBookies.contains(n)) { + continue; + } + if (!(n instanceof BookieNode) || !predicate.apply((BookieNode) n, ensemble)) { + continue; + } + BookieNode bn = (BookieNode) n; + // got a good candidate + if (ensemble.addNode(bn)) { + // add the candidate to exclude set + excludeBookies.add(bn); + } + return bn; + } + throw new BKNotEnoughBookiesException(); + } + + /** + * Choose a random node from whole cluster. + * + * @param numBookies + * number bookies to choose + * @param excludeBookies + * bookies set to exclude. + * @param ensemble + * ensemble to hold the bookie chosen. + * @return the bookie node chosen. + * @throws BKNotEnoughBookiesException + */ + protected List<BookieNode> selectRandom(int numBookies, + Set<Node> excludeBookies, + Predicate<BookieNode> predicate, + Ensemble<BookieNode> ensemble) + throws BKNotEnoughBookiesException { + return selectRandomInternal(new ArrayList<BookieNode>(knownBookies.values()), numBookies, excludeBookies, predicate, ensemble); + } + + protected List<BookieNode> selectRandomInternal(List<BookieNode> bookiesToSelectFrom, + int numBookies, + Set<Node> excludeBookies, + Predicate<BookieNode> predicate, + Ensemble<BookieNode> ensemble) + throws BKNotEnoughBookiesException { + Collections.shuffle(bookiesToSelectFrom); + List<BookieNode> newBookies = new ArrayList<BookieNode>(numBookies); + for (BookieNode bookie : bookiesToSelectFrom) { + if (excludeBookies.contains(bookie)) { + continue; + } + + // When durability is being enforced; we must not violate the + // predicate even when selecting a random bookie; as durability + // guarantee is not best effort; correctness is implied by it + if (enforceDurability && !predicate.apply(bookie, ensemble)) { + continue; + } + + if (ensemble.addNode(bookie)) { + excludeBookies.add(bookie); + newBookies.add(bookie); + --numBookies; + } + + if (numBookies == 0) { + return newBookies; + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to find {} bookies : excludeBookies {}, allBookies {}.", new Object[] { + numBookies, excludeBookies, bookiesToSelectFrom }); + } + throw new BKNotEnoughBookiesException(); + } + + + + @Override + public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) { + int ensembleSize = ensemble.size(); + List<Integer> finalList = new ArrayList<Integer>(writeSet.size()); + List<Long> observedFailuresList = new ArrayList<Long>(writeSet.size()); + List<Integer> readOnlyList = new ArrayList<Integer>(writeSet.size()); + List<Integer> unAvailableList = new ArrayList<Integer>(writeSet.size()); + for (Integer idx : writeSet) { + BookieSocketAddress address = ensemble.get(idx); + Long lastFailedEntryOnBookie = bookieFailureHistory.get(address); + if (null == knownBookies.get(address)) { + // there isn't too much differences between readonly bookies from unavailable bookies. since there + // is no write requests to them, so we shouldn't try reading from readonly bookie in prior to writable + // bookies. + if ((null == readOnlyBookies) || !readOnlyBookies.contains(address)) { + unAvailableList.add(idx); + } else { + readOnlyList.add(idx); + } + } else { + if ((lastFailedEntryOnBookie == null) || (lastFailedEntryOnBookie < 0)) { + finalList.add(idx); + } else { + observedFailuresList.add(lastFailedEntryOnBookie * ensembleSize + idx); + } + } + } + + if (reorderReadsRandom) { + Collections.shuffle(finalList); + Collections.shuffle(readOnlyList); + Collections.shuffle(unAvailableList); + } + + Collections.sort(observedFailuresList); + + for(long value: observedFailuresList) { + finalList.add((int)(value % ensembleSize)); + } + + finalList.addAll(readOnlyList); + finalList.addAll(unAvailableList); + return finalList; + } +}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java new file mode 100644 index 0000000..abdcb61 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java @@ -0,0 +1,602 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.client; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.google.common.base.Optional; + + +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.net.DNSToSwitchMapping; +import org.apache.bookkeeper.net.NetworkTopology; +import org.apache.bookkeeper.net.Node; +import org.apache.bookkeeper.net.NodeBase; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.util.BookKeeperConstants; +import org.apache.commons.lang3.tuple.Pair; +import org.jboss.netty.util.HashedWheelTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicy { + static final Logger LOG = LoggerFactory.getLogger(RegionAwareEnsemblePlacementPolicy.class); + + public static final String REPP_REGIONS_TO_WRITE = "reppRegionsToWrite"; + public static final String REPP_MINIMUM_REGIONS_FOR_DURABILITY = "reppMinimumRegionsForDurability"; + public static final String REPP_ENABLE_DURABILITY_ENFORCEMENT_IN_REPLACE = "reppEnableDurabilityEnforcementInReplace"; + public static final String REPP_DISABLE_DURABILITY_FEATURE_NAME = "reppDisableDurabilityFeatureName"; + public static final String REPP_DISALLOW_BOOKIE_PLACEMENT_IN_REGION_FEATURE_NAME = "reppDisallowBookiePlacementInRegionFeatureName"; + public static final String REPP_DISABLE_DURABILITY_ENFORCEMENT_FEATURE = "reppDisableDurabilityEnforcementFeature"; + public static final String REPP_ENABLE_VALIDATION = "reppEnableValidation"; + public static final String REGION_AWARE_ANOMALOUS_ENSEMBLE = "region_aware_anomalous_ensemble"; + static final int MINIMUM_REGIONS_FOR_DURABILITY_DEFAULT = 2; + static final int REGIONID_DISTANCE_FROM_LEAVES = 2; + static final String UNKNOWN_REGION = "UnknownRegion"; + static final int REMOTE_NODE_IN_REORDER_SEQUENCE = 2; + + protected final Map<String, TopologyAwareEnsemblePlacementPolicy> perRegionPlacement; + protected final ConcurrentMap<BookieSocketAddress, String> address2Region; + protected FeatureProvider featureProvider; + protected String disallowBookiePlacementInRegionFeatureName; + protected String myRegion = null; + protected int minRegionsForDurability = 0; + protected boolean enableValidation = true; + protected boolean enforceDurabilityInReplace = false; + protected Feature disableDurabilityFeature; + + RegionAwareEnsemblePlacementPolicy() { + super(); + perRegionPlacement = new HashMap<String, TopologyAwareEnsemblePlacementPolicy>(); + address2Region = new ConcurrentHashMap<BookieSocketAddress, String>(); + } + + protected String getRegion(BookieSocketAddress addr) { + String region = address2Region.get(addr); + if (null == region) { + String networkLocation = resolveNetworkLocation(addr); + if (NetworkTopology.DEFAULT_RACK.equals(networkLocation)) { + region = UNKNOWN_REGION; + } else { + String[] parts = networkLocation.split(NodeBase.PATH_SEPARATOR_STR); + if (parts.length <= 1) { + region = UNKNOWN_REGION; + } else { + region = parts[1]; + } + } + address2Region.putIfAbsent(addr, region); + } + return region; + } + + protected String getLocalRegion(BookieNode node) { + if (null == node || null == node.getAddr()) { + return UNKNOWN_REGION; + } + return getRegion(node.getAddr()); + } + + @Override + public void handleBookiesThatLeft(Set<BookieSocketAddress> leftBookies) { + super.handleBookiesThatLeft(leftBookies); + + for(TopologyAwareEnsemblePlacementPolicy policy: perRegionPlacement.values()) { + policy.handleBookiesThatLeft(leftBookies); + } + } + + @Override + public void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) { + Map<String, Set<BookieSocketAddress>> perRegionClusterChange = new HashMap<String, Set<BookieSocketAddress>>(); + + // node joined + for (BookieSocketAddress addr : joinedBookies) { + BookieNode node = createBookieNode(addr); + topology.add(node); + knownBookies.put(addr, node); + String region = getLocalRegion(node); + if (null == perRegionPlacement.get(region)) { + perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy() + .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds, statsLogger)); + } + + Set<BookieSocketAddress> regionSet = perRegionClusterChange.get(region); + if (null == regionSet) { + regionSet = new HashSet<BookieSocketAddress>(); + regionSet.add(addr); + perRegionClusterChange.put(region, regionSet); + } else { + regionSet.add(addr); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Cluster changed : bookie {} joined the cluster.", addr); + } + } + + for(String region: perRegionPlacement.keySet()) { + Set<BookieSocketAddress> regionSet = perRegionClusterChange.get(region); + if (null == regionSet) { + regionSet = new HashSet<BookieSocketAddress>(); + } + perRegionPlacement.get(region).handleBookiesThatJoined(regionSet); + } + } + + @Override + public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf, + Optional<DNSToSwitchMapping> optionalDnsResolver, + HashedWheelTimer timer, + FeatureProvider featureProvider, + StatsLogger statsLogger) { + super.initialize(conf, optionalDnsResolver, timer, featureProvider, statsLogger); + myRegion = getLocalRegion(localNode); + enableValidation = conf.getBoolean(REPP_ENABLE_VALIDATION, true); + + // We have to statically provide regions we want the writes to go through and how many regions + // are required for durability. This decision cannot be driven by the active bookies as the + // current topology will not be indicative of constraints that must be enforced for durability + String regionsString = conf.getString(REPP_REGIONS_TO_WRITE, null); + if (null != regionsString) { + // Regions are specified as + // R1;R2;... + String[] regions = regionsString.split(";"); + for (String region: regions) { + perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy(true) + .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds, statsLogger)); + } + minRegionsForDurability = conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY, MINIMUM_REGIONS_FOR_DURABILITY_DEFAULT); + if (minRegionsForDurability > 0) { + enforceDurability = true; + enforceDurabilityInReplace = conf.getBoolean(REPP_ENABLE_DURABILITY_ENFORCEMENT_IN_REPLACE, true); + } + if (regions.length < minRegionsForDurability) { + throw new IllegalArgumentException("Regions provided are insufficient to meet the durability constraints"); + } + } + this.featureProvider = featureProvider; + this.disallowBookiePlacementInRegionFeatureName = conf.getString(REPP_DISALLOW_BOOKIE_PLACEMENT_IN_REGION_FEATURE_NAME); + this.disableDurabilityFeature = conf.getFeature(REPP_DISABLE_DURABILITY_ENFORCEMENT_FEATURE, null); + if (null == disableDurabilityFeature) { + this.disableDurabilityFeature = + featureProvider.getFeature( + conf.getString(REPP_DISABLE_DURABILITY_FEATURE_NAME, + BookKeeperConstants.FEATURE_REPP_DISABLE_DURABILITY_ENFORCEMENT)); + } + return this; + } + + protected List<BookieNode> selectRandomFromRegions(Set<String> availableRegions, + int numBookies, + Set<Node> excludeBookies, + Predicate<BookieNode> predicate, + Ensemble<BookieNode> ensemble) + throws BKException.BKNotEnoughBookiesException { + List<BookieNode> availableBookies = new ArrayList<BookieNode>(); + for(BookieNode bookieNode: knownBookies.values()) { + if (availableRegions.contains(getLocalRegion(bookieNode))) { + availableBookies.add(bookieNode); + } + } + + return selectRandomInternal(availableBookies, numBookies, excludeBookies, predicate, ensemble); + } + + + @Override + public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, + Set<BookieSocketAddress> excludeBookies) throws BKException.BKNotEnoughBookiesException { + + int effectiveMinRegionsForDurability = disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability; + + // All of these conditions indicate bad configuration + if (ackQuorumSize < effectiveMinRegionsForDurability) { + throw new IllegalArgumentException("Ack Quorum size provided are insufficient to meet the durability constraints"); + } else if (ensembleSize < writeQuorumSize) { + throw new IllegalArgumentException("write quorum (" + writeQuorumSize + ") cannot exceed ensemble size (" + ensembleSize + ")"); + } else if (writeQuorumSize < ackQuorumSize) { + throw new IllegalArgumentException("ack quorum (" + ackQuorumSize + ") cannot exceed write quorum size (" + writeQuorumSize + ")"); + } else if (effectiveMinRegionsForDurability > 0) { + // We must survive the failure of numRegions - effectiveMinRegionsForDurability. When these + // regions have failed we would spread the replicas over the remaining + // effectiveMinRegionsForDurability regions; we have to make sure that the ack quorum is large + // enough such that there is a configuration for spreading the replicas across + // effectiveMinRegionsForDurability - 1 regions + if (ackQuorumSize <= (writeQuorumSize - (writeQuorumSize / effectiveMinRegionsForDurability))) { + throw new IllegalArgumentException("ack quorum (" + ackQuorumSize + ") " + + "violates the requirement to satisfy durability constraints when running in degraded mode"); + } + } + + rwLock.readLock().lock(); + try { + Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies); + Set<String> availableRegions = new HashSet<String>(); + for (String region: perRegionPlacement.keySet()) { + if ((null == disallowBookiePlacementInRegionFeatureName) || + !featureProvider.scope(region).getFeature(disallowBookiePlacementInRegionFeatureName).isAvailable()) { + availableRegions.add(region); + } + } + int numRegionsAvailable = availableRegions.size(); + + // If we were unable to get region information or all regions are disallowed which is + // an invalid configuration; default to random selection from the set of nodes + if (numRegionsAvailable < 1) { + // We cant disallow all regions; if we did, raise an alert to draw attention + if (perRegionPlacement.keySet().size() >= 1) { + LOG.error("No regions available, invalid configuration"); + } + List<BookieNode> bns = selectRandom(ensembleSize, excludeNodes, TruePredicate.instance, + EnsembleForReplacementWithNoConstraints.instance); + ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>(ensembleSize); + for (BookieNode bn : bns) { + addrs.add(bn.getAddr()); + } + return addrs; + } + + // Single region, fall back to RackAwareEnsemblePlacement + if (numRegionsAvailable < 2) { + RRTopologyAwareCoverageEnsemble ensemble = new RRTopologyAwareCoverageEnsemble(ensembleSize, + writeQuorumSize, + ackQuorumSize, + REGIONID_DISTANCE_FROM_LEAVES, + effectiveMinRegionsForDurability > 0 ? new HashSet<String>(perRegionPlacement.keySet()) : null, + effectiveMinRegionsForDurability); + TopologyAwareEnsemblePlacementPolicy nextPolicy = perRegionPlacement.get(availableRegions.iterator().next()); + return nextPolicy.newEnsemble(ensembleSize, writeQuorumSize, writeQuorumSize, excludeBookies, ensemble, ensemble); + } + + int remainingEnsemble = ensembleSize; + int remainingWriteQuorum = writeQuorumSize; + + // Equally distribute the nodes across all regions to whatever extent possible + // with the hierarchy in mind + // Try and place as many nodes in a region as possible, the ones that cannot be + // accommodated are placed on other regions + // Within each region try and follow rack aware placement + Map<String, Pair<Integer,Integer>> regionsWiseAllocation = new HashMap<String, Pair<Integer,Integer>>(); + for (String region: availableRegions) { + regionsWiseAllocation.put(region, Pair.of(0,0)); + } + int remainingEnsembleBeforeIteration; + Set<String> regionsReachedMaxAllocation = new HashSet<String>(); + RRTopologyAwareCoverageEnsemble ensemble; + int iteration = 0; + do { + LOG.info("RegionAwareEnsemblePlacementPolicy#newEnsemble Iteration {}", iteration++); + int numRemainingRegions = numRegionsAvailable - regionsReachedMaxAllocation.size(); + ensemble = new RRTopologyAwareCoverageEnsemble(ensembleSize, + writeQuorumSize, + ackQuorumSize, + REGIONID_DISTANCE_FROM_LEAVES, + // We pass all regions we know off to the coverage ensemble as + // regardless of regions that are available; constraints are + // always applied based on all possible regions + effectiveMinRegionsForDurability > 0 ? new HashSet<String>(perRegionPlacement.keySet()) : null, + effectiveMinRegionsForDurability); + remainingEnsembleBeforeIteration = remainingEnsemble; + for (Map.Entry<String, Pair<Integer, Integer>> regionEntry: regionsWiseAllocation.entrySet()) { + String region = regionEntry.getKey(); + final Pair<Integer, Integer> currentAllocation = regionEntry.getValue(); + TopologyAwareEnsemblePlacementPolicy policyWithinRegion = perRegionPlacement.get(region); + if (!regionsReachedMaxAllocation.contains(region)) { + if (numRemainingRegions <= 0) { + LOG.error("Inconsistent State: This should never happen"); + throw new BKException.BKNotEnoughBookiesException(); + } + + int addToEnsembleSize = Math.min(remainingEnsemble, (remainingEnsembleBeforeIteration + numRemainingRegions - 1) / numRemainingRegions); + boolean success = false; + while(addToEnsembleSize > 0) { + int addToWriteQuorum = Math.max(1, Math.min(remainingWriteQuorum, Math.round(1.0f * writeQuorumSize * addToEnsembleSize / ensembleSize))); + + // Temp ensemble will be merged back into the ensemble only if we are able to successfully allocate + // the target number of bookies in this region; if we fail because we dont have enough bookies; then we + // retry the process with a smaller target + RRTopologyAwareCoverageEnsemble tempEnsemble = new RRTopologyAwareCoverageEnsemble(ensemble); + int newEnsembleSize = currentAllocation.getLeft() + addToEnsembleSize; + int newWriteQuorumSize = currentAllocation.getRight() + addToWriteQuorum; + try { + policyWithinRegion.newEnsemble(newEnsembleSize, newWriteQuorumSize, newWriteQuorumSize, excludeBookies, tempEnsemble, tempEnsemble); + ensemble = tempEnsemble; + remainingEnsemble -= addToEnsembleSize; + remainingWriteQuorum -= writeQuorumSize; + regionsWiseAllocation.put(region, Pair.of(newEnsembleSize, newWriteQuorumSize)); + success = true; + LOG.info("Allocated {} bookies in region {} : {}", + new Object[]{newEnsembleSize, region, ensemble}); + break; + } catch (BKException.BKNotEnoughBookiesException exc) { + LOG.warn("Could not allocate {} bookies in region {}, try allocating {} bookies", + new Object[] {newEnsembleSize, region, (newEnsembleSize - 1) }); + addToEnsembleSize--; + } + } + + // we couldn't allocate additional bookies from the region, + // it should have reached its max allocation. + if (!success) { + regionsReachedMaxAllocation.add(region); + } + } + + if (regionsReachedMaxAllocation.contains(region)) { + if (currentAllocation.getLeft() > 0) { + LOG.info("Allocating {} bookies in region {} : ensemble {} exclude {}", + new Object[]{currentAllocation.getLeft(), region, excludeBookies, ensemble}); + policyWithinRegion.newEnsemble( + currentAllocation.getLeft(), + currentAllocation.getRight(), + currentAllocation.getRight(), + excludeBookies, + ensemble, + ensemble); + LOG.info("Allocated {} bookies in region {} : {}", + new Object[]{currentAllocation.getLeft(), region, ensemble}); + } + } + } + + if (regionsReachedMaxAllocation.containsAll(regionsWiseAllocation.keySet())) { + break; + } + } while ((remainingEnsemble > 0) && (remainingEnsemble < remainingEnsembleBeforeIteration)); + + ArrayList<BookieSocketAddress> bookieList = ensemble.toList(); + if (ensembleSize != bookieList.size()) { + LOG.error("Not enough {} bookies are available to form an ensemble : {}.", + ensembleSize, bookieList); + throw new BKException.BKNotEnoughBookiesException(); + } + + if(enableValidation && !ensemble.validate()) { + LOG.error("Not enough {} bookies are available to form a valid ensemble : {}.", + ensembleSize, bookieList); + throw new BKException.BKNotEnoughBookiesException(); + } + + return ensemble.toList(); + } finally { + rwLock.readLock().unlock(); + } + } + + @Override + public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace, + Set<BookieSocketAddress> excludeBookies) throws BKException.BKNotEnoughBookiesException { + rwLock.readLock().lock(); + try { + boolean enforceDurability = enforceDurabilityInReplace && !disableDurabilityFeature.isAvailable(); + int effectiveMinRegionsForDurability = enforceDurability ? minRegionsForDurability : 1; + Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies); + RRTopologyAwareCoverageEnsemble ensemble = new RRTopologyAwareCoverageEnsemble(ensembleSize, + writeQuorumSize, + ackQuorumSize, + REGIONID_DISTANCE_FROM_LEAVES, + effectiveMinRegionsForDurability > 0 ? new HashSet<String>(perRegionPlacement.keySet()) : null, + effectiveMinRegionsForDurability); + + BookieNode bookieNodeToReplace = knownBookies.get(bookieToReplace); + if (null == bookieNodeToReplace) { + bookieNodeToReplace = createBookieNode(bookieToReplace); + } + excludeNodes.add(bookieNodeToReplace); + + for(BookieSocketAddress bookieAddress: currentEnsemble) { + if (bookieAddress.equals(bookieToReplace)) { + continue; + } + + BookieNode bn = knownBookies.get(bookieAddress); + if (null == bn) { + bn = createBookieNode(bookieAddress); + } + + excludeNodes.add(bn); + + if (!ensemble.apply(bn, ensemble)) { + LOG.warn("Anomalous ensemble detected"); + if (null != statsLogger) { + statsLogger.getCounter(REGION_AWARE_ANOMALOUS_ENSEMBLE).inc(); + } + enforceDurability = false; + } + + ensemble.addNode(bn); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Try to choose a new bookie to replace {}, excluding {}.", bookieToReplace, + excludeNodes); + } + // pick a candidate from same rack to replace + BookieNode candidate = replaceFromRack(bookieNodeToReplace, excludeNodes, + ensemble, ensemble, enforceDurability); + if (LOG.isDebugEnabled()) { + LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bookieNodeToReplace); + } + return candidate.getAddr(); + } finally { + rwLock.readLock().unlock(); + } + } + + protected BookieNode replaceFromRack(BookieNode bookieNodeToReplace, + Set<Node> excludeBookies, + Predicate<BookieNode> predicate, + Ensemble<BookieNode> ensemble, + boolean enforceDurability) + throws BKException.BKNotEnoughBookiesException { + Set<String> availableRegions = new HashSet<String>(); + for (String region: perRegionPlacement.keySet()) { + if ((null == disallowBookiePlacementInRegionFeatureName) || + !featureProvider.scope(region).getFeature(disallowBookiePlacementInRegionFeatureName).isAvailable()) { + availableRegions.add(region); + } + } + String regionForBookieToReplace = getLocalRegion(bookieNodeToReplace); + if (availableRegions.contains(regionForBookieToReplace)) { + TopologyAwareEnsemblePlacementPolicy regionPolicy = perRegionPlacement.get(regionForBookieToReplace); + if (null != regionPolicy) { + try { + // select one from local rack => it falls back to selecting a node from the region + // if the rack does not have an available node, selecting from the same region + // should not violate durability constraints so we can simply not have to check + // for that. + return regionPolicy.selectFromNetworkLocation( + bookieNodeToReplace.getNetworkLocation(), + excludeBookies, + TruePredicate.instance, + EnsembleForReplacementWithNoConstraints.instance); + } catch (BKException.BKNotEnoughBookiesException e) { + LOG.warn("Failed to choose a bookie from {} : " + + "excluded {}, fallback to choose bookie randomly from the cluster.", + bookieNodeToReplace.getNetworkLocation(), excludeBookies); + } + } + } + + // randomly choose one from all the regions that are available, ignore the provided predicate if we are not + // enforcing durability. + return selectRandomFromRegions(availableRegions, 1, + excludeBookies, + enforceDurability ? predicate : TruePredicate.instance, + enforceDurability ? ensemble : EnsembleForReplacementWithNoConstraints.instance).get(0); + } + + @Override + public final List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) { + if (UNKNOWN_REGION.equals(myRegion)) { + return super.reorderReadSequence(ensemble, writeSet, bookieFailureHistory); + } else { + int ensembleSize = ensemble.size(); + List<Integer> finalList = new ArrayList<Integer>(writeSet.size()); + List<Integer> localList = new ArrayList<Integer>(writeSet.size()); + List<Long> localFailures = new ArrayList<Long>(writeSet.size()); + List<Integer> remoteList = new ArrayList<Integer>(writeSet.size()); + List<Long> remoteFailures = new ArrayList<Long>(writeSet.size()); + List<Integer> readOnlyList = new ArrayList<Integer>(writeSet.size()); + List<Integer> unAvailableList = new ArrayList<Integer>(writeSet.size()); + for (Integer idx : writeSet) { + BookieSocketAddress address = ensemble.get(idx); + String region = getRegion(address); + Long lastFailedEntryOnBookie = bookieFailureHistory.get(address); + if (null == knownBookies.get(address)) { + // there isn't too much differences between readonly bookies from unavailable bookies. since there + // is no write requests to them, so we shouldn't try reading from readonly bookie in prior to writable + // bookies. + if ((null == readOnlyBookies) || !readOnlyBookies.contains(address)) { + unAvailableList.add(idx); + } else { + readOnlyList.add(idx); + } + } else if (region.equals(myRegion)) { + if ((lastFailedEntryOnBookie == null) || (lastFailedEntryOnBookie < 0)) { + localList.add(idx); + } else { + localFailures.add(lastFailedEntryOnBookie * ensembleSize + idx); + } + } else { + if ((lastFailedEntryOnBookie == null) || (lastFailedEntryOnBookie < 0)) { + remoteList.add(idx); + } else { + remoteFailures.add(lastFailedEntryOnBookie * ensembleSize + idx); + } + } + } + + // Given that idx is less than ensemble size the order of the elements in these two lists + // is determined by the lastFailedEntryOnBookie + Collections.sort(localFailures); + Collections.sort(remoteFailures); + + if (reorderReadsRandom) { + Collections.shuffle(localList); + Collections.shuffle(remoteList); + Collections.shuffle(readOnlyList); + Collections.shuffle(unAvailableList); + } + + // nodes within a region are ordered as follows + // (Random?) list of nodes that have no history of failure + // Nodes with Failure history are ordered in the reverse + // order of the most recent entry that generated an error + for(long value: localFailures) { + localList.add((int)(value % ensembleSize)); + } + + for(long value: remoteFailures) { + remoteList.add((int)(value % ensembleSize)); + } + + // Insert a node from the remote region at the specified location so we + // try more than one region within the max allowed latency + for (int i = 0; i < REMOTE_NODE_IN_REORDER_SEQUENCE; i++) { + if (localList.size() > 0) { + finalList.add(localList.remove(0)); + } else { + break; + } + } + + if (remoteList.size() > 0) { + finalList.add(remoteList.remove(0)); + } + + // Add all the local nodes + finalList.addAll(localList); + finalList.addAll(remoteList); + finalList.addAll(readOnlyList); + finalList.addAll(unAvailableList); + return finalList; + } + } + + @Override + public final List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) { + if (UNKNOWN_REGION.equals(myRegion)) { + return super.reorderReadLACSequence(ensemble, writeSet, bookieFailureHistory); + } + List<Integer> finalList = reorderReadSequence(ensemble, writeSet, bookieFailureHistory); + + if (finalList.size() < ensemble.size()) { + for (int i = 0; i < ensemble.size(); i++) { + if (!finalList.contains(i)) { + finalList.add(i); + } + } + } + return finalList; + + } +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java new file mode 100644 index 0000000..c222827 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java @@ -0,0 +1,467 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.client; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.net.NetworkTopology; +import org.apache.bookkeeper.net.NodeBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class TopologyAwareEnsemblePlacementPolicy implements ITopologyAwareEnsemblePlacementPolicy<TopologyAwareEnsemblePlacementPolicy.BookieNode> { + static final Logger LOG = LoggerFactory.getLogger(TopologyAwareEnsemblePlacementPolicy.class); + + protected static class TruePredicate implements Predicate<BookieNode> { + + public static final TruePredicate instance = new TruePredicate(); + + @Override + public boolean apply(BookieNode candidate, Ensemble chosenNodes) { + return true; + } + + } + + protected static class EnsembleForReplacementWithNoConstraints implements Ensemble<BookieNode> { + + public static final EnsembleForReplacementWithNoConstraints instance = new EnsembleForReplacementWithNoConstraints(); + static final ArrayList<BookieSocketAddress> EMPTY_LIST = new ArrayList<BookieSocketAddress>(0); + + @Override + public boolean addNode(BookieNode node) { + // do nothing + return true; + } + + @Override + public ArrayList<BookieSocketAddress> toList() { + return EMPTY_LIST; + } + + /** + * Validates if an ensemble is valid + * + * @return true if the ensemble is valid; false otherwise + */ + @Override + public boolean validate() { + return true; + } + + } + + protected static class BookieNode extends NodeBase { + + private final BookieSocketAddress addr; // identifier of a bookie node. + + BookieNode(BookieSocketAddress addr, String networkLoc) { + super(addr.toString(), networkLoc); + this.addr = addr; + } + + public BookieSocketAddress getAddr() { + return addr; + } + + @Override + public int hashCode() { + return name.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof BookieNode)) { + return false; + } + BookieNode other = (BookieNode) obj; + return getName().equals(other.getName()); + } + + @Override + public String toString() { + return String.format("<Bookie:%s>", name); + } + + } + + /** + * A predicate checking the rack coverage for write quorum in {@link RoundRobinDistributionSchedule}, + * which ensures that a write quorum should be covered by at least two racks. + */ + protected static class RRTopologyAwareCoverageEnsemble implements Predicate<BookieNode>, Ensemble<BookieNode> { + + protected interface CoverageSet { + boolean apply(BookieNode candidate); + void addBookie(BookieNode candidate); + public CoverageSet duplicate(); + } + + protected class RackQuorumCoverageSet implements CoverageSet { + HashSet<String> racksOrRegionsInQuorum = new HashSet<String>(); + int seenBookies = 0; + + @Override + public boolean apply(BookieNode candidate) { + // If we don't have sufficient members in the write quorum; then we cant enforce + // rack/region diversity + if (writeQuorumSize < 2) { + return true; + } + + if (seenBookies + 1 == writeQuorumSize) { + return racksOrRegionsInQuorum.size() > (racksOrRegionsInQuorum.contains(candidate.getNetworkLocation(distanceFromLeaves)) ? 1 : 0); + } + return true; + } + + @Override + public void addBookie(BookieNode candidate) { + ++seenBookies; + racksOrRegionsInQuorum.add(candidate.getNetworkLocation(distanceFromLeaves)); + } + + @Override + public RackQuorumCoverageSet duplicate() { + RackQuorumCoverageSet ret = new RackQuorumCoverageSet(); + ret.racksOrRegionsInQuorum = Sets.newHashSet(this.racksOrRegionsInQuorum); + ret.seenBookies = this.seenBookies; + return ret; + } + } + + protected class RackOrRegionDurabilityCoverageSet implements CoverageSet { + HashMap<String, Integer> allocationToRacksOrRegions = new HashMap<String, Integer>(); + + RackOrRegionDurabilityCoverageSet() { + for (String rackOrRegion: racksOrRegions) { + allocationToRacksOrRegions.put(rackOrRegion, 0); + } + } + + @Override + public RackOrRegionDurabilityCoverageSet duplicate() { + RackOrRegionDurabilityCoverageSet ret = new RackOrRegionDurabilityCoverageSet(); + ret.allocationToRacksOrRegions = Maps.newHashMap(this.allocationToRacksOrRegions); + return ret; + } + + private boolean checkSumOfSubsetWithinLimit(final Set<String> includedRacksOrRegions, + final Set<String> remainingRacksOrRegions, + int subsetSize, + int maxAllowedSum) { + if (remainingRacksOrRegions.isEmpty() || (subsetSize <= 0)) { + if (maxAllowedSum < 0) { + LOG.trace("CHECK FAILED: RacksOrRegions Included {} Remaining {}, subsetSize {}, maxAllowedSum {}", new Object[]{ + includedRacksOrRegions, remainingRacksOrRegions, subsetSize, maxAllowedSum + }); + } + return (maxAllowedSum >= 0); + } + + for(String rackOrRegion: remainingRacksOrRegions) { + Integer currentAllocation = allocationToRacksOrRegions.get(rackOrRegion); + if (currentAllocation == null) { + allocationToRacksOrRegions.put(rackOrRegion, 0); + currentAllocation = 0; + } + + if (currentAllocation > maxAllowedSum) { + LOG.trace("CHECK FAILED: RacksOrRegions Included {} Candidate {}, subsetSize {}, maxAllowedSum {}", new Object[]{ + includedRacksOrRegions, rackOrRegion, subsetSize, maxAllowedSum + }); + return false; + } else { + Set<String> remainingElements = new HashSet<String>(remainingRacksOrRegions); + Set<String> includedElements = new HashSet<String>(includedRacksOrRegions); + includedElements.add(rackOrRegion); + remainingElements.remove(rackOrRegion); + if (!checkSumOfSubsetWithinLimit(includedElements, + remainingElements, + subsetSize - 1, + maxAllowedSum - currentAllocation)) { + return false; + } + } + } + + return true; + } + + @Override + public boolean apply(BookieNode candidate) { + if (minRacksOrRegionsForDurability <= 1) { + return true; + } + + String candidateRackOrRegion = candidate.getNetworkLocation(distanceFromLeaves); + candidateRackOrRegion = candidateRackOrRegion.startsWith(NodeBase.PATH_SEPARATOR_STR) ? candidateRackOrRegion.substring(1) : candidateRackOrRegion; + final Set<String> remainingRacksOrRegions = new HashSet<String>(racksOrRegions); + remainingRacksOrRegions.remove(candidateRackOrRegion); + final Set<String> includedRacksOrRegions = new HashSet<String>(); + includedRacksOrRegions.add(candidateRackOrRegion); + + // If minRacksOrRegionsForDurability are required for durability; we must ensure that + // no subset of (minRacksOrRegionsForDurability - 1) regions have ackQuorumSize + // We are only modifying candidateRackOrRegion if we accept this bookie, so lets only + // find sets that contain this candidateRackOrRegion + Integer currentAllocation = allocationToRacksOrRegions.get(candidateRackOrRegion); + if (currentAllocation == null) { + LOG.info("Detected a region that was not initialized {}", candidateRackOrRegion); + if (candidateRackOrRegion.equals(NetworkTopology.DEFAULT_REGION)) { + LOG.error("Failed to resolve network location {}", candidate); + } else if (!racksOrRegions.contains(candidateRackOrRegion)) { + LOG.error("Unknown region detected {}", candidateRackOrRegion); + } + allocationToRacksOrRegions.put(candidateRackOrRegion, 0); + currentAllocation = 0; + } + + int inclusiveLimit = (ackQuorumSize - 1) - (currentAllocation + 1); + return checkSumOfSubsetWithinLimit(includedRacksOrRegions, + remainingRacksOrRegions, minRacksOrRegionsForDurability - 2, inclusiveLimit); + } + + @Override + public void addBookie(BookieNode candidate) { + String candidateRackOrRegion = candidate.getNetworkLocation(distanceFromLeaves); + candidateRackOrRegion = candidateRackOrRegion.startsWith(NodeBase.PATH_SEPARATOR_STR) ? candidateRackOrRegion.substring(1) : candidateRackOrRegion; + int oldCount = 0; + if (null != allocationToRacksOrRegions.get(candidateRackOrRegion)) { + oldCount = allocationToRacksOrRegions.get(candidateRackOrRegion); + } + allocationToRacksOrRegions.put(candidateRackOrRegion, oldCount + 1); + } + } + + + + final int distanceFromLeaves; + final int ensembleSize; + final int writeQuorumSize; + final int ackQuorumSize; + final int minRacksOrRegionsForDurability; + final ArrayList<BookieNode> chosenNodes; + final Set<String> racksOrRegions; + private final CoverageSet[] quorums; + final Predicate<BookieNode> parentPredicate; + final Ensemble<BookieNode> parentEnsemble; + + protected RRTopologyAwareCoverageEnsemble(RRTopologyAwareCoverageEnsemble that) { + this.distanceFromLeaves = that.distanceFromLeaves; + this.ensembleSize = that.ensembleSize; + this.writeQuorumSize = that.writeQuorumSize; + this.ackQuorumSize = that.ackQuorumSize; + this.chosenNodes = Lists.newArrayList(that.chosenNodes); + this.quorums = new CoverageSet[that.quorums.length]; + for (int i = 0; i < that.quorums.length; i++) { + if (null != that.quorums[i]) { + this.quorums[i] = that.quorums[i].duplicate(); + } else { + this.quorums[i] = null; + } + } + this.parentPredicate = that.parentPredicate; + this.parentEnsemble = that.parentEnsemble; + if (null != that.racksOrRegions) { + this.racksOrRegions = new HashSet<String>(that.racksOrRegions); + } else { + this.racksOrRegions = null; + } + this.minRacksOrRegionsForDurability = that.minRacksOrRegionsForDurability; + } + + protected RRTopologyAwareCoverageEnsemble(int ensembleSize, + int writeQuorumSize, + int ackQuorumSize, + int distanceFromLeaves, + Set<String> racksOrRegions, + int minRacksOrRegionsForDurability) { + this(ensembleSize, writeQuorumSize, ackQuorumSize, distanceFromLeaves, null, null, + racksOrRegions, minRacksOrRegionsForDurability); + } + + protected RRTopologyAwareCoverageEnsemble(int ensembleSize, + int writeQuorumSize, + int ackQuorumSize, + int distanceFromLeaves, + Ensemble<BookieNode> parentEnsemble, + Predicate<BookieNode> parentPredicate) { + this(ensembleSize, writeQuorumSize, ackQuorumSize, distanceFromLeaves, parentEnsemble, parentPredicate, + null, 0); + } + + protected RRTopologyAwareCoverageEnsemble(int ensembleSize, + int writeQuorumSize, + int ackQuorumSize, + int distanceFromLeaves, + Ensemble<BookieNode> parentEnsemble, + Predicate<BookieNode> parentPredicate, + Set<String> racksOrRegions, + int minRacksOrRegionsForDurability) { + this.ensembleSize = ensembleSize; + this.writeQuorumSize = writeQuorumSize; + this.ackQuorumSize = ackQuorumSize; + this.distanceFromLeaves = distanceFromLeaves; + this.chosenNodes = new ArrayList<BookieNode>(ensembleSize); + if (minRacksOrRegionsForDurability > 0) { + this.quorums = new RackOrRegionDurabilityCoverageSet[ensembleSize]; + } else { + this.quorums = new RackQuorumCoverageSet[ensembleSize]; + } + this.parentEnsemble = parentEnsemble; + this.parentPredicate = parentPredicate; + this.racksOrRegions = racksOrRegions; + this.minRacksOrRegionsForDurability = minRacksOrRegionsForDurability; + } + + @Override + public boolean apply(BookieNode candidate, Ensemble<BookieNode> ensemble) { + if (ensemble != this) { + return false; + } + + // An ensemble cannot contain the same node twice + if (chosenNodes.contains(candidate)) { + return false; + } + + // candidate position + if ((ensembleSize == writeQuorumSize) && (minRacksOrRegionsForDurability > 0)) { + if (null == quorums[0]) { + quorums[0] = new RackOrRegionDurabilityCoverageSet(); + } + if (!quorums[0].apply(candidate)) { + return false; + } + } else { + int candidatePos = chosenNodes.size(); + int startPos = candidatePos - writeQuorumSize + 1; + for (int i = startPos; i <= candidatePos; i++) { + int idx = (i + ensembleSize) % ensembleSize; + if (null == quorums[idx]) { + if (minRacksOrRegionsForDurability > 0) { + quorums[idx] = new RackOrRegionDurabilityCoverageSet(); + } else { + quorums[idx] = new RackQuorumCoverageSet(); + } + } + if (!quorums[idx].apply(candidate)) { + return false; + } + } + } + + return ((null == parentPredicate) || parentPredicate.apply(candidate, parentEnsemble)); + } + + @Override + public boolean addNode(BookieNode node) { + // An ensemble cannot contain the same node twice + if (chosenNodes.contains(node)) { + return false; + } + + if ((ensembleSize == writeQuorumSize) && (minRacksOrRegionsForDurability > 0)) { + if (null == quorums[0]) { + quorums[0] = new RackOrRegionDurabilityCoverageSet(); + } + quorums[0].addBookie(node); + } else { + int candidatePos = chosenNodes.size(); + int startPos = candidatePos - writeQuorumSize + 1; + for (int i = startPos; i <= candidatePos; i++) { + int idx = (i + ensembleSize) % ensembleSize; + if (null == quorums[idx]) { + if (minRacksOrRegionsForDurability > 0) { + quorums[idx] = new RackOrRegionDurabilityCoverageSet(); + } else { + quorums[idx] = new RackQuorumCoverageSet(); + } + } + quorums[idx].addBookie(node); + } + } + chosenNodes.add(node); + + return ((null == parentEnsemble) || parentEnsemble.addNode(node)); + } + + @Override + public ArrayList<BookieSocketAddress> toList() { + ArrayList<BookieSocketAddress> addresses = new ArrayList<BookieSocketAddress>(ensembleSize); + for (BookieNode bn : chosenNodes) { + addresses.add(bn.getAddr()); + } + return addresses; + } + + /** + * Validates if an ensemble is valid + * + * @return true if the ensemble is valid; false otherwise + */ + @Override + public boolean validate() { + HashSet<BookieSocketAddress> addresses = new HashSet<BookieSocketAddress>(ensembleSize); + HashSet<String> racksOrRegions = new HashSet<String>(); + for (BookieNode bn : chosenNodes) { + if (addresses.contains(bn.getAddr())) { + return false; + } + addresses.add(bn.getAddr()); + racksOrRegions.add(bn.getNetworkLocation(distanceFromLeaves)); + } + + return ((minRacksOrRegionsForDurability == 0) || + (racksOrRegions.size() >= minRacksOrRegionsForDurability)); + } + + @Override + public String toString() { + return chosenNodes.toString(); + } + } + + @Override + public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) { + return writeSet; + } + + @Override + public List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) { + List<Integer> retList = new ArrayList<Integer>(reorderReadSequence(ensemble, writeSet, bookieFailureHistory)); + if (retList.size() < ensemble.size()) { + for (int i = 0; i < ensemble.size(); i++) { + if (!retList.contains(i)) { + retList.add(i); + } + } + } + return retList; + } +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index 57c3790..8e76bb7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -66,6 +66,8 @@ public class ClientConfiguration extends AbstractConfiguration { protected final static String TIMEOUT_TASK_INTERVAL_MILLIS = "timeoutTaskIntervalMillis"; protected final static String PCBC_TIMEOUT_TIMER_TICK_DURATION_MS = "pcbcTimeoutTimerTickDurationMs"; protected final static String PCBC_TIMEOUT_TIMER_NUM_TICKS = "pcbcTimeoutTimerNumTicks"; + protected final static String TIMEOUT_TIMER_TICK_DURATION_MS = "timeoutTimerTickDurationMs"; + protected final static String TIMEOUT_TIMER_NUM_TICKS = "timeoutTimerNumTicks"; // Bookie health check settings protected final static String BOOKIE_HEALTH_CHECK_ENABLED = "bookieHealthCheckEnabled"; @@ -78,6 +80,7 @@ public class ClientConfiguration extends AbstractConfiguration { // Ensemble Placement Policy protected final static String ENSEMBLE_PLACEMENT_POLICY = "ensemblePlacementPolicy"; + protected final static String NETWORK_TOPOLOGY_STABILIZE_PERIOD_SECONDS = "networkTopologyStabilizePeriodSeconds"; // Stats protected final static String ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats"; @@ -329,6 +332,48 @@ public class ClientConfiguration extends AbstractConfiguration { } /** + * Get the tick duration in milliseconds that used for timeout timer. + * + * @return tick duration in milliseconds + */ + public long getTimeoutTimerTickDurationMs() { + return getLong(TIMEOUT_TIMER_TICK_DURATION_MS, 100); + } + + /** + * Set the tick duration in milliseconds that used for timeout timer. + * + * @param tickDuration + * tick duration in milliseconds. + * @return client configuration. + */ + public ClientConfiguration setTimeoutTimerTickDurationMs(long tickDuration) { + setProperty(TIMEOUT_TIMER_TICK_DURATION_MS, tickDuration); + return this; + } + + /** + * Get number of ticks that used for timeout timer. + * + * @return number of ticks that used for timeout timer. + */ + public int getTimeoutTimerNumTicks() { + return getInt(TIMEOUT_TIMER_NUM_TICKS, 1024); + } + + /** + * Set number of ticks that used for timeout timer. + * + * @param numTicks + * number of ticks that used for timeout timer. + * @return client configuration. + */ + public ClientConfiguration setTimeoutTimerNumTicks(int numTicks) { + setProperty(TIMEOUT_TIMER_NUM_TICKS, numTicks); + return this; + } + + /** * Get client netty connect timeout in millis. * * @return client netty connect timeout in millis. @@ -666,10 +711,10 @@ public class ClientConfiguration extends AbstractConfiguration { * @return ensemble placement policy class. */ public Class<? extends EnsemblePlacementPolicy> getEnsemblePlacementPolicy() - throws ConfigurationException { + throws ConfigurationException { return ReflectionUtils.getClass(this, ENSEMBLE_PLACEMENT_POLICY, - RackawareEnsemblePlacementPolicy.class, - EnsemblePlacementPolicy.class, + RackawareEnsemblePlacementPolicy.class, + EnsemblePlacementPolicy.class, defaultLoader); } @@ -685,6 +730,27 @@ public class ClientConfiguration extends AbstractConfiguration { } /** + * Get the network topology stabilize period in seconds. if it is zero, this feature is turned off. + * + * @return network topology stabilize period in seconds. + */ + public int getNetworkTopologyStabilizePeriodSeconds() { + return getInt(NETWORK_TOPOLOGY_STABILIZE_PERIOD_SECONDS, 0); + } + + /** + * Set the network topology stabilize period in seconds. + * + * @see #getNetworkTopologyStabilizePeriodSeconds() + * @param seconds stabilize period in seconds + * @return client configuration. + */ + public ClientConfiguration setNetworkTopologyStabilizePeriodSeconds(int seconds) { + setProperty(NETWORK_TOPOLOGY_STABILIZE_PERIOD_SECONDS, seconds); + return this; + } + + /** * Whether to enable recording task execution stats. * * @return flag to enable/disable recording task execution stats. http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java index cba1f7e..99ed038 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -// This code has been copied from hadoop-common 0.23.1 + package org.apache.bookkeeper.net; import java.util.HashSet; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java index eb0f6f3..8947abf 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java @@ -63,7 +63,7 @@ public class BookieSocketAddress { } // Public getters - public String getHostname() { + public String getHostName() { return hostname; } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java index 96acbc2..d7ff251 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -// This code has been copied from hadoop-common 0.23.1 package org.apache.bookkeeper.net; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java index a5dce93..d09e422 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java @@ -69,6 +69,10 @@ public class DNS { // This is formed by reversing the IP numbers and appending in-addr.arpa // String[] parts = hostIp.getHostAddress().split("\\."); + if(parts.length !=4) { + //Not proper address. May be IPv6 + throw new NamingException("IPV6"); + } String reverseIP = parts[3] + "." + parts[2] + "." + parts[1] + "." + parts[0] + ".in-addr.arpa"; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java index 35f9a36..6156993 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -// This code has been copied from hadoop-common 0.23.1 package org.apache.bookkeeper.net; import java.util.List;