This is an automated email from the ASF dual-hosted git repository.
ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 511fc81 Make bookie watcher an interface
511fc81 is described below
commit 511fc810d753dbdf147a2e2eabb9065192cdf7e5
Author: Ivan Kelly <[email protected]>
AuthorDate: Mon Aug 13 22:51:07 2018 +0200
Make bookie watcher an interface
So that it can be mocked out easily for testing.
This patch also contains a simple mock.
Author: Ivan Kelly <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #1594 from ivankelly/bookie-watcher-iface
---
.../org/apache/bookkeeper/client/BookKeeper.java | 4 +-
.../apache/bookkeeper/client/BookieWatcher.java | 234 ++-------------------
.../{BookieWatcher.java => BookieWatcherImpl.java} | 34 +--
3 files changed, 26 insertions(+), 246 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 44f970f..09d2526 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -133,7 +133,7 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
boolean ownEventLoopGroup = false;
final BookieClient bookieClient;
- final BookieWatcher bookieWatcher;
+ final BookieWatcherImpl bookieWatcher;
final OrderedExecutor mainWorkerPool;
final OrderedScheduler scheduler;
@@ -509,7 +509,7 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
// initialize bookie client
this.bookieClient = new BookieClient(conf, this.eventLoopGroup,
this.mainWorkerPool,
scheduler, statsLogger);
- this.bookieWatcher = new BookieWatcher(
+ this.bookieWatcher = new BookieWatcherImpl(
conf, this.placementPolicy,
metadataDriver.getRegistrationClient(),
this.statsLogger.scope(WATCHER_SCOPE));
if (conf.getDiskWeightBasedPlacementEnabled()) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index d97121d..0f760a9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -17,177 +17,16 @@
*/
package org.apache.bookkeeper.client;
-import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.NEW_ENSEMBLE_TIME;
-import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.REPLACE_BOOKIE_TIME;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.client.BKException.BKInterruptedException;
+
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
-import org.apache.bookkeeper.client.BKException.MetaStoreException;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.MathUtils;
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-/**
- * This class is responsible for maintaining a consistent view of what bookies
- * are available by reading Zookeeper (and setting watches on the bookie
nodes).
- * When a bookie fails, the other parts of the code turn to this class to find
a
- * replacement
- *
- */
-@Slf4j
-class BookieWatcher {
-
- private static final Function<Throwable, BKException> EXCEPTION_FUNC =
cause -> {
- if (cause instanceof BKException) {
- log.error("Failed to get bookie list : ", cause);
- return (BKException) cause;
- } else if (cause instanceof InterruptedException) {
- log.error("Interrupted reading bookie list : ", cause);
- return new BKInterruptedException();
- } else {
- return new MetaStoreException();
- }
- };
-
- private final ClientConfiguration conf;
- private final RegistrationClient registrationClient;
- private final EnsemblePlacementPolicy placementPolicy;
- private final OpStatsLogger newEnsembleTimer;
- private final OpStatsLogger replaceBookieTimer;
-
- // Bookies that will not be preferred to be chosen in a new ensemble
- final Cache<BookieSocketAddress, Boolean> quarantinedBookies;
-
- private volatile Set<BookieSocketAddress> writableBookies =
Collections.emptySet();
- private volatile Set<BookieSocketAddress> readOnlyBookies =
Collections.emptySet();
-
- private CompletableFuture<?> initialWritableBookiesFuture = null;
- private CompletableFuture<?> initialReadonlyBookiesFuture = null;
-
- public BookieWatcher(ClientConfiguration conf,
- EnsemblePlacementPolicy placementPolicy,
- RegistrationClient registrationClient,
- StatsLogger statsLogger) {
- this.conf = conf;
- this.placementPolicy = placementPolicy;
- this.registrationClient = registrationClient;
- this.quarantinedBookies = CacheBuilder.newBuilder()
- .expireAfterWrite(conf.getBookieQuarantineTimeSeconds(),
TimeUnit.SECONDS)
- .removalListener(new RemovalListener<BookieSocketAddress,
Boolean>() {
-
- @Override
- public void
onRemoval(RemovalNotification<BookieSocketAddress, Boolean> bookie) {
- log.info("Bookie {} is no longer quarantined",
bookie.getKey());
- }
- }).build();
- this.newEnsembleTimer =
statsLogger.getOpStatsLogger(NEW_ENSEMBLE_TIME);
- this.replaceBookieTimer =
statsLogger.getOpStatsLogger(REPLACE_BOOKIE_TIME);
- }
-
- public Set<BookieSocketAddress> getBookies() throws BKException {
- try {
- return FutureUtils.result(registrationClient.getWritableBookies(),
EXCEPTION_FUNC).getValue();
- } catch (BKInterruptedException ie) {
- Thread.currentThread().interrupt();
- throw ie;
- }
- }
-
- public Set<BookieSocketAddress> getReadOnlyBookies()
- throws BKException {
- try {
- return FutureUtils.result(registrationClient.getReadOnlyBookies(),
EXCEPTION_FUNC).getValue();
- } catch (BKInterruptedException ie) {
- Thread.currentThread().interrupt();
- throw ie;
- }
- }
-
- // this callback is already not executed in zookeeper thread
- private synchronized void
processWritableBookiesChanged(Set<BookieSocketAddress> newBookieAddrs) {
- // Update watcher outside ZK callback thread, to avoid deadlock in
case some other
- // component is trying to do a blocking ZK operation
- this.writableBookies = newBookieAddrs;
- placementPolicy.onClusterChanged(newBookieAddrs, readOnlyBookies);
- // we don't need to close clients here, because:
- // a. the dead bookies will be removed from topology, which will not
be used in new ensemble.
- // b. the read sequence will be reordered based on znode availability,
so most of the reads
- // will not be sent to them.
- // c. the close here is just to disconnect the channel, which doesn't
remove the channel from
- // from pcbc map. we don't really need to disconnect the channel
here, since if a bookie is
- // really down, PCBC will disconnect itself based on netty
callback. if we try to disconnect
- // here, it actually introduces side-effects on case d.
- // d. closing the client here will affect latency if the bookie is
alive but just being flaky
- // on its znode registration due zookeeper session expire.
- // e. if we want to permanently remove a bookkeeper client, we should
watch on the cookies' list.
- // if (bk.getBookieClient() != null) {
- // bk.getBookieClient().closeClients(deadBookies);
- // }
- }
-
- private synchronized void
processReadOnlyBookiesChanged(Set<BookieSocketAddress> readOnlyBookies) {
- this.readOnlyBookies = readOnlyBookies;
- placementPolicy.onClusterChanged(writableBookies, readOnlyBookies);
- }
-
- /**
- * Blocks until bookies are read from zookeeper, used in the {@link
BookKeeper} constructor.
- *
- * @throws BKException when failed to read bookies
- */
- public void initialBlockingBookieRead() throws BKException {
- CompletableFuture<?> writable;
- CompletableFuture<?> readonly;
- synchronized (this) {
- if (initialReadonlyBookiesFuture == null) {
- assert initialWritableBookiesFuture == null;
-
- writable = this.registrationClient.watchWritableBookies(
- bookies ->
processWritableBookiesChanged(bookies.getValue()));
-
- readonly = this.registrationClient.watchReadOnlyBookies(
- bookies ->
processReadOnlyBookiesChanged(bookies.getValue()));
- initialWritableBookiesFuture = writable;
- initialReadonlyBookiesFuture = readonly;
- } else {
- writable = initialWritableBookiesFuture;
- readonly = initialReadonlyBookiesFuture;
- }
- }
-
- try {
- FutureUtils.result(writable, EXCEPTION_FUNC);
- } catch (BKInterruptedException ie) {
- Thread.currentThread().interrupt();
- throw ie;
- }
- try {
- FutureUtils.result(readonly, EXCEPTION_FUNC);
- } catch (BKInterruptedException ie) {
- Thread.currentThread().interrupt();
- throw ie;
- } catch (Exception e) {
- log.error("Failed getReadOnlyBookies: ", e);
- }
- }
+interface BookieWatcher {
+ Set<BookieSocketAddress> getBookies() throws BKException;
+ Set<BookieSocketAddress> getReadOnlyBookies() throws BKException;
/**
* Create an ensemble with given <i>ensembleSize</i> and
<i>writeQuorumSize</i>.
@@ -199,27 +38,9 @@ class BookieWatcher {
* @return list of bookies for new ensemble.
* @throws BKNotEnoughBookiesException
*/
- public List<BookieSocketAddress> newEnsemble(int ensembleSize, int
writeQuorumSize,
- int ackQuorumSize, Map<String, byte[]> customMetadata)
- throws BKNotEnoughBookiesException {
- long startTime = MathUtils.nowInNano();
- List<BookieSocketAddress> socketAddresses;
- try {
- socketAddresses = placementPolicy.newEnsemble(ensembleSize,
- writeQuorumSize, ackQuorumSize, customMetadata, new
HashSet<BookieSocketAddress>(
- quarantinedBookies.asMap().keySet()));
- // we try to only get from the healthy bookies first
- newEnsembleTimer.registerSuccessfulEvent(MathUtils.nowInNano() -
startTime, TimeUnit.NANOSECONDS);
- } catch (BKNotEnoughBookiesException e) {
- if (log.isDebugEnabled()) {
- log.debug("Not enough healthy bookies available, using
quarantined bookies");
- }
- socketAddresses = placementPolicy.newEnsemble(
- ensembleSize, writeQuorumSize, ackQuorumSize,
customMetadata, new HashSet<>());
- newEnsembleTimer.registerFailedEvent(MathUtils.nowInNano() -
startTime, TimeUnit.NANOSECONDS);
- }
- return socketAddresses;
- }
+ List<BookieSocketAddress> newEnsemble(int ensembleSize, int
writeQuorumSize,
+ int ackQuorumSize, Map<String,
byte[]> customMetadata)
+ throws BKNotEnoughBookiesException;
/**
* Choose a bookie to replace bookie <i>bookieIdx</i> in
<i>existingBookies</i>.
@@ -230,43 +51,16 @@ class BookieWatcher {
* @return the bookie to replace.
* @throws BKNotEnoughBookiesException
*/
- public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
- Map<String, byte[]>
customMetadata,
- List<BookieSocketAddress>
existingBookies, int bookieIdx,
- Set<BookieSocketAddress>
excludeBookies)
- throws BKNotEnoughBookiesException {
- long startTime = MathUtils.nowInNano();
- BookieSocketAddress addr = existingBookies.get(bookieIdx);
- BookieSocketAddress socketAddress;
- try {
- // we exclude the quarantined bookies also first
- Set<BookieSocketAddress> existingAndQuarantinedBookies = new
HashSet<BookieSocketAddress>(existingBookies);
-
existingAndQuarantinedBookies.addAll(quarantinedBookies.asMap().keySet());
- socketAddress = placementPolicy.replaceBookie(
- ensembleSize, writeQuorumSize, ackQuorumSize,
customMetadata,
- existingAndQuarantinedBookies, addr, excludeBookies);
- replaceBookieTimer.registerSuccessfulEvent(MathUtils.nowInNano() -
startTime, TimeUnit.NANOSECONDS);
- } catch (BKNotEnoughBookiesException e) {
- if (log.isDebugEnabled()) {
- log.debug("Not enough healthy bookies available, using
quarantined bookies");
- }
- socketAddress = placementPolicy.replaceBookie(
- ensembleSize, writeQuorumSize, ackQuorumSize,
customMetadata,
- new HashSet<BookieSocketAddress>(existingBookies), addr,
excludeBookies);
- replaceBookieTimer.registerFailedEvent(MathUtils.nowInNano() -
startTime, TimeUnit.NANOSECONDS);
- }
- return socketAddress;
- }
+ BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize,
int ackQuorumSize,
+ Map<String, byte[]> customMetadata,
+ List<BookieSocketAddress>
existingBookies, int bookieIdx,
+ Set<BookieSocketAddress> excludeBookies)
+ throws BKNotEnoughBookiesException;
+
/**
* Quarantine <i>bookie</i> so it will not be preferred to be chosen for
new ensembles.
* @param bookie
*/
- public void quarantineBookie(BookieSocketAddress bookie) {
- if (quarantinedBookies.getIfPresent(bookie) == null) {
- quarantinedBookies.put(bookie, Boolean.TRUE);
- log.warn("Bookie {} has been quarantined because of read/write
errors.", bookie);
- }
- }
-
+ void quarantineBookie(BookieSocketAddress bookie);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
similarity index 92%
copy from
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
copy to
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
index d97121d..ff707f0 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
@@ -52,7 +52,7 @@ import org.apache.bookkeeper.stats.StatsLogger;
*
*/
@Slf4j
-class BookieWatcher {
+class BookieWatcherImpl implements BookieWatcher {
private static final Function<Throwable, BKException> EXCEPTION_FUNC =
cause -> {
if (cause instanceof BKException) {
@@ -81,10 +81,10 @@ class BookieWatcher {
private CompletableFuture<?> initialWritableBookiesFuture = null;
private CompletableFuture<?> initialReadonlyBookiesFuture = null;
- public BookieWatcher(ClientConfiguration conf,
- EnsemblePlacementPolicy placementPolicy,
- RegistrationClient registrationClient,
- StatsLogger statsLogger) {
+ public BookieWatcherImpl(ClientConfiguration conf,
+ EnsemblePlacementPolicy placementPolicy,
+ RegistrationClient registrationClient,
+ StatsLogger statsLogger) {
this.conf = conf;
this.placementPolicy = placementPolicy;
this.registrationClient = registrationClient;
@@ -102,6 +102,7 @@ class BookieWatcher {
this.replaceBookieTimer =
statsLogger.getOpStatsLogger(REPLACE_BOOKIE_TIME);
}
+ @Override
public Set<BookieSocketAddress> getBookies() throws BKException {
try {
return FutureUtils.result(registrationClient.getWritableBookies(),
EXCEPTION_FUNC).getValue();
@@ -111,6 +112,7 @@ class BookieWatcher {
}
}
+ @Override
public Set<BookieSocketAddress> getReadOnlyBookies()
throws BKException {
try {
@@ -189,16 +191,7 @@ class BookieWatcher {
}
}
- /**
- * Create an ensemble with given <i>ensembleSize</i> and
<i>writeQuorumSize</i>.
- *
- * @param ensembleSize
- * Ensemble Size
- * @param writeQuorumSize
- * Write Quorum Size
- * @return list of bookies for new ensemble.
- * @throws BKNotEnoughBookiesException
- */
+ @Override
public List<BookieSocketAddress> newEnsemble(int ensembleSize, int
writeQuorumSize,
int ackQuorumSize, Map<String, byte[]> customMetadata)
throws BKNotEnoughBookiesException {
@@ -221,15 +214,7 @@ class BookieWatcher {
return socketAddresses;
}
- /**
- * Choose a bookie to replace bookie <i>bookieIdx</i> in
<i>existingBookies</i>.
- * @param existingBookies
- * list of existing bookies.
- * @param bookieIdx
- * index of the bookie in the list to be replaced.
- * @return the bookie to replace.
- * @throws BKNotEnoughBookiesException
- */
+ @Override
public BookieSocketAddress replaceBookie(int ensembleSize, int
writeQuorumSize, int ackQuorumSize,
Map<String, byte[]>
customMetadata,
List<BookieSocketAddress>
existingBookies, int bookieIdx,
@@ -262,6 +247,7 @@ class BookieWatcher {
* Quarantine <i>bookie</i> so it will not be preferred to be chosen for
new ensembles.
* @param bookie
*/
+ @Override
public void quarantineBookie(BookieSocketAddress bookie) {
if (quarantinedBookies.getIfPresent(bookie) == null) {
quarantinedBookies.put(bookie, Boolean.TRUE);