This is an automated email from the ASF dual-hosted git repository.

nkalmar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d8caf9  ZOOKEEPER-3756: Members slow to rejoin quorum using Kubernetes
4d8caf9 is described below

commit 4d8caf9851f63073145996fd4a535278014d01be
Author: Mate Szalay-Beko <szalay.beko.m...@gmail.com>
AuthorDate: Mon Mar 23 16:20:05 2020 +0100

    ZOOKEEPER-3756: Members slow to rejoin quorum using Kubernetes
    
    Whenever we close the current master ZooKeeper server, a new leader election
    is triggered. During the new election, a connection will be established 
between
    all the servers, by calling the synchronized 'connectOne' method in
    QuorumCnxManager. The method will open the socket and send a single small
    initial message to the other server, usually very quickly. If the 
destination
    host is unreachable, it should fail immediately.
    
    However, when we use Kubernetes, then the destination host is always 
reachable
    as it points to Kubernetes services. If the actual container / pod is not
    available then the 'socket.connect' method will timeout (by default after 5 
sec)
    instead of failing immediately with NoRouteToHostException. As the 
'connectOne'
    method is synchronized, this timeout will block the creation of other
    connections, so a single unreachable host can cause timeout in the leader
    election protocol.
    
    One workaround is to decrease the socket connection timeout with the
    '-Dzookeeper.cnxTimeout' stystem property, but the proper fix would be to
    make the connection initiation fully asynchronous, as using very low 
timeout can
    have its own side effect. Fortunately most of the initial message sending
    is already made async: the SASL authentication can take more time, so the
    second (authentication + initial message sending) part of the initiation 
protocol
    is already called in a separate thread, when Quorum SASL authentication is 
enabled.
    
    In the following patch I made the whole connection initiation async, by
    always using the async executor (not only when Quorum SASL is enabled) and
    also moving the socket.connect call into the async thread.
    
    I also created a unit test to verify my fix. I added a static socket 
factory that can be
    changed by the tests using a packet private setter method. My test failed 
(and
    produced the same error logs as we see in the original Jira ticket) before 
I applied
    my changes and a time-outed as no leader election succeeded after 15 
seconds.
    After the changes the test runs very quickly, in 1-2 seconds.
    
    Note: due to the multiAddress changes, we will need different PRs to the 
branch 3.5
    and to the 3.6+ branches. I will submit the other PR once this got reviewed.
    
    Author: Mate Szalay-Beko <szalay.beko.m...@gmail.com>
    Author: Mate Szalay-Beko <msza...@cloudera.com>
    
    Reviewers: Enrico Olivelli <eolive...@apache.org>, Norbert Kalmar 
<nkal...@apache.org>
    
    Closes #1289 from symat/ZOOKEEPER-3756-master
---
 .../zookeeper/server/quorum/QuorumCnxManager.java  | 170 +++++++++------------
 ...uorumCnxManagerSocketConnectionTimeoutTest.java | 112 ++++++++++++++
 2 files changed, 188 insertions(+), 94 deletions(-)

diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
index 66f6883..0838492 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
@@ -27,7 +27,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.NoRouteToHostException;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
@@ -58,6 +57,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import javax.net.ssl.SSLSocket;
 import org.apache.zookeeper.common.NetUtils;
