Repository: bookkeeper
Updated Branches:
  refs/heads/master 96adbf1d6 -> c8255f8c5


BookKeeper client should try not to use bookies with errors/timeouts when 
forming a new ensemble

…hen forming ensembles

Author: Siddharth Boobna <[email protected]>

Reviewers: Sijie Guo <[email protected]>

Closes #11 from sboobna/BOOKKEEPER-889


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/c8255f8c
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/c8255f8c
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/c8255f8c

Branch: refs/heads/master
Commit: c8255f8c5f73a2353293ec1d9612dbb98f291ccc
Parents: 96adbf1
Author: Siddharth Boobna <[email protected]>
Authored: Mon Mar 7 22:06:40 2016 -0800
Committer: Sijie Guo <[email protected]>
Committed: Mon Mar 7 22:06:40 2016 -0800

----------------------------------------------------------------------
 .../apache/bookkeeper/client/BKException.java   |  10 ++
 .../apache/bookkeeper/client/BookKeeper.java    |  26 ++++
 .../apache/bookkeeper/client/BookieWatcher.java |  54 ++++++-
 .../bookkeeper/conf/ClientConfiguration.java    | 109 ++++++++++++++
 .../apache/bookkeeper/proto/BookieClient.java   |  27 +++-
 .../DefaultPerChannelBookieClientPool.java      |  12 +-
 .../proto/PerChannelBookieClient.java           |  97 ++++++++----
 .../proto/PerChannelBookieClientFactory.java    |   2 +-
 .../proto/PerChannelBookieClientPool.java       |   5 +
 .../client/TestBookieHealthCheck.java           | 149 +++++++++++++++++++
 .../test/BookKeeperClusterTestCase.java         |  33 ++++
 .../bookkeeper/test/ReadOnlyBookieTest.java     |  14 +-
 12 files changed, 497 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index b2355cd..f2f150e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -98,6 +98,8 @@ public abstract class BKException extends Exception {
             return new BKAddEntryQuorumTimeoutException();
         case Code.DuplicateEntryIdException:
             return new BKDuplicateEntryIdException();
+        case Code.TimeoutException:
+            return new BKTimeoutException();
         default:
             return new BKUnexpectedConditionException();
         }
@@ -131,6 +133,7 @@ public abstract class BKException extends Exception {
         int LedgerExistException = -20;
         int AddEntryQuorumTimeoutException = -21;
         int DuplicateEntryIdException = -22;
+        int TimeoutException = -23;
 
         int IllegalOpException = -100;
         int LedgerFencedException = -101;
@@ -213,6 +216,8 @@ public abstract class BKException extends Exception {
             return "Invalid operation";
         case Code.AddEntryQuorumTimeoutException:
             return "Add entry quorum wait timed out";
+        case Code.TimeoutException:
+            return "Bookie operation timeout";
         default:
             return "Unexpected condition";
         }
@@ -392,4 +397,9 @@ public abstract class BKException extends Exception {
         }
     }
 
