This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.6
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit e2a5e7c47faeadce206405a92b2468c6c7e84615
Author: Ivan Kelly <[email protected]>
AuthorDate: Wed Nov 29 18:58:03 2017 -0800

    Tests wait for client to see started bookie
    
    Up until now we were only waiting for the bookie to exist in the
    bookie list, after which we triggered a read. However, when the
    registration changes went in, the read wasn't actually populating the
    mechanism used to select bookies, so we ended up momentarily having
    unusable bookkeeper clients until the correct read triggered. This
    resulted in flakiness in random tests.
    
    This change makes it so we wait until the client has seen a bookie
    before startBookie will return. It also makes sure that reading the
    bookie list actually populates the bookie selection mechanism.
    
    Author: Ivan Kelly <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>, Sijie 
Guo <[email protected]>
    
    This closes #775 from ivankelly/no-enough-bookie-flake
---
 .../org/apache/bookkeeper/client/BookKeeper.java   |  4 +-
 .../apache/bookkeeper/client/BookieWatcher.java    | 39 ++++++++++++++---
 .../bookkeeper/discover/RegistrationClient.java    |  8 +++-
 .../bookkeeper/discover/ZKRegistrationClient.java  | 23 +++++++---
 .../bookkeeper/client/BookKeeperTestClient.java    | 51 +++++++++++++++++++---
 .../bookkeeper/test/BookKeeperClusterTestCase.java | 44 ++++++++-----------
 .../apache/bookkeeper/test/ReadOnlyBookieTest.java | 27 +++++-------
 7 files changed, 129 insertions(+), 67 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 8e14812..4faea05 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -497,13 +497,13 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
                     .setNameFormat("BKClientMetaDataPollScheduler-%d");
             this.bookieInfoScheduler = 