@@ -186,6 +186,17 @@ public class QuorumCnxManager {
      */
     private final boolean tcpKeepAlive = 
Boolean.getBoolean("zookeeper.tcpKeepAlive");
 
+
+    /*
+     * Socket factory, allowing the injection of custom socket implementations 
for testing
+     */
+    static final Supplier<Socket> DEFAULT_SOCKET_FACTORY = () -> new Socket();
+    private static Supplier<Socket> SOCKET_FACTORY = DEFAULT_SOCKET_FACTORY;
+    static void setSocketFactory(Supplier<Socket> factory) {
+        SOCKET_FACTORY = factory;
+    }
+
+
     public static class Message {
 
         Message(ByteBuffer buffer, long sid) {
@@ -316,41 +327,30 @@ public class QuorumCnxManager {
         this.socketTimeout = socketTimeout;
         this.view = view;
         this.listenOnAllIPs = listenOnAllIPs;
+        this.authServer = authServer;
+        this.authLearner = authLearner;
+        this.quorumSaslAuthEnabled = quorumSaslAuthEnabled;
 
-        initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize, 
quorumSaslAuthEnabled);
+        initializeConnectionExecutor(mySid, quorumCnxnThreadsSize);
 
         // Starts listener thread that waits for connection requests
         listener = new Listener();
         listener.setName("QuorumPeerListener");
     }
 
-    private void initializeAuth(final long mySid, final QuorumAuthServer 
authServer,
-        final QuorumAuthLearner authLearner, final int quorumCnxnThreadsSize, 
final boolean quorumSaslAuthEnabled) {
-
-        this.authServer = authServer;
-        this.authLearner = authLearner;
-        this.quorumSaslAuthEnabled = quorumSaslAuthEnabled;
-        if (!this.quorumSaslAuthEnabled) {
-            LOG.debug("Not initializing connection executor as quorum sasl 
auth is disabled");
-            return;
-        }
-
-        // init connection executors
+    // we always use the Connection Executor during connection initiation (to 
handle connection
+    // timeouts), and optionally use it during receiving connections (as the 
Quorum SASL authentication
+    // can take extra time)
+    private void initializeConnectionExecutor(final long mySid, final int 
quorumCnxnThreadsSize) {
         final AtomicInteger threadIndex = new AtomicInteger(1);
         SecurityManager s = System.getSecurityManager();
         final ThreadGroup group = (s != null) ? s.getThreadGroup() : 
Thread.currentThread().getThreadGroup();
-        ThreadFactory daemonThFactory = new ThreadFactory() {
 
-            @Override
-            public Thread newThread(Runnable r) {
-                Thread t = new Thread(
-                    group,
-                    r,
-                    "QuorumConnectionThread-[myid=" + mySid + "]-" + 
threadIndex.getAndIncrement());
-                return t;
-            }
-        };
-        this.connectionExecutor = new ThreadPoolExecutor(3, 
quorumCnxnThreadsSize, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 
daemonThFactory);
+        final ThreadFactory daemonThFactory = runnable -> new Thread(group, 
runnable,
+            String.format("QuorumConnectionThread-[myid=%d]-%d", mySid, 
threadIndex.getAndIncrement()));
+
+        this.connectionExecutor = new ThreadPoolExecutor(3, 
quorumCnxnThreadsSize, 60, TimeUnit.SECONDS,
+                                                         new 
SynchronousQueue<>(), daemonThFactory);
         this.connectionExecutor.allowCoreThreadTimeOut(true);
     }
 
@@ -359,20 +359,49 @@ public class QuorumCnxManager {
      *
      * @param sid
      */
-    public void testInitiateConnection(long sid) throws Exception {
+    public void testInitiateConnection(long sid) {
         LOG.debug("Opening channel to server {}", sid);
-        Socket sock = new Socket();
-        setSockOpts(sock);
-        InetSocketAddress address = 
self.getVotingView().get(sid).electionAddr.getReachableOrOne();
-        sock.connect(address, cnxTO);
-        initiateConnection(sock, sid);
+        initiateConnection(self.getVotingView().get(sid).electionAddr, sid);
     }
 
     /**
+     * First we create the socket, perform SSL handshake and authentication if 
needed.
+     * Then we perform the initiation protocol.
      * If this server has initiated the connection, then it gives up on the
      * connection if it loses challenge. Otherwise, it keeps the connection.
      */
-    public void initiateConnection(final Socket sock, final Long sid) {
+    public void initiateConnection(final MultipleAddresses electionAddr, final 
Long sid) {
+        Socket sock = null;
+        try {
+            LOG.debug("Opening channel to server {}", sid);
+            if (self.isSslQuorum()) {
+                sock = self.getX509Util().createSSLSocket();
+            } else {
+                sock = SOCKET_FACTORY.get();
+            }
+            setSockOpts(sock);
+            sock.connect(electionAddr.getReachableOrOne(), cnxTO);
+            if (sock instanceof SSLSocket) {
+                SSLSocket sslSock = (SSLSocket) sock;
+                sslSock.startHandshake();
+                LOG.info("SSL handshake complete with {} - {} - {}",
+                         sslSock.getRemoteSocketAddress(),
+                         sslSock.getSession().getProtocol(),
+                         sslSock.getSession().getCipherSuite());
+            }
+
+            LOG.debug("Connected to server {} using election address: {}:{}",
+                      sid, sock.getInetAddress(), sock.getPort());
+        } catch (X509Exception e) {
+            LOG.warn("Cannot open secure channel to {} at election address 
{}", sid, electionAddr, e);
+            closeSocket(sock);
+            return;
+        } catch (UnresolvedAddressException | IOException e) {
+            LOG.warn("Cannot open channel to {} at election address {}", sid, 
electionAddr, e);
+            closeSocket(sock);
+            return;
+        }
+
         try {
             startConnection(sock, sid);
         } catch (IOException e) {
@@ -389,16 +418,15 @@ public class QuorumCnxManager {
      * Server will initiate the connection request to its peer server
      * asynchronously via separate connection thread.
      */
-    public void initiateConnectionAsync(final Socket sock, final Long sid) {
+    public boolean initiateConnectionAsync(final MultipleAddresses 
electionAddr, final Long sid) {
         if (!inprogressConnections.add(sid)) {
             // simply return as there is a connection request to
             // server 'sid' already in progress.
             LOG.debug("Connection request to server id: {} is already in 
progress, so skipping this request", sid);
-            closeSocket(sock);
-            return;
+            return true;
         }
         try {
-            connectionExecutor.execute(new QuorumConnectionReqThread(sock, 
sid));
+            connectionExecutor.execute(new 
QuorumConnectionReqThread(electionAddr, sid));
             connectionThreadCnt.incrementAndGet();
         } catch (Throwable e) {
             // Imp: Safer side catching all type of exceptions and remove 'sid'
@@ -406,27 +434,27 @@ public class QuorumCnxManager {
             // connection requests from this 'sid' in case of errors.
             inprogressConnections.remove(sid);
             LOG.error("Exception while submitting quorum connection request", 
e);
-            closeSocket(sock);
+            return false;
         }
+        return true;
     }
 
     /**
      * Thread to send connection request to peer server.
      */
     private class QuorumConnectionReqThread extends ZooKeeperThread {
-
-        final Socket sock;
+        final MultipleAddresses electionAddr;
         final Long sid;
-        QuorumConnectionReqThread(final Socket sock, final Long sid) {
+        QuorumConnectionReqThread(final MultipleAddresses electionAddr, final 
Long sid) {
             super("QuorumConnectionReqThread-" + sid);
-            this.sock = sock;
+            this.electionAddr = electionAddr;
             this.sid = sid;
         }
 
         @Override
         public void run() {
             try {
-                initiateConnection(sock, sid);
+                initiateConnection(electionAddr, sid);
             } finally {
                 inprogressConnections.remove(sid);
             }
@@ -679,6 +707,7 @@ public class QuorumCnxManager {
 
     /**
      * Try to establish a connection to server with id sid using its 
electionAddr.
+     * The function will return quickly and the connection will be established 
asynchronously.
      *
      * VisibleForTesting.
      *
@@ -697,62 +726,15 @@ public class QuorumCnxManager {
             return true;
         }
 
-        Socket sock = null;
-        try {
-            LOG.debug("Opening channel to server {}", sid);
-            if (self.isSslQuorum()) {
-                sock = self.getX509Util().createSSLSocket();
-            } else {
-                sock = new Socket();
-            }
-            setSockOpts(sock);
-            sock.connect(electionAddr.getReachableOrOne(), cnxTO);
-            if (sock instanceof SSLSocket) {
-                SSLSocket sslSock = (SSLSocket) sock;
-                sslSock.startHandshake();
-                LOG.info("SSL handshake complete with {} - {} - {}",
-                         sslSock.getRemoteSocketAddress(),
-                         sslSock.getSession().getProtocol(),
-                         sslSock.getSession().getCipherSuite());
-            }
-
-            LOG.debug("Connected to server {} using election address: {}:{}",
-                      sid, sock.getInetAddress(), sock.getPort());
-            // Sends connection request asynchronously if the quorum
-            // sasl authentication is enabled. This is required because
-            // sasl server authentication process may take few seconds to
-            // finish, this may delay next peer connection requests.
-            if (quorumSaslAuthEnabled) {
-                initiateConnectionAsync(sock, sid);
-            } else {
-                initiateConnection(sock, sid);
-            }
-            return true;
-        } catch (UnresolvedAddressException e) {
-            // Sun doesn't include the address that causes this
-            // exception to be thrown, also UAE cannot be wrapped cleanly
-            // so we log the exception in order to capture this critical
-            // detail.
-            LOG.warn("Cannot open channel to {} at election address {}", sid, 
electionAddr, e);
-            closeSocket(sock);
-            throw e;
-        } catch (X509Exception e) {
-            LOG.warn("Cannot open secure channel to {} at election address 
{}", sid, electionAddr, e);
-            closeSocket(sock);
-            return false;
-        } catch (NoRouteToHostException e) {
-            LOG.warn("None of the addresses ({}) are reachable for sid {}", 
electionAddr, sid, e);
-            closeSocket(sock);
-            return false;
-        } catch (IOException e) {
-            LOG.warn("Cannot open channel to {} at election address {}", sid, 
electionAddr, e);
-            closeSocket(sock);
-            return false;
-        }
+        // we are doing connection initiation always asynchronously, since it 
is possible that
+        // the socket connection timeouts or the SSL handshake takes too long 
and don't want
+        // to keep the rest of the connections to wait
+        return initiateConnectionAsync(electionAddr, sid);
     }
 
     /**
      * Try to establish a connection to server with id sid.
+     * The function will return quickly and the connection will be established 
asynchronously.
      *
      *  @param sid  server id
      */
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumCnxManagerSocketConnectionTimeoutTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumCnxManagerSocketConnectionTimeoutTest.java
new file mode 100644
index 0000000..ab1c077
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumCnxManagerSocketConnectionTimeoutTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.QuorumUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QuorumCnxManagerSocketConnectionTimeoutTest extends ZKTestCase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(QuorumCnxManagerSocketConnectionTimeoutTest.class);
+    private QuorumUtil qu;
+
+    @Before
+    public void setUp() throws Exception {
+        // starting a 3 node ensemble without observers
+        qu = new QuorumUtil(1, 2);
+        qu.startAll();
+    }
+
+    /**
+     * Testing an error case reported in ZOOKEEPER-3756:
+     *
+     * When a new leader election happens after a ZooKeeper server restarted, 
in Kubernetes
+     * the rest of the servers can not initiate connection to the restarted 
one. But they
+     * get SocketTimeoutException instead of immediate IOException. The Leader 
Election was
+     * time-outing quicker than the socket.connect call, so we ended up with 
cycles of broken
+     * leader elections.
+     *
+     * The fix was to make the connection initiation asynchronous, so one 
'broken' connection
+     * doesn't make the whole leader election to be blocked, even in case of 
SocketTimeoutException.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testSocketConnectionTimeoutDuringConnectingToElectionAddress() 
throws Exception {
+
+        int leaderId = qu.getLeaderServer();
+
+        // use a custom socket factory that will cause timeout instead of 
connecting to the
+        // leader election port of the current leader
+        final InetSocketAddress leaderElectionAddress =
+            qu.getLeaderQuorumPeer().getElectionAddress().getOne();
+        QuorumCnxManager.setSocketFactory(() -> new 
SocketStub(leaderElectionAddress));
+
+        qu.shutdown(leaderId);
+
+        assertTrue("Timeout during waiting for current leader to go down",
+                   ClientBase.waitForServerDown("127.0.0.1:" + 
qu.getPeer(leaderId).clientPort,
+                                                
ClientBase.CONNECTION_TIMEOUT));
+
+        String errorMessage = "No new leader was elected";
+        waitFor(errorMessage, () -> qu.leaderExists() && qu.getLeaderServer() 
!= leaderId, 15);
+    }
+
+    final class SocketStub extends Socket {
+
+        private final InetSocketAddress addressToTimeout;
+
+        SocketStub(InetSocketAddress addressToTimeout) {
+            this.addressToTimeout = addressToTimeout;
+        }
+
+        @Override
+        public void connect(SocketAddress endpoint, int timeout) throws 
IOException {
+            if (addressToTimeout.equals(endpoint)) {
+                try {
+                    Thread.sleep(timeout);
+                } catch (InterruptedException e) {
+                    LOG.warn("interrupted SocketStub.connect", e);
+                }
+                throw new SocketTimeoutException("timeout reached in 
SocketStub.connect()");
+            }
+
+            super.connect(endpoint, timeout);
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        qu.shutdownAll();
+        
QuorumCnxManager.setSocketFactory(QuorumCnxManager.DEFAULT_SOCKET_FACTORY);
+    }
+
+}
\ No newline at end of file

Reply via email to