merlimat closed pull request #1204: Allow rackaware policy to be notified of any rack change URL: https://github.com/apache/bookkeeper/pull/1204
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackChangeNotifier.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackChangeNotifier.java new file mode 100644 index 000000000..c66bcaf97 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackChangeNotifier.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bookkeeper.client; + +/** + * Notifier used by the RackawareEnsemblePlacementPolicy to get notified if a rack changes for a bookie. + */ +public interface RackChangeNotifier { + + /** + * Register a listener for the rack-aware placement policy. + * + * @param rackawarePolicy + */ + void registerRackChangeListener(RackawareEnsemblePlacementPolicyImpl rackawarePolicy); +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index 9a587643b..568debf02 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -72,7 +72,7 @@ * * <p>Make most of the class and methods as protected, so it could be extended to implement other algorithms. */ -class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacementPolicy { +public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacementPolicy { static final Logger LOG = LoggerFactory.getLogger(RackawareEnsemblePlacementPolicyImpl.class); boolean isWeighted; @@ -305,6 +305,10 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf, if (dnsResolver instanceof Configurable) { ((Configurable) dnsResolver).setConf(conf); } + + if (dnsResolver instanceof RackChangeNotifier) { + ((RackChangeNotifier) dnsResolver).registerRackChangeListener(this); + } } catch (RuntimeException re) { LOG.info("Failed to initialize DNS Resolver {}, used default subnet resolver.", dnsResolverName, re); dnsResolver = new DefaultResolver(() -> this.getDefaultRack()); @@ -337,6 +341,22 @@ protected String resolveNetworkLocation(BookieSocketAddress addr) { return NetUtils.resolveNetworkLocation(dnsResolver, addr.getSocketAddress()); } + public void onBookieRackChange(List<BookieSocketAddress> bookieAddressList) { + rwLock.writeLock().lock(); + try { + for (BookieSocketAddress bookieAddress : bookieAddressList) { + BookieNode node = knownBookies.get(bookieAddress); + if (node != null) { + // refresh the rack info if its a known bookie + topology.remove(node); + topology.add(createBookieNode(bookieAddress)); + } + } + } finally { + rwLock.writeLock().unlock(); + } + } + @Override public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> writableBookies, Set<BookieSocketAddress> readOnlyBookies) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index bb7c6928e..1d32c1376 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -875,7 +875,7 @@ public void testWeightedPlacementAndNewEnsembleWithoutEnoughBookies() throws Exc } } - private int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> ensemble, int writeQuorumSize) + static int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> ensemble, int writeQuorumSize) throws Exception { int ensembleSize = ensemble.size(); int numCoveredWriteQuorums = 0; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java new file mode 100644 index 000000000..a6f28bace --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.client; + +import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.REPP_DNS_RESOLVER_CLASS; +import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL; + +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import io.netty.util.HashedWheelTimer; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import junit.framework.TestCase; + +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.net.DNSToSwitchMapping; +import org.apache.bookkeeper.net.NetworkTopology; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.util.StaticDNSResolver; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test the rackaware ensemble placement policy. + */ +public class TestRackawarePolicyNotificationUpdates extends TestCase { + + static final Logger LOG = LoggerFactory.getLogger(TestRackawarePolicyNotificationUpdates.class); + + RackawareEnsemblePlacementPolicy repp; + HashedWheelTimer timer; + ClientConfiguration conf = new ClientConfiguration(); + + @Override + protected void setUp() throws Exception { + super.setUp(); + conf.setProperty(REPP_DNS_RESOLVER_CLASS, StaticDNSResolver.class.getName()); + + StaticDNSResolver.reset(); + StaticDNSResolver.addNodeToRack(InetAddress.getLocalHost().getHostAddress(), + NetworkTopology.DEFAULT_REGION_AND_RACK); + StaticDNSResolver.addNodeToRack("127.0.0.1", NetworkTopology.DEFAULT_REGION_AND_RACK); + StaticDNSResolver.addNodeToRack("localhost", NetworkTopology.DEFAULT_REGION_AND_RACK); + LOG.info("Set up static DNS Resolver."); + + timer = new HashedWheelTimer(new ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(), + conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, conf.getTimeoutTimerNumTicks()); + + repp = new RackawareEnsemblePlacementPolicy(); + repp.initialize(conf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + } + + @Override + protected void tearDown() throws Exception { + repp.uninitalize(); + super.tearDown(); + } + + @Test + public void testNotifyRackChange() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.1.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.1.3", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.1.4", 3181); + + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/rack-1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/rack-2"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/rack-2"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/rack-2"); + + // Update cluster + Set<BookieSocketAddress> addrs = Sets.newHashSet(addr1, addr2, addr3, addr4); + repp.onClusterChanged(addrs, new HashSet<>()); + + ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, Collections.emptyMap(), + Collections.emptySet()); + int numCovered = TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 2); + assertTrue(numCovered >= 1 && numCovered < 3); + assertTrue(ensemble.contains(addr1)); + + List<BookieSocketAddress> bookieAddressList = new ArrayList<>(); + List<String> rackList = new ArrayList<>(); + bookieAddressList.add(addr2); + rackList.add("/default-region/rack-3"); + StaticDNSResolver.changeRack(bookieAddressList, rackList); + + ensemble = repp.newEnsemble(3, 2, 1, Collections.emptyMap(), Collections.emptySet()); + assertEquals(3, TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 2)); + assertTrue(ensemble.contains(addr1)); + assertTrue(ensemble.contains(addr2)); + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java index b08586c69..d5cb06710 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java @@ -22,7 +22,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.bookkeeper.client.RackChangeNotifier; +import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl; import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.net.DNSToSwitchMapping; import org.apache.bookkeeper.net.NetworkTopology; import org.apache.bookkeeper.net.NodeBase; @@ -32,7 +35,7 @@ /** * Implements {@link DNSToSwitchMapping} via static mappings. Used in test cases to simulate racks. */ -public class StaticDNSResolver extends AbstractDNSToSwitchMapping { +public class StaticDNSResolver extends AbstractDNSToSwitchMapping implements RackChangeNotifier { static final Logger LOG = LoggerFactory.getLogger(StaticDNSResolver.class); @@ -84,4 +87,19 @@ public void reloadCachedMappings() { // nop } + private static RackawareEnsemblePlacementPolicyImpl rackawarePolicy = null; + + @Override + public void registerRackChangeListener(RackawareEnsemblePlacementPolicyImpl rackawareEnsemblePolicy) { + rackawarePolicy = rackawareEnsemblePolicy; + } + + public static void changeRack(List<BookieSocketAddress> bookieAddressList, List<String> rack) { + for (int i = 0; i < bookieAddressList.size(); i++) { + BookieSocketAddress bkAddress = bookieAddressList.get(i); + name2Racks.put(bkAddress.getHostName(), rack.get(i)); + } + rackawarePolicy.onBookieRackChange(bookieAddressList); + } + } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
