This is an automated email from the ASF dual-hosted git repository.

ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 511fc81  Make bookie watcher an interface
511fc81 is described below

commit 511fc810d753dbdf147a2e2eabb9065192cdf7e5
Author: Ivan Kelly <[email protected]>
AuthorDate: Mon Aug 13 22:51:07 2018 +0200

    Make bookie watcher an interface
    
    So that it can be mocked out easily for testing.
    This patch also contains a simple mock.
    
    Author: Ivan Kelly <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #1594 from ivankelly/bookie-watcher-iface
---
 .../org/apache/bookkeeper/client/BookKeeper.java   |   4 +-
 .../apache/bookkeeper/client/BookieWatcher.java    | 234 ++-------------------
 .../{BookieWatcher.java => BookieWatcherImpl.java} |  34 +--
 3 files changed, 26 insertions(+), 246 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 44f970f..09d2526 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -133,7 +133,7 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
     boolean ownEventLoopGroup = false;
 
     final BookieClient bookieClient;
-    final BookieWatcher bookieWatcher;
+    final BookieWatcherImpl bookieWatcher;
 
     final OrderedExecutor mainWorkerPool;
     final OrderedScheduler scheduler;
@@ -509,7 +509,7 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
         // initialize bookie client
         this.bookieClient = new BookieClient(conf, this.eventLoopGroup, 
this.mainWorkerPool,
                                              scheduler, statsLogger);
