Repository: bookkeeper Updated Branches: refs/heads/master 96adbf1d6 -> c8255f8c5
BookKeeper client should try not to use bookies with errors/timeouts when forming a new ensemble â¦hen forming ensembles Author: Siddharth Boobna <[email protected]> Reviewers: Sijie Guo <[email protected]> Closes #11 from sboobna/BOOKKEEPER-889 Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/c8255f8c Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/c8255f8c Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/c8255f8c Branch: refs/heads/master Commit: c8255f8c5f73a2353293ec1d9612dbb98f291ccc Parents: 96adbf1 Author: Siddharth Boobna <[email protected]> Authored: Mon Mar 7 22:06:40 2016 -0800 Committer: Sijie Guo <[email protected]> Committed: Mon Mar 7 22:06:40 2016 -0800 ---------------------------------------------------------------------- .../apache/bookkeeper/client/BKException.java | 10 ++ .../apache/bookkeeper/client/BookKeeper.java | 26 ++++ .../apache/bookkeeper/client/BookieWatcher.java | 54 ++++++- .../bookkeeper/conf/ClientConfiguration.java | 109 ++++++++++++++ .../apache/bookkeeper/proto/BookieClient.java | 27 +++- .../DefaultPerChannelBookieClientPool.java | 12 +- .../proto/PerChannelBookieClient.java | 97 ++++++++---- .../proto/PerChannelBookieClientFactory.java | 2 +- .../proto/PerChannelBookieClientPool.java | 5 + .../client/TestBookieHealthCheck.java | 149 +++++++++++++++++++ .../test/BookKeeperClusterTestCase.java | 33 ++++ .../bookkeeper/test/ReadOnlyBookieTest.java | 14 +- 12 files changed, 497 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java index b2355cd..f2f150e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java @@ -98,6 +98,8 @@ public abstract class BKException extends Exception { return new BKAddEntryQuorumTimeoutException(); case Code.DuplicateEntryIdException: return new BKDuplicateEntryIdException(); + case Code.TimeoutException: + return new BKTimeoutException(); default: return new BKUnexpectedConditionException(); } @@ -131,6 +133,7 @@ public abstract class BKException extends Exception { int LedgerExistException = -20; int AddEntryQuorumTimeoutException = -21; int DuplicateEntryIdException = -22; + int TimeoutException = -23; int IllegalOpException = -100; int LedgerFencedException = -101; @@ -213,6 +216,8 @@ public abstract class BKException extends Exception { return "Invalid operation"; case Code.AddEntryQuorumTimeoutException: return "Add entry quorum wait timed out"; + case Code.TimeoutException: + return "Bookie operation timeout"; default: return "Unexpected condition"; } @@ -392,4 +397,9 @@ public abstract class BKException extends Exception { } } + public static class BKTimeoutException extends BKException { + public BKTimeoutException() { + super(Code.TimeoutException); + } + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index ed744b0..f354bef 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -21,6 +21,7 @@ package org.apache.bookkeeper.client; import java.io.IOException; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -28,6 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.base.Preconditions; + import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; @@ -37,6 +39,7 @@ import org.apache.bookkeeper.meta.CleanupLedgerManager; import org.apache.bookkeeper.meta.LedgerIdGenerator; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.meta.LedgerManagerFactory; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieClient; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.stats.NullStatsLogger; @@ -44,6 +47,7 @@ import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.bookkeeper.util.ReflectionUtils; +import org.apache.bookkeeper.util.SafeRunnable; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.commons.configuration.ConfigurationException; @@ -312,6 +316,8 @@ public class BookKeeper { this.ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk); this.ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager()); this.ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator(); + + scheduleBookieHealthCheckIfEnabled(); } private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf) @@ -336,6 +342,26 @@ public class BookKeeper { } } + void scheduleBookieHealthCheckIfEnabled() { + if (conf.isBookieHealthCheckEnabled()) { + scheduler.scheduleAtFixedRate(new SafeRunnable() { + + @Override + public void safeRun() { + checkForFaultyBookies(); + } + }, conf.getBookieHealthCheckIntervalSeconds(), conf.getBookieHealthCheckIntervalSeconds(), + TimeUnit.SECONDS); + } + } + + void checkForFaultyBookies() { + List<BookieSocketAddress> faultyBookies = bookieClient.getFaultyBookies(); + for (BookieSocketAddress faultyBookie : faultyBookies) { + bookieWatcher.quarantineBookie(faultyBookie); + } + } + LedgerManager getLedgerManager() { return ledgerManager; } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java index 65a3417..cadec5d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java @@ -45,6 +45,11 @@ import org.apache.zookeeper.ZooDefs.Ids; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + /** * This class is responsible for maintaining a consistent view of what bookies * are available by reading Zookeeper (and setting watches on the bookie nodes). @@ -57,6 +62,7 @@ class BookieWatcher implements Watcher, ChildrenCallback { public static int ZK_CONNECT_BACKOFF_SEC = 1; private static final Set<BookieSocketAddress> EMPTY_SET = new HashSet<BookieSocketAddress>(); + private static final Boolean BOOLEAN = new Boolean(true); // Bookie registration path in ZK private final String bookieRegistrationPath; @@ -65,6 +71,9 @@ class BookieWatcher implements Watcher, ChildrenCallback { final ScheduledExecutorService scheduler; final EnsemblePlacementPolicy placementPolicy; + // Bookies that will not be preferred to be chosen in a new ensemble + final Cache<BookieSocketAddress, Boolean> quarantinedBookies; + SafeRunnable reReadTask = new SafeRunnable() { @Override public void safeRun() { @@ -83,6 +92,16 @@ class BookieWatcher implements Watcher, ChildrenCallback { this.scheduler = scheduler; this.placementPolicy = placementPolicy; readOnlyBookieWatcher = new ReadOnlyBookieWatcher(conf, bk); + this.quarantinedBookies = CacheBuilder.newBuilder() + .expireAfterWrite(conf.getBookieQuarantineTimeSeconds(), TimeUnit.SECONDS) + .removalListener(new RemovalListener<BookieSocketAddress, Boolean>() { + + @Override + public void onRemoval(RemovalNotification<BookieSocketAddress, Boolean> bookie) { + logger.info("Bookie {} is no longer quarantined", bookie.getKey()); + } + + }).build(); } void notifyBookiesChanged(final BookiesListener listener) throws BKException { @@ -235,7 +254,16 @@ class BookieWatcher implements Watcher, ChildrenCallback { */ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize) throws BKNotEnoughBookiesException { - return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, EMPTY_SET); + try { + // we try to only get from the healthy bookies first + return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, new HashSet<BookieSocketAddress>( + quarantinedBookies.asMap().keySet())); + } catch (BKNotEnoughBookiesException e) { + if (logger.isDebugEnabled()) { + logger.debug("Not enough healthy bookies available, using quarantined bookies"); + } + return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, EMPTY_SET); + } } /** @@ -250,7 +278,29 @@ class BookieWatcher implements Watcher, ChildrenCallback { public BookieSocketAddress replaceBookie(List<BookieSocketAddress> existingBookies, int bookieIdx) throws BKNotEnoughBookiesException { BookieSocketAddress addr = existingBookies.get(bookieIdx); - return placementPolicy.replaceBookie(addr, new HashSet<BookieSocketAddress>(existingBookies)); + try { + // we exclude the quarantined bookies also first + Set<BookieSocketAddress> existingAndQuarantinedBookies = new HashSet<BookieSocketAddress>(existingBookies); + existingAndQuarantinedBookies.addAll(quarantinedBookies.asMap().keySet()); + return placementPolicy.replaceBookie(addr, existingAndQuarantinedBookies); + } catch (BKNotEnoughBookiesException e) { + if (logger.isDebugEnabled()) { + logger.debug("Not enough healthy bookies available, using quarantined bookies"); + } + return placementPolicy.replaceBookie(addr, new HashSet<BookieSocketAddress>(existingBookies)); + } + } + + /** + * Quarantine <i>bookie</i> so it will not be preferred to be chosen for new ensembles. + * @param bookie + * @return + */ + public void quarantineBookie(BookieSocketAddress bookie) { + if (quarantinedBookies.getIfPresent(bookie) == null) { + quarantinedBookies.put(bookie, BOOLEAN); + logger.warn("Bookie {} has been quarantined because of read/write errors.", bookie); + } } /** http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index 7a99559..d0750d3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -65,6 +65,12 @@ public class ClientConfiguration extends AbstractConfiguration { protected final static String PCBC_TIMEOUT_TIMER_TICK_DURATION_MS = "pcbcTimeoutTimerTickDurationMs"; protected final static String PCBC_TIMEOUT_TIMER_NUM_TICKS = "pcbcTimeoutTimerNumTicks"; + // Bookie health check settings + protected final static String BOOKIE_HEALTH_CHECK_ENABLED = "bookieHealthCheckEnabled"; + protected final static String BOOKIE_HEALTH_CHECK_INTERVAL_SECONDS = "bookieHealthCheckIntervalSeconds"; + protected final static String BOOKIE_ERROR_THRESHOLD_PER_INTERVAL = "bookieErrorThresholdPerInterval"; + protected final static String BOOKIE_QUARANTINE_TIME_SECONDS = "bookieQuarantineTimeSeconds"; + // Number Woker Threads protected final static String NUM_WORKER_THREADS = "numWorkerThreads"; @@ -691,4 +697,107 @@ public class ClientConfiguration extends AbstractConfiguration { setProperty(TASK_EXECUTION_WARN_TIME_MICROS, warnTime); return this; } + + /** + * Check if bookie health check is enabled. + * + * @return + */ + public boolean isBookieHealthCheckEnabled() { + return getBoolean(BOOKIE_HEALTH_CHECK_ENABLED, false); + } + + /** + * Enables the bookie health check. + * + * <p> + * If the number of read/write errors for a bookie exceeds {@link #getBookieErrorThresholdPerInterval()} per + * interval, that bookie is quarantined for {@link #getBookieQuarantineTimeSeconds()} seconds. During this + * quarantined period, the client will try not to use this bookie when creating new ensembles. + * </p> + * + * By default, the bookie health check is <b>disabled</b>. + * + * @return client configuration + */ + public ClientConfiguration enableBookieHealthCheck() { + setProperty(BOOKIE_HEALTH_CHECK_ENABLED, true); + return this; + } + + /** + * Get the bookie health check interval in seconds. + * + * @return + */ + public int getBookieHealthCheckIntervalSeconds() { + return getInt(BOOKIE_HEALTH_CHECK_INTERVAL_SECONDS, 60); + } + + /** + * Set the bookie health check interval. Default is 60 seconds. + * + * <p> + * Note: Please {@link #enableBookieHealthCheck()} to use this configuration. + * </p> + * + * @param interval + * @param unit + * @return client configuration + */ + public ClientConfiguration setBookieHealthCheckInterval(int interval, TimeUnit unit) { + setProperty(BOOKIE_HEALTH_CHECK_INTERVAL_SECONDS, unit.toSeconds(interval)); + return this; + } + + /** + * Get the error threshold for a bookie to be quarantined. + * + * @return + */ + public long getBookieErrorThresholdPerInterval() { + return getLong(BOOKIE_ERROR_THRESHOLD_PER_INTERVAL, 100); + } + + /** + * Set the error threshold per interval ({@link #getBookieHealthCheckIntervalSeconds()}) for a bookie before it is + * quarantined. Default is 100 errors per minute. + * + * <p> + * Note: Please {@link #enableBookieHealthCheck()} to use this configuration. + * </p> + * + * @param threshold + * @param unit + * @return client configuration + */ + public ClientConfiguration setBookieErrorThresholdPerInterval(long thresholdPerInterval) { + setProperty(BOOKIE_ERROR_THRESHOLD_PER_INTERVAL, thresholdPerInterval); + return this; + } + + /** + * Get the time for which a bookie will be quarantined. + * + * @return + */ + public int getBookieQuarantineTimeSeconds() { + return getInt(BOOKIE_QUARANTINE_TIME_SECONDS, 1800); + } + + /** + * Set the time for which a bookie will be quarantined. Default is 30 minutes. + * + * <p> + * Note: Please {@link #enableBookieHealthCheck()} to use this configuration. + * </p> + * + * @param quarantineTime + * @param unit + * @return client configuration + */ + public ClientConfiguration setBookieQuarantineTime(int quarantineTime, TimeUnit unit) { + setProperty(BOOKIE_QUARANTINE_TIME_SECONDS, unit.toSeconds(quarantineTime)); + return this; + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java index 909cdd0..8a79547 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java @@ -23,15 +23,13 @@ package org.apache.bookkeeper.proto; import static com.google.common.base.Charsets.UTF_8; import java.io.IOException; -import java.util.HashSet; -import java.util.Set; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; @@ -52,6 +50,9 @@ import org.jboss.netty.util.TimerTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Implements the client-side part of the BookKeeper protocol. * @@ -70,6 +71,8 @@ public class BookieClient implements PerChannelBookieClientFactory { private final StatsLogger statsLogger; private final int numConnectionsPerBookie; + private final long bookieErrorThresholdPerInterval; + public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor) { this(conf, channelFactory, executor, NullStatsLogger.INSTANCE); } @@ -87,6 +90,7 @@ public class BookieClient implements PerChannelBookieClientFactory { new ThreadFactoryBuilder().setNameFormat("BookieClientTimer-%d").build(), conf.getPCBCTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, conf.getPCBCTimeoutTimerNumTicks()); + this.bookieErrorThresholdPerInterval = conf.getBookieErrorThresholdPerInterval(); } private int getRc(int rc) { @@ -101,10 +105,23 @@ public class BookieClient implements PerChannelBookieClientFactory { } } + public List<BookieSocketAddress> getFaultyBookies() { + List<BookieSocketAddress> faultyBookies = Lists.newArrayList(); + for (PerChannelBookieClientPool channelPool : channels.values()) { + if (channelPool instanceof DefaultPerChannelBookieClientPool) { + DefaultPerChannelBookieClientPool pool = (DefaultPerChannelBookieClientPool) channelPool; + if (pool.errorCounter.getAndSet(0) >= bookieErrorThresholdPerInterval) { + faultyBookies.add(pool.address); + } + } + } + return faultyBookies; + } + @Override - public PerChannelBookieClient create(BookieSocketAddress address) { + public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool) { return new PerChannelBookieClient(conf, executor, channelFactory, address, - requestTimer, statsLogger); + requestTimer, statsLogger, pcbcPool); } private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr, Object key) { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java index 920515b..2658634 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java @@ -20,7 +20,8 @@ */ package org.apache.bookkeeper.proto; -import com.google.common.base.Preconditions; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.util.MathUtils; @@ -28,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.base.Preconditions; /** * Provide a simple round-robin style channel pool. We could improve it later to do more @@ -42,6 +44,7 @@ class DefaultPerChannelBookieClientPool implements PerChannelBookieClientPool, final BookieSocketAddress address; final PerChannelBookieClient[] clients; final AtomicInteger counter = new AtomicInteger(0); + final AtomicLong errorCounter = new AtomicLong(0); DefaultPerChannelBookieClientPool(PerChannelBookieClientFactory factory, BookieSocketAddress address, @@ -51,7 +54,7 @@ class DefaultPerChannelBookieClientPool implements PerChannelBookieClientPool, this.address = address; this.clients = new PerChannelBookieClient[coreSize]; for (int i = 0; i < coreSize; i++) { - this.clients[i] = factory.create(address); + this.clients[i] = factory.create(address, this); } } @@ -78,6 +81,11 @@ class DefaultPerChannelBookieClientPool implements PerChannelBookieClientPool, } @Override + public void recordError() { + errorCounter.incrementAndGet(); + } + + @Override public void disconnect(boolean wait) { for (PerChannelBookieClient pcbc : clients) { pcbc.disconnect(wait); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 6d8058f..2bd4e9b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -20,17 +20,21 @@ package org.apache.bookkeeper.proto; import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; +import java.util.Collections; import java.util.Queue; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; -import com.google.protobuf.ByteString; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeperClientStats; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest; import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse; import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader; @@ -41,12 +45,9 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse; import org.apache.bookkeeper.proto.BookkeeperProtocol.Request; import org.apache.bookkeeper.proto.BookkeeperProtocol.Response; import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.bookkeeper.util.SafeRunnable; @@ -76,6 +77,9 @@ import org.jboss.netty.util.TimerTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Sets; +import com.google.protobuf.ByteString; + /** * This class manages all details of connection to a particular bookie. It also * has reconnect logic if a connection to a bookie fails. @@ -85,6 +89,16 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan static final Logger LOG = LoggerFactory.getLogger(PerChannelBookieClient.class); + // this set contains the bookie error return codes that we do not consider for a bookie to be "faulty" + private static final Set<Integer> expectedBkOperationErrors = Collections.unmodifiableSet(Sets + .newHashSet(BKException.Code.BookieHandleNotAvailableException, + BKException.Code.NoSuchEntryException, + BKException.Code.NoSuchLedgerExistsException, + BKException.Code.LedgerFencedException, + BKException.Code.LedgerExistException, + BKException.Code.DuplicateEntryIdException, + BKException.Code.WriteOnReadOnlyBookieException)); + public static final int MAX_FRAME_LENGTH = 2 * 1024 * 1024; // 2M public static final AtomicLong txnIdGenerator = new AtomicLong(0); @@ -119,14 +133,17 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock(); private final ClientConfiguration conf; + private final PerChannelBookieClientPool pcbcPool; + public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory, BookieSocketAddress addr) { - this(new ClientConfiguration(), executor, channelFactory, addr, null, NullStatsLogger.INSTANCE); + this(new ClientConfiguration(), executor, channelFactory, addr, null, NullStatsLogger.INSTANCE, null); } public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory, BookieSocketAddress addr, - HashedWheelTimer requestTimer, StatsLogger parentStatsLogger) { + HashedWheelTimer requestTimer, StatsLogger parentStatsLogger, + PerChannelBookieClientPool pcbcPool) { this.conf = conf; this.addr = addr; this.executor = executor; @@ -147,6 +164,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan addEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_ADD_OP); readTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ); addTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD); + + this.pcbcPool = pcbcPool; } private void completeOperation(GenericCallback<PerChannelBookieClient> op, int rc) { @@ -299,7 +318,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan final int entrySize = toSend.readableBytes(); final CompletionKey completionKey = new CompletionKey(txnId, OperationType.ADD_ENTRY); completionObjects.put(completionKey, - new AddCompletion(addEntryOpLogger, cb, ctx, ledgerId, entryId, + new AddCompletion(this, addEntryOpLogger, cb, ctx, ledgerId, entryId, scheduleTimeout(completionKey, addEntryTimeout))); // Build the request and calculate the total size to be included in the packet. @@ -360,7 +379,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan final long txnId = getTxnId(); final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY); completionObjects.put(completionKey, - new ReadCompletion(readEntryOpLogger, cb, ctx, ledgerId, entryId, + new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId, scheduleTimeout(completionKey, readEntryTimeout))); // Build the request and calculate the total size to be included in the packet. @@ -415,7 +434,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan final long txnId = getTxnId(); final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY); completionObjects.put(completionKey, - new ReadCompletion(readEntryOpLogger, cb, ctx, ledgerId, entryId, + new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId, scheduleTimeout(completionKey, readEntryTimeout))); // Build the request and calculate the total size to be included in the packet. @@ -619,6 +638,12 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } } + void recordError() { + if (pcbcPool != null) { + pcbcPool.recordError(); + } + } + /** * In the netty pipeline, we need to split packets based on length, so we * use the {@link LengthFieldBasedFrameDecoder}. Other than that all actions @@ -832,27 +857,34 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan static class ReadCompletion extends CompletionValue { final ReadEntryCallback cb; - public ReadCompletion(ReadEntryCallback cb, Object ctx, + public ReadCompletion(final PerChannelBookieClient pcbc, ReadEntryCallback cb, Object ctx, long ledgerId, long entryId) { - this(null, cb, ctx, ledgerId, entryId, null); + this(pcbc, null, cb, ctx, ledgerId, entryId, null); } - public ReadCompletion(final OpStatsLogger readEntryOpLogger, + public ReadCompletion(final PerChannelBookieClient pcbc, final OpStatsLogger readEntryOpLogger, final ReadEntryCallback originalCallback, final Object originalCtx, final long ledgerId, final long entryId, final Timeout timeout) { super(originalCtx, ledgerId, entryId, timeout); final long startTime = MathUtils.nowInNano(); - this.cb = null == readEntryOpLogger ? originalCallback : new ReadEntryCallback() { + this.cb = new ReadEntryCallback() { @Override public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx) { cancelTimeout(); - long latency = MathUtils.elapsedNanos(startTime); - if (rc != BKException.Code.OK) { - readEntryOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS); - } else { - readEntryOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); + if (readEntryOpLogger != null) { + long latency = MathUtils.elapsedNanos(startTime); + if (rc != BKException.Code.OK) { + readEntryOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS); + } else { + readEntryOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); + } + } + + if (rc != BKException.Code.OK && !expectedBkOperationErrors.contains(rc)) { + pcbc.recordError(); } + originalCallback.readEntryComplete(rc, ledgerId, entryId, buffer, originalCtx); } }; @@ -863,12 +895,12 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan static class AddCompletion extends CompletionValue { final WriteCallback cb; - public AddCompletion(WriteCallback cb, Object ctx, + public AddCompletion(final PerChannelBookieClient pcbc, WriteCallback cb, Object ctx, long ledgerId, long entryId) { - this(null, cb, ctx, ledgerId, entryId, null); + this(pcbc, null, cb, ctx, ledgerId, entryId, null); } - public AddCompletion(final OpStatsLogger addEntryOpLogger, + public AddCompletion(final PerChannelBookieClient pcbc, final OpStatsLogger addEntryOpLogger, final WriteCallback originalCallback, final Object originalCtx, final long ledgerId, final long entryId, final Timeout timeout) { @@ -878,12 +910,19 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan @Override public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) { cancelTimeout(); - long latency = MathUtils.elapsedNanos(startTime); - if (rc != BKException.Code.OK) { - addEntryOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS); - } else { - addEntryOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); + if (pcbc.addEntryOpLogger != null) { + long latency = MathUtils.elapsedNanos(startTime); + if (rc != BKException.Code.OK) { + pcbc.addEntryOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS); + } else { + pcbc.addEntryOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); + } } + + if (rc != BKException.Code.OK && !expectedBkOperationErrors.contains(rc)) { + pcbc.recordError(); + } + originalCallback.writeComplete(rc, ledgerId, entryId, addr, originalCtx); } }; @@ -943,10 +982,10 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan return; } if (OperationType.ADD_ENTRY == operationType) { - errorOutAddKey(this); + errorOutAddKey(this, BKException.Code.TimeoutException); addTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS); } else { - errorOutReadKey(this); + errorOutReadKey(this, BKException.Code.TimeoutException); readTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS); } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java index bd45e92..69d559e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java @@ -33,5 +33,5 @@ interface PerChannelBookieClientFactory { * * @return the client connected to address. */ - PerChannelBookieClient create(BookieSocketAddress address); + PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java index 81b3ba7..4a735f9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java @@ -41,6 +41,11 @@ interface PerChannelBookieClientPool { void obtain(GenericCallback<PerChannelBookieClient> callback); /** + * record any read/write error on {@link PerChannelBookieClientPool} + */ + void recordError(); + + /** * Disconnect the connections in the pool. * * @param wait http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieHealthCheck.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieHealthCheck.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieHealthCheck.java new file mode 100644 index 0000000..33be97c --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieHealthCheck.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.client; + +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.client.AsyncCallback.AddCallback; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestBookieHealthCheck extends BookKeeperClusterTestCase { + private final static Logger LOG = LoggerFactory.getLogger(TestBookieHealthCheck.class); + + public TestBookieHealthCheck() { + super(4); + baseClientConf.setAddEntryTimeout(1); + baseClientConf.enableBookieHealthCheck(); + baseClientConf.setBookieHealthCheckInterval(1, TimeUnit.SECONDS); + baseClientConf.setBookieErrorThresholdPerInterval(1); + baseClientConf.setBookieQuarantineTime(5, TimeUnit.SECONDS); + } + + @Test(timeout = 60000) + public void testBkQuarantine() throws Exception { + LedgerHandle lh = bkc.createLedger(2, 2, 2, BookKeeper.DigestType.CRC32, new byte[] {}); + + final int numEntries = 10; + for (int i = 0; i < numEntries; i++) { + byte[] msg = ("msg-" + i).getBytes(); + lh.addEntry(msg); + } + + BookieSocketAddress bookieToQuarantine = lh.getLedgerMetadata().getEnsemble(numEntries).get(0); + sleepBookie(bookieToQuarantine, baseClientConf.getAddEntryTimeout() * 2).await(); + + byte[] tempMsg = "temp-msg".getBytes(); + lh.asyncAddEntry(tempMsg, new AddCallback() { + + @Override + public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) { + // no-op + } + }, null); + + // make sure the add entry timeouts + Thread.sleep(baseClientConf.getAddEntryTimeout() * 2 * 1000); + + // make sure the health check runs once after the timeout + Thread.sleep(baseClientConf.getBookieHealthCheckIntervalSeconds() * 2 * 1000); + + // the bookie watcher should contain the bookieToQuarantine in the quarantine set + Assert.assertTrue(bkc.bookieWatcher.quarantinedBookies.asMap().containsKey(bookieToQuarantine)); + + // the bookie to be left out of the ensemble should always be the quarantined bookie + LedgerHandle lh1 = bkc.createLedger(2, 2, 2, BookKeeper.DigestType.CRC32, new byte[] {}); + LedgerHandle lh2 = bkc.createLedger(3, 3, 3, BookKeeper.DigestType.CRC32, new byte[] {}); + Assert.assertFalse(lh1.getLedgerMetadata().getEnsemble(0).contains(bookieToQuarantine)); + Assert.assertFalse(lh2.getLedgerMetadata().getEnsemble(0).contains(bookieToQuarantine)); + + // the quarantined bookie can still be in the ensemble if we do not have enough healthy bookies + LedgerHandle lh3 = bkc.createLedger(4, 4, 4, BookKeeper.DigestType.CRC32, new byte[] {}); + Assert.assertTrue(lh3.getLedgerMetadata().getEnsemble(0).contains(bookieToQuarantine)); + + // make sure faulty bookie is out of quarantine + Thread.sleep(baseClientConf.getBookieQuarantineTimeSeconds() * 1000); + + // the bookie should not be quarantined anymore + Assert.assertFalse(bkc.bookieWatcher.quarantinedBookies.asMap().containsKey(bookieToQuarantine)); + } + + @Test(timeout = 60000) + public void testNoQuarantineOnBkRestart() throws Exception { + final LedgerHandle lh = bkc.createLedger(2, 2, 2, BookKeeper.DigestType.CRC32, new byte[] {}); + final int numEntries = 20; + BookieSocketAddress bookieToRestart = lh.getLedgerMetadata().getEnsemble(0).get(0); + + // we add entries on a separate thread so that we can restart a bookie on the current thread + Thread addEntryThread = new Thread() { + public void run() { + for (int i = 0; i < numEntries; i++) { + byte[] msg = ("msg-" + i).getBytes(); + try { + lh.addEntry(msg); + // we add sufficient sleep to make sure all entries are not added before we restart the bookie + Thread.sleep(100); + } catch (Exception e) { + LOG.error("Error sending msg"); + } + } + } + }; + addEntryThread.start(); + restartBookie(bookieToRestart); + + // make sure the health check runs once + Thread.sleep(baseClientConf.getBookieHealthCheckIntervalSeconds() * 2 * 1000); + + // the bookie watcher should not contain the bookieToRestart in the quarantine set + Assert.assertFalse(bkc.bookieWatcher.quarantinedBookies.asMap().containsKey(bookieToRestart)); + } + + @Test(timeout = 60000) + public void testNoQuarantineOnExpectedBkErrors() throws Exception { + final LedgerHandle lh = bkc.createLedger(2, 2, 2, BookKeeper.DigestType.CRC32, new byte[] {}); + final int numEntries = 10; + for (int i = 0; i < numEntries; i++) { + byte[] msg = ("msg-" + i).getBytes(); + lh.addEntry(msg); + } + BookieSocketAddress bookie1 = lh.getLedgerMetadata().getEnsemble(0).get(0); + BookieSocketAddress bookie2 = lh.getLedgerMetadata().getEnsemble(0).get(1); + try { + // we read an entry that is not added + lh.readEntries(10, 10); + } catch (BKException e) { + // ok + } + + // make sure the health check runs once + Thread.sleep(baseClientConf.getBookieHealthCheckIntervalSeconds() * 2 * 1000); + + // the bookie watcher should not contain the bookieToRestart in the quarantine set + Assert.assertFalse(bkc.bookieWatcher.quarantinedBookies.asMap().containsKey(bookie1)); + Assert.assertFalse(bkc.bookieWatcher.quarantinedBookies.asMap().containsKey(bookie2)); + } + +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index fce689d..278dc8c 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -351,6 +351,39 @@ public abstract class BookKeeperClusterTestCase { } /** + * Restart a bookie. Also restart the respective auto recovery process, + * if isAutoRecoveryEnabled is true. + * + * @param addr + * @throws InterruptedException + * @throws IOException + * @throws KeeperException + * @throws BookieException + */ + public void restartBookie(BookieSocketAddress addr) throws Exception { + BookieServer toRemove = null; + int toRemoveIndex = 0; + for (BookieServer server : bs) { + if (server.getLocalAddress().equals(addr)) { + server.shutdown(); + toRemove = server; + break; + } + ++toRemoveIndex; + } + if (toRemove != null) { + stopAutoRecoveryService(toRemove); + bs.remove(toRemove); + ServerConfiguration newConfig = bsConfs.remove(toRemoveIndex); + Thread.sleep(1000); + bs.add(startBookie(newConfig)); + bsConfs.add(newConfig); + return; + } + throw new IOException("Bookie not found"); + } + + /** * Restart bookie servers using new configuration settings. Also restart the * respective auto recovery process, if isAutoRecoveryEnabled is true. * http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c8255f8c/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java index 8cf5618..cf2b251 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java @@ -20,6 +20,11 @@ */ package org.apache.bookkeeper.test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.File; import java.util.Enumeration; @@ -33,8 +38,6 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.conf.ServerConfiguration; import org.junit.Test; -import static org.junit.Assert.*; - /** * Test to verify the readonly feature of bookies */ @@ -139,6 +142,13 @@ public class ReadOnlyBookieTest extends BookKeeperClusterTestCase { // Now add the current ledger dir back to writable dirs list ledgerDirsManager.addToWritableDirs(testDir, true); + // since the bookie transitions to write mode asynchronously, we need to wait for the bookie to be registered on + // zk + while (zkc.exists(baseConf.getZkAvailableBookiesPath() + "/" + + Bookie.getBookieAddress(bsConfs.get(1)).toString(), false) == null) { + Thread.sleep(100); + } + LOG.info("bookie is running {}, readonly {}.", bookie.isRunning(), bookie.isReadOnly()); assertTrue("Bookie should be running and converted back to writable mode", bookie.isRunning() && !bookie.isReadOnly());
