This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new c0aff9f Allow rackaware policy to be notified of any rack change
c0aff9f is described below
commit c0aff9f91b79b8b573f50fb4ecd0e300816f6949
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Feb 26 14:11:42 2018 -0800
Allow rackaware policy to be notified of any rack change
This PR is a port from Yahoo branch:
https://github.com/yahoo/bookkeeper/commit/dbfd2073b6e687a5b4b9c56a0f0c3436f177db58
It adds a hook so that a placement policy implementation can add a way to
notify the main rackaware policy that the rack mappings were updated.
This was used to have a specialized policy that keeps the mapping (bookie
-> rack) in ZooKeeper. Whenever the new bookies are added (or updated), the
administrator would update the mapping and we need to start using the new
settings.
Author: Matteo Merli <[email protected]>
Reviewers: Sijie Guo <[email protected]>
This closes #1204 from merlimat/rack-aware-notification
---
.../bookkeeper/client/RackChangeNotifier.java | 32 ++++++
.../RackawareEnsemblePlacementPolicyImpl.java | 22 +++-
.../TestRackawareEnsemblePlacementPolicy.java | 2 +-
.../TestRackawarePolicyNotificationUpdates.java | 120 +++++++++++++++++++++
.../apache/bookkeeper/util/StaticDNSResolver.java | 20 +++-
5 files changed, 193 insertions(+), 3 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackChangeNotifier.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackChangeNotifier.java
new file mode 100644
index 0000000..c66bcaf
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackChangeNotifier.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.client;
+
+/**
+ * Notifier used by the RackawareEnsemblePlacementPolicy to get notified if a
rack changes for a bookie.
+ */
+public interface RackChangeNotifier {
+
+ /**
+ * Register a listener for the rack-aware placement policy.
+ *
+ * @param rackawarePolicy
+ */
+ void registerRackChangeListener(RackawareEnsemblePlacementPolicyImpl
rackawarePolicy);
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 9a58764..568debf 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -72,7 +72,7 @@ import org.slf4j.LoggerFactory;
*
* <p>Make most of the class and methods as protected, so it could be extended
to implement other algorithms.
*/
-class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsemblePlacementPolicy {
+public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsemblePlacementPolicy {
static final Logger LOG =
LoggerFactory.getLogger(RackawareEnsemblePlacementPolicyImpl.class);
boolean isWeighted;
@@ -305,6 +305,10 @@ class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsemblePlacemen
if (dnsResolver instanceof Configurable) {
((Configurable) dnsResolver).setConf(conf);
}
+
+ if (dnsResolver instanceof RackChangeNotifier) {
+ ((RackChangeNotifier)
dnsResolver).registerRackChangeListener(this);
+ }
} catch (RuntimeException re) {
LOG.info("Failed to initialize DNS Resolver {}, used default
subnet resolver.", dnsResolverName, re);
dnsResolver = new DefaultResolver(() -> this.getDefaultRack());
@@ -337,6 +341,22 @@ class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsemblePlacemen
return NetUtils.resolveNetworkLocation(dnsResolver,
addr.getSocketAddress());
}
+ public void onBookieRackChange(List<BookieSocketAddress>
bookieAddressList) {
+ rwLock.writeLock().lock();
+ try {
+ for (BookieSocketAddress bookieAddress : bookieAddressList) {
+ BookieNode node = knownBookies.get(bookieAddress);
+ if (node != null) {
+ // refresh the rack info if its a known bookie
+ topology.remove(node);
+ topology.add(createBookieNode(bookieAddress));
+ }
+ }
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
@Override
public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress>
writableBookies,
Set<BookieSocketAddress> readOnlyBookies) {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index bb7c692..1d32c13 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -875,7 +875,7 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
}
}
- private int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress>
ensemble, int writeQuorumSize)
+ static int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress>
ensemble, int writeQuorumSize)
throws Exception {
int ensembleSize = ensemble.size();
int numCoveredWriteQuorums = 0;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
new file mode 100644
index 0000000..a6f28ba
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static
org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.REPP_DNS_RESOLVER_CLASS;
+import static
org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import io.netty.util.HashedWheelTimer;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the rackaware ensemble placement policy.
+ */
+public class TestRackawarePolicyNotificationUpdates extends TestCase {
+
+ static final Logger LOG =
LoggerFactory.getLogger(TestRackawarePolicyNotificationUpdates.class);
+
+ RackawareEnsemblePlacementPolicy repp;
+ HashedWheelTimer timer;
+ ClientConfiguration conf = new ClientConfiguration();
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ conf.setProperty(REPP_DNS_RESOLVER_CLASS,
StaticDNSResolver.class.getName());
+
+ StaticDNSResolver.reset();
+
StaticDNSResolver.addNodeToRack(InetAddress.getLocalHost().getHostAddress(),
+ NetworkTopology.DEFAULT_REGION_AND_RACK);
+ StaticDNSResolver.addNodeToRack("127.0.0.1",
NetworkTopology.DEFAULT_REGION_AND_RACK);
+ StaticDNSResolver.addNodeToRack("localhost",
NetworkTopology.DEFAULT_REGION_AND_RACK);
+ LOG.info("Set up static DNS Resolver.");
+
+ timer = new HashedWheelTimer(new
ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
+ conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
conf.getTimeoutTimerNumTicks());
+
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(conf, Optional.<DNSToSwitchMapping> empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ repp.uninitalize();
+ super.tearDown();
+ }
+
+ @Test
+ public void testNotifyRackChange() throws Exception {
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.1.2", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.1.3", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.1.4", 3181);
+
+ // update dns mapping
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(),
"/default-region/rack-1");
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(),
"/default-region/rack-2");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(),
"/default-region/rack-2");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(),
"/default-region/rack-2");
+
+ // Update cluster
+ Set<BookieSocketAddress> addrs = Sets.newHashSet(addr1, addr2, addr3,
addr4);
+ repp.onClusterChanged(addrs, new HashSet<>());
+
+ ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2,
Collections.emptyMap(),
+ Collections.emptySet());
+ int numCovered =
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 2);
+ assertTrue(numCovered >= 1 && numCovered < 3);
+ assertTrue(ensemble.contains(addr1));
+
+ List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
+ List<String> rackList = new ArrayList<>();
+ bookieAddressList.add(addr2);
+ rackList.add("/default-region/rack-3");
+ StaticDNSResolver.changeRack(bookieAddressList, rackList);
+
+ ensemble = repp.newEnsemble(3, 2, 1, Collections.emptyMap(),
Collections.emptySet());
+ assertEquals(3,
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 2));
+ assertTrue(ensemble.contains(addr1));
+ assertTrue(ensemble.contains(addr2));
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java
index b08586c..d5cb067 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java
@@ -22,7 +22,10 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.client.RackChangeNotifier;
+import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.net.NodeBase;
@@ -32,7 +35,7 @@ import org.slf4j.LoggerFactory;
/**
* Implements {@link DNSToSwitchMapping} via static mappings. Used in test
cases to simulate racks.
*/
-public class StaticDNSResolver extends AbstractDNSToSwitchMapping {
+public class StaticDNSResolver extends AbstractDNSToSwitchMapping implements
RackChangeNotifier {
static final Logger LOG = LoggerFactory.getLogger(StaticDNSResolver.class);
@@ -84,4 +87,19 @@ public class StaticDNSResolver extends
AbstractDNSToSwitchMapping {
// nop
}
+ private static RackawareEnsemblePlacementPolicyImpl rackawarePolicy = null;
+
+ @Override
+ public void
registerRackChangeListener(RackawareEnsemblePlacementPolicyImpl
rackawareEnsemblePolicy) {
+ rackawarePolicy = rackawareEnsemblePolicy;
+ }
+
+ public static void changeRack(List<BookieSocketAddress> bookieAddressList,
List<String> rack) {
+ for (int i = 0; i < bookieAddressList.size(); i++) {
+ BookieSocketAddress bkAddress = bookieAddressList.get(i);
+ name2Racks.put(bkAddress.getHostName(), rack.get(i));
+ }
+ rackawarePolicy.onBookieRackChange(bookieAddressList);
+ }
+
}
--
To stop receiving notification emails like this one, please contact
[email protected].