Modified: zookeeper/trunk/src/c/tests/TestZookeeperInit.cc URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/TestZookeeperInit.cc?rev=1410731&r1=1410730&r2=1410731&view=diff ============================================================================== --- zookeeper/trunk/src/c/tests/TestZookeeperInit.cc (original) +++ zookeeper/trunk/src/c/tests/TestZookeeperInit.cc Sat Nov 17 14:03:03 2012 @@ -93,7 +93,7 @@ public: void testBasic() { const string EXPECTED_HOST("127.0.0.1:2121"); - const int EXPECTED_ADDRS_COUNT =1; + const unsigned int EXPECTED_ADDRS_COUNT =1; const int EXPECTED_RECV_TIMEOUT=10000; clientid_t cid; memset(&cid,0xFE,sizeof(cid)); @@ -101,16 +101,16 @@ public: zh=zookeeper_init(EXPECTED_HOST.c_str(),watcher,EXPECTED_RECV_TIMEOUT, &cid,(void*)1,0); - CPPUNIT_ASSERT(zh!=0); + CPPUNIT_ASSERT(zh != NULL); CPPUNIT_ASSERT(zh->fd == -1); - CPPUNIT_ASSERT(zh->hostname!=0); - CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs_count); + CPPUNIT_ASSERT(zh->hostname != NULL); + CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs.count); CPPUNIT_ASSERT_EQUAL(EXPECTED_HOST,string(zh->hostname)); - CPPUNIT_ASSERT(zh->state == NOTCONNECTED_STATE_DEF); + CPPUNIT_ASSERT(zh->state == ZOO_NOTCONNECTED_STATE); CPPUNIT_ASSERT(zh->context == (void*)1); CPPUNIT_ASSERT_EQUAL(EXPECTED_RECV_TIMEOUT,zh->recv_timeout); CPPUNIT_ASSERT(zh->watcher == watcher); - CPPUNIT_ASSERT(zh->connect_index==0); + CPPUNIT_ASSERT(zh->addrs.next==0); CPPUNIT_ASSERT(zh->primer_buffer.buffer==zh->primer_storage_buffer); CPPUNIT_ASSERT(zh->primer_buffer.curr_offset ==0); CPPUNIT_ASSERT(zh->primer_buffer.len == sizeof(zh->primer_storage_buffer)); @@ -136,15 +136,15 @@ public: void testAddressResolution() { const char EXPECTED_IPS[][4]={{127,0,0,1}}; - const int EXPECTED_ADDRS_COUNT =COUNTOF(EXPECTED_IPS); + const unsigned int EXPECTED_ADDRS_COUNT =COUNTOF(EXPECTED_IPS); zoo_deterministic_conn_order(1); zh=zookeeper_init("127.0.0.1:2121",0,10000,0,0,0); CPPUNIT_ASSERT(zh!=0); - CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs_count); - for(int i=0;i<zh->addrs_count;i++){ - sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs[i]; + CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs.count); + for(int i=0;i<zh->addrs.count;i++){ + sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs.data[i]; CPPUNIT_ASSERT(memcmp(EXPECTED_IPS[i],&addr->sin_addr,sizeof(addr->sin_addr))==0); CPPUNIT_ASSERT_EQUAL(2121,(int)ntohs(addr->sin_port)); } @@ -153,16 +153,16 @@ public: { const string EXPECTED_HOST("127.0.0.1:2121,127.0.0.2:3434"); const char EXPECTED_IPS[][4]={{127,0,0,1},{127,0,0,2}}; - const int EXPECTED_ADDRS_COUNT =COUNTOF(EXPECTED_IPS); + const unsigned int EXPECTED_ADDRS_COUNT =COUNTOF(EXPECTED_IPS); zoo_deterministic_conn_order(1); zh=zookeeper_init(EXPECTED_HOST.c_str(),0,1000,0,0,0); CPPUNIT_ASSERT(zh!=0); - CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs_count); + CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs.count); - for(int i=0;i<zh->addrs_count;i++){ - sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs[i]; + for(int i=0;i<zh->addrs.count;i++){ + sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs.data[i]; CPPUNIT_ASSERT(memcmp(EXPECTED_IPS[i],&addr->sin_addr,sizeof(addr->sin_addr))==0); if(i<1) CPPUNIT_ASSERT_EQUAL(2121,(int)ntohs(addr->sin_port)); @@ -174,16 +174,16 @@ public: { const string EXPECTED_HOST("127.0.0.1:2121, 127.0.0.2:3434"); const char EXPECTED_IPS[][4]={{127,0,0,1},{127,0,0,2}}; - const int EXPECTED_ADDRS_COUNT =COUNTOF(EXPECTED_IPS); + const unsigned int EXPECTED_ADDRS_COUNT =COUNTOF(EXPECTED_IPS); zoo_deterministic_conn_order(1); zh=zookeeper_init(EXPECTED_HOST.c_str(),0,1000,0,0,0); CPPUNIT_ASSERT(zh!=0); - CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs_count); + CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs.count); - for(int i=0;i<zh->addrs_count;i++){ - sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs[i]; + for(int i=0;i<zh->addrs.count;i++){ + sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs.data[i]; CPPUNIT_ASSERT(memcmp(EXPECTED_IPS[i],&addr->sin_addr,sizeof(addr->sin_addr))==0); if(i<1) CPPUNIT_ASSERT_EQUAL(2121,(int)ntohs(addr->sin_port)); @@ -277,7 +277,7 @@ public: void testPermuteAddrsList() { const char EXPECTED[][5]={"\0\0\0\0","\1\1\1\1","\2\2\2\2","\3\3\3\3"}; - const int EXPECTED_ADDR_COUNT=COUNTOF(EXPECTED); + const unsigned int EXPECTED_ADDR_COUNT=COUNTOF(EXPECTED); const int RAND_SEQ[]={0,1,1,-1}; const int RAND_SIZE=COUNTOF(RAND_SEQ); @@ -286,11 +286,11 @@ public: zh=zookeeper_init("0.0.0.0:123,1.1.1.1:123,2.2.2.2:123,3.3.3.3:123",0,1000,0,0,0); CPPUNIT_ASSERT(zh!=0); - CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDR_COUNT,zh->addrs_count); + CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDR_COUNT,zh->addrs.count); const string EXPECTED_SEQ("3210"); char ACTUAL_SEQ[EXPECTED_ADDR_COUNT+1]; ACTUAL_SEQ[EXPECTED_ADDR_COUNT]=0; - for(int i=0;i<zh->addrs_count;i++){ - sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs[i]; + for(int i=0;i<zh->addrs.count;i++){ + sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs.data[i]; // match the first byte of the EXPECTED and of the actual address ACTUAL_SEQ[i]=((char*)&addr->sin_addr)[0]+'0'; }
Modified: zookeeper/trunk/src/c/tests/ZKMocks.cc URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/ZKMocks.cc?rev=1410731&r1=1410730&r2=1410731&view=diff ============================================================================== --- zookeeper/trunk/src/c/tests/ZKMocks.cc (original) +++ zookeeper/trunk/src/c/tests/ZKMocks.cc Sat Nov 17 14:03:03 2012 @@ -507,7 +507,13 @@ void ZookeeperServer::notifyBufferSent(c void forceConnected(zhandle_t* zh){ // simulate connected state zh->state=ZOO_CONNECTED_STATE; + + // Simulate we're connected to the first host in our host list zh->fd=ZookeeperServer::FD; + assert(zh->addrs.count > 0); + zh->addr_cur = zh->addrs.data[0]; + zh->addrs.next++; + zh->input_buffer=0; gettimeofday(&zh->last_recv,0); gettimeofday(&zh->last_send,0); Modified: zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml?rev=1410731&r1=1410730&r2=1410731&view=diff ============================================================================== --- zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml (original) +++ zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml Sat Nov 17 14:03:03 2012 @@ -541,6 +541,40 @@ and the second client will be disconnected (causing the pair to attempt to re-establish it's connection/session indefinitely).</para> + <para> <emphasis role="bold">Updating the list of servers</emphasis>. We allow a client to + update the connection string by providing a new comma separated list of host:port pairs, + each corresponding to a ZooKeeper server. The function invokes a probabilistic load-balancing + algorithm which may cause the client to disconnect from its current host with the goal + to achieve expected uniform number of connections per server in the new list. + In case the current host to which the client is connected is not in the new list + this call will always cause the connection to be dropped. Otherwise, the decision + is based on whether the number of servers has increased or decreased and by how much. + + <para> + For example, if the previous connection string contained 3 hosts and now the list contains + these 3 hosts and 2 more hosts, 40% of clients connected to each of the 3 hosts will + move to one of the new hosts in order to balance the load. The algorithm will cause the client + to drop its connection to the current host to which it is connected with probability 0.4 and in this + case cause the client to connect to one of the 2 new hosts, chosen at random. + </para> + + <para> + Another example -- suppose we have 5 hosts and now update the list to remove 2 of the hosts, + the clients connected to the 3 remaining hosts will stay connected, whereas all clients connected + to the 2 removed hosts will need to move to one of the 3 hosts, chosen at random. If the connection + is dropped, the client moves to a special mode where he chooses a new server to connect to using the + probabilistic algorithm, and not just round robin. + </para> + + <para> + In the first example, each client decides to disconnect with probability 0.4 but once the decision is + made, it will try to connect to a random new server and only if it cannot connect to any of the new + servers will it try to connect to the old ones. After finding a server, or trying all servers in the + new list and failing to connect, the client moves back to the normal mode of operation where it picks + an arbitrary server from the connectString and attempt to connect to it. If that fails, is will continue + trying different random servers in round robin. (see above the algorithm used to initially choose a server) + </para> + </section> <section id="ch_zkWatches"> Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=1410731&r1=1410730&r2=1410731&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Sat Nov 17 14:03:03 2012 @@ -19,6 +19,7 @@ package org.apache.zookeeper; import org.apache.zookeeper.AsyncCallback.*; +import org.apache.zookeeper.ClientCnxn.SendThread; import org.apache.zookeeper.OpResult.ErrorResult; import org.apache.zookeeper.client.ConnectStringParser; import org.apache.zookeeper.client.HostProvider; @@ -33,7 +34,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.net.UnknownHostException; import java.util.*; /** @@ -94,6 +98,61 @@ public class ZooKeeper { LOG = LoggerFactory.getLogger(ZooKeeper.class); Environment.logEnv("Client environment:", LOG); } + + private final StaticHostProvider hostProvider; + + /** + * This function allows a client to update the connection string by providing + * a new comma separated list of host:port pairs, each corresponding to a + * ZooKeeper server. + * <p> + * The function invokes a <a href="https://issues.apache.org/jira/browse/ZOOKEEPER-1355"> + * probabilistic load-balancing algorithm</a> which may cause the client to disconnect from + * its current host with the goal to achieve expected uniform number of connections per server + * in the new list. In case the current host to which the client is connected is not in the new + * list this call will always cause the connection to be dropped. Otherwise, the decision + * is based on whether the number of servers has increased or decreased and by how much. + * For example, if the previous connection string contained 3 hosts and now the list contains + * these 3 hosts and 2 more hosts, 40% of clients connected to each of the 3 hosts will + * move to one of the new hosts in order to balance the load. The algorithm will disconnect + * from the current host with probability 0.4 and in this case cause the client to connect + * to one of the 2 new hosts, chosen at random. + * <p> + * If the connection is dropped, the client moves to a special mode "reconfigMode" where he chooses + * a new server to connect to using the probabilistic algorithm. After finding a server, + * or exhausting all servers in the new list after trying all of them and failing to connect, + * the client moves back to the normal mode of operation where it will pick an arbitrary server + * from the connectString and attempt to connect to it. If establishment of + * the connection fails, another server in the connect string will be tried + * (the order is non-deterministic, as we random shuffle the list), until a + * connection is established. The client will continue attempts until the + * session is explicitly closed (or the session is expired by the server). + + * @param connectString + * comma separated host:port pairs, each corresponding to a zk + * server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" + * If the optional chroot suffix is used the example would look + * like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a" + * where the client would be rooted at "/app/a" and all paths + * would be relative to this root - ie getting/setting/etc... + * "/foo/bar" would result in operations being run on + * "/app/a/foo/bar" (from the server perspective). + * + * @throws IOException in cases of network failure + */ + public void updateServerList(String connectString) throws IOException { + ConnectStringParser connectStringParser = new ConnectStringParser(connectString); + Collection<InetSocketAddress> serverAddresses = connectStringParser.getServerAddresses(); + + ClientCnxnSocket clientCnxnSocket = cnxn.sendThread.getClientCnxnSocket(); + InetSocketAddress currentHost = (InetSocketAddress) clientCnxnSocket.getRemoteSocketAddress(); + + boolean reconfigMode = hostProvider.updateServerList(serverAddresses, currentHost); + + // cause disconnection - this will cause next to be called + // which will in turn call nextReconfigMode + if (reconfigMode) clientCnxnSocket.testableCloseSocket(); + } public ZooKeeperSaslClient getSaslClient() { return cnxn.zooKeeperSaslClient; @@ -442,7 +501,8 @@ public class ZooKeeper { ConnectStringParser connectStringParser = new ConnectStringParser( connectString); - HostProvider hostProvider = new StaticHostProvider( + + hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, @@ -580,10 +640,10 @@ public class ZooKeeper { + (sessionPasswd == null ? "<null>" : "<hidden>")); watchManager.defaultWatcher = watcher; - + ConnectStringParser connectStringParser = new ConnectStringParser( connectString); - HostProvider hostProvider = new StaticHostProvider( + hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/client/HostProvider.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/client/HostProvider.java?rev=1410731&r1=1410730&r2=1410731&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/client/HostProvider.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/client/HostProvider.java Sat Nov 17 14:03:03 2012 @@ -19,6 +19,8 @@ package org.apache.zookeeper.client; import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.Collection; /** * A set of hosts a ZooKeeper client should connect to. @@ -58,4 +60,13 @@ public interface HostProvider { * The HostProvider may use this notification to reset it's inner state. */ public void onConnected(); + + /** + * Update the list of servers. This returns true if changing connections is necessary for load-balancing, false otherwise. + * @param serverAddresses new host list + * @param currentHost the host to which this client is currently connected + * @return true if changing connections is necessary for load-balancing, false otherwise + */ + boolean updateServerList(Collection<InetSocketAddress> serverAddresses, InetSocketAddress currentHost) + throws UnknownHostException; } Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java?rev=1410731&r1=1410730&r2=1410731&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java Sat Nov 17 14:03:03 2012 @@ -25,11 +25,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Most simple HostProvider, resolves only on instantiation. * @@ -38,14 +38,31 @@ public final class StaticHostProvider im private static final Logger LOG = LoggerFactory .getLogger(StaticHostProvider.class); - private final List<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>( + private List<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>( 5); + private Random sourceOfRandomness; private int lastIndex = -1; private int currentIndex = -1; /** + * The following fields are used to migrate clients during reconfiguration + */ + private boolean reconfigMode = false; + + private final List<InetSocketAddress> oldServers = new ArrayList<InetSocketAddress>( + 5); + + private final List<InetSocketAddress> newServers = new ArrayList<InetSocketAddress>( + 5); + + private int currentIndexOld = -1; + private int currentIndexNew = -1; + + private float pOld, pNew; + + /** * Constructs a SimpleHostSet. * * @param serverAddresses @@ -56,46 +73,234 @@ public final class StaticHostProvider im */ public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) throws UnknownHostException { + sourceOfRandomness = new Random(System.currentTimeMillis() ^ this.hashCode()); + + this.serverAddresses = resolveAndShuffle(serverAddresses); + if (this.serverAddresses.isEmpty()) { + throw new IllegalArgumentException( + "A HostProvider may not be empty!"); + } + currentIndex = -1; + lastIndex = -1; + } + + /** + * Constructs a SimpleHostSet. This constructor is used from StaticHostProviderTest to produce deterministic test results + * by initializing sourceOfRandomness with the same seed + * + * @param serverAddresses + * possibly unresolved ZooKeeper server addresses + * @param randomnessSeed a seed used to initialize sourceOfRandomnes + * @throws UnknownHostException + * @throws IllegalArgumentException + * if serverAddresses is empty or resolves to an empty list + */ + public StaticHostProvider(Collection<InetSocketAddress> serverAddresses, long randomnessSeed) + throws UnknownHostException { + sourceOfRandomness = new Random(randomnessSeed); + + this.serverAddresses = resolveAndShuffle(serverAddresses); + if (this.serverAddresses.isEmpty()) { + throw new IllegalArgumentException( + "A HostProvider may not be empty!"); + } + currentIndex = -1; + lastIndex = -1; + } + + private List<InetSocketAddress> resolveAndShuffle(Collection<InetSocketAddress> serverAddresses) + throws UnknownHostException { + List<InetSocketAddress> tmpList = new ArrayList<InetSocketAddress>(serverAddresses.size()); for (InetSocketAddress address : serverAddresses) { InetAddress resolvedAddresses[] = InetAddress.getAllByName(address .getHostName()); for (InetAddress resolvedAddress : resolvedAddresses) { - this.serverAddresses.add(new InetSocketAddress(resolvedAddress + tmpList.add(new InetSocketAddress(resolvedAddress .getHostAddress(), address.getPort())); } } + Collections.shuffle(tmpList, sourceOfRandomness); + return tmpList; + } - if (this.serverAddresses.isEmpty()) { + + /** + * Update the list of servers. This returns true if changing connections is necessary for load-balancing, false + * otherwise. Changing connections is necessary if one of the following holds: + * a) the host to which this client is currently connected is not in serverAddresses. + * Otherwise (if currentHost is in the new list serverAddresses): + * b) the number of servers in the cluster is increasing - in this case the load on currentHost should decrease, + * which means that SOME of the clients connected to it will migrate to the new servers. The decision whether + * this client migrates or not (i.e., whether true or false is returned) is probabilistic so that the expected + * number of clients connected to each server is the same. + * + * If true is returned, the function sets pOld and pNew that correspond to the probability to migrate to ones of the + * new servers in serverAddresses or one of the old servers (migrating to one of the old servers is done only + * if our client's currentHost is not in serverAddresses). See nextHostInReconfigMode for the selection logic. + * + * See {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1355} for the protocol and its evaluation, and + * StaticHostProviderTest for the tests that illustrate how load balancing works with this policy. + * @param serverAddresses new host list + * @param currentHost the host to which this client is currently connected + * @return true if changing connections is necessary for load-balancing, false otherwise + */ + + + @Override + public boolean updateServerList(Collection<InetSocketAddress> serverAddresses, InetSocketAddress currentHost) throws UnknownHostException { + // Resolve server addresses and shuffle them + List<InetSocketAddress> resolvedList = resolveAndShuffle(serverAddresses); + if (resolvedList.isEmpty()) { throw new IllegalArgumentException( "A HostProvider may not be empty!"); } - Collections.shuffle(this.serverAddresses); + // Check if client's current server is in the new list of servers + boolean myServerInNewConfig = false; + for (InetSocketAddress addr : resolvedList) { + if (addr.getHostName().equals(currentHost.getHostName()) + && addr.getPort() == currentHost.getPort()) { + myServerInNewConfig = true; + break; + } + } + + synchronized(this) { + reconfigMode = true; + + newServers.clear(); + oldServers.clear(); + // Divide the new servers into oldServers that were in the previous list + // and newServers that were not in the previous list + for (InetSocketAddress resolvedAddress : resolvedList) { + if (this.serverAddresses.contains(resolvedAddress)) { + oldServers.add(resolvedAddress); + } else { + newServers.add(resolvedAddress); + } + } + + int numOld = oldServers.size(); + int numNew = newServers.size(); + + // number of servers increased + if (numOld + numNew > this.serverAddresses.size()) { + if (myServerInNewConfig) { + // my server is in new config, but load should be decreased. + // Need to decide if this client + // is moving to one of the new servers + if (sourceOfRandomness.nextFloat() <= (1 - ((float) this.serverAddresses + .size()) / (numOld + numNew))) { + pNew = 1; + pOld = 0; + } else { + // do nothing special - stay with the current server + reconfigMode = false; + } + } else { + // my server is not in new config, and load on old servers must + // be decreased, so connect to + // one of the new servers + pNew = 1; + pOld = 0; + } + } else { // number of servers stayed the same or decreased + if (myServerInNewConfig) { + // my server is in new config, and load should be increased, so + // stay with this server and do nothing special + reconfigMode = false; + } else { + pOld = ((float) (numOld * (this.serverAddresses.size() - (numOld + numNew)))) + / ((numOld + numNew) * (this.serverAddresses.size() - numOld)); + pNew = 1 - pOld; + } + } + + this.serverAddresses = resolvedList; + currentIndexOld = -1; + currentIndexNew = -1; + currentIndex = -1; + lastIndex = -1; + return reconfigMode; + } } - public int size() { + public synchronized int size() { return serverAddresses.size(); } + /** + * Get the next server to connect to, when in "reconfigMode", which means that + * you've just updated the server list, and now trying to find some server to connect to. + * Once onConnected() is called, reconfigMode is set to false. Similarly, if we tried to connect + * to all servers in new config and failed, reconfigMode is set to false. + * + * While in reconfigMode, we should connect to a server in newServers with probability pNew and to servers in + * oldServers with probability pOld (which is just 1-pNew). If we tried out all servers in either oldServers + * or newServers we continue to try servers from the other set, regardless of pNew or pOld. If we tried all servers + * we give up and go back to the normal round robin mode + * + * When called, this should be protected by synchronized(this) + */ + private InetSocketAddress nextHostInReconfigMode() { + boolean takeNew = (sourceOfRandomness.nextFloat() <= pNew); + + // take one of the new servers if it is possible (there are still such + // servers we didn't try), + // and either the probability tells us to connect to one of the new + // servers or if we already + // tried all the old servers + if (((currentIndexNew + 1) < newServers.size()) + && (takeNew || (currentIndexOld + 1) >= oldServers.size())) { + ++currentIndexNew; + return newServers.get(currentIndexNew); + } + + // start taking old servers + if ((currentIndexOld + 1) < oldServers.size()) { + ++currentIndexOld; + return oldServers.get(currentIndexOld); + } + + return null; + } + public InetSocketAddress next(long spinDelay) { - ++currentIndex; - if (currentIndex == serverAddresses.size()) { - currentIndex = 0; + boolean needToSleep = false; + InetSocketAddress addr; + + synchronized(this) { + if (reconfigMode) { + addr = nextHostInReconfigMode(); + if (addr != null) return addr; + //tried all servers and couldn't connect + reconfigMode = false; + needToSleep = (spinDelay > 0); + } + ++currentIndex; + if (currentIndex == serverAddresses.size()) { + currentIndex = 0; + } + addr = serverAddresses.get(currentIndex); + needToSleep = needToSleep || (currentIndex == lastIndex && spinDelay > 0); + if (lastIndex == -1) { + // We don't want to sleep on the first ever connect attempt. + lastIndex = 0; + } } - if (currentIndex == lastIndex && spinDelay > 0) { + if (needToSleep) { try { Thread.sleep(spinDelay); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } - } else if (lastIndex == -1) { - // We don't want to sleep on the first ever connect attempt. - lastIndex = 0; } - return serverAddresses.get(currentIndex); + return addr; } - public void onConnected() { + public synchronized void onConnected() { lastIndex = currentIndex; + reconfigMode = false; } + } Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java?rev=1410731&r1=1410730&r2=1410731&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java Sat Nov 17 14:03:03 2012 @@ -29,9 +29,12 @@ import org.junit.Test; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Random; public class StaticHostProviderTest extends ZKTestCase { - + private Random r = new Random(1); + @Test public void testNextGoesRound() throws UnknownHostException { HostProvider hostProvider = getHostProvider(2); @@ -85,14 +88,236 @@ public class StaticHostProviderTest exte assertNotSame(first, second); } + private final double slackPercent = 10; + private final int numClients = 10000; + + @Test + public void testUpdateClientMigrateOrNot() throws UnknownHostException { + HostProvider hostProvider = getHostProvider(4); // 10.10.10.4:1238, 10.10.10.3:1237, 10.10.10.2:1236, 10.10.10.1:1235 + Collection<InetSocketAddress> newList = getServerAddresses(3); // 10.10.10.3:1237, 10.10.10.2:1236, 10.10.10.1:1235 + + InetSocketAddress myServer = new InetSocketAddress("10.10.10.3", 1237); + + // Number of machines becomes smaller, my server is in the new cluster + boolean disconnectRequired = hostProvider.updateServerList(newList, myServer); + assertTrue(!disconnectRequired); + + // Number of machines stayed the same, my server is in the new cluster + disconnectRequired = hostProvider.updateServerList(newList, myServer); + assertTrue(!disconnectRequired); + + // Number of machines became smaller, my server is not in the new + // cluster + newList = getServerAddresses(2); // 10.10.10.2:1236, 10.10.10.1:1235 + disconnectRequired = hostProvider.updateServerList(newList, myServer); + assertTrue(disconnectRequired); + + // Number of machines stayed the same, my server is not in the new + // cluster + disconnectRequired = hostProvider.updateServerList(newList, myServer); + assertTrue(disconnectRequired); + + // Number of machines increased, my server is not in the new cluster + newList = new ArrayList<InetSocketAddress>(3); + for (int i = 4; i > 1; i--) { // 10.10.10.4:1238, 10.10.10.3:1237, 10.10.10.2:1236 + newList.add(new InetSocketAddress("10.10.10." + i, 1234 + i)); + } + myServer = new InetSocketAddress("10.10.10.1", 1235); + disconnectRequired = hostProvider.updateServerList(newList, myServer); + assertTrue(disconnectRequired); + + // Number of machines increased, my server is in the new cluster + // Here whether to move or not depends on the difference of cluster + // sizes + // With probability 1 - |old|/|new} the client disconnects + // In the test below 1-9/10 = 1/10 chance of disconnecting + HostProvider[] hostProviderArray = new HostProvider[numClients]; + newList = getServerAddresses(10); + int numDisconnects = 0; + for (int i = 0; i < numClients; i++) { + hostProviderArray[i] = getHostProvider(9); + disconnectRequired = hostProviderArray[i].updateServerList(newList, myServer); + if (disconnectRequired) + numDisconnects++; + } + + // should be numClients/10 in expectation, we test that its numClients/10 +- slackPercent + assertTrue(numDisconnects < upperboundCPS(numClients, 10)); + } + + @Test + public void testUpdateMigrationGoesRound() throws UnknownHostException { + HostProvider hostProvider = getHostProvider(4); + // old list (just the ports): 1238, 1237, 1236, 1235 + Collection<InetSocketAddress> newList = new ArrayList<InetSocketAddress>( + 10); + for (int i = 12; i > 2; i--) { // 1246, 1245, 1244, 1243, 1242, 1241, + // 1240, 1239, 1238, 1237 + newList.add(new InetSocketAddress("10.10.10." + i, 1234 + i)); + } + + // servers from the old list that appear in the new list + Collection<InetSocketAddress> oldStaying = new ArrayList<InetSocketAddress>(2); + for (int i = 4; i > 2; i--) { // 1238, 1237 + oldStaying.add(new InetSocketAddress("10.10.10." + i, 1234 + i)); + } + + // servers in the new list that are not in the old list + Collection<InetSocketAddress> newComing = new ArrayList<InetSocketAddress>(10); + for (int i = 12; i > 4; i--) {// 1246, 1245, 1244, 1243, 1242, 1241, 1240, 1139 + newComing.add(new InetSocketAddress("10.10.10." + i, 1234 + i)); + } + + // Number of machines increases, my server is not in the new cluster + // load on old servers must be decreased, so must connect to one of the + // new servers + // i.e., pNew = 1. + boolean disconnectRequired = hostProvider.updateServerList(newList, new InetSocketAddress("10.10.10.1", 1235)); + assertTrue(disconnectRequired); + + // This means reconfigMode = true, and nextHostInReconfigMode will be + // called from next + // Since pNew = 1 we should first try the new servers + ArrayList<InetSocketAddress> seen = new ArrayList<InetSocketAddress>(); + for (int i = 0; i < newComing.size(); i++) { + InetSocketAddress addr = hostProvider.next(0); + assertTrue(newComing.contains(addr)); + assertTrue(!seen.contains(addr)); + seen.add(addr); + } + + // Next the old servers + seen.clear(); + for (int i = 0; i < oldStaying.size(); i++) { + InetSocketAddress addr = hostProvider.next(0); + assertTrue(oldStaying.contains(addr)); + assertTrue(!seen.contains(addr)); + seen.add(addr); + } + + // And now it goes back to normal next() so it should be everything + // together like in testNextGoesRound() + InetSocketAddress first = hostProvider.next(0); + assertTrue(first != null); + for (int i = 0; i < newList.size() - 1; i++) { + hostProvider.next(0); + } + + assertEquals(first, hostProvider.next(0)); + } + + @Test + public void testUpdateLoadBalancing() throws UnknownHostException { + // Start with 9 servers and 10000 clients + boolean disconnectRequired; + HostProvider[] hostProviderArray = new HostProvider[numClients]; + InetSocketAddress[] curHostForEachClient = new InetSocketAddress[numClients]; + int[] numClientsPerHost = new int[9]; + + // initialization + for (int i = 0; i < numClients; i++) { + hostProviderArray[i] = getHostProvider(9); + curHostForEachClient[i] = hostProviderArray[i].next(0); + numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++; + } + + for (int i = 0; i < 9; i++) { + assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 9)); + assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 9)); + numClientsPerHost[i] = 0; // prepare for next test + } + + // remove host number 8 (the last one in a list of 9 hosts) + Collection<InetSocketAddress> newList = getServerAddresses(8); + + for (int i = 0; i < numClients; i++) { + disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]); + if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0); + numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++; + } + + for (int i = 0; i < 8; i++) { + assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 8)); + assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 8)); + numClientsPerHost[i] = 0; // prepare for next test + } + assertTrue(numClientsPerHost[8] == 0); + + // remove hosts number 6 and 7 (the currently last two in the list) + newList = getServerAddresses(6); + + for (int i = 0; i < numClients; i++) { + disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]); + if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0); + numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++; + } + + for (int i = 0; i < 6; i++) { + assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 6)); + assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 6)); + numClientsPerHost[i] = 0; // prepare for next test + } + assertTrue(numClientsPerHost[6] == 0); + assertTrue(numClientsPerHost[7] == 0); + assertTrue(numClientsPerHost[8] == 0); + + // remove host number 0 (the first one in the current list) + // and add back hosts 6, 7 and 8 + newList = new ArrayList<InetSocketAddress>(8); + for (int i = 9; i > 1; i--) { + newList.add(new InetSocketAddress("10.10.10." + i, 1234 + i)); + } + + for (int i = 0; i < numClients; i++) { + disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]); + if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0); + numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++; + } + + assertTrue(numClientsPerHost[0] == 0); + + for (int i = 1; i < 9; i++) { + assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 8)); + assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 8)); + numClientsPerHost[i] = 0; // prepare for next test + } + + // add back host number 0 + newList = getServerAddresses(9); + + for (int i = 0; i < numClients; i++) { + disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]); + if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0); + numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++; + } + + for (int i = 0; i < 9; i++) { + assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 9)); + assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 9)); + } + } + private StaticHostProvider getHostProvider(int size) throws UnknownHostException { + return new StaticHostProvider(getServerAddresses(size), r.nextLong()); + } + + private Collection<InetSocketAddress> getServerAddresses(int size) { ArrayList<InetSocketAddress> list = new ArrayList<InetSocketAddress>( size); while (size > 0) { - list.add(new InetSocketAddress("10.10.10." + size, 1234)); + list.add(new InetSocketAddress("10.10.10." + size, 1234 + size)); --size; } - return new StaticHostProvider(list); + return list; + } + + private double lowerboundCPS(int numClients, int numServers) { + return (1 - slackPercent/100.0) * numClients / numServers; + } + + private double upperboundCPS(int numClients, int numServers) { + return (1 + slackPercent/100.0) * numClients / numServers; } + }