-        this.bookieWatcher = new BookieWatcher(
+        this.bookieWatcher = new BookieWatcherImpl(
                 conf, this.placementPolicy, 
metadataDriver.getRegistrationClient(),
                 this.statsLogger.scope(WATCHER_SCOPE));
         if (conf.getDiskWeightBasedPlacementEnabled()) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index d97121d..0f760a9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -17,177 +17,16 @@
  */
 package org.apache.bookkeeper.client;
 
-import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.NEW_ENSEMBLE_TIME;
-import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.REPLACE_BOOKIE_TIME;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.client.BKException.BKInterruptedException;
+
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
-import org.apache.bookkeeper.client.BKException.MetaStoreException;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.MathUtils;
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-/**
- * This class is responsible for maintaining a consistent view of what bookies
- * are available by reading Zookeeper (and setting watches on the bookie 
nodes).
- * When a bookie fails, the other parts of the code turn to this class to find 
a
- * replacement
- *
- */
-@Slf4j
-class BookieWatcher {
-
-    private static final Function<Throwable, BKException> EXCEPTION_FUNC = 
cause -> {
-        if (cause instanceof BKException) {
-            log.error("Failed to get bookie list : ", cause);
-            return (BKException) cause;
-        } else if (cause instanceof InterruptedException) {
-            log.error("Interrupted reading bookie list : ", cause);
-            return new BKInterruptedException();
-        } else {
-            return new MetaStoreException();
-        }
-    };
-
-    private final ClientConfiguration conf;
-    private final RegistrationClient registrationClient;
-    private final EnsemblePlacementPolicy placementPolicy;
-    private final OpStatsLogger newEnsembleTimer;
-    private final OpStatsLogger replaceBookieTimer;
-
-    // Bookies that will not be preferred to be chosen in a new ensemble
-    final Cache<BookieSocketAddress, Boolean> quarantinedBookies;
-
-    private volatile Set<BookieSocketAddress> writableBookies = 
Collections.emptySet();
-    private volatile Set<BookieSocketAddress> readOnlyBookies = 
Collections.emptySet();
-
-    private CompletableFuture<?> initialWritableBookiesFuture = null;
-    private CompletableFuture<?> initialReadonlyBookiesFuture = null;
-
-    public BookieWatcher(ClientConfiguration conf,
-                         EnsemblePlacementPolicy placementPolicy,
-                         RegistrationClient registrationClient,
-                         StatsLogger statsLogger)  {
-        this.conf = conf;
-        this.placementPolicy = placementPolicy;
-        this.registrationClient = registrationClient;
-        this.quarantinedBookies = CacheBuilder.newBuilder()
-                .expireAfterWrite(conf.getBookieQuarantineTimeSeconds(), 
TimeUnit.SECONDS)
-                .removalListener(new RemovalListener<BookieSocketAddress, 
Boolean>() {
-
-                    @Override
-                    public void 
onRemoval(RemovalNotification<BookieSocketAddress, Boolean> bookie) {
-                        log.info("Bookie {} is no longer quarantined", 
bookie.getKey());
-                    }
 
-                }).build();
-        this.newEnsembleTimer = 
statsLogger.getOpStatsLogger(NEW_ENSEMBLE_TIME);
-        this.replaceBookieTimer = 
statsLogger.getOpStatsLogger(REPLACE_BOOKIE_TIME);
-    }
-
-    public Set<BookieSocketAddress> getBookies() throws BKException {
-        try {
-            return FutureUtils.result(registrationClient.getWritableBookies(), 
EXCEPTION_FUNC).getValue();
-        } catch (BKInterruptedException ie) {
-            Thread.currentThread().interrupt();
-            throw ie;
-        }
-    }
-
-    public Set<BookieSocketAddress> getReadOnlyBookies()
-            throws BKException {
-        try {
-            return FutureUtils.result(registrationClient.getReadOnlyBookies(), 
EXCEPTION_FUNC).getValue();
-        } catch (BKInterruptedException ie) {
-            Thread.currentThread().interrupt();
-            throw ie;
-        }
-    }
-
-    // this callback is already not executed in zookeeper thread
-    private synchronized void 
processWritableBookiesChanged(Set<BookieSocketAddress> newBookieAddrs) {
-        // Update watcher outside ZK callback thread, to avoid deadlock in 
case some other
-        // component is trying to do a blocking ZK operation
-        this.writableBookies = newBookieAddrs;
-        placementPolicy.onClusterChanged(newBookieAddrs, readOnlyBookies);
-        // we don't need to close clients here, because:
-        // a. the dead bookies will be removed from topology, which will not 
be used in new ensemble.
-        // b. the read sequence will be reordered based on znode availability, 
so most of the reads
-        //    will not be sent to them.
-        // c. the close here is just to disconnect the channel, which doesn't 
remove the channel from
-        //    from pcbc map. we don't really need to disconnect the channel 
here, since if a bookie is
-        //    really down, PCBC will disconnect itself based on netty 
callback. if we try to disconnect
-        //    here, it actually introduces side-effects on case d.
-        // d. closing the client here will affect latency if the bookie is 
alive but just being flaky
-        //    on its znode registration due zookeeper session expire.
-        // e. if we want to permanently remove a bookkeeper client, we should 
watch on the cookies' list.
-        // if (bk.getBookieClient() != null) {
-        //     bk.getBookieClient().closeClients(deadBookies);
-        // }
-    }
-
-    private synchronized void 
processReadOnlyBookiesChanged(Set<BookieSocketAddress> readOnlyBookies) {
-        this.readOnlyBookies = readOnlyBookies;
-        placementPolicy.onClusterChanged(writableBookies, readOnlyBookies);
-    }
-
-    /**
-     * Blocks until bookies are read from zookeeper, used in the {@link 
BookKeeper} constructor.
-     *
-     * @throws BKException when failed to read bookies
-     */
-    public void initialBlockingBookieRead() throws BKException {
-        CompletableFuture<?> writable;
-        CompletableFuture<?> readonly;
-        synchronized (this) {
-            if (initialReadonlyBookiesFuture == null) {
-                assert initialWritableBookiesFuture == null;
-
-                writable = this.registrationClient.watchWritableBookies(
-                            bookies -> 
processWritableBookiesChanged(bookies.getValue()));
-
-                readonly = this.registrationClient.watchReadOnlyBookies(
-                            bookies -> 
processReadOnlyBookiesChanged(bookies.getValue()));
-                initialWritableBookiesFuture = writable;
-                initialReadonlyBookiesFuture = readonly;
-            } else {
-                writable = initialWritableBookiesFuture;
-                readonly = initialReadonlyBookiesFuture;
-            }
-        }
-
-        try {
-            FutureUtils.result(writable, EXCEPTION_FUNC);
-        } catch (BKInterruptedException ie) {
-            Thread.currentThread().interrupt();
-            throw ie;
-        }
-        try {
-            FutureUtils.result(readonly, EXCEPTION_FUNC);
-        } catch (BKInterruptedException ie) {
-            Thread.currentThread().interrupt();
-            throw ie;
-        } catch (Exception e) {
-            log.error("Failed getReadOnlyBookies: ", e);
-        }
-    }
+interface BookieWatcher {
+    Set<BookieSocketAddress> getBookies() throws BKException;
+    Set<BookieSocketAddress> getReadOnlyBookies() throws BKException;
 
     /**
      * Create an ensemble with given <i>ensembleSize</i> and 
<i>writeQuorumSize</i>.
@@ -199,27 +38,9 @@ class BookieWatcher {
      * @return list of bookies for new ensemble.
      * @throws BKNotEnoughBookiesException
      */
-    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize,
-        int ackQuorumSize, Map<String, byte[]> customMetadata)
-            throws BKNotEnoughBookiesException {
-        long startTime = MathUtils.nowInNano();
-        List<BookieSocketAddress> socketAddresses;
-        try {
-            socketAddresses = placementPolicy.newEnsemble(ensembleSize,
-                    writeQuorumSize, ackQuorumSize, customMetadata, new 
HashSet<BookieSocketAddress>(
-                            quarantinedBookies.asMap().keySet()));
-            // we try to only get from the healthy bookies first
-            newEnsembleTimer.registerSuccessfulEvent(MathUtils.nowInNano() - 
startTime, TimeUnit.NANOSECONDS);
-        } catch (BKNotEnoughBookiesException e) {
-            if (log.isDebugEnabled()) {
-                log.debug("Not enough healthy bookies available, using 
quarantined bookies");
-            }
-            socketAddresses = placementPolicy.newEnsemble(
-                    ensembleSize, writeQuorumSize, ackQuorumSize, 
customMetadata, new HashSet<>());
-            newEnsembleTimer.registerFailedEvent(MathUtils.nowInNano() - 
startTime, TimeUnit.NANOSECONDS);
-        }
-        return socketAddresses;
-    }
+    List<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize,
+                                          int ackQuorumSize, Map<String, 
byte[]> customMetadata)
+            throws BKNotEnoughBookiesException;
 
     /**
      * Choose a bookie to replace bookie <i>bookieIdx</i> in 
<i>existingBookies</i>.
@@ -230,43 +51,16 @@ class BookieWatcher {
      * @return the bookie to replace.
      * @throws BKNotEnoughBookiesException
      */
-    public BookieSocketAddress replaceBookie(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
-                                             Map<String, byte[]> 
customMetadata,
-                                             List<BookieSocketAddress> 
existingBookies, int bookieIdx,
-                                             Set<BookieSocketAddress> 
excludeBookies)
-            throws BKNotEnoughBookiesException {
-        long startTime = MathUtils.nowInNano();
-        BookieSocketAddress addr = existingBookies.get(bookieIdx);
-        BookieSocketAddress socketAddress;
-        try {
-            // we exclude the quarantined bookies also first
-            Set<BookieSocketAddress> existingAndQuarantinedBookies = new 
HashSet<BookieSocketAddress>(existingBookies);
-            
existingAndQuarantinedBookies.addAll(quarantinedBookies.asMap().keySet());
-            socketAddress = placementPolicy.replaceBookie(
-                    ensembleSize, writeQuorumSize, ackQuorumSize, 
customMetadata,
-                    existingAndQuarantinedBookies, addr, excludeBookies);
-            replaceBookieTimer.registerSuccessfulEvent(MathUtils.nowInNano() - 
startTime, TimeUnit.NANOSECONDS);
-        } catch (BKNotEnoughBookiesException e) {
-            if (log.isDebugEnabled()) {
-                log.debug("Not enough healthy bookies available, using 
quarantined bookies");
-            }
-            socketAddress = placementPolicy.replaceBookie(
-                    ensembleSize, writeQuorumSize, ackQuorumSize, 
customMetadata,
-                    new HashSet<BookieSocketAddress>(existingBookies), addr, 
excludeBookies);
-            replaceBookieTimer.registerFailedEvent(MathUtils.nowInNano() - 
startTime, TimeUnit.NANOSECONDS);
-        }
-        return socketAddress;
-    }
+    BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, 
int ackQuorumSize,
+                                      Map<String, byte[]> customMetadata,
+                                      List<BookieSocketAddress> 
existingBookies, int bookieIdx,
+                                      Set<BookieSocketAddress> excludeBookies)
+            throws BKNotEnoughBookiesException;
+
 
     /**
      * Quarantine <i>bookie</i> so it will not be preferred to be chosen for 
new ensembles.
      * @param bookie
      */
-    public void quarantineBookie(BookieSocketAddress bookie) {
-        if (quarantinedBookies.getIfPresent(bookie) == null) {
-            quarantinedBookies.put(bookie, Boolean.TRUE);
-            log.warn("Bookie {} has been quarantined because of read/write 
errors.", bookie);
-        }
-    }
-
+    void quarantineBookie(BookieSocketAddress bookie);
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
similarity index 92%
copy from 
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
copy to 
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
index d97121d..ff707f0 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
@@ -52,7 +52,7 @@ import org.apache.bookkeeper.stats.StatsLogger;
  *
  */
 @Slf4j
-class BookieWatcher {
+class BookieWatcherImpl implements BookieWatcher {
 
     private static final Function<Throwable, BKException> EXCEPTION_FUNC = 
cause -> {
         if (cause instanceof BKException) {
@@ -81,10 +81,10 @@ class BookieWatcher {
     private CompletableFuture<?> initialWritableBookiesFuture = null;
     private CompletableFuture<?> initialReadonlyBookiesFuture = null;
 
-    public BookieWatcher(ClientConfiguration conf,
-                         EnsemblePlacementPolicy placementPolicy,
-                         RegistrationClient registrationClient,
-                         StatsLogger statsLogger)  {
+    public BookieWatcherImpl(ClientConfiguration conf,
+                             EnsemblePlacementPolicy placementPolicy,
+                             RegistrationClient registrationClient,
+                             StatsLogger statsLogger)  {
         this.conf = conf;
         this.placementPolicy = placementPolicy;
         this.registrationClient = registrationClient;
@@ -102,6 +102,7 @@ class BookieWatcher {
         this.replaceBookieTimer = 
statsLogger.getOpStatsLogger(REPLACE_BOOKIE_TIME);
     }
 
+    @Override
     public Set<BookieSocketAddress> getBookies() throws BKException {
         try {
             return FutureUtils.result(registrationClient.getWritableBookies(), 
EXCEPTION_FUNC).getValue();
@@ -111,6 +112,7 @@ class BookieWatcher {
         }
     }
 
+    @Override
     public Set<BookieSocketAddress> getReadOnlyBookies()
             throws BKException {
         try {
@@ -189,16 +191,7 @@ class BookieWatcher {
         }
     }
 
-    /**
-     * Create an ensemble with given <i>ensembleSize</i> and 
<i>writeQuorumSize</i>.
-     *
-     * @param ensembleSize
-     *          Ensemble Size
-     * @param writeQuorumSize
-     *          Write Quorum Size
-     * @return list of bookies for new ensemble.
-     * @throws BKNotEnoughBookiesException
-     */
+    @Override
     public List<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize,
         int ackQuorumSize, Map<String, byte[]> customMetadata)
             throws BKNotEnoughBookiesException {
@@ -221,15 +214,7 @@ class BookieWatcher {
         return socketAddresses;
     }
 
-    /**
-     * Choose a bookie to replace bookie <i>bookieIdx</i> in 
<i>existingBookies</i>.
-     * @param existingBookies
-     *          list of existing bookies.
-     * @param bookieIdx
-     *          index of the bookie in the list to be replaced.
-     * @return the bookie to replace.
-     * @throws BKNotEnoughBookiesException
-     */
+    @Override
     public BookieSocketAddress replaceBookie(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
                                              Map<String, byte[]> 
customMetadata,
                                              List<BookieSocketAddress> 
existingBookies, int bookieIdx,
@@ -262,6 +247,7 @@ class BookieWatcher {
      * Quarantine <i>bookie</i> so it will not be preferred to be chosen for 
new ensembles.
      * @param bookie
      */
+    @Override
     public void quarantineBookie(BookieSocketAddress bookie) {
         if (quarantinedBookies.getIfPresent(bookie) == null) {
             quarantinedBookies.put(bookie, Boolean.TRUE);

Reply via email to