This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new c0aff9f  Allow rackaware policy to be notified of any rack change
c0aff9f is described below

commit c0aff9f91b79b8b573f50fb4ecd0e300816f6949
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Feb 26 14:11:42 2018 -0800

    Allow rackaware policy to be notified of any rack change
    
    This PR is a port from Yahoo branch: 
https://github.com/yahoo/bookkeeper/commit/dbfd2073b6e687a5b4b9c56a0f0c3436f177db58
    
    It adds a hook so that a placement policy implementation can add a way to 
notify the main rackaware policy that the rack mappings were updated.
    
    This was used to have a specialized policy that keeps the mapping (bookie 
-> rack) in ZooKeeper. Whenever the new bookies are added (or updated), the 
administrator would update the mapping and we need to start using the new 
settings.
    
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Sijie Guo <[email protected]>
    
    This closes #1204 from merlimat/rack-aware-notification
---
 .../bookkeeper/client/RackChangeNotifier.java      |  32 ++++++
 .../RackawareEnsemblePlacementPolicyImpl.java      |  22 +++-
 .../TestRackawareEnsemblePlacementPolicy.java      |   2 +-
 .../TestRackawarePolicyNotificationUpdates.java    | 120 +++++++++++++++++++++
 .../apache/bookkeeper/util/StaticDNSResolver.java  |  20 +++-
 5 files changed, 193 insertions(+), 3 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackChangeNotifier.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackChangeNotifier.java
