This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-3.6
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/branch-3.6 by this push:
     new 66e9eda  ZOOKEEPER-3651: try to fix flaky NettyServerCnxnFactoryTest
66e9eda is described below

commit 66e9eda68539c7eda5c67af4f12003382aba8529
Author: Mate Szalay-Beko <[email protected]>
AuthorDate: Mon Dec 16 22:09:36 2019 +0100

    ZOOKEEPER-3651: try to fix flaky NettyServerCnxnFactoryTest
    
    The testOutstandingHandshakeLimit is flaky, I tried to fix it in this 
commit.
    - I added extra comments and did some restructuring in the code.
    - Avoiding to start unnecessary ZooKeeper servers for tests don't require it
    - Decreasing the number of client connections the test tries to initiate
    - Increasing the timeout to make sure the connections get established
    - Filtering the 'SyncConnected' events in the client watcher to make sure
    the given connection is really established before counting it
    
    I think the last two points above should fix the flakiness. I tried to run 
the
    test in docker, and before the fix it failed for me once in every 4-5 
execution.
    After applying these changes I re-executed it 100 times without failure.
    
    If these fixes are not enough, then we can introduce some 
only-visible-by-test
    method to add sleep in the SSLHandshake process in the production code to
    force to have handshakes in parallel. However, it would be nice to avoid 
that.
    Let's hope that these fixes will be enough.
    
    Author: Mate Szalay-Beko <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Norbert Kalmar 
<[email protected]>
    
    Closes #1184 from symat/ZOOKEEPER-3651
    
    (cherry picked from commit 20daae7d5fa934629e7825ed72e66ad76a94d6aa)
    Signed-off-by: Enrico Olivelli <[email protected]>
---
 .../server/NettyServerCnxnFactoryTest.java         | 150 ++++++++++++++++-----
 1 file changed, 114 insertions(+), 36 deletions(-)

diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnFactoryTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnFactoryTest.java
index afb97b1..76136c4 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnFactoryTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnFactoryTest.java
@@ -43,24 +43,33 @@ public class NettyServerCnxnFactoryTest extends ClientBase {
     private static final Logger LOG = LoggerFactory
             .getLogger(NettyServerCnxnFactoryTest.class);
 
-    final LinkedBlockingQueue<ZooKeeper> zks = new 
LinkedBlockingQueue<ZooKeeper>();
+    ClientX509Util x509Util;
+    final LinkedBlockingQueue<ZooKeeper> zooKeeperClients = new 
LinkedBlockingQueue<>();
+
 
     @Override
     public void setUp() throws Exception {
         System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
                 "org.apache.zookeeper.server.NettyServerCnxnFactory");
-        super.setUp();
+
+        // by default, we don't start any ZooKeeper server, as not all the 
tests are needing it.
     }
 
     @Override
     public void tearDown() throws Exception {
-        System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
 
-        // clean up
-        for (ZooKeeper zk : zks) {
+        System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
+        if (x509Util != null) {
+            SSLAuthTest.clearSecureSetting(x509Util);
+        }
+        for (ZooKeeper zk : zooKeeperClients) {
             zk.close();
         }
-        super.tearDown();
+
+        //stopping the server only if it was started
+        if (serverFactory != null) {
+            super.tearDown();
+        }
     }
 
     @Test
@@ -96,63 +105,132 @@ public class NettyServerCnxnFactoryTest extends 
ClientBase {
         Assert.assertTrue(factory.getParentChannel().isActive());
     }
 
+    /*
+     * In this test we are flooding the server with SSL connections, and 
expecting that not
+     * all the connection will succeed at once. Some of the connections should 
be closed,
+     * as there is a maximum number of parallel SSL handshake the server is 
willing to do
+     * for security reasons.
+     */
     @Test
     public void testOutstandingHandshakeLimit() throws Exception {
 
+        // setting up SSL params, but disable some debug logs
+        x509Util = SSLAuthTest.setUpSecure();
+        System.clearProperty("javax.net.debug");
+
+        // starting a single server (it will be closed in the tearDown)
+        setUpWithServerId(1);
+
+        // initializing the statistics
         SimpleCounter tlsHandshakeExceeded = (SimpleCounter) 
ServerMetrics.getMetrics().TLS_HANDSHAKE_EXCEEDED;
         tlsHandshakeExceeded.reset();
         Assert.assertEquals(tlsHandshakeExceeded.get(), 0);
 
-        ClientX509Util x509Util = SSLAuthTest.setUpSecure();
+        // setting the HandshakeLimit to 3, so only 3 SSL handshakes can 
happen in parallel
         NettyServerCnxnFactory factory = (NettyServerCnxnFactory) 
serverFactory;
         factory.setSecure(true);
-        factory.setOutstandingHandshakeLimit(10);
+        factory.setOutstandingHandshakeLimit(3);
 
+        // starting the threads that will try to connect to the server
+        // we will have 3 threads, each of them establishing 3 connections
         int threadNum = 3;
-        int cnxnPerThread = 10;
-        Thread[] cnxnWorker = new Thread[threadNum];
-
+        int cnxnPerThread = 3;
+        int cnxnLimit = threadNum * cnxnPerThread;
         AtomicInteger cnxnCreated = new AtomicInteger(0);
         CountDownLatch latch = new CountDownLatch(1);
-
+        Thread[] cnxnWorker = new Thread[threadNum];
         for (int i = 0; i < cnxnWorker.length; i++) {
-            cnxnWorker[i] = new Thread() {
-                @Override
-                public void run() {
-                    for (int i = 0; i < cnxnPerThread; i++) {
-                        try {
-                            zks.add(new ZooKeeper(hostPort, 3000, new 
Watcher() {
-                                @Override
-                                public void process(WatchedEvent event) {
-                                    int created = cnxnCreated.addAndGet(1);
-                                    if (created == threadNum * cnxnPerThread) {
-                                        latch.countDown();
-                                    }
-                                }
-                            }));
-                        } catch (Exception e) {
-                            LOG.info("Error while creating zk client", e);
-                        }
-                    }
-                }
-            };
+            cnxnWorker[i] = new ClientConnectionGenerator(i, cnxnPerThread, 
cnxnCreated, cnxnLimit, latch, zooKeeperClients);
             cnxnWorker[i].start();
         }
 
-        Assert.assertThat(latch.await(3, TimeUnit.SECONDS), Matchers.is(true));
-        LOG.info("created {} connections", threadNum * cnxnPerThread);
+        // we might need to wait potentially for a longer time for all the 
connection to get established,
+        // as the ZooKeeper Server will close some of the connections and the 
clients will have to re-try
+        boolean allConnectionsCreatedInTime = latch.await(30, 
TimeUnit.SECONDS);
+        int actualConnections = cnxnCreated.get();
+        LOG.info("created {} connections", actualConnections);
+        if (!allConnectionsCreatedInTime) {
+          Assert.fail(String.format("Only %d out of %d connections created!", 
actualConnections, cnxnLimit));
+        }
 
-        // Assert throttling not 0
+        // Assert the server refused some of the connections because the 
handshake limit was reached
+        // (throttling should be greater than 0)
         long handshakeThrottledNum = tlsHandshakeExceeded.get();
         LOG.info("TLS_HANDSHAKE_EXCEEDED: {}", handshakeThrottledNum);
         Assert.assertThat("The number of handshake throttled should be "
                 + "greater than 0", handshakeThrottledNum, 
Matchers.greaterThan(0L));
 
-        // Assert there is no outstanding handshake anymore
+        // Assert there is no outstanding handshake anymore, all the clients 
connected in the end
         int outstandingHandshakeNum = factory.getOutstandingHandshakeNum();
         LOG.info("outstanding handshake is {}", outstandingHandshakeNum);
         Assert.assertThat("The outstanding handshake number should be 0 "
                 + "after all cnxns established", outstandingHandshakeNum, 
Matchers.is(0));
+    }
 
+
+    private final class ClientConnectionWatcher implements Watcher {
+
+        private final AtomicInteger cnxnCreated;
+        private final int cnxnLimit;
+        private final int cnxnThreadId;
+        private final int cnxnId;
+        private final CountDownLatch latch;
+
+        public ClientConnectionWatcher(AtomicInteger cnxnCreated, int 
cnxnLimit, int cnxnThreadId,
+                                       int cnxnId, CountDownLatch latch) {
+            this.cnxnCreated = cnxnCreated;
+            this.cnxnLimit = cnxnLimit;
+            this.cnxnThreadId = cnxnThreadId;
+            this.cnxnId = cnxnId;
+            this.latch = latch;
+        }
+
+        @Override
+        public void process(WatchedEvent event) {
+            LOG.info(String.format("WATCHER [thread: %d, cnx:%d] - new event: 
%s", cnxnThreadId, cnxnId, event.toString()));
+            if (event.getState() == Event.KeeperState.SyncConnected) {
+              int created = cnxnCreated.addAndGet(1);
+              if (created == cnxnLimit) {
+                latch.countDown();
+              }
+            }
+        }
     }
+
+
+    private final class ClientConnectionGenerator extends Thread {
+
+        private final int cnxnThreadId;
+        private final int cnxnPerThread;
+        private final AtomicInteger cnxnCreated;
+        private final int cnxnLimit;
+        private final CountDownLatch latch;
+        private final LinkedBlockingQueue<ZooKeeper> zks;
+
+        private ClientConnectionGenerator(int cnxnThreadId, int cnxnPerThread,
+                                          AtomicInteger cnxnCreated, int 
cnxnLimit,
+                                          CountDownLatch latch,
+                                          LinkedBlockingQueue<ZooKeeper> zks) {
+            this.cnxnThreadId = cnxnThreadId;
+            this.cnxnPerThread = cnxnPerThread;
+            this.cnxnCreated = cnxnCreated;
+            this.cnxnLimit = cnxnLimit;
+            this.latch = latch;
+            this.zks = zks;
+        }
+
+        @Override
+        public void run() {
+
+            for (int j = 0; j < cnxnPerThread; j++) {
+                try {
+                    zks.add(new ZooKeeper(hostPort, 30000,
+                                          new 
ClientConnectionWatcher(cnxnCreated, cnxnLimit, cnxnThreadId, j, latch)));
+                } catch (Exception e) {
+                    LOG.info("Error while creating zk client", e);
+                }
+            }
+        }
+    }
+
 }

Reply via email to