+    public static class BKTimeoutException extends BKException {
+        public BKTimeoutException() {
+            super(Code.TimeoutException);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index ed744b0..f354bef 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -21,6 +21,7 @@
 package org.apache.bookkeeper.client;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -28,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -37,6 +39,7 @@ import org.apache.bookkeeper.meta.CleanupLedgerManager;
 import org.apache.bookkeeper.meta.LedgerIdGenerator;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -44,6 +47,7 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.configuration.ConfigurationException;
@@ -312,6 +316,8 @@ public class BookKeeper {
         this.ledgerManagerFactory = 
LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
         this.ledgerManager = new 
CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
         this.ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator();
+
+        scheduleBookieHealthCheckIfEnabled();
     }
 
     private EnsemblePlacementPolicy 
initializeEnsemblePlacementPolicy(ClientConfiguration conf)
@@ -336,6 +342,26 @@ public class BookKeeper {
         }
     }
 
+    void scheduleBookieHealthCheckIfEnabled() {
+        if (conf.isBookieHealthCheckEnabled()) {
+            scheduler.scheduleAtFixedRate(new SafeRunnable() {
+
+                @Override
+                public void safeRun() {
+                    checkForFaultyBookies();
+                }
+                    }, conf.getBookieHealthCheckIntervalSeconds(), 
conf.getBookieHealthCheckIntervalSeconds(),
+                    TimeUnit.SECONDS);
+        }
+    }
+
+    void checkForFaultyBookies() {
+        List<BookieSocketAddress> faultyBookies = 
bookieClient.getFaultyBookies();
+        for (BookieSocketAddress faultyBookie : faultyBookies) {
+            bookieWatcher.quarantineBookie(faultyBookie);
+        }
+    }
+
     LedgerManager getLedgerManager() {
         return ledgerManager;
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index 65a3417..cadec5d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -45,6 +45,11 @@ import org.apache.zookeeper.ZooDefs.Ids;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
 /**
  * This class is responsible for maintaining a consistent view of what bookies
  * are available by reading Zookeeper (and setting watches on the bookie 
nodes).
@@ -57,6 +62,7 @@ class BookieWatcher implements Watcher, ChildrenCallback {
 
     public static int ZK_CONNECT_BACKOFF_SEC = 1;
     private static final Set<BookieSocketAddress> EMPTY_SET = new 
HashSet<BookieSocketAddress>();
+    private static final Boolean BOOLEAN = new Boolean(true);
 
     // Bookie registration path in ZK
     private final String bookieRegistrationPath;
@@ -65,6 +71,9 @@ class BookieWatcher implements Watcher, ChildrenCallback {
     final ScheduledExecutorService scheduler;
     final EnsemblePlacementPolicy placementPolicy;
 
+    // Bookies that will not be preferred to be chosen in a new ensemble
+    final Cache<BookieSocketAddress, Boolean> quarantinedBookies;
+
     SafeRunnable reReadTask = new SafeRunnable() {
         @Override
         public void safeRun() {
@@ -83,6 +92,16 @@ class BookieWatcher implements Watcher, ChildrenCallback {
         this.scheduler = scheduler;
         this.placementPolicy = placementPolicy;
         readOnlyBookieWatcher = new ReadOnlyBookieWatcher(conf, bk);
+        this.quarantinedBookies = CacheBuilder.newBuilder()
+                .expireAfterWrite(conf.getBookieQuarantineTimeSeconds(), 
TimeUnit.SECONDS)
+                .removalListener(new RemovalListener<BookieSocketAddress, 
Boolean>() {
+
+                    @Override
+                    public void 
onRemoval(RemovalNotification<BookieSocketAddress, Boolean> bookie) {
+                        logger.info("Bookie {} is no longer quarantined", 
bookie.getKey());
+                    }
+
+                }).build();
     }
 
     void notifyBookiesChanged(final BookiesListener listener) throws 
BKException {
@@ -235,7 +254,16 @@ class BookieWatcher implements Watcher, ChildrenCallback {
      */
     public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize)
             throws BKNotEnoughBookiesException {
-        return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, 
EMPTY_SET);
+        try {
+            // we try to only get from the healthy bookies first
+            return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, 
new HashSet<BookieSocketAddress>(
+                    quarantinedBookies.asMap().keySet()));
+        } catch (BKNotEnoughBookiesException e) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Not enough healthy bookies available, using 
quarantined bookies");
+            }
+            return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, 
EMPTY_SET);
+        }
     }
 
     /**
@@ -250,7 +278,29 @@ class BookieWatcher implements Watcher, ChildrenCallback {
     public BookieSocketAddress replaceBookie(List<BookieSocketAddress> 
existingBookies, int bookieIdx)
             throws BKNotEnoughBookiesException {
         BookieSocketAddress addr = existingBookies.get(bookieIdx);
-        return placementPolicy.replaceBookie(addr, new 
HashSet<BookieSocketAddress>(existingBookies));
+        try {
+            // we exclude the quarantined bookies also first
+            Set<BookieSocketAddress> existingAndQuarantinedBookies = new 
HashSet<BookieSocketAddress>(existingBookies);
+            
existingAndQuarantinedBookies.addAll(quarantinedBookies.asMap().keySet());
+            return placementPolicy.replaceBookie(addr, 
existingAndQuarantinedBookies);
+        } catch (BKNotEnoughBookiesException e) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Not enough healthy bookies available, using 
quarantined bookies");
+            }
+            return placementPolicy.replaceBookie(addr, new 
HashSet<BookieSocketAddress>(existingBookies));
+        }
+    }
+
+    /**
+     * Quarantine <i>bookie</i> so it will not be preferred to be chosen for 
new ensembles.
+     * @param bookie
+     * @return
+     */
+    public void quarantineBookie(BookieSocketAddress bookie) {
+        if (quarantinedBookies.getIfPresent(bookie) == null) {
+            quarantinedBookies.put(bookie, BOOLEAN);
+            logger.warn("Bookie {} has been quarantined because of read/write 
errors.", bookie);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 7a99559..d0750d3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -65,6 +65,12 @@ public class ClientConfiguration extends 
AbstractConfiguration {
     protected final static String PCBC_TIMEOUT_TIMER_TICK_DURATION_MS = 
"pcbcTimeoutTimerTickDurationMs";
     protected final static String PCBC_TIMEOUT_TIMER_NUM_TICKS = 
"pcbcTimeoutTimerNumTicks";
 
+    // Bookie health check settings
+    protected final static String BOOKIE_HEALTH_CHECK_ENABLED = 
"bookieHealthCheckEnabled";
+    protected final static String BOOKIE_HEALTH_CHECK_INTERVAL_SECONDS = 
"bookieHealthCheckIntervalSeconds";
+    protected final static String BOOKIE_ERROR_THRESHOLD_PER_INTERVAL = 
"bookieErrorThresholdPerInterval";
+    protected final static String BOOKIE_QUARANTINE_TIME_SECONDS = 
"bookieQuarantineTimeSeconds";
+
     // Number Woker Threads
     protected final static String NUM_WORKER_THREADS = "numWorkerThreads";
 
@@ -691,4 +697,107 @@ public class ClientConfiguration extends 
AbstractConfiguration {
         setProperty(TASK_EXECUTION_WARN_TIME_MICROS, warnTime);
         return this;
     }
+
+    /**
+     * Check if bookie health check is enabled.
+     * 
+     * @return
+     */
+    public boolean isBookieHealthCheckEnabled() {
+        return getBoolean(BOOKIE_HEALTH_CHECK_ENABLED, false);
+    }
+
+    /**
+     * Enables the bookie health check.
+     * 
+     * <p>
+     * If the number of read/write errors for a bookie exceeds {@link 
#getBookieErrorThresholdPerInterval()} per
+     * interval, that bookie is quarantined for {@link 
#getBookieQuarantineTimeSeconds()} seconds. During this
+     * quarantined period, the client will try not to use this bookie when 
creating new ensembles.
+     * </p>
+     * 
+     * By default, the bookie health check is <b>disabled</b>.
+     * 
+     * @return client configuration
+     */
+    public ClientConfiguration enableBookieHealthCheck() {
+        setProperty(BOOKIE_HEALTH_CHECK_ENABLED, true);
+        return this;
+    }
+
+    /**
+     * Get the bookie health check interval in seconds.
+     * 
+     * @return
+     */
+    public int getBookieHealthCheckIntervalSeconds() {
+        return getInt(BOOKIE_HEALTH_CHECK_INTERVAL_SECONDS, 60);
+    }
+
+    /**
+     * Set the bookie health check interval. Default is 60 seconds.
+     * 
+     * <p>
+     * Note: Please {@link #enableBookieHealthCheck()} to use this 
configuration.
+     * </p>
+     * 
+     * @param interval
+     * @param unit
+     * @return client configuration
+     */
+    public ClientConfiguration setBookieHealthCheckInterval(int interval, 
TimeUnit unit) {
+        setProperty(BOOKIE_HEALTH_CHECK_INTERVAL_SECONDS, 
unit.toSeconds(interval));
+        return this;
+    }
+
+    /**
+     * Get the error threshold for a bookie to be quarantined.
+     * 
+     * @return
+     */
+    public long getBookieErrorThresholdPerInterval() {
+        return getLong(BOOKIE_ERROR_THRESHOLD_PER_INTERVAL, 100);
+    }
+
+    /**
+     * Set the error threshold per interval ({@link 
#getBookieHealthCheckIntervalSeconds()}) for a bookie before it is
+     * quarantined. Default is 100 errors per minute.
+     * 
+     * <p>
+     * Note: Please {@link #enableBookieHealthCheck()} to use this 
configuration.
+     * </p>
+     * 
+     * @param threshold
+     * @param unit
+     * @return client configuration
+     */
+    public ClientConfiguration setBookieErrorThresholdPerInterval(long 
thresholdPerInterval) {
+        setProperty(BOOKIE_ERROR_THRESHOLD_PER_INTERVAL, thresholdPerInterval);
+        return this;
+    }
+
+    /**
+     * Get the time for which a bookie will be quarantined.
+     * 
+     * @return
+     */
+    public int getBookieQuarantineTimeSeconds() {
+        return getInt(BOOKIE_QUARANTINE_TIME_SECONDS, 1800);
+    }
+
+    /**
+     * Set the time for which a bookie will be quarantined. Default is 30 
minutes.
+     * 
+     * <p>
+     * Note: Please {@link #enableBookieHealthCheck()} to use this 
configuration.
+     * </p>
+     * 
+     * @param quarantineTime
+     * @param unit
+     * @return client configuration
+     */
+    public ClientConfiguration setBookieQuarantineTime(int quarantineTime, 
TimeUnit unit) {
+        setProperty(BOOKIE_QUARANTINE_TIME_SECONDS, 
unit.toSeconds(quarantineTime));
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 909cdd0..8a79547 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -23,15 +23,13 @@ package org.apache.bookkeeper.proto;
 import static com.google.common.base.Charsets.UTF_8;
 
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -52,6 +50,9 @@ import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * Implements the client-side part of the BookKeeper protocol.
  *
@@ -70,6 +71,8 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
     private final StatsLogger statsLogger;
     private final int numConnectionsPerBookie;
 
+    private final long bookieErrorThresholdPerInterval;
+
     public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory 
channelFactory, OrderedSafeExecutor executor) {
         this(conf, channelFactory, executor, NullStatsLogger.INSTANCE);
     }
@@ -87,6 +90,7 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
                 new 
ThreadFactoryBuilder().setNameFormat("BookieClientTimer-%d").build(),
                 conf.getPCBCTimeoutTimerTickDurationMs(), 
TimeUnit.MILLISECONDS,
                 conf.getPCBCTimeoutTimerNumTicks());
+        this.bookieErrorThresholdPerInterval = 
conf.getBookieErrorThresholdPerInterval();
     }
 
     private int getRc(int rc) {
@@ -101,10 +105,23 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
         }
     }
 
+    public List<BookieSocketAddress> getFaultyBookies() {
+        List<BookieSocketAddress> faultyBookies = Lists.newArrayList();
+        for (PerChannelBookieClientPool channelPool : channels.values()) {
+            if (channelPool instanceof DefaultPerChannelBookieClientPool) {
+                DefaultPerChannelBookieClientPool pool = 
(DefaultPerChannelBookieClientPool) channelPool;
+                if (pool.errorCounter.getAndSet(0) >= 
bookieErrorThresholdPerInterval) {
+                    faultyBookies.add(pool.address);
+                }
+            }
+        }
+        return faultyBookies;
+    }
+
     @Override
-    public PerChannelBookieClient create(BookieSocketAddress address) {
+    public PerChannelBookieClient create(BookieSocketAddress address, 
PerChannelBookieClientPool pcbcPool) {
         return new PerChannelBookieClient(conf, executor, channelFactory, 
address,
-                                          requestTimer, statsLogger);
+                                          requestTimer, statsLogger, pcbcPool);
     }
 
     private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr, 
Object key) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
index 920515b..2658634 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
@@ -20,7 +20,8 @@
  */
 package org.apache.bookkeeper.proto;
 