new file mode 100644
index 0000000..c66bcaf
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackChangeNotifier.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.client;
+
+/**
+ * Notifier used by the RackawareEnsemblePlacementPolicy to get notified if a 
rack changes for a bookie.
+ */
+public interface RackChangeNotifier {
+
+    /**
+     * Register a listener for the rack-aware placement policy.
+     *
+     * @param rackawarePolicy
+     */
+    void registerRackChangeListener(RackawareEnsemblePlacementPolicyImpl 
rackawarePolicy);
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 9a58764..568debf 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -72,7 +72,7 @@ import org.slf4j.LoggerFactory;
  *
  * <p>Make most of the class and methods as protected, so it could be extended 
to implement other algorithms.
  */
-class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsemblePlacementPolicy {
+public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsemblePlacementPolicy {
 
     static final Logger LOG = 
LoggerFactory.getLogger(RackawareEnsemblePlacementPolicyImpl.class);
     boolean isWeighted;
@@ -305,6 +305,10 @@ class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsemblePlacemen
                 if (dnsResolver instanceof Configurable) {
                     ((Configurable) dnsResolver).setConf(conf);
                 }
+
+                if (dnsResolver instanceof RackChangeNotifier) {
+                    ((RackChangeNotifier) 
dnsResolver).registerRackChangeListener(this);
+                }
             } catch (RuntimeException re) {
                 LOG.info("Failed to initialize DNS Resolver {}, used default 
subnet resolver.", dnsResolverName, re);
                 dnsResolver = new DefaultResolver(() -> this.getDefaultRack());
@@ -337,6 +341,22 @@ class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsemblePlacemen
         return NetUtils.resolveNetworkLocation(dnsResolver, 
addr.getSocketAddress());
     }
 
+    public void onBookieRackChange(List<BookieSocketAddress> 
bookieAddressList) {
+        rwLock.writeLock().lock();
+        try {
+            for (BookieSocketAddress bookieAddress : bookieAddressList) {
+                BookieNode node = knownBookies.get(bookieAddress);
+                if (node != null) {
+                    // refresh the rack info if its a known bookie
+                    topology.remove(node);
+                    topology.add(createBookieNode(bookieAddress));
+                }
+            }
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
     @Override
     public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> 
writableBookies,
             Set<BookieSocketAddress> readOnlyBookies) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index bb7c692..1d32c13 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -875,7 +875,7 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
         }
     }
 
-    private int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> 
ensemble, int writeQuorumSize)
+    static int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> 
ensemble, int writeQuorumSize)
             throws Exception {
         int ensembleSize = ensemble.size();
         int numCoveredWriteQuorums = 0;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
new file mode 100644
index 0000000..a6f28ba
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static 
org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.REPP_DNS_RESOLVER_CLASS;
+import static 
org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import io.netty.util.HashedWheelTimer;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the rackaware ensemble placement policy.
+ */
+public class TestRackawarePolicyNotificationUpdates extends TestCase {
+
+    static final Logger LOG = 
LoggerFactory.getLogger(TestRackawarePolicyNotificationUpdates.class);
+
+    RackawareEnsemblePlacementPolicy repp;
+    HashedWheelTimer timer;
+    ClientConfiguration conf = new ClientConfiguration();
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        conf.setProperty(REPP_DNS_RESOLVER_CLASS, 
StaticDNSResolver.class.getName());
+
+        StaticDNSResolver.reset();
+        
StaticDNSResolver.addNodeToRack(InetAddress.getLocalHost().getHostAddress(),
+                NetworkTopology.DEFAULT_REGION_AND_RACK);
+        StaticDNSResolver.addNodeToRack("127.0.0.1", 
NetworkTopology.DEFAULT_REGION_AND_RACK);
+        StaticDNSResolver.addNodeToRack("localhost", 
NetworkTopology.DEFAULT_REGION_AND_RACK);
+        LOG.info("Set up static DNS Resolver.");
+
+        timer = new HashedWheelTimer(new 
ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
+                conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, 
conf.getTimeoutTimerNumTicks());
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping> empty(), timer, 
DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        repp.uninitalize();
+        super.tearDown();
+    }
+
+    @Test
+    public void testNotifyRackChange() throws Exception {
+        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181);
+        BookieSocketAddress addr2 = new BookieSocketAddress("127.0.1.2", 3181);
+        BookieSocketAddress addr3 = new BookieSocketAddress("127.0.1.3", 3181);
+        BookieSocketAddress addr4 = new BookieSocketAddress("127.0.1.4", 3181);
+
+        // update dns mapping
+        StaticDNSResolver.addNodeToRack(addr1.getHostName(), 
"/default-region/rack-1");
+        StaticDNSResolver.addNodeToRack(addr2.getHostName(), 
"/default-region/rack-2");
+        StaticDNSResolver.addNodeToRack(addr3.getHostName(), 
"/default-region/rack-2");
+        StaticDNSResolver.addNodeToRack(addr4.getHostName(), 
"/default-region/rack-2");
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = Sets.newHashSet(addr1, addr2, addr3, 
addr4);
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, 
Collections.emptyMap(),
+                Collections.emptySet());
+        int numCovered = 
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 2);
+        assertTrue(numCovered >= 1 && numCovered < 3);
+        assertTrue(ensemble.contains(addr1));
+
+        List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
+        List<String> rackList = new ArrayList<>();
+        bookieAddressList.add(addr2);
+        rackList.add("/default-region/rack-3");
+        StaticDNSResolver.changeRack(bookieAddressList, rackList);
+
+        ensemble = repp.newEnsemble(3, 2, 1, Collections.emptyMap(), 
Collections.emptySet());
+        assertEquals(3, 
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 2));
+        assertTrue(ensemble.contains(addr1));
+        assertTrue(ensemble.contains(addr2));
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java
index b08586c..d5cb067 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java
@@ -22,7 +22,10 @@ import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.bookkeeper.client.RackChangeNotifier;
+import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
 import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.NetworkTopology;
 import org.apache.bookkeeper.net.NodeBase;
@@ -32,7 +35,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Implements {@link DNSToSwitchMapping} via static mappings. Used in test 
cases to simulate racks.
  */
-public class StaticDNSResolver extends AbstractDNSToSwitchMapping {
+public class StaticDNSResolver extends AbstractDNSToSwitchMapping implements 
RackChangeNotifier {
 
     static final Logger LOG = LoggerFactory.getLogger(StaticDNSResolver.class);
 
@@ -84,4 +87,19 @@ public class StaticDNSResolver extends 
AbstractDNSToSwitchMapping {
         // nop
     }
 
+    private static RackawareEnsemblePlacementPolicyImpl rackawarePolicy = null;
+
+    @Override
+    public void 
registerRackChangeListener(RackawareEnsemblePlacementPolicyImpl 
rackawareEnsemblePolicy) {
+        rackawarePolicy = rackawareEnsemblePolicy;
+    }
+
+    public static void changeRack(List<BookieSocketAddress> bookieAddressList, 
List<String> rack) {
+        for (int i = 0; i < bookieAddressList.size(); i++) {
+            BookieSocketAddress bkAddress = bookieAddressList.get(i);
+            name2Racks.put(bkAddress.getHostName(), rack.get(i));
+        }
+        rackawarePolicy.onBookieRackChange(bookieAddressList);
+    }
+
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to