This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 24d53cb Tests wait for client to see started bookie
24d53cb is described below
commit 24d53cb241d0cd08c4fb99101d963bceb9b0d8cb
Author: Ivan Kelly <[email protected]>
AuthorDate: Wed Nov 29 18:58:03 2017 -0800
Tests wait for client to see started bookie
Up until now we were only waiting for the bookie to exist in the
bookie list, after which we triggered a read. However, when the
registration changes went in, the read wasn't actually populating the
mechanism used to select bookies, so we ended up momentarily having
unusable bookkeeper clients until the correct read triggered. This
resulted in flakiness in random tests.
This change makes it so we wait until the client has seen a bookie
before startBookie will return. It also makes sure that reading the
bookie list actually populates the bookie selection mechanism.
Author: Ivan Kelly <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>, Sijie
Guo <[email protected]>
This closes #775 from ivankelly/no-enough-bookie-flake
---
.../org/apache/bookkeeper/client/BookKeeper.java | 4 +-
.../apache/bookkeeper/client/BookieWatcher.java | 39 ++++++++++++++---
.../bookkeeper/discover/RegistrationClient.java | 8 +++-
.../bookkeeper/discover/ZKRegistrationClient.java | 23 +++++++---
.../bookkeeper/client/BookKeeperTestClient.java | 51 +++++++++++++++++++---
.../bookkeeper/test/BookKeeperClusterTestCase.java | 44 ++++++++-----------
.../apache/bookkeeper/test/ReadOnlyBookieTest.java | 27 +++++-------
7 files changed, 129 insertions(+), 67 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 8e14812..4faea05 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -497,13 +497,13 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
.setNameFormat("BKClientMetaDataPollScheduler-%d");
this.bookieInfoScheduler =
Executors.newSingleThreadScheduledExecutor(tFBuilder.build());
this.bookieInfoReader = new BookieInfoReader(this, conf,
this.bookieInfoScheduler);
- this.bookieWatcher.readBookiesBlocking();
+ this.bookieWatcher.initialBlockingBookieRead();
this.bookieInfoReader.start();
} else {
LOG.info("Weighted ledger placement is not enabled");
this.bookieInfoScheduler = null;
this.bookieInfoReader = new BookieInfoReader(this, conf, null);
- this.bookieWatcher.readBookiesBlocking();
+ this.bookieWatcher.initialBlockingBookieRead();
}
// initialize ledger manager
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index 2bba1d6..04c62e2 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
@@ -70,6 +71,9 @@ class BookieWatcher {
private volatile Set<BookieSocketAddress> writableBookies =
Collections.emptySet();
private volatile Set<BookieSocketAddress> readOnlyBookies =
Collections.emptySet();
+ private CompletableFuture<?> initialWritableBookiesFuture = null;
+ private CompletableFuture<?> initialReadonlyBookiesFuture = null;
+
public BookieWatcher(ClientConfiguration conf,
EnsemblePlacementPolicy placementPolicy,
RegistrationClient registrationClient) {
@@ -138,17 +142,40 @@ class BookieWatcher {
*
* @throws BKException when failed to read bookies
*/
- public void readBookiesBlocking() throws BKException {
- this.registrationClient.watchReadOnlyBookies(bookies ->
processReadOnlyBookiesChanged(bookies.getValue()));
- this.registrationClient.watchWritableBookies(bookies ->
processWritableBookiesChanged(bookies.getValue()));
+ public void initialBlockingBookieRead() throws BKException {
+ CompletableFuture<?> writable;
+ CompletableFuture<?> readonly;
+ synchronized (this) {
+ if (initialReadonlyBookiesFuture == null) {
+ assert initialWritableBookiesFuture == null;
+
+ writable = this.registrationClient.watchWritableBookies(
+ bookies ->
processWritableBookiesChanged(bookies.getValue()));
+
+ readonly = this.registrationClient.watchReadOnlyBookies(
+ bookies ->
processReadOnlyBookiesChanged(bookies.getValue()));
+ initialWritableBookiesFuture = writable;
+ initialReadonlyBookiesFuture = readonly;
+ } else {
+ writable = initialWritableBookiesFuture;
+ readonly = initialReadonlyBookiesFuture;
+ }
+ }
try {
- readOnlyBookies = getReadOnlyBookies();
+ FutureUtils.result(writable, EXCEPTION_FUNC);
+ } catch (BKInterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw ie;
+ }
+ try {
+ FutureUtils.result(readonly, EXCEPTION_FUNC);
+ } catch (BKInterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw ie;
} catch (Exception e) {
log.error("Failed getReadOnlyBookies: ", e);
}
-
- writableBookies = getBookies();
}
/**
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
index 068f1cc..79dc300 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
@@ -87,8 +87,10 @@ public interface RegistrationClient extends AutoCloseable {
* <p>The topology changes of bookies will be propagated to the provided
<i>listener</i>.
*
* @param listener listener to receive the topology changes of bookies.
+ * @return a future which completes when the bookies have been read for
+ * the first time
*/
- void watchWritableBookies(RegistrationListener listener);
+ CompletableFuture<Void> watchWritableBookies(RegistrationListener
listener);
/**
* Unwatch the changes of bookies.
@@ -103,8 +105,10 @@ public interface RegistrationClient extends AutoCloseable {
* <p>The topology changes of bookies will be propagated to the provided
<i>listener</i>.
*
* @param listener listener to receive the topology changes of bookies.
+ * @return a future which completes when the bookies have been read for
+ * the first time
*/
- void watchReadOnlyBookies(RegistrationListener listener);
+ CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener
listener);
/**
* Unwatch the changes of bookies.
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
index e214521..be31520 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
@@ -77,10 +77,12 @@ public class ZKRegistrationClient implements
RegistrationClient {
private boolean closed = false;
private Set<BookieSocketAddress> bookies = null;
private Version version = Version.NEW;
+ private final CompletableFuture<Void> firstRunFuture;
- WatchTask(String regPath) {
+ WatchTask(String regPath, CompletableFuture<Void> firstRunFuture) {
this.regPath = regPath;
this.listeners = new CopyOnWriteArraySet<>();
+ this.firstRunFuture = firstRunFuture;
}
public int getNumListeners() {
@@ -90,7 +92,10 @@ public class ZKRegistrationClient implements
RegistrationClient {
public boolean addListener(RegistrationListener listener) {
if (listeners.add(listener)) {
if (null != bookies) {
- listener.onBookiesChanged(new Versioned<>(bookies,
version));
+ scheduler.execute(() -> {
+ listener.onBookiesChanged(
+ new Versioned<>(bookies, version));
+ });
}
}
return true;
@@ -126,6 +131,7 @@ public class ZKRegistrationClient implements
RegistrationClient {
public void accept(Versioned<Set<BookieSocketAddress>> bookieSet,
Throwable throwable) {
if (throwable != null) {
scheduleWatchTask(ZK_CONNECT_BACKOFF_MS);
+ firstRunFuture.completeExceptionally(throwable);
return;
}
@@ -138,6 +144,7 @@ public class ZKRegistrationClient implements
RegistrationClient {
listener.onBookiesChanged(bookieSet);
}
}
+ FutureUtils.complete(firstRunFuture, null);
}
@Override
@@ -274,15 +281,17 @@ public class ZKRegistrationClient implements
RegistrationClient {
@Override
- public synchronized void watchWritableBookies(RegistrationListener
listener) {
+ public synchronized CompletableFuture<Void>
watchWritableBookies(RegistrationListener listener) {
+ CompletableFuture<Void> f = new CompletableFuture<>();
if (null == watchWritableBookiesTask) {
- watchWritableBookiesTask = new WatchTask(bookieRegistrationPath);
+ watchWritableBookiesTask = new WatchTask(bookieRegistrationPath,
f);
}
watchWritableBookiesTask.addListener(listener);
if (watchWritableBookiesTask.getNumListeners() == 1) {
watchWritableBookiesTask.watch();
}
+ return f;
}
@Override
@@ -299,15 +308,17 @@ public class ZKRegistrationClient implements
RegistrationClient {
}
@Override
- public synchronized void watchReadOnlyBookies(RegistrationListener
listener) {
+ public synchronized CompletableFuture<Void>
watchReadOnlyBookies(RegistrationListener listener) {
+ CompletableFuture<Void> f = new CompletableFuture<>();
if (null == watchReadOnlyBookiesTask) {
- watchReadOnlyBookiesTask = new
WatchTask(bookieReadonlyRegistrationPath);
+ watchReadOnlyBookiesTask = new
WatchTask(bookieReadonlyRegistrationPath, f);
}
watchReadOnlyBookiesTask.addListener(listener);
if (watchReadOnlyBookiesTask.getNumListeners() == 1) {
watchReadOnlyBookiesTask.watch();
}
+ return f;
}
@Override
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
index a42a50d..6b346e1 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
@@ -22,7 +22,13 @@ package org.apache.bookkeeper.client;
*/
import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
@@ -31,6 +37,7 @@ import org.apache.zookeeper.ZooKeeper;
* Test BookKeeperClient which allows access to members we don't
* wish to expose in the public API.
*/
+@Slf4j
public class BookKeeperTestClient extends BookKeeper {
public BookKeeperTestClient(ClientConfiguration conf)
throws IOException, InterruptedException, BKException {
@@ -49,14 +56,44 @@ public class BookKeeperTestClient extends BookKeeper {
return bookieClient;
}
+ public Future<?> waitForReadOnlyBookie(BookieSocketAddress b)
+ throws Exception {
+ return waitForBookieInSet(b, false);
+ }
+
+ public Future<?> waitForWritableBookie(BookieSocketAddress b)
+ throws Exception {
+ return waitForBookieInSet(b, true);
+ }
+
/**
- * Force a read to zookeeper to get list of bookies.
- *
- * @throws InterruptedException
- * @throws KeeperException
+ * Wait for bookie to appear in either the writable set of bookies,
+ * or the read only set of bookies. Also ensure that it doesn't exist
+ * in the other set before completing.
*/
- public void readBookiesBlocking()
- throws InterruptedException, BKException {
- bookieWatcher.readBookiesBlocking();
+ private Future<?> waitForBookieInSet(BookieSocketAddress b,
+ boolean writable) {
+ log.info("Wait for {} to become {}",
+ b, writable ? "writable" : "readonly");
+
+ CompletableFuture<Void> readOnlyFuture = new CompletableFuture<>();
+ CompletableFuture<Void> writableFuture = new CompletableFuture<>();
+
+ RegistrationListener readOnlyListener = (bookies) -> {
+ boolean contains = bookies.getValue().contains(b);
+ if ((!writable && contains) || (writable && !contains)) {
+ readOnlyFuture.complete(null);
+ }
+ };
+ RegistrationListener writableListener = (bookies) -> {
+ boolean contains = bookies.getValue().contains(b);
+ if ((writable && contains) || (!writable && !contains)) {
+ writableFuture.complete(null);
+ }
+ };
+
+ regClient.watchWritableBookies(writableListener);
+ regClient.watchReadOnlyBookies(readOnlyListener);
+ return CompletableFuture.allOf(writableFuture, readOnlyFuture);
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index b5a22f8..83eb8e2 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Bookie;
@@ -558,26 +559,19 @@ public abstract class BookKeeperClusterTestCase {
protected BookieServer startBookie(ServerConfiguration conf)
throws Exception {
BookieServer server = new BookieServer(conf);
- server.start();
-
+ BookieSocketAddress address = Bookie.getBookieAddress(conf);
if (bkc == null) {
bkc = new BookKeeperTestClient(baseClientConf);
}
- int port = conf.getBookiePort();
- String host = InetAddress.getLocalHost().getHostAddress();
- if (conf.getUseHostNameAsBookieID()) {
- host = InetAddress.getLocalHost().getCanonicalHostName();
- }
-
- while (conf.isForceReadOnlyBookie()
- && bkc.getZkHandle().exists(conf.getZkLedgersRootPath()
+"/available/readonly/" + host + ":" + port,
- false) == null) {
- Thread.sleep(100);
- }
+ Future<?> waitForBookie = conf.isForceReadOnlyBookie()
+ ? bkc.waitForReadOnlyBookie(address)
+ : bkc.waitForWritableBookie(address);
+
+ server.start();
- bkc.readBookiesBlocking();
- LOG.info("New bookie on port " + port + " has been created.");
+ waitForBookie.get(30, TimeUnit.SECONDS);
+ LOG.info("New bookie '{}' has been created.", address);
try {
startAutoRecovery(server, conf);
@@ -601,24 +595,20 @@ public abstract class BookKeeperClusterTestCase {
return b;
}
};
- server.start();
+ BookieSocketAddress address = Bookie.getBookieAddress(conf);
if (bkc == null) {
bkc = new BookKeeperTestClient(baseClientConf);
}
+ Future<?> waitForBookie = conf.isForceReadOnlyBookie()
+ ? bkc.waitForReadOnlyBookie(address)
+ : bkc.waitForWritableBookie(address);
- int port = conf.getBookiePort();
- String host = InetAddress.getLocalHost().getHostAddress();
- if (conf.getUseHostNameAsBookieID()) {
- host = InetAddress.getLocalHost().getCanonicalHostName();
- }
- while (bkc.getZkHandle().exists(
- conf.getZkLedgersRootPath() + "/available/" + host + ":" +
port, false) == null) {
- Thread.sleep(500);
- }
+ server.start();
+
+ waitForBookie.get(30, TimeUnit.SECONDS);
+ LOG.info("New bookie '{}' has been created.", address);
- bkc.readBookiesBlocking();
- LOG.info("New bookie on port " + port + " has been created.");
try {
startAutoRecovery(server, conf);
} catch (CompatibilityException ce) {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
index a67e200..6d7ceae 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.fail;
import java.io.File;
import java.util.Enumeration;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
@@ -127,18 +128,13 @@ public class ReadOnlyBookieTest extends
BookKeeperClusterTestCase {
// Expected
}
- // wait for zk to get updated (async) as bookie transitions to
read-only
- while (zkc.exists(baseConf.getZkAvailableBookiesPath() + "/" +
BookKeeperConstants.READONLY + "/"
- + Bookie.getBookieAddress(bsConfs.get(1)).toString(), false)
== null) {
- Thread.sleep(100);
- }
-
+ bkc.waitForReadOnlyBookie(Bookie.getBookieAddress(bsConfs.get(1)))
+ .get(30, TimeUnit.SECONDS);
+
LOG.info("bookie is running {}, readonly {}.", bookie.isRunning(),
bookie.isReadOnly());
assertTrue("Bookie should be running and converted to readonly mode",
bookie.isRunning() && bookie.isReadOnly());
- // refresh the bookkeeper client
- bkc.readBookiesBlocking();
// should fail to create ledger
try {
bkc.createLedger(2, 2, DigestType.MAC, "".getBytes());
@@ -150,18 +146,13 @@ public class ReadOnlyBookieTest extends
BookKeeperClusterTestCase {
// Now add the current ledger dir back to writable dirs list
ledgerDirsManager.addToWritableDirs(testDir, true);
- // since the bookie transitions to write mode asynchronously, we need
to wait for the bookie to be registered on
- // zk
- while (zkc.exists(baseConf.getZkAvailableBookiesPath() + "/"
- + Bookie.getBookieAddress(bsConfs.get(1)).toString(), false)
== null) {
- Thread.sleep(100);
- }
+ bkc.waitForWritableBookie(Bookie.getBookieAddress(bsConfs.get(1)))
+ .get(30, TimeUnit.SECONDS);
LOG.info("bookie is running {}, readonly {}.", bookie.isRunning(),
bookie.isReadOnly());
assertTrue("Bookie should be running and converted back to writable
mode", bookie.isRunning()
&& !bookie.isReadOnly());
- // force client to read bookies
- bkc.readBookiesBlocking();
+
LedgerHandle newLedger = bkc.createLedger(2, 2, DigestType.MAC,
"".getBytes());
for (int i = 0; i < 10; i++) {
newLedger.addEntry("data".getBytes());
@@ -272,7 +263,9 @@ public class ReadOnlyBookieTest extends
BookKeeperClusterTestCase {
startNewBookie();
bs.get(1).getBookie().doTransitionToReadOnlyMode();
try {
- bkc.readBookiesBlocking();
+ bkc.waitForReadOnlyBookie(Bookie.getBookieAddress(bsConfs.get(1)))
+ .get(30, TimeUnit.SECONDS);
+
bkc.createLedger(2, 2, DigestType.CRC32, "".getBytes());
fail("Must throw exception, as there is one readonly bookie");
} catch (BKException e) {
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].