-import com.google.common.base.Preconditions;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.util.MathUtils;
@@ -28,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.base.Preconditions;
 
 /**
  *  Provide a simple round-robin style channel pool. We could improve it later 
to do more
@@ -42,6 +44,7 @@ class DefaultPerChannelBookieClientPool implements 
PerChannelBookieClientPool,
     final BookieSocketAddress address;
     final PerChannelBookieClient[] clients;
     final AtomicInteger counter = new AtomicInteger(0);
+    final AtomicLong errorCounter = new AtomicLong(0);
 
     DefaultPerChannelBookieClientPool(PerChannelBookieClientFactory factory,
                                       BookieSocketAddress address,
@@ -51,7 +54,7 @@ class DefaultPerChannelBookieClientPool implements 
PerChannelBookieClientPool,
         this.address = address;
         this.clients = new PerChannelBookieClient[coreSize];
         for (int i = 0; i < coreSize; i++) {
-            this.clients[i] = factory.create(address);
+            this.clients[i] = factory.create(address, this);
         }
     }
 
@@ -78,6 +81,11 @@ class DefaultPerChannelBookieClientPool implements 
PerChannelBookieClientPool,
     }
 
     @Override
+    public void recordError() {
+        errorCounter.incrementAndGet();
+    }
+
+    @Override
     public void disconnect(boolean wait) {
         for (PerChannelBookieClient pcbc : clients) {
             pcbc.disconnect(wait);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 6d8058f..2bd4e9b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -20,17 +20,21 @@ package org.apache.bookkeeper.proto;
 import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayDeque;
+import java.util.Collections;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import com.google.protobuf.ByteString;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeperClientStats;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
@@ -41,12 +45,9 @@ import 
org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
@@ -76,6 +77,9 @@ import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+
 /**
  * This class manages all details of connection to a particular bookie. It also
  * has reconnect logic if a connection to a bookie fails.
@@ -85,6 +89,16 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
 
     static final Logger LOG = 
LoggerFactory.getLogger(PerChannelBookieClient.class);
 
+    // this set contains the bookie error return codes that we do not consider 
for a bookie to be "faulty"
+    private static final Set<Integer> expectedBkOperationErrors = 
Collections.unmodifiableSet(Sets
+            .newHashSet(BKException.Code.BookieHandleNotAvailableException,
+                        BKException.Code.NoSuchEntryException,
+                        BKException.Code.NoSuchLedgerExistsException,
+                        BKException.Code.LedgerFencedException,
+                        BKException.Code.LedgerExistException,
+                        BKException.Code.DuplicateEntryIdException,
+                        BKException.Code.WriteOnReadOnlyBookieException));
+
     public static final int MAX_FRAME_LENGTH = 2 * 1024 * 1024; // 2M
     public static final AtomicLong txnIdGenerator = new AtomicLong(0);
 
@@ -119,14 +133,17 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
     final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
     private final ClientConfiguration conf;
 
+    private final PerChannelBookieClientPool pcbcPool;
+
     public PerChannelBookieClient(OrderedSafeExecutor executor, 
ClientSocketChannelFactory channelFactory,
                                   BookieSocketAddress addr) {
-        this(new ClientConfiguration(), executor, channelFactory, addr, null, 
NullStatsLogger.INSTANCE);
+        this(new ClientConfiguration(), executor, channelFactory, addr, null, 
NullStatsLogger.INSTANCE, null);
     }
 
     public PerChannelBookieClient(ClientConfiguration conf, 
OrderedSafeExecutor executor,
                                   ClientSocketChannelFactory channelFactory, 
BookieSocketAddress addr,
-                                  HashedWheelTimer requestTimer, StatsLogger 
parentStatsLogger) {
+                                  HashedWheelTimer requestTimer, StatsLogger 
parentStatsLogger,
+                                  PerChannelBookieClientPool pcbcPool) {
         this.conf = conf;
         this.addr = addr;
         this.executor = executor;
@@ -147,6 +164,8 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
         addEntryOpLogger = 
statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_ADD_OP);
         readTimeoutOpLogger = 
statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ);
         addTimeoutOpLogger = 
statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD);
+
+        this.pcbcPool = pcbcPool;
     }
 
     private void completeOperation(GenericCallback<PerChannelBookieClient> op, 
int rc) {
@@ -299,7 +318,7 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
         final int entrySize = toSend.readableBytes();
         final CompletionKey completionKey = new CompletionKey(txnId, 
OperationType.ADD_ENTRY);
         completionObjects.put(completionKey,
-                new AddCompletion(addEntryOpLogger, cb, ctx, ledgerId, entryId,
+                new AddCompletion(this, addEntryOpLogger, cb, ctx, ledgerId, 
entryId,
                                   scheduleTimeout(completionKey, 
addEntryTimeout)));
 
         // Build the request and calculate the total size to be included in 
the packet.
@@ -360,7 +379,7 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
         final long txnId = getTxnId();
         final CompletionKey completionKey = new CompletionKey(txnId, 
OperationType.READ_ENTRY);
         completionObjects.put(completionKey,
-                new ReadCompletion(readEntryOpLogger, cb, ctx, ledgerId, 
entryId,
+                new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, 
entryId,
                                    scheduleTimeout(completionKey, 
readEntryTimeout)));
 
         // Build the request and calculate the total size to be included in 
the packet.
@@ -415,7 +434,7 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
         final long txnId = getTxnId();
         final CompletionKey completionKey = new CompletionKey(txnId, 
OperationType.READ_ENTRY);
         completionObjects.put(completionKey,
-                new ReadCompletion(readEntryOpLogger, cb, ctx, ledgerId, 
entryId,
+                new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, 
entryId,
                                    scheduleTimeout(completionKey, 
readEntryTimeout)));
 
         // Build the request and calculate the total size to be included in 
the packet.
@@ -619,6 +638,12 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
         }
     }
 
+    void recordError() {
+        if (pcbcPool != null) {
+            pcbcPool.recordError();
+        }
+    }
+
     /**
      * In the netty pipeline, we need to split packets based on length, so we
      * use the {@link LengthFieldBasedFrameDecoder}. Other than that all 
actions
@@ -832,27 +857,34 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
     static class ReadCompletion extends CompletionValue {
         final ReadEntryCallback cb;
 
-        public ReadCompletion(ReadEntryCallback cb, Object ctx,
+        public ReadCompletion(final PerChannelBookieClient pcbc, 
ReadEntryCallback cb, Object ctx,
                               long ledgerId, long entryId) {
-            this(null, cb, ctx, ledgerId, entryId, null);
+            this(pcbc, null, cb, ctx, ledgerId, entryId, null);
         }
 
-        public ReadCompletion(final OpStatsLogger readEntryOpLogger,
+        public ReadCompletion(final PerChannelBookieClient pcbc, final 
OpStatsLogger readEntryOpLogger,
                               final ReadEntryCallback originalCallback,
                               final Object originalCtx, final long ledgerId, 
final long entryId,
                               final Timeout timeout) {
             super(originalCtx, ledgerId, entryId, timeout);
             final long startTime = MathUtils.nowInNano();
-            this.cb = null == readEntryOpLogger ? originalCallback : new 
ReadEntryCallback() {
+            this.cb = new ReadEntryCallback() {
                 @Override
                 public void readEntryComplete(int rc, long ledgerId, long 
entryId, ChannelBuffer buffer, Object ctx) {
                     cancelTimeout();
-                    long latency = MathUtils.elapsedNanos(startTime);
-                    if (rc != BKException.Code.OK) {
-                        readEntryOpLogger.registerFailedEvent(latency, 
TimeUnit.NANOSECONDS);
-                    } else {
-                        readEntryOpLogger.registerSuccessfulEvent(latency, 
TimeUnit.NANOSECONDS);
+                    if (readEntryOpLogger != null) {
+                        long latency = MathUtils.elapsedNanos(startTime);
+                        if (rc != BKException.Code.OK) {
+                            readEntryOpLogger.registerFailedEvent(latency, 
TimeUnit.NANOSECONDS);
+                        } else {
+                            readEntryOpLogger.registerSuccessfulEvent(latency, 
TimeUnit.NANOSECONDS);
+                        }
+                    }
+
+                    if (rc != BKException.Code.OK && 
!expectedBkOperationErrors.contains(rc)) {
+                        pcbc.recordError();
                     }
+
                     originalCallback.readEntryComplete(rc, ledgerId, entryId, 
buffer, originalCtx);
                 }
             };
@@ -863,12 +895,12 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
     static class AddCompletion extends CompletionValue {
         final WriteCallback cb;
 
-        public AddCompletion(WriteCallback cb, Object ctx,
+        public AddCompletion(final PerChannelBookieClient pcbc, WriteCallback 
cb, Object ctx,
                              long ledgerId, long entryId) {
-            this(null, cb, ctx, ledgerId, entryId, null);
+            this(pcbc, null, cb, ctx, ledgerId, entryId, null);
         }
 
-        public AddCompletion(final OpStatsLogger addEntryOpLogger,
+        public AddCompletion(final PerChannelBookieClient pcbc, final 
OpStatsLogger addEntryOpLogger,
                              final WriteCallback originalCallback,
                              final Object originalCtx, final long ledgerId, 
final long entryId,
                              final Timeout timeout) {
@@ -878,12 +910,19 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
                 @Override
                 public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
                     cancelTimeout();
-                    long latency = MathUtils.elapsedNanos(startTime);
-                    if (rc != BKException.Code.OK) {
-                        addEntryOpLogger.registerFailedEvent(latency, 
TimeUnit.NANOSECONDS);
-                    } else {
-                        addEntryOpLogger.registerSuccessfulEvent(latency, 
TimeUnit.NANOSECONDS);
+                    if (pcbc.addEntryOpLogger != null) {
+                        long latency = MathUtils.elapsedNanos(startTime);
+                        if (rc != BKException.Code.OK) {
+                            pcbc.addEntryOpLogger.registerFailedEvent(latency, 
TimeUnit.NANOSECONDS);
+                        } else {
+                            
pcbc.addEntryOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
+                        }
                     }
+
+                    if (rc != BKException.Code.OK && 
!expectedBkOperationErrors.contains(rc)) {
+                        pcbc.recordError();
+                    }
+
                     originalCallback.writeComplete(rc, ledgerId, entryId, 
addr, originalCtx);
                 }
             };
@@ -943,10 +982,10 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
                 return;
             }
             if (OperationType.ADD_ENTRY == operationType) {
-                errorOutAddKey(this);
+                errorOutAddKey(this, BKException.Code.TimeoutException);
                 addTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), 
TimeUnit.NANOSECONDS);
             } else {
-                errorOutReadKey(this);
+                errorOutReadKey(this, BKException.Code.TimeoutException);
                 readTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), 
TimeUnit.NANOSECONDS);
             }
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java
index bd45e92..69d559e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java
@@ -33,5 +33,5 @@ interface PerChannelBookieClientFactory {
      *
      * @return the client connected to address.
      */
