Modified: zookeeper/trunk/src/c/tests/TestZookeeperInit.cc
URL: 
http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/TestZookeeperInit.cc?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/TestZookeeperInit.cc (original)
+++ zookeeper/trunk/src/c/tests/TestZookeeperInit.cc Sat Nov 17 14:03:03 2012
@@ -93,7 +93,7 @@ public:
     void testBasic()
     {
         const string EXPECTED_HOST("127.0.0.1:2121");
-        const int EXPECTED_ADDRS_COUNT =1;
+        const unsigned int EXPECTED_ADDRS_COUNT =1;
         const int EXPECTED_RECV_TIMEOUT=10000;
         clientid_t cid;
         memset(&cid,0xFE,sizeof(cid));
@@ -101,16 +101,16 @@ public:
         zh=zookeeper_init(EXPECTED_HOST.c_str(),watcher,EXPECTED_RECV_TIMEOUT,
                 &cid,(void*)1,0);
 
-        CPPUNIT_ASSERT(zh!=0);
+        CPPUNIT_ASSERT(zh != NULL);
         CPPUNIT_ASSERT(zh->fd == -1);
-        CPPUNIT_ASSERT(zh->hostname!=0);
-        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs_count);
+        CPPUNIT_ASSERT(zh->hostname != NULL);
+        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs.count);
         CPPUNIT_ASSERT_EQUAL(EXPECTED_HOST,string(zh->hostname));
-        CPPUNIT_ASSERT(zh->state == NOTCONNECTED_STATE_DEF);
+        CPPUNIT_ASSERT(zh->state == ZOO_NOTCONNECTED_STATE);
         CPPUNIT_ASSERT(zh->context == (void*)1);
         CPPUNIT_ASSERT_EQUAL(EXPECTED_RECV_TIMEOUT,zh->recv_timeout);
         CPPUNIT_ASSERT(zh->watcher == watcher);
-        CPPUNIT_ASSERT(zh->connect_index==0);
+        CPPUNIT_ASSERT(zh->addrs.next==0);
         CPPUNIT_ASSERT(zh->primer_buffer.buffer==zh->primer_storage_buffer);
         CPPUNIT_ASSERT(zh->primer_buffer.curr_offset ==0);
         CPPUNIT_ASSERT(zh->primer_buffer.len == 
sizeof(zh->primer_storage_buffer));
@@ -136,15 +136,15 @@ public:
     void testAddressResolution()
     {
         const char EXPECTED_IPS[][4]={{127,0,0,1}};
-        const int EXPECTED_ADDRS_COUNT =COUNTOF(EXPECTED_IPS);
+        const unsigned int EXPECTED_ADDRS_COUNT =COUNTOF(EXPECTED_IPS);
 
         zoo_deterministic_conn_order(1);
         zh=zookeeper_init("127.0.0.1:2121",0,10000,0,0,0);
 
         CPPUNIT_ASSERT(zh!=0);
-        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs_count);
-        for(int i=0;i<zh->addrs_count;i++){
-            sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs[i];
+        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs.count);
+        for(int i=0;i<zh->addrs.count;i++){
+            sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs.data[i];
             
CPPUNIT_ASSERT(memcmp(EXPECTED_IPS[i],&addr->sin_addr,sizeof(addr->sin_addr))==0);
             CPPUNIT_ASSERT_EQUAL(2121,(int)ntohs(addr->sin_port));
         }
@@ -153,16 +153,16 @@ public:
     {
         const string EXPECTED_HOST("127.0.0.1:2121,127.0.0.2:3434");
         const char EXPECTED_IPS[][4]={{127,0,0,1},{127,0,0,2}};
-        const int EXPECTED_ADDRS_COUNT =COUNTOF(EXPECTED_IPS);
+        const unsigned int EXPECTED_ADDRS_COUNT =COUNTOF(EXPECTED_IPS);
 
         zoo_deterministic_conn_order(1);
         zh=zookeeper_init(EXPECTED_HOST.c_str(),0,1000,0,0,0);
 
         CPPUNIT_ASSERT(zh!=0);
-        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs_count);
+        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs.count);
 