Executors.newSingleThreadScheduledExecutor(tFBuilder.build());
             this.bookieInfoReader = new BookieInfoReader(this, conf, 
this.bookieInfoScheduler);
-            this.bookieWatcher.readBookiesBlocking();
+            this.bookieWatcher.initialBlockingBookieRead();
             this.bookieInfoReader.start();
         } else {
             LOG.info("Weighted ledger placement is not enabled");
             this.bookieInfoScheduler = null;
             this.bookieInfoReader = new BookieInfoReader(this, conf, null);
-            this.bookieWatcher.readBookiesBlocking();
+            this.bookieWatcher.initialBlockingBookieRead();
         }
 
         // initialize ledger manager
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index 2bba1d6..04c62e2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import lombok.extern.slf4j.Slf4j;
@@ -70,6 +71,9 @@ class BookieWatcher {
     private volatile Set<BookieSocketAddress> writableBookies = 
Collections.emptySet();
     private volatile Set<BookieSocketAddress> readOnlyBookies = 
Collections.emptySet();
 
+    private CompletableFuture<?> initialWritableBookiesFuture = null;
+    private CompletableFuture<?> initialReadonlyBookiesFuture = null;
+
     public BookieWatcher(ClientConfiguration conf,
                          EnsemblePlacementPolicy placementPolicy,
                          RegistrationClient registrationClient) {
@@ -138,17 +142,40 @@ class BookieWatcher {
      *
      * @throws BKException when failed to read bookies
      */
-    public void readBookiesBlocking() throws BKException {
-        this.registrationClient.watchReadOnlyBookies(bookies -> 
processReadOnlyBookiesChanged(bookies.getValue()));
-        this.registrationClient.watchWritableBookies(bookies -> 
processWritableBookiesChanged(bookies.getValue()));
+    public void initialBlockingBookieRead() throws BKException {
+        CompletableFuture<?> writable;
+        CompletableFuture<?> readonly;
+        synchronized (this) {
+            if (initialReadonlyBookiesFuture == null) {
+                assert initialWritableBookiesFuture == null;
+
+                writable = this.registrationClient.watchWritableBookies(
+                            bookies -> 
processWritableBookiesChanged(bookies.getValue()));
+
+                readonly = this.registrationClient.watchReadOnlyBookies(
+                            bookies -> 
processReadOnlyBookiesChanged(bookies.getValue()));
+                initialWritableBookiesFuture = writable;
+                initialReadonlyBookiesFuture = readonly;
+            } else {
+                writable = initialWritableBookiesFuture;
+                readonly = initialReadonlyBookiesFuture;
+            }
+        }
 
         try {
-            readOnlyBookies = getReadOnlyBookies();
+            FutureUtils.result(writable, EXCEPTION_FUNC);
+        } catch (BKInterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw ie;
+        }
+        try {
+            FutureUtils.result(readonly, EXCEPTION_FUNC);
+        } catch (BKInterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw ie;
         } catch (Exception e) {
             log.error("Failed getReadOnlyBookies: ", e);
         }
-
-        writableBookies = getBookies();
     }
 
     /**
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
index 068f1cc..79dc300 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
@@ -87,8 +87,10 @@ public interface RegistrationClient extends AutoCloseable {
      * <p>The topology changes of bookies will be propagated to the provided 
<i>listener</i>.
      *
      * @param listener listener to receive the topology changes of bookies.
+     * @return a future which completes when the bookies have been read for
+     *         the first time
      */
-    void watchWritableBookies(RegistrationListener listener);
+    CompletableFuture<Void> watchWritableBookies(RegistrationListener 
listener);
 
     /**
      * Unwatch the changes of bookies.
@@ -103,8 +105,10 @@ public interface RegistrationClient extends AutoCloseable {
      * <p>The topology changes of bookies will be propagated to the provided 
<i>listener</i>.
      *
      * @param listener listener to receive the topology changes of bookies.
+     * @return a future which completes when the bookies have been read for
+     *         the first time
      */
-    void watchReadOnlyBookies(RegistrationListener listener);
+    CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener 
listener);
 
     /**
      * Unwatch the changes of bookies.
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
index e214521..be31520 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
@@ -77,10 +77,12 @@ public class ZKRegistrationClient implements 
RegistrationClient {
         private boolean closed = false;
         private Set<BookieSocketAddress> bookies = null;
         private Version version = Version.NEW;
+        private final CompletableFuture<Void> firstRunFuture;
 
-        WatchTask(String regPath) {
+        WatchTask(String regPath, CompletableFuture<Void> firstRunFuture) {
             this.regPath = regPath;
             this.listeners = new CopyOnWriteArraySet<>();
+            this.firstRunFuture = firstRunFuture;
         }
 
         public int getNumListeners() {
@@ -90,7 +92,10 @@ public class ZKRegistrationClient implements 
RegistrationClient {
         public boolean addListener(RegistrationListener listener) {
             if (listeners.add(listener)) {
                 if (null != bookies) {
-                    listener.onBookiesChanged(new Versioned<>(bookies, 
version));
+                    scheduler.execute(() -> {
+                            listener.onBookiesChanged(
+                                    new Versioned<>(bookies, version));
+                        });
                 }
             }
             return true;
@@ -126,6 +131,7 @@ public class ZKRegistrationClient implements 
RegistrationClient {
         public void accept(Versioned<Set<BookieSocketAddress>> bookieSet, 
Throwable throwable) {
             if (throwable != null) {
                 scheduleWatchTask(ZK_CONNECT_BACKOFF_MS);
+                firstRunFuture.completeExceptionally(throwable);
                 return;
             }
 
@@ -138,6 +144,7 @@ public class ZKRegistrationClient implements 
RegistrationClient {
                     listener.onBookiesChanged(bookieSet);
                 }
             }
+            FutureUtils.complete(firstRunFuture, null);
         }
 
         @Override
@@ -274,15 +281,17 @@ public class ZKRegistrationClient implements 
RegistrationClient {
 
 
     @Override
-    public synchronized void watchWritableBookies(RegistrationListener 
listener) {
+    public synchronized CompletableFuture<Void> 
watchWritableBookies(RegistrationListener listener) {
+        CompletableFuture<Void> f = new CompletableFuture<>();
         if (null == watchWritableBookiesTask) {
-            watchWritableBookiesTask = new WatchTask(bookieRegistrationPath);
+            watchWritableBookiesTask = new WatchTask(bookieRegistrationPath, 
f);
         }
 
         watchWritableBookiesTask.addListener(listener);
         if (watchWritableBookiesTask.getNumListeners() == 1) {
             watchWritableBookiesTask.watch();
         }
+        return f;
     }
 
     @Override
@@ -299,15 +308,17 @@ public class ZKRegistrationClient implements 
RegistrationClient {
     }
 
     @Override
-    public synchronized void watchReadOnlyBookies(RegistrationListener 
listener) {
+    public synchronized CompletableFuture<Void> 
watchReadOnlyBookies(RegistrationListener listener) {
+        CompletableFuture<Void> f = new CompletableFuture<>();
         if (null == watchReadOnlyBookiesTask) {
-            watchReadOnlyBookiesTask = new 
WatchTask(bookieReadonlyRegistrationPath);
+            watchReadOnlyBookiesTask = new 
WatchTask(bookieReadonlyRegistrationPath, f);
         }
 
         watchReadOnlyBookiesTask.addListener(listener);
         if (watchReadOnlyBookiesTask.getNumListeners() == 1) {
             watchReadOnlyBookiesTask.watch();
         }
+        return f;
     }
 
     @Override
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
index a42a50d..6b346e1 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
@@ -22,7 +22,13 @@ package org.apache.bookkeeper.client;
  */
 
 import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
@@ -31,6 +37,7 @@ import org.apache.zookeeper.ZooKeeper;
  * Test BookKeeperClient which allows access to members we don't
  * wish to expose in the public API.
  */
+@Slf4j
 public class BookKeeperTestClient extends BookKeeper {
     public BookKeeperTestClient(ClientConfiguration conf)
             throws IOException, InterruptedException, BKException {
@@ -49,14 +56,44 @@ public class BookKeeperTestClient extends BookKeeper {
         return bookieClient;
     }
 
+    public Future<?> waitForReadOnlyBookie(BookieSocketAddress b)
+            throws Exception {
+        return waitForBookieInSet(b, false);
+    }
+
+    public Future<?> waitForWritableBookie(BookieSocketAddress b)
+            throws Exception {
+        return waitForBookieInSet(b, true);
+    }
+
     /**
-     * Force a read to zookeeper to get list of bookies.
-     *
-     * @throws InterruptedException
-     * @throws KeeperException
+     * Wait for bookie to appear in either the writable set of bookies,
+     * or the read only set of bookies. Also ensure that it doesn't exist
+     * in the other set before completing.
      */
-    public void readBookiesBlocking()
-            throws InterruptedException, BKException {
-        bookieWatcher.readBookiesBlocking();
+    private Future<?> waitForBookieInSet(BookieSocketAddress b,
+                                                       boolean writable) {
+        log.info("Wait for {} to become {}",
+                 b, writable ? "writable" : "readonly");
+
+        CompletableFuture<Void> readOnlyFuture = new CompletableFuture<>();
+        CompletableFuture<Void> writableFuture = new CompletableFuture<>();
+
+        RegistrationListener readOnlyListener = (bookies) -> {
+            boolean contains = bookies.getValue().contains(b);
+            if ((!writable && contains) || (writable && !contains)) {
+                readOnlyFuture.complete(null);
+            }
+        };
+        RegistrationListener writableListener = (bookies) -> {
+            boolean contains = bookies.getValue().contains(b);
+            if ((writable && contains) || (!writable && !contains)) {
+                writableFuture.complete(null);
+            }
+        };
+
+        regClient.watchWritableBookies(writableListener);
+        regClient.watchReadOnlyBookies(readOnlyListener);
+        return CompletableFuture.allOf(writableFuture, readOnlyFuture);
     }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index b5a22f8..83eb8e2 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.Bookie;
@@ -558,26 +559,19 @@ public abstract class BookKeeperClusterTestCase {
     protected BookieServer startBookie(ServerConfiguration conf)
             throws Exception {
         BookieServer server = new BookieServer(conf);
-        server.start();
-
+        BookieSocketAddress address = Bookie.getBookieAddress(conf);
         if (bkc == null) {
             bkc = new BookKeeperTestClient(baseClientConf);
         }
 
-        int port = conf.getBookiePort();
-        String host = InetAddress.getLocalHost().getHostAddress();
-        if (conf.getUseHostNameAsBookieID()) {
-            host = InetAddress.getLocalHost().getCanonicalHostName();
-        }
-        
-        while (conf.isForceReadOnlyBookie()
-            && bkc.getZkHandle().exists(conf.getZkLedgersRootPath() 
+"/available/readonly/" + host + ":" + port,
-            false) == null) {
-            Thread.sleep(100);
-        }
+        Future<?> waitForBookie = conf.isForceReadOnlyBookie()
+            ? bkc.waitForReadOnlyBookie(address)
+            : bkc.waitForWritableBookie(address);
+
+        server.start();
 
-        bkc.readBookiesBlocking();
-        LOG.info("New bookie on port " + port + " has been created.");
+        waitForBookie.get(30, TimeUnit.SECONDS);
+        LOG.info("New bookie '{}' has been created.", address);
 
         try {
             startAutoRecovery(server, conf);
@@ -601,24 +595,20 @@ public abstract class BookKeeperClusterTestCase {
                 return b;
             }
         };
-        server.start();
 
+        BookieSocketAddress address = Bookie.getBookieAddress(conf);
         if (bkc == null) {
             bkc = new BookKeeperTestClient(baseClientConf);
         }
+        Future<?> waitForBookie = conf.isForceReadOnlyBookie()
+            ? bkc.waitForReadOnlyBookie(address)
+            : bkc.waitForWritableBookie(address);
 
-        int port = conf.getBookiePort();
-        String host = InetAddress.getLocalHost().getHostAddress();
-        if (conf.getUseHostNameAsBookieID()) {
-            host = InetAddress.getLocalHost().getCanonicalHostName();
-        }
-        while (bkc.getZkHandle().exists(
-                conf.getZkLedgersRootPath() + "/available/" + host + ":" + 
port, false) == null) {
-            Thread.sleep(500);
-        }
+        server.start();
+
+        waitForBookie.get(30, TimeUnit.SECONDS);
+        LOG.info("New bookie '{}' has been created.", address);
 
-        bkc.readBookiesBlocking();
-        LOG.info("New bookie on port " + port + " has been created.");
         try {
             startAutoRecovery(server, conf);
         } catch (CompatibilityException ce) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
index a67e200..6d7ceae 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.util.Enumeration;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
@@ -127,18 +128,13 @@ public class ReadOnlyBookieTest extends 
BookKeeperClusterTestCase {
             // Expected
         }
 
-        // wait for zk to get updated (async) as bookie transitions to 
read-only
-        while (zkc.exists(baseConf.getZkAvailableBookiesPath() + "/" + 
BookKeeperConstants.READONLY + "/"
-                + Bookie.getBookieAddress(bsConfs.get(1)).toString(), false) 
== null) {
-            Thread.sleep(100);
-        }
-        
+        bkc.waitForReadOnlyBookie(Bookie.getBookieAddress(bsConfs.get(1)))
+            .get(30, TimeUnit.SECONDS);
+
         LOG.info("bookie is running {}, readonly {}.", bookie.isRunning(), 
bookie.isReadOnly());
         assertTrue("Bookie should be running and converted to readonly mode",
                 bookie.isRunning() && bookie.isReadOnly());
 
-        // refresh the bookkeeper client
-        bkc.readBookiesBlocking();
         // should fail to create ledger
         try {
             bkc.createLedger(2, 2, DigestType.MAC, "".getBytes());
@@ -150,18 +146,13 @@ public class ReadOnlyBookieTest extends 
BookKeeperClusterTestCase {
         // Now add the current ledger dir back to writable dirs list
         ledgerDirsManager.addToWritableDirs(testDir, true);
 
-        // since the bookie transitions to write mode asynchronously, we need 
to wait for the bookie to be registered on
-        // zk
-        while (zkc.exists(baseConf.getZkAvailableBookiesPath() + "/"
-                + Bookie.getBookieAddress(bsConfs.get(1)).toString(), false) 
== null) {
-            Thread.sleep(100);
-        }
+        bkc.waitForWritableBookie(Bookie.getBookieAddress(bsConfs.get(1)))
+            .get(30, TimeUnit.SECONDS);
 
         LOG.info("bookie is running {}, readonly {}.", bookie.isRunning(), 
bookie.isReadOnly());
         assertTrue("Bookie should be running and converted back to writable 
mode", bookie.isRunning()
                 && !bookie.isReadOnly());
-        // force client to read bookies
-        bkc.readBookiesBlocking();
+
         LedgerHandle newLedger = bkc.createLedger(2, 2, DigestType.MAC, 
"".getBytes());
         for (int i = 0; i < 10; i++) {
             newLedger.addEntry("data".getBytes());
@@ -272,7 +263,9 @@ public class ReadOnlyBookieTest extends 
BookKeeperClusterTestCase {
         startNewBookie();
         bs.get(1).getBookie().doTransitionToReadOnlyMode();
         try {
-            bkc.readBookiesBlocking();
+            bkc.waitForReadOnlyBookie(Bookie.getBookieAddress(bsConfs.get(1)))
+                .get(30, TimeUnit.SECONDS);
+
             bkc.createLedger(2, 2, DigestType.CRC32, "".getBytes());
             fail("Must throw exception, as there is one readonly bookie");
         } catch (BKException e) {

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to