-    PerChannelBookieClient create(BookieSocketAddress address);
+    PerChannelBookieClient create(BookieSocketAddress address, 
PerChannelBookieClientPool pcbcPool);
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
index 81b3ba7..4a735f9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
@@ -41,6 +41,11 @@ interface PerChannelBookieClientPool {
     void obtain(GenericCallback<PerChannelBookieClient> callback);
 
     /**
+     * record any read/write error on {@link PerChannelBookieClientPool}
+     */
+    void recordError();
+
+    /**
      * Disconnect the connections in the pool.
      *
      * @param wait

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieHealthCheck.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieHealthCheck.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieHealthCheck.java
new file mode 100644
index 0000000..33be97c
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieHealthCheck.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.client;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestBookieHealthCheck extends BookKeeperClusterTestCase {
+    private final static Logger LOG = 
LoggerFactory.getLogger(TestBookieHealthCheck.class);
+
+    public TestBookieHealthCheck() {
+        super(4);
+        baseClientConf.setAddEntryTimeout(1);
+        baseClientConf.enableBookieHealthCheck();
+        baseClientConf.setBookieHealthCheckInterval(1, TimeUnit.SECONDS);
+        baseClientConf.setBookieErrorThresholdPerInterval(1);
+        baseClientConf.setBookieQuarantineTime(5, TimeUnit.SECONDS);
+    }
+
+    @Test(timeout = 60000)
+    public void testBkQuarantine() throws Exception {
+        LedgerHandle lh = bkc.createLedger(2, 2, 2, 
BookKeeper.DigestType.CRC32, new byte[] {});
+
+        final int numEntries = 10;
+        for (int i = 0; i < numEntries; i++) {
+            byte[] msg = ("msg-" + i).getBytes();
+            lh.addEntry(msg);
+        }
+
+        BookieSocketAddress bookieToQuarantine = 
lh.getLedgerMetadata().getEnsemble(numEntries).get(0);
+        sleepBookie(bookieToQuarantine, baseClientConf.getAddEntryTimeout() * 
2).await();
+
+        byte[] tempMsg = "temp-msg".getBytes();
+        lh.asyncAddEntry(tempMsg, new AddCallback() {
+
+            @Override
+            public void addComplete(int rc, LedgerHandle lh, long entryId, 
Object ctx) {
+                // no-op
+            }
+        }, null);
+
+        // make sure the add entry timeouts
+        Thread.sleep(baseClientConf.getAddEntryTimeout() * 2 * 1000);
+
+        // make sure the health check runs once after the timeout
+        Thread.sleep(baseClientConf.getBookieHealthCheckIntervalSeconds() * 2 
* 1000);
+
+        // the bookie watcher should contain the bookieToQuarantine in the 
quarantine set
+        
Assert.assertTrue(bkc.bookieWatcher.quarantinedBookies.asMap().containsKey(bookieToQuarantine));
+
+        // the bookie to be left out of the ensemble should always be the 
quarantined bookie
+        LedgerHandle lh1 = bkc.createLedger(2, 2, 2, 
BookKeeper.DigestType.CRC32, new byte[] {});
+        LedgerHandle lh2 = bkc.createLedger(3, 3, 3, 
BookKeeper.DigestType.CRC32, new byte[] {});
+        
Assert.assertFalse(lh1.getLedgerMetadata().getEnsemble(0).contains(bookieToQuarantine));
+        
Assert.assertFalse(lh2.getLedgerMetadata().getEnsemble(0).contains(bookieToQuarantine));
+
+        // the quarantined bookie can still be in the ensemble if we do not 
have enough healthy bookies
+        LedgerHandle lh3 = bkc.createLedger(4, 4, 4, 
BookKeeper.DigestType.CRC32, new byte[] {});
+        
Assert.assertTrue(lh3.getLedgerMetadata().getEnsemble(0).contains(bookieToQuarantine));
+
+        // make sure faulty bookie is out of quarantine
+        Thread.sleep(baseClientConf.getBookieQuarantineTimeSeconds() * 1000);
+
+        // the bookie should not be quarantined anymore
+        
Assert.assertFalse(bkc.bookieWatcher.quarantinedBookies.asMap().containsKey(bookieToQuarantine));
+    }
+
+    @Test(timeout = 60000)
+    public void testNoQuarantineOnBkRestart() throws Exception {
+        final LedgerHandle lh = bkc.createLedger(2, 2, 2, 
BookKeeper.DigestType.CRC32, new byte[] {});
+        final int numEntries = 20;
+        BookieSocketAddress bookieToRestart = 
lh.getLedgerMetadata().getEnsemble(0).get(0);
+
+        // we add entries on a separate thread so that we can restart a bookie 
on the current thread
+        Thread addEntryThread = new Thread() {
+            public void run() {
+                for (int i = 0; i < numEntries; i++) {
+                    byte[] msg = ("msg-" + i).getBytes();
+                    try {
+                        lh.addEntry(msg);
+                        // we add sufficient sleep to make sure all entries 
are not added before we restart the bookie
+                        Thread.sleep(100);
+                    } catch (Exception e) {
+                        LOG.error("Error sending msg");
+                    }
+                }
+            }
+        };
+        addEntryThread.start();
+        restartBookie(bookieToRestart);
+
+        // make sure the health check runs once
+        Thread.sleep(baseClientConf.getBookieHealthCheckIntervalSeconds() * 2 
* 1000);
+
+        // the bookie watcher should not contain the bookieToRestart in the 
quarantine set
+        
Assert.assertFalse(bkc.bookieWatcher.quarantinedBookies.asMap().containsKey(bookieToRestart));
+    }
+
+    @Test(timeout = 60000)
+    public void testNoQuarantineOnExpectedBkErrors() throws Exception {
+        final LedgerHandle lh = bkc.createLedger(2, 2, 2, 
BookKeeper.DigestType.CRC32, new byte[] {});
+        final int numEntries = 10;
+        for (int i = 0; i < numEntries; i++) {
+            byte[] msg = ("msg-" + i).getBytes();
+            lh.addEntry(msg);
+        }
+        BookieSocketAddress bookie1 = 
lh.getLedgerMetadata().getEnsemble(0).get(0);
+        BookieSocketAddress bookie2 = 
lh.getLedgerMetadata().getEnsemble(0).get(1);
+        try {
+            // we read an entry that is not added
+            lh.readEntries(10, 10);
+        } catch (BKException e) {
+            // ok
+        }
+
+        // make sure the health check runs once
+        Thread.sleep(baseClientConf.getBookieHealthCheckIntervalSeconds() * 2 
* 1000);
+
+        // the bookie watcher should not contain the bookieToRestart in the 
quarantine set
+        
Assert.assertFalse(bkc.bookieWatcher.quarantinedBookies.asMap().containsKey(bookie1));
+        
Assert.assertFalse(bkc.bookieWatcher.quarantinedBookies.asMap().containsKey(bookie2));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index fce689d..278dc8c 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -351,6 +351,39 @@ public abstract class BookKeeperClusterTestCase {
     }
 
     /**
+     * Restart a bookie. Also restart the respective auto recovery process,
+     * if isAutoRecoveryEnabled is true.
+     * 
+     * @param addr
+     * @throws InterruptedException
+     * @throws IOException
+     * @throws KeeperException
+     * @throws BookieException
+     */
+    public void restartBookie(BookieSocketAddress addr) throws Exception {
+        BookieServer toRemove = null;
+        int toRemoveIndex = 0;
+        for (BookieServer server : bs) {
+            if (server.getLocalAddress().equals(addr)) {
+                server.shutdown();
+                toRemove = server;
+                break;
+            }
+            ++toRemoveIndex;
+        }
+        if (toRemove != null) {
+            stopAutoRecoveryService(toRemove);
+            bs.remove(toRemove);
+            ServerConfiguration newConfig = bsConfs.remove(toRemoveIndex);
+            Thread.sleep(1000);
+            bs.add(startBookie(newConfig));
+            bsConfs.add(newConfig);
+            return;
+        }
+        throw new IOException("Bookie not found");
+    }
+
+    /**
      * Restart bookie servers using new configuration settings. Also restart 
the
      * respective auto recovery process, if isAutoRecoveryEnabled is true.
      *

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
index 8cf5618..cf2b251 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
@@ -20,6 +20,11 @@
  */
 package org.apache.bookkeeper.test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.File;
 import java.util.Enumeration;
 
@@ -33,8 +38,6 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
-
 /**
  * Test to verify the readonly feature of bookies
  */
@@ -139,6 +142,13 @@ public class ReadOnlyBookieTest extends 
BookKeeperClusterTestCase {
         // Now add the current ledger dir back to writable dirs list
         ledgerDirsManager.addToWritableDirs(testDir, true);
 
+        // since the bookie transitions to write mode asynchronously, we need 
to wait for the bookie to be registered on
+        // zk
+        while (zkc.exists(baseConf.getZkAvailableBookiesPath() + "/"
+                + Bookie.getBookieAddress(bsConfs.get(1)).toString(), false) 
== null) {
+            Thread.sleep(100);
+        }
+
         LOG.info("bookie is running {}, readonly {}.", bookie.isRunning(), 
bookie.isReadOnly());
         assertTrue("Bookie should be running and converted back to writable 
mode", bookie.isRunning()
                 && !bookie.isReadOnly());

Reply via email to