-        for(int i=0;i<zh->addrs_count;i++){
-            sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs[i];
+        for(int i=0;i<zh->addrs.count;i++){
+            sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs.data[i];
             
CPPUNIT_ASSERT(memcmp(EXPECTED_IPS[i],&addr->sin_addr,sizeof(addr->sin_addr))==0);
             if(i<1)
                 CPPUNIT_ASSERT_EQUAL(2121,(int)ntohs(addr->sin_port));
@@ -174,16 +174,16 @@ public:
     { 
         const string EXPECTED_HOST("127.0.0.1:2121,  127.0.0.2:3434");
         const char EXPECTED_IPS[][4]={{127,0,0,1},{127,0,0,2}};
-        const int EXPECTED_ADDRS_COUNT =COUNTOF(EXPECTED_IPS);
+        const unsigned int EXPECTED_ADDRS_COUNT =COUNTOF(EXPECTED_IPS);
 
         zoo_deterministic_conn_order(1);
         zh=zookeeper_init(EXPECTED_HOST.c_str(),0,1000,0,0,0);
 
         CPPUNIT_ASSERT(zh!=0);
-        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs_count);
+        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs.count);
 
-        for(int i=0;i<zh->addrs_count;i++){
-            sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs[i];
+        for(int i=0;i<zh->addrs.count;i++){
+            sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs.data[i];
             
CPPUNIT_ASSERT(memcmp(EXPECTED_IPS[i],&addr->sin_addr,sizeof(addr->sin_addr))==0);
             if(i<1)
                 CPPUNIT_ASSERT_EQUAL(2121,(int)ntohs(addr->sin_port));
@@ -277,7 +277,7 @@ public:
     void testPermuteAddrsList()
     {
         const char EXPECTED[][5]={"\0\0\0\0","\1\1\1\1","\2\2\2\2","\3\3\3\3"};
-        const int EXPECTED_ADDR_COUNT=COUNTOF(EXPECTED);
+        const unsigned int EXPECTED_ADDR_COUNT=COUNTOF(EXPECTED);
 
         const int RAND_SEQ[]={0,1,1,-1};
         const int RAND_SIZE=COUNTOF(RAND_SEQ);
@@ -286,11 +286,11 @@ public:
         
zh=zookeeper_init("0.0.0.0:123,1.1.1.1:123,2.2.2.2:123,3.3.3.3:123",0,1000,0,0,0);
 
         CPPUNIT_ASSERT(zh!=0);
-        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDR_COUNT,zh->addrs_count);
+        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDR_COUNT,zh->addrs.count);
         const string EXPECTED_SEQ("3210");
         char ACTUAL_SEQ[EXPECTED_ADDR_COUNT+1]; 
ACTUAL_SEQ[EXPECTED_ADDR_COUNT]=0;
-        for(int i=0;i<zh->addrs_count;i++){
-            sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs[i];
+        for(int i=0;i<zh->addrs.count;i++){
+            sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs.data[i];
             // match the first byte of the EXPECTED and of the actual address
             ACTUAL_SEQ[i]=((char*)&addr->sin_addr)[0]+'0';
         }

Modified: zookeeper/trunk/src/c/tests/ZKMocks.cc
URL: 
http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/ZKMocks.cc?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/ZKMocks.cc (original)
+++ zookeeper/trunk/src/c/tests/ZKMocks.cc Sat Nov 17 14:03:03 2012
@@ -507,7 +507,13 @@ void ZookeeperServer::notifyBufferSent(c
 void forceConnected(zhandle_t* zh){
     // simulate connected state
     zh->state=ZOO_CONNECTED_STATE;
+    
+    // Simulate we're connected to the first host in our host list
     zh->fd=ZookeeperServer::FD;
+    assert(zh->addrs.count > 0);
+    zh->addr_cur = zh->addrs.data[0];
+    zh->addrs.next++;
+
     zh->input_buffer=0;
     gettimeofday(&zh->last_recv,0);    
     gettimeofday(&zh->last_send,0);    

Modified: 
zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml
URL: 
http://svn.apache.org/viewvc/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- 
zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml
 (original)
+++ 
zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml
 Sat Nov 17 14:03:03 2012
@@ -541,6 +541,40 @@
       and the second client will be disconnected (causing the pair to attempt 
to re-establish
       it's connection/session indefinitely).</para>
 
+    <para> <emphasis role="bold">Updating the list of servers</emphasis>.  We 
allow a client to 
+      update the connection string by providing a new comma separated list of 
host:port pairs, 
+      each corresponding to a ZooKeeper server. The function invokes a 
probabilistic load-balancing 
+      algorithm which may cause the client to disconnect from its current host 
with the goal
+      to achieve expected uniform number of connections per server in the new 
list. 
+      In case the current host to which the client is connected is not in the 
new list
+      this call will always cause the connection to be dropped. Otherwise, the 
decision
+         is based on whether the number of servers has increased or decreased 
and by how much.
+
+    <para>
+      For example, if the previous connection string contained 3 hosts and now 
the list contains
+      these 3 hosts and 2 more hosts, 40% of clients connected to each of the 
3 hosts will
+      move to one of the new hosts in order to balance the load. The algorithm 
will cause the client 
+      to drop its connection to the current host to which it is connected with 
probability 0.4 and in this 
+         case cause the client to connect to one of the 2 new hosts, chosen at 
random.
+    </para>
+
+       <para>
+         Another example -- suppose we have 5 hosts and now update the list to 
remove 2 of the hosts, 
+         the clients connected to the 3 remaining hosts will stay connected, 
whereas all clients connected 
+         to the 2 removed hosts will need to move to one of the 3 hosts, 
chosen at random. If the connection
+         is dropped, the client moves to a special mode where he chooses a new 
server to connect to using the
+         probabilistic algorithm, and not just round robin. 
+    </para>
+
+    <para>
+         In the first example, each client decides to disconnect with 
probability 0.4 but once the decision is
+         made, it will try to connect to a random new server and only if it 
cannot connect to any of the new 
+         servers will it try to connect to the old ones. After finding a 
server, or trying all servers in the 
+         new list and failing to connect, the client moves back to the normal 
mode of operation where it picks
+         an arbitrary server from the connectString and attempt to connect to 
it. If that fails, is will continue
+         trying different random servers in round robin. (see above the 
algorithm used to initially choose a server)
+    </para>
+
   </section>
 
   <section id="ch_zkWatches">

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: 
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Sat Nov 
17 14:03:03 2012
@@ -19,6 +19,7 @@
 package org.apache.zookeeper;
 
 import org.apache.zookeeper.AsyncCallback.*;
+import org.apache.zookeeper.ClientCnxn.SendThread;
 import org.apache.zookeeper.OpResult.ErrorResult;
 import org.apache.zookeeper.client.ConnectStringParser;
 import org.apache.zookeeper.client.HostProvider;
@@ -33,7 +34,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.net.UnknownHostException;
 import java.util.*;
 
 /**
@@ -94,6 +98,61 @@ public class ZooKeeper {
         LOG = LoggerFactory.getLogger(ZooKeeper.class);
         Environment.logEnv("Client environment:", LOG);
     }
+    
+    private final StaticHostProvider hostProvider;
+    
+    /**
+     * This function allows a client to update the connection string by 
providing 
+     * a new comma separated list of host:port pairs, each corresponding to a 
+     * ZooKeeper server. 
+     * <p>
+     * The function invokes a <a 
href="https://issues.apache.org/jira/browse/ZOOKEEPER-1355";>
+     * probabilistic load-balancing algorithm</a> which may cause the client 
to disconnect from 
+     * its current host with the goal to achieve expected uniform number of 
connections per server 
+     * in the new list. In case the current host to which the client is 
connected is not in the new
+     * list this call will always cause the connection to be dropped. 
Otherwise, the decision
+     * is based on whether the number of servers has increased or decreased 
and by how much.
+     * For example, if the previous connection string contained 3 hosts and 
now the list contains
+     * these 3 hosts and 2 more hosts, 40% of clients connected to each of the 
3 hosts will
+     * move to one of the new hosts in order to balance the load. The 
algorithm will disconnect 
+     * from the current host with probability 0.4 and in this case cause the 
client to connect 
+     * to one of the 2 new hosts, chosen at random.
+     * <p>
+     * If the connection is dropped, the client moves to a special mode 
"reconfigMode" where he chooses
+     * a new server to connect to using the probabilistic algorithm. After 
finding a server,
+     * or exhausting all servers in the new list after trying all of them and 
failing to connect,
+     * the client moves back to the normal mode of operation where it will 
pick an arbitrary server
+     * from the connectString and attempt to connect to it. If establishment of
+     * the connection fails, another server in the connect string will be tried
+     * (the order is non-deterministic, as we random shuffle the list), until a
+     * connection is established. The client will continue attempts until the
+     * session is explicitly closed (or the session is expired by the server).
+
+     * @param connectString
+     *            comma separated host:port pairs, each corresponding to a zk
+     *            server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+     *            If the optional chroot suffix is used the example would look
+     *            like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
+     *            where the client would be rooted at "/app/a" and all paths
+     *            would be relative to this root - ie getting/setting/etc...
+     *            "/foo/bar" would result in operations being run on
+     *            "/app/a/foo/bar" (from the server perspective).     
+     *
+     * @throws IOException in cases of network failure     
+     */
+    public void updateServerList(String connectString) throws IOException {
+        ConnectStringParser connectStringParser = new 
ConnectStringParser(connectString);
+        Collection<InetSocketAddress> serverAddresses = 
connectStringParser.getServerAddresses();
+
+        ClientCnxnSocket clientCnxnSocket = 
cnxn.sendThread.getClientCnxnSocket();
+        InetSocketAddress currentHost = (InetSocketAddress) 
clientCnxnSocket.getRemoteSocketAddress();
+
+        boolean reconfigMode = hostProvider.updateServerList(serverAddresses, 
currentHost);
+
+        // cause disconnection - this will cause next to be called
+        // which will in turn call nextReconfigMode
+        if (reconfigMode) clientCnxnSocket.testableCloseSocket();
+    }
 
     public ZooKeeperSaslClient getSaslClient() {
         return cnxn.zooKeeperSaslClient;
@@ -442,7 +501,8 @@ public class ZooKeeper {
 
         ConnectStringParser connectStringParser = new ConnectStringParser(
                 connectString);
-        HostProvider hostProvider = new StaticHostProvider(
+        
+        hostProvider = new StaticHostProvider(
                 connectStringParser.getServerAddresses());
         cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                 hostProvider, sessionTimeout, this, watchManager,
@@ -580,10 +640,10 @@ public class ZooKeeper {
                 + (sessionPasswd == null ? "<null>" : "<hidden>"));
 
         watchManager.defaultWatcher = watcher;
-
+       
         ConnectStringParser connectStringParser = new ConnectStringParser(
                 connectString);
-        HostProvider hostProvider = new StaticHostProvider(
+        hostProvider = new StaticHostProvider(
                 connectStringParser.getServerAddresses());
         cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                 hostProvider, sessionTimeout, this, watchManager,

Modified: 
zookeeper/trunk/src/java/main/org/apache/zookeeper/client/HostProvider.java
URL: 
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/client/HostProvider.java?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/client/HostProvider.java 
(original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/client/HostProvider.java 
Sat Nov 17 14:03:03 2012
@@ -19,6 +19,8 @@
 package org.apache.zookeeper.client;
 
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
 
 /**
  * A set of hosts a ZooKeeper client should connect to.
@@ -58,4 +60,13 @@ public interface HostProvider {
      * The HostProvider may use this notification to reset it's inner state.
      */
     public void onConnected();
+
+    /**
+     * Update the list of servers. This returns true if changing connections 
is necessary for load-balancing, false otherwise.
+     * @param serverAddresses new host list
+     * @param currentHost the host to which this client is currently connected
+     * @return true if changing connections is necessary for load-balancing, 
false otherwise  
+     */
+       boolean updateServerList(Collection<InetSocketAddress> serverAddresses, 
InetSocketAddress currentHost)
+                       throws UnknownHostException;
 }

Modified: 
zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java
URL: 
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- 
zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java
 (original)
+++ 
zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java
 Sat Nov 17 14:03:03 2012
@@ -25,11 +25,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Random;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Most simple HostProvider, resolves only on instantiation.
  * 
@@ -38,14 +38,31 @@ public final class StaticHostProvider im
     private static final Logger LOG = LoggerFactory
             .getLogger(StaticHostProvider.class);
 
-    private final List<InetSocketAddress> serverAddresses = new 
ArrayList<InetSocketAddress>(
+    private List<InetSocketAddress> serverAddresses = new 
ArrayList<InetSocketAddress>(
             5);
 
+    private Random sourceOfRandomness;
     private int lastIndex = -1;
 
     private int currentIndex = -1;
 
     /**
+     * The following fields are used to migrate clients during reconfiguration
+     */
+    private boolean reconfigMode = false;
+
+    private final List<InetSocketAddress> oldServers = new 
ArrayList<InetSocketAddress>(
+            5);
+
+    private final List<InetSocketAddress> newServers = new 
ArrayList<InetSocketAddress>(
+            5);
+
+    private int currentIndexOld = -1;
+    private int currentIndexNew = -1;
+
+    private float pOld, pNew;
+
+    /**
      * Constructs a SimpleHostSet.
      * 
      * @param serverAddresses
@@ -56,46 +73,234 @@ public final class StaticHostProvider im
      */
     public StaticHostProvider(Collection<InetSocketAddress> serverAddresses)
             throws UnknownHostException {
+       sourceOfRandomness = new Random(System.currentTimeMillis() ^ 
this.hashCode());
+
+        this.serverAddresses = resolveAndShuffle(serverAddresses);
+        if (this.serverAddresses.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "A HostProvider may not be empty!");
+        }       
+        currentIndex = -1;
+        lastIndex = -1;              
+    }
+
+    /**
+     * Constructs a SimpleHostSet. This constructor is used from 
StaticHostProviderTest to produce deterministic test results
+     * by initializing sourceOfRandomness with the same seed
+     * 
+     * @param serverAddresses
+     *            possibly unresolved ZooKeeper server addresses
+     * @param randomnessSeed a seed used to initialize sourceOfRandomnes
+     * @throws UnknownHostException
+     * @throws IllegalArgumentException
+     *             if serverAddresses is empty or resolves to an empty list
+     */
+    public StaticHostProvider(Collection<InetSocketAddress> serverAddresses, 
long randomnessSeed)
+            throws UnknownHostException {
+        sourceOfRandomness = new Random(randomnessSeed);
+
+        this.serverAddresses = resolveAndShuffle(serverAddresses);
+        if (this.serverAddresses.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "A HostProvider may not be empty!");
+        }       
+        currentIndex = -1;
+        lastIndex = -1;              
+    }
+
+    private List<InetSocketAddress> 
resolveAndShuffle(Collection<InetSocketAddress> serverAddresses)
+            throws UnknownHostException {
+        List<InetSocketAddress> tmpList = new 
ArrayList<InetSocketAddress>(serverAddresses.size());       
         for (InetSocketAddress address : serverAddresses) {
             InetAddress resolvedAddresses[] = InetAddress.getAllByName(address
                     .getHostName());
             for (InetAddress resolvedAddress : resolvedAddresses) {
-                this.serverAddresses.add(new InetSocketAddress(resolvedAddress
+                tmpList.add(new InetSocketAddress(resolvedAddress
                         .getHostAddress(), address.getPort()));
             }
         }
+        Collections.shuffle(tmpList, sourceOfRandomness);
+        return tmpList;
+    } 
 
-        if (this.serverAddresses.isEmpty()) {
+
+    /**
+     * Update the list of servers. This returns true if changing connections 
is necessary for load-balancing, false
+        * otherwise. Changing connections is necessary if one of the following 
holds: 
+     * a) the host to which this client is currently connected is not in 
serverAddresses.
+     *    Otherwise (if currentHost is in the new list serverAddresses):   
+     * b) the number of servers in the cluster is increasing - in this case 
the load on currentHost should decrease,
+     *    which means that SOME of the clients connected to it will migrate to 
the new servers. The decision whether
+     *    this client migrates or not (i.e., whether true or false is 
returned) is probabilistic so that the expected 
+     *    number of clients connected to each server is the same.
+     *    
+     * If true is returned, the function sets pOld and pNew that correspond to 
the probability to migrate to ones of the
+     * new servers in serverAddresses or one of the old servers (migrating to 
one of the old servers is done only
+     * if our client's currentHost is not in serverAddresses). See 
nextHostInReconfigMode for the selection logic.
+     * 
+     * See {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1355} for 
the protocol and its evaluation, and
+        * StaticHostProviderTest for the tests that illustrate how load 
balancing works with this policy.
+     * @param serverAddresses new host list
+     * @param currentHost the host to which this client is currently connected
+     * @return true if changing connections is necessary for load-balancing, 
false otherwise  
+     */
+
+
+    @Override
+    public boolean updateServerList(Collection<InetSocketAddress> 
serverAddresses, InetSocketAddress currentHost) throws UnknownHostException {   
     
+        // Resolve server addresses and shuffle them
+        List<InetSocketAddress> resolvedList = 
resolveAndShuffle(serverAddresses);
+        if (resolvedList.isEmpty()) {
             throw new IllegalArgumentException(
                     "A HostProvider may not be empty!");
         }
-        Collections.shuffle(this.serverAddresses);
+        // Check if client's current server is in the new list of servers
+        boolean myServerInNewConfig = false;        
+        for (InetSocketAddress addr : resolvedList) {
+            if (addr.getHostName().equals(currentHost.getHostName())
+                    && addr.getPort() == currentHost.getPort()) {
+                myServerInNewConfig = true;
+                break;
+            }
+        }
+
+        synchronized(this) {
+            reconfigMode = true;
+
+            newServers.clear();
+            oldServers.clear();
+            // Divide the new servers into oldServers that were in the 
previous list
+            // and newServers that were not in the previous list
+            for (InetSocketAddress resolvedAddress : resolvedList) {           
     
+                if (this.serverAddresses.contains(resolvedAddress)) {
+                    oldServers.add(resolvedAddress);
+                } else {
+                    newServers.add(resolvedAddress);
+                }
+            }        
+
+            int numOld = oldServers.size();
+            int numNew = newServers.size();                        
+
+            // number of servers increased
+            if (numOld + numNew > this.serverAddresses.size()) {
+                if (myServerInNewConfig) {
+                    // my server is in new config, but load should be 
decreased.
+                    // Need to decide if this client
+                    // is moving to one of the new servers
+                    if (sourceOfRandomness.nextFloat() <= (1 - ((float) 
this.serverAddresses
+                            .size()) / (numOld + numNew))) {
+                        pNew = 1;
+                        pOld = 0;
+                    } else {
+                        // do nothing special - stay with the current server
+                        reconfigMode = false;
+                    }
+                } else {
+                    // my server is not in new config, and load on old servers 
must
+                    // be decreased, so connect to
+                    // one of the new servers
+                    pNew = 1;
+                    pOld = 0;
+                }
+            } else { // number of servers stayed the same or decreased
+                if (myServerInNewConfig) {
+                    // my server is in new config, and load should be 
increased, so
+                    // stay with this server and do nothing special
+                    reconfigMode = false;
+                } else {
+                    pOld = ((float) (numOld * (this.serverAddresses.size() - 
(numOld + numNew))))
+                            / ((numOld + numNew) * 
(this.serverAddresses.size() - numOld));
+                    pNew = 1 - pOld;
+                }
+            }
+
+            this.serverAddresses = resolvedList;    
+            currentIndexOld = -1;
+            currentIndexNew = -1; 
+            currentIndex = -1;
+            lastIndex = -1;                
+            return reconfigMode;
+        }
     }
 
-    public int size() {
+    public synchronized int size() {
         return serverAddresses.size();
     }
 
+    /**
+     * Get the next server to connect to, when in "reconfigMode", which means 
that 
+     * you've just updated the server list, and now trying to find some server 
to connect to. 
+     * Once onConnected() is called, reconfigMode is set to false. Similarly, 
if we tried to connect
+     * to all servers in new config and failed, reconfigMode is set to false.
+     * 
+     * While in reconfigMode, we should connect to a server in newServers with 
probability pNew and to servers in
+     * oldServers with probability pOld (which is just 1-pNew). If we tried 
out all servers in either oldServers
+     * or newServers we continue to try servers from the other set, regardless 
of pNew or pOld. If we tried all servers
+     * we give up and go back to the normal round robin mode
+     *
+     * When called, this should be protected by synchronized(this)
+     */
+    private InetSocketAddress nextHostInReconfigMode() {
+        boolean takeNew = (sourceOfRandomness.nextFloat() <= pNew);
+
+        // take one of the new servers if it is possible (there are still such
+        // servers we didn't try),
+        // and either the probability tells us to connect to one of the new
+        // servers or if we already
+        // tried all the old servers
+        if (((currentIndexNew + 1) < newServers.size())
+                && (takeNew || (currentIndexOld + 1) >= oldServers.size())) {
+            ++currentIndexNew;
+            return newServers.get(currentIndexNew);
+        }
+
+        // start taking old servers
+        if ((currentIndexOld + 1) < oldServers.size()) {
+            ++currentIndexOld;
+            return oldServers.get(currentIndexOld);
+        }
+
+        return null;
+    }
+
     public InetSocketAddress next(long spinDelay) {
-        ++currentIndex;
-        if (currentIndex == serverAddresses.size()) {
-            currentIndex = 0;
+        boolean needToSleep = false;
+        InetSocketAddress addr;
+
+        synchronized(this) {
+            if (reconfigMode) {
+                addr = nextHostInReconfigMode();
+                if (addr != null) return addr;                
+                //tried all servers and couldn't connect
+                reconfigMode = false;
+                needToSleep = (spinDelay > 0);
+            }        
+            ++currentIndex;
+            if (currentIndex == serverAddresses.size()) {
+                currentIndex = 0;
+            }            
+            addr = serverAddresses.get(currentIndex);
+            needToSleep = needToSleep || (currentIndex == lastIndex && 
spinDelay > 0);
+            if (lastIndex == -1) { 
+                // We don't want to sleep on the first ever connect attempt.
+                lastIndex = 0;
+            }
         }
-        if (currentIndex == lastIndex && spinDelay > 0) {
+        if (needToSleep) {
             try {
                 Thread.sleep(spinDelay);
             } catch (InterruptedException e) {
                 LOG.warn("Unexpected exception", e);
             }
-        } else if (lastIndex == -1) {
-            // We don't want to sleep on the first ever connect attempt.
-            lastIndex = 0;
         }
 
-        return serverAddresses.get(currentIndex);
+        return addr;
     }
 
-    public void onConnected() {
+    public synchronized void onConnected() {
         lastIndex = currentIndex;
+        reconfigMode = false;
     }
+
 }

Modified: 
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java
URL: 
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java?rev=1410731&r1=1410730&r2=1410731&view=diff
==============================================================================
--- 
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java
 (original)
+++ 
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java
 Sat Nov 17 14:03:03 2012
@@ -29,9 +29,12 @@ import org.junit.Test;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Random;
 
 public class StaticHostProviderTest extends ZKTestCase {
-
+    private Random r = new Random(1);
+    
     @Test
     public void testNextGoesRound() throws UnknownHostException {
         HostProvider hostProvider = getHostProvider(2);
@@ -85,14 +88,236 @@ public class StaticHostProviderTest exte
         assertNotSame(first, second);
     }
 
+    private final double slackPercent = 10;
+    private final int numClients = 10000;
+
+    @Test
+    public void testUpdateClientMigrateOrNot() throws UnknownHostException {
+        HostProvider hostProvider = getHostProvider(4); // 10.10.10.4:1238, 
10.10.10.3:1237, 10.10.10.2:1236, 10.10.10.1:1235
+        Collection<InetSocketAddress> newList = getServerAddresses(3); // 
10.10.10.3:1237, 10.10.10.2:1236, 10.10.10.1:1235
+        
+        InetSocketAddress myServer = new InetSocketAddress("10.10.10.3", 1237);
+        
+        // Number of machines becomes smaller, my server is in the new cluster
+        boolean disconnectRequired = hostProvider.updateServerList(newList, 
myServer);
+        assertTrue(!disconnectRequired);
+
+        // Number of machines stayed the same, my server is in the new cluster
+        disconnectRequired = hostProvider.updateServerList(newList, myServer);
+        assertTrue(!disconnectRequired);
+
+        // Number of machines became smaller, my server is not in the new
+        // cluster
+        newList = getServerAddresses(2); // 10.10.10.2:1236, 10.10.10.1:1235
+        disconnectRequired = hostProvider.updateServerList(newList, myServer);
+        assertTrue(disconnectRequired);
+
+        // Number of machines stayed the same, my server is not in the new
+        // cluster
+        disconnectRequired = hostProvider.updateServerList(newList, myServer);
+        assertTrue(disconnectRequired);
+
+        // Number of machines increased, my server is not in the new cluster
+        newList = new ArrayList<InetSocketAddress>(3);
+        for (int i = 4; i > 1; i--) { // 10.10.10.4:1238, 10.10.10.3:1237, 
10.10.10.2:1236
+            newList.add(new InetSocketAddress("10.10.10." + i, 1234 + i));
+        }
+        myServer = new InetSocketAddress("10.10.10.1", 1235);
+        disconnectRequired = hostProvider.updateServerList(newList, myServer);
+        assertTrue(disconnectRequired);
+
+        // Number of machines increased, my server is in the new cluster
+        // Here whether to move or not depends on the difference of cluster
+        // sizes
+        // With probability 1 - |old|/|new} the client disconnects
+        // In the test below 1-9/10 = 1/10 chance of disconnecting
+        HostProvider[] hostProviderArray = new HostProvider[numClients];
+        newList = getServerAddresses(10);
+        int numDisconnects = 0;
+        for (int i = 0; i < numClients; i++) {
+            hostProviderArray[i] = getHostProvider(9);
+            disconnectRequired = 
hostProviderArray[i].updateServerList(newList, myServer);
+            if (disconnectRequired)
+                numDisconnects++;
+        }
+
+       // should be numClients/10 in expectation, we test that its 
numClients/10 +- slackPercent 
+        assertTrue(numDisconnects < upperboundCPS(numClients, 10));
+    }
+
+    @Test
+    public void testUpdateMigrationGoesRound() throws UnknownHostException {
+        HostProvider hostProvider = getHostProvider(4);
+        // old list (just the ports): 1238, 1237, 1236, 1235
+        Collection<InetSocketAddress> newList = new 
ArrayList<InetSocketAddress>(
+                10);
+        for (int i = 12; i > 2; i--) { // 1246, 1245, 1244, 1243, 1242, 1241,
+                                       // 1240, 1239, 1238, 1237
+            newList.add(new InetSocketAddress("10.10.10." + i, 1234 + i));
+        }
+
+        // servers from the old list that appear in the new list
+        Collection<InetSocketAddress> oldStaying = new 
ArrayList<InetSocketAddress>(2);
+        for (int i = 4; i > 2; i--) { // 1238, 1237
+            oldStaying.add(new InetSocketAddress("10.10.10." + i, 1234 + i));
+        }
+
+        // servers in the new list that are not in the old list
+        Collection<InetSocketAddress> newComing = new 
ArrayList<InetSocketAddress>(10);
+        for (int i = 12; i > 4; i--) {// 1246, 1245, 1244, 1243, 1242, 1241, 
1240, 1139
+            newComing.add(new InetSocketAddress("10.10.10." + i, 1234 + i));
+        }
+
+        // Number of machines increases, my server is not in the new cluster
+        // load on old servers must be decreased, so must connect to one of the
+        // new servers
+        // i.e., pNew = 1.
+        boolean disconnectRequired = hostProvider.updateServerList(newList, 
new InetSocketAddress("10.10.10.1", 1235));
+        assertTrue(disconnectRequired);
+
+        // This means reconfigMode = true, and nextHostInReconfigMode will be
+        // called from next
+        // Since pNew = 1 we should first try the new servers
+        ArrayList<InetSocketAddress> seen = new ArrayList<InetSocketAddress>();
+        for (int i = 0; i < newComing.size(); i++) {
+            InetSocketAddress addr = hostProvider.next(0);
+            assertTrue(newComing.contains(addr));
+            assertTrue(!seen.contains(addr));
+            seen.add(addr);
+        }
+
+        // Next the old servers
+        seen.clear();
+        for (int i = 0; i < oldStaying.size(); i++) {
+            InetSocketAddress addr = hostProvider.next(0);
+            assertTrue(oldStaying.contains(addr));
+            assertTrue(!seen.contains(addr));
+            seen.add(addr);
+        }
+
+        // And now it goes back to normal next() so it should be everything
+        // together like in testNextGoesRound()
+        InetSocketAddress first = hostProvider.next(0);
+        assertTrue(first != null);
+        for (int i = 0; i < newList.size() - 1; i++) {
+            hostProvider.next(0);
+        }
+
+        assertEquals(first, hostProvider.next(0));
+    }
+
+    @Test
+    public void testUpdateLoadBalancing() throws UnknownHostException {
+        // Start with 9 servers and 10000 clients
+        boolean disconnectRequired;
+        HostProvider[] hostProviderArray = new HostProvider[numClients];
+        InetSocketAddress[] curHostForEachClient = new 
InetSocketAddress[numClients];
+        int[] numClientsPerHost = new int[9];
+
+        // initialization
+        for (int i = 0; i < numClients; i++) {
+            hostProviderArray[i] = getHostProvider(9);
+            curHostForEachClient[i] = hostProviderArray[i].next(0);
+            numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+        }
+
+        for (int i = 0; i < 9; i++) {
+            assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 9));
+            assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 9));
+            numClientsPerHost[i] = 0; // prepare for next test
+        }
+
+        // remove host number 8 (the last one in a list of 9 hosts)
+        Collection<InetSocketAddress> newList = getServerAddresses(8);
+
+        for (int i = 0; i < numClients; i++) {
+            disconnectRequired = 
hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
+            if (disconnectRequired) curHostForEachClient[i] = 
hostProviderArray[i].next(0);
+            numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+        }
+
+        for (int i = 0; i < 8; i++) {
+            assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 8));
+            assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 8));
+            numClientsPerHost[i] = 0; // prepare for next test
+        }
+        assertTrue(numClientsPerHost[8] == 0);
+
+        // remove hosts number 6 and 7 (the currently last two in the list)
+        newList = getServerAddresses(6);
+
+        for (int i = 0; i < numClients; i++) {
+            disconnectRequired = 
hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
+            if (disconnectRequired) curHostForEachClient[i] = 
hostProviderArray[i].next(0);
+            numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+        }
+
+        for (int i = 0; i < 6; i++) {
+            assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 6));
+            assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 6));
+            numClientsPerHost[i] = 0; // prepare for next test
+        }
+        assertTrue(numClientsPerHost[6] == 0);
+        assertTrue(numClientsPerHost[7] == 0);
+        assertTrue(numClientsPerHost[8] == 0);
+
+        // remove host number 0 (the first one in the current list)
+        // and add back hosts 6, 7 and 8
+        newList = new ArrayList<InetSocketAddress>(8);
+        for (int i = 9; i > 1; i--) {
+            newList.add(new InetSocketAddress("10.10.10." + i, 1234 + i));
+        }
+
+        for (int i = 0; i < numClients; i++) {
+            disconnectRequired = 
hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
+            if (disconnectRequired) curHostForEachClient[i] = 
hostProviderArray[i].next(0);
+            numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+        }
+
+        assertTrue(numClientsPerHost[0] == 0);
+
+        for (int i = 1; i < 9; i++) {
+            assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 8));
+            assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 8));
+            numClientsPerHost[i] = 0; // prepare for next test
+        }
+
+        // add back host number 0
+        newList = getServerAddresses(9);
+
+        for (int i = 0; i < numClients; i++) {
+            disconnectRequired = 
hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
+            if (disconnectRequired) curHostForEachClient[i] = 
hostProviderArray[i].next(0);
+            numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+        }
+
+        for (int i = 0; i < 9; i++) {
+            assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 9));
+            assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 9));
+        }
+    }
+
     private StaticHostProvider getHostProvider(int size)
             throws UnknownHostException {
+        return new StaticHostProvider(getServerAddresses(size), r.nextLong());
+    }
+
+    private Collection<InetSocketAddress> getServerAddresses(int size) {
         ArrayList<InetSocketAddress> list = new ArrayList<InetSocketAddress>(
                 size);
         while (size > 0) {
-            list.add(new InetSocketAddress("10.10.10." + size, 1234));
+            list.add(new InetSocketAddress("10.10.10." + size, 1234 + size));
             --size;
         }
-        return new StaticHostProvider(list);
+        return list;
+    }
+
+    private double lowerboundCPS(int numClients, int numServers) {
+        return (1 - slackPercent/100.0) * numClients / numServers;
+    }
+
+    private double upperboundCPS(int numClients, int numServers) {
+        return (1 + slackPercent/100.0) * numClients / numServers;
     }
+
 }


Reply via email to