merlimat closed pull request #1204: Allow rackaware policy to be notified of 
any rack change
URL: https://github.com/apache/bookkeeper/pull/1204
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackChangeNotifier.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackChangeNotifier.java
new file mode 100644
index 000000000..c66bcaf97
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackChangeNotifier.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.client;
+
+/**
+ * Notifier used by the RackawareEnsemblePlacementPolicy to get notified if a 
rack changes for a bookie.
+ */
+public interface RackChangeNotifier {
+
+    /**
+     * Register a listener for the rack-aware placement policy.
+     *
+     * @param rackawarePolicy
+     */
+    void registerRackChangeListener(RackawareEnsemblePlacementPolicyImpl 
rackawarePolicy);
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 9a587643b..568debf02 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -72,7 +72,7 @@
  *
  * <p>Make most of the class and methods as protected, so it could be extended 
to implement other algorithms.
  */
-class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsemblePlacementPolicy {
+public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsemblePlacementPolicy {
 
     static final Logger LOG = 
LoggerFactory.getLogger(RackawareEnsemblePlacementPolicyImpl.class);
     boolean isWeighted;
@@ -305,6 +305,10 @@ public RackawareEnsemblePlacementPolicyImpl 
initialize(ClientConfiguration conf,
                 if (dnsResolver instanceof Configurable) {
                     ((Configurable) dnsResolver).setConf(conf);
                 }
+
+                if (dnsResolver instanceof RackChangeNotifier) {
+                    ((RackChangeNotifier) 
dnsResolver).registerRackChangeListener(this);
+                }
             } catch (RuntimeException re) {
                 LOG.info("Failed to initialize DNS Resolver {}, used default 
subnet resolver.", dnsResolverName, re);
                 dnsResolver = new DefaultResolver(() -> this.getDefaultRack());
@@ -337,6 +341,22 @@ protected String 
resolveNetworkLocation(BookieSocketAddress addr) {
         return NetUtils.resolveNetworkLocation(dnsResolver, 
addr.getSocketAddress());
     }
 
+    public void onBookieRackChange(List<BookieSocketAddress> 
bookieAddressList) {
+        rwLock.writeLock().lock();
+        try {
+            for (BookieSocketAddress bookieAddress : bookieAddressList) {
+                BookieNode node = knownBookies.get(bookieAddress);
+                if (node != null) {
+                    // refresh the rack info if its a known bookie
+                    topology.remove(node);
+                    topology.add(createBookieNode(bookieAddress));
+                }
+            }
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
     @Override
     public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> 
writableBookies,
             Set<BookieSocketAddress> readOnlyBookies) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index bb7c6928e..1d32c1376 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -875,7 +875,7 @@ public void 
testWeightedPlacementAndNewEnsembleWithoutEnoughBookies() throws Exc
         }
     }
 
-    private int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> 
ensemble, int writeQuorumSize)
+    static int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> 
ensemble, int writeQuorumSize)
             throws Exception {
         int ensembleSize = ensemble.size();
         int numCoveredWriteQuorums = 0;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
new file mode 100644
index 000000000..a6f28bace
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static 
org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.REPP_DNS_RESOLVER_CLASS;
+import static 
org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import io.netty.util.HashedWheelTimer;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the rackaware ensemble placement policy.
+ */
+public class TestRackawarePolicyNotificationUpdates extends TestCase {
+
+    static final Logger LOG = 
LoggerFactory.getLogger(TestRackawarePolicyNotificationUpdates.class);
+
+    RackawareEnsemblePlacementPolicy repp;
+    HashedWheelTimer timer;
+    ClientConfiguration conf = new ClientConfiguration();
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        conf.setProperty(REPP_DNS_RESOLVER_CLASS, 
StaticDNSResolver.class.getName());
+
+        StaticDNSResolver.reset();
+        
StaticDNSResolver.addNodeToRack(InetAddress.getLocalHost().getHostAddress(),
+                NetworkTopology.DEFAULT_REGION_AND_RACK);
+        StaticDNSResolver.addNodeToRack("127.0.0.1", 
NetworkTopology.DEFAULT_REGION_AND_RACK);
+        StaticDNSResolver.addNodeToRack("localhost", 
NetworkTopology.DEFAULT_REGION_AND_RACK);
+        LOG.info("Set up static DNS Resolver.");
+
+        timer = new HashedWheelTimer(new 
ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
+                conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, 
conf.getTimeoutTimerNumTicks());
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping> empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        repp.uninitalize();
+        super.tearDown();
+    }
+
+    @Test
+    public void testNotifyRackChange() throws Exception {
+        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181);
+        BookieSocketAddress addr2 = new BookieSocketAddress("127.0.1.2", 3181);
+        BookieSocketAddress addr3 = new BookieSocketAddress("127.0.1.3", 3181);
+        BookieSocketAddress addr4 = new BookieSocketAddress("127.0.1.4", 3181);
+
+        // update dns mapping
+        StaticDNSResolver.addNodeToRack(addr1.getHostName(), 
"/default-region/rack-1");
+        StaticDNSResolver.addNodeToRack(addr2.getHostName(), 
"/default-region/rack-2");
+        StaticDNSResolver.addNodeToRack(addr3.getHostName(), 
"/default-region/rack-2");
+        StaticDNSResolver.addNodeToRack(addr4.getHostName(), 
"/default-region/rack-2");
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = Sets.newHashSet(addr1, addr2, addr3, 
addr4);
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, 
Collections.emptyMap(),
+                Collections.emptySet());
+        int numCovered = 
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 2);
+        assertTrue(numCovered >= 1 && numCovered < 3);
+        assertTrue(ensemble.contains(addr1));
+
+        List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
+        List<String> rackList = new ArrayList<>();
+        bookieAddressList.add(addr2);
+        rackList.add("/default-region/rack-3");
+        StaticDNSResolver.changeRack(bookieAddressList, rackList);
+
+        ensemble = repp.newEnsemble(3, 2, 1, Collections.emptyMap(), 
Collections.emptySet());
+        assertEquals(3, 
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 2));
+        assertTrue(ensemble.contains(addr1));
+        assertTrue(ensemble.contains(addr2));
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java
index b08586c69..d5cb06710 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java
@@ -22,7 +22,10 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.bookkeeper.client.RackChangeNotifier;
+import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
 import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.NetworkTopology;
 import org.apache.bookkeeper.net.NodeBase;
@@ -32,7 +35,7 @@
 /**
  * Implements {@link DNSToSwitchMapping} via static mappings. Used in test 
cases to simulate racks.
  */
-public class StaticDNSResolver extends AbstractDNSToSwitchMapping {
+public class StaticDNSResolver extends AbstractDNSToSwitchMapping implements 
RackChangeNotifier {
 
     static final Logger LOG = LoggerFactory.getLogger(StaticDNSResolver.class);
 
@@ -84,4 +87,19 @@ public void reloadCachedMappings() {
         // nop
     }
 
+    private static RackawareEnsemblePlacementPolicyImpl rackawarePolicy = null;
+
+    @Override
+    public void 
registerRackChangeListener(RackawareEnsemblePlacementPolicyImpl 
rackawareEnsemblePolicy) {
+        rackawarePolicy = rackawareEnsemblePolicy;
+    }
+
+    public static void changeRack(List<BookieSocketAddress> bookieAddressList, 
List<String> rack) {
+        for (int i = 0; i < bookieAddressList.size(); i++) {
+            BookieSocketAddress bkAddress = bookieAddressList.get(i);
+            name2Racks.put(bkAddress.getHostName(), rack.get(i));
+        }
+        rackawarePolicy.onBookieRackChange(bookieAddressList);
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to