This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 786da27  Client times out requests in batch rather than individually
786da27 is described below

commit 786da273b0f90c1b35b7244b0408959fd034d411
Author: Ivan Kelly <[email protected]>
AuthorDate: Wed Dec 13 16:43:25 2017 +0800

    Client times out requests in batch rather than individually
    
    This patch is a squash of f59b597f, 00fc5cd2, 1658eb61 & 833dc307 from the 
yahoo-4.3 branch. The change removes the HashWheelTimer from 
PerChannelBookieClient and instead runs through all outstanding ops 
periodically, timing them out as necessary.
    
    The motivation for this change is to reduce the number of objects created 
per add/read request. With HashWheelTimer a TimerTask needs to be added to the 
Timer, which allocates and returns a Timeout object. We had to be careful to 
cancel this timeout on success.
    
    The new approach is simpler, doesn't require any allocation per request, 
and doesn't require any cancellation. Functionally the end result is the same - 
very slow requests are timed out.
    
    The patch also makes some additions to the original changes (due to the 
codebase moving since it was originally written).
    - PendingAddOps are changed to use the same mechanism for quorum timeouts.
    - The checkTimeout method is moved to the PCBCPool so we can run the check 
on non-connected PCBC instances. This is necessary for TLS.
    
    Author: Ivan Kelly <[email protected]>
    Author: Siddharth Boobna <[email protected]>
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>
    
    This closes #817 from ivankelly/yahoo-bp-9
---
 .../apache/bookkeeper/benchmark/BenchBookie.java   |   9 +-
 .../org/apache/bookkeeper/client/BookKeeper.java   |   6 +-
 .../org/apache/bookkeeper/client/LedgerHandle.java |  30 +++++
 .../org/apache/bookkeeper/client/PendingAddOp.java |  36 ++----
 .../bookkeeper/conf/ClientConfiguration.java       |  41 ++++++-
 .../org/apache/bookkeeper/proto/BookieClient.java  |  64 ++++++----
 .../proto/DefaultPerChannelBookieClientPool.java   |   7 ++
 .../bookkeeper/proto/PerChannelBookieClient.java   | 135 +++++++++++----------
 .../proto/PerChannelBookieClientPool.java          |   6 +
 .../client/TestGetBookieInfoTimeout.java           |  10 +-
 .../apache/bookkeeper/test/BookieClientTest.java   |  17 ++-
 .../org/apache/bookkeeper/test/LoopbackClient.java | 121 ------------------
 12 files changed, 236 insertions(+), 246 deletions(-)

diff --git 
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
 
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index 9c52788..4506091 100644
--- 
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++ 
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -24,7 +24,10 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -33,6 +36,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -158,9 +162,11 @@ public class BenchBookie {
                 .name("BenchBookieClientScheduler")
                 .numThreads(1)
                 .build();
+        ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("BookKeeperClientScheduler"));
 
         ClientConfiguration conf = new ClientConfiguration();
-        BookieClient bc = new BookieClient(conf, eventLoop, executor);
+        BookieClient bc = new BookieClient(conf, eventLoop, executor, 
scheduler, NullStatsLogger.INSTANCE);
         LatencyCallback lc = new LatencyCallback();
 
         ThroughputCallback tc = new ThroughputCallback();
@@ -220,6 +226,7 @@ public class BenchBookie {
         LOG.info("Throughput: " + ((long) entryCount) * 1000 / (endTime - 
startTime));
 
         bc.close();
+        scheduler.shutdown();
         eventLoop.shutdownGracefully();
         executor.shutdown();
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 99e9484..d0f8a64 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -146,6 +146,7 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
     final int explicitLacInterval;
     final boolean delayEnsembleChange;
     final boolean reorderReadSequence;
+    final long addEntryQuorumTimeoutNanos;
 
     final Optional<SpeculativeRequestExecutionPolicy> 
readSpeculativeRequestPolicy;
     final Optional<SpeculativeRequestExecutionPolicy> 
readLACSpeculativeRequestPolicy;
@@ -487,9 +488,9 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
             this.readLACSpeculativeRequestPolicy = 
Optional.<SpeculativeRequestExecutionPolicy>absent();
         }
 
-
         // initialize bookie client
-        this.bookieClient = new BookieClient(conf, this.eventLoopGroup, 
this.mainWorkerPool, statsLogger);
+        this.bookieClient = new BookieClient(conf, this.eventLoopGroup, 
this.mainWorkerPool,
+                                             scheduler, statsLogger);
         this.bookieWatcher = new BookieWatcher(conf, this.placementPolicy, 
regClient);
         if (conf.getDiskWeightBasedPlacementEnabled()) {
             LOG.info("Weighted ledger placement enabled");
@@ -520,6 +521,7 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
             LOG.debug("Explicit LAC Interval : {}", this.explicitLacInterval);
         }
 
+        this.addEntryQuorumTimeoutNanos = 
TimeUnit.SECONDS.toNanos(conf.getAddEntryQuorumTimeout());
         scheduleBookieHealthCheckIfEnabled();
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 78f3e75..5c15376 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -45,6 +45,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -103,6 +104,7 @@ public class LedgerHandle implements WriteHandle {
     final LoadingCache<BookieSocketAddress, Long> bookieFailureHistory;
     final boolean enableParallelRecoveryRead;
     final int recoveryReadBatchSize;
+    ScheduledFuture<?> timeoutFuture = null;
 
     /**
      * Invalid entry id. This value is returned from methods which
@@ -182,6 +184,19 @@ public class LedgerHandle implements WriteHandle {
                                               }
                                           });
         initializeExplicitLacFlushPolicy();
+
+        if (bk.getConf().getAddEntryQuorumTimeout() > 0) {
+            SafeRunnable monitor = new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        monitorPendingAddOps();
+                    }
+                };
+            this.timeoutFuture = bk.scheduler.scheduleAtFixedRate(monitor,
+                                                                  
bk.getConf().getTimeoutMonitorIntervalSec(),
+                                                                  
bk.getConf().getTimeoutMonitorIntervalSec(),
+                                                                  
TimeUnit.SECONDS);
+        }
     }
 
     protected void initializeExplicitLacFlushPolicy() {
@@ -335,6 +350,9 @@ public class LedgerHandle implements WriteHandle {
         SyncCloseCallback callback = new SyncCloseCallback(result);
         asyncClose(callback, null);
         explicitLacFlushPolicy.stopExplicitLacFlush();
+        if (timeoutFuture != null) {
+            timeoutFuture.cancel(false);
+        }
         return result;
     }
 
@@ -1371,6 +1389,18 @@ public class LedgerHandle implements WriteHandle {
         asyncCloseInternal(NoopCloseCallback.instance, null, rc);
     }
 
+    private void monitorPendingAddOps() {
+        int timedOut = 0;
+        for (PendingAddOp op : pendingAddOps) {
+            if (op.maybeTimeout()) {
+                timedOut++;
+            }
+        }
+        if (timedOut > 0) {
+            LOG.info("Timed out {} add ops", timedOut);
+        }
+    }
+
     void errorOutPendingAdds(int rc) {
         errorOutPendingAdds(rc, drainPendingAddsToErrorOut());
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index d414711..e42bbf3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -25,8 +25,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
 
 import java.util.Map;
 import java.util.concurrent.RejectedExecutionException;
@@ -51,7 +49,7 @@ import org.slf4j.LoggerFactory;
  *
  *
  */
-class PendingAddOp extends SafeRunnable implements WriteCallback, TimerTask {
+class PendingAddOp extends SafeRunnable implements WriteCallback {
     private static final Logger LOG = 
LoggerFactory.getLogger(PendingAddOp.class);
 
     ByteBuf payload;
@@ -68,8 +66,7 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback, TimerTask {
     boolean isRecoveryAdd = false;
     long requestTimeNanos;
 
-    int timeoutSec;
-    Timeout timeout = null;
+    long timeoutNanos;
 
     OpStatsLogger addOpLogger;
     long currentLedgerLength;
@@ -91,14 +88,11 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback, TimerTask {
         op.completed = false;
         op.ackSet = lh.distributionSchedule.getAckSet();
         op.addOpLogger = lh.bk.getAddOpLogger();
-        if (op.timeout != null) {
-            op.timeout.cancel();
-        }
-        op.timeout = null;
-        op.timeoutSec = lh.bk.getConf().getAddEntryQuorumTimeout();
+        op.timeoutNanos = lh.bk.addEntryQuorumTimeoutNanos;
         op.pendingWriteRequests = 0;
         op.callbackTriggered = false;
         op.hasRun = false;
+        op.requestTimeNanos = Long.MAX_VALUE;
         return op;
     }
 
@@ -131,9 +125,12 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback, TimerTask {
         ++pendingWriteRequests;
     }
 
-    @Override
-    public void run(Timeout timeout) {
-        timeoutQuorumWait();
+    boolean maybeTimeout() {
+        if (MathUtils.elapsedNanos(requestTimeNanos) >= timeoutNanos) {
+            timeoutQuorumWait();
+            return true;
+        }
+        return false;
     }
 
     void timeoutQuorumWait() {
@@ -220,11 +217,6 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback, TimerTask {
             return;
         }
 
-        if (timeoutSec > -1) {
-            this.timeout = lh.bk.getBookieClient().scheduleTimeout(
-                    this, timeoutSec, TimeUnit.SECONDS);
-        }
-
         this.requestTimeNanos = MathUtils.nowInNano();
         checkNotNull(lh);
         checkNotNull(lh.macManager);
@@ -340,10 +332,6 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback, TimerTask {
     }
 
     void submitCallback(final int rc) {
-        if (null != timeout) {
-            timeout.cancel();
-        }
-
         if (LOG.isDebugEnabled()) {
             LOG.debug("Submit callback (lid:{}, eid: {}). rc:{}", lh.getId(), 
entryId, rc);
         }
@@ -429,10 +417,6 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback, TimerTask {
         pendingWriteRequests = 0;
         callbackTriggered = false;
         hasRun = false;
-        if (timeout != null) {
-            timeout.cancel();
-        }
-        timeout = null;
 
         recyclerHandle.recycle(this);
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 361a383..966fd42 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -88,6 +88,7 @@ public class ClientConfiguration extends 
AbstractConfiguration {
     protected static final String ADD_ENTRY_TIMEOUT_SEC = "addEntryTimeoutSec";
     protected static final String ADD_ENTRY_QUORUM_TIMEOUT_SEC = 
"addEntryQuorumTimeoutSec";
     protected static final String READ_ENTRY_TIMEOUT_SEC = 
"readEntryTimeoutSec";
+    protected static final String TIMEOUT_MONITOR_INTERVAL_SEC = 
"timeoutMonitorIntervalSec";
     protected static final String TIMEOUT_TASK_INTERVAL_MILLIS = 
"timeoutTaskIntervalMillis";
     protected static final String EXPLICIT_LAC_INTERVAL = 
"explicitLacInterval";
     protected static final String PCBC_TIMEOUT_TIMER_TICK_DURATION_MS = 
"pcbcTimeoutTimerTickDurationMs";
@@ -678,11 +679,41 @@ public class ClientConfiguration extends 
AbstractConfiguration {
     }
 
     /**
-     * Get the interval between successive executions of the 
PerChannelBookieClient's
-     * TimeoutTask. This value is in milliseconds. Every X milliseconds, the 
timeout task
-     * will be executed and it will error out entries that have timed out.
+     * Get the interval between successive executions of the operation timeout 
monitor. This value is in seconds.
+     *
+     * @see #setTimeoutMonitorIntervalSec(long)
+     * @return the interval at which request timeouts will be checked
+     */
+    public long getTimeoutMonitorIntervalSec() {
+        int minTimeout = Math.min(Math.min(getAddEntryQuorumTimeout(),
+                                           getAddEntryTimeout()), 
getReadEntryTimeout());
+        return getLong(TIMEOUT_MONITOR_INTERVAL_SEC, Math.max(minTimeout / 2, 
1));
+    }
+
+    /**
+     * Set the interval between successive executions of the operation timeout 
monitor. The value in seconds.
+     * Every X seconds, all outstanding add and read operations are checked to 
see if they have been running
+     * for longer than their configured timeout. Any that have been will be 
errored out.
+     *
+     * <p>This timeout should be set to a value which is a fraction of the 
values of
+     * {@link #getAddEntryQuorumTimeout}, {@link #getAddEntryTimeout} and 
{@link #getReadEntryTimeout},
+     * so that these timeouts run in a timely fashion.
+     *
+     * @param timeoutInterval The timeout monitor interval, in seconds
+     * @return client configuration
+     */
+    public ClientConfiguration setTimeoutMonitorIntervalSec(long 
timeoutInterval) {
+        setProperty(TIMEOUT_MONITOR_INTERVAL_SEC, 
Long.toString(timeoutInterval));
+        return this;
+    }
+
+    /**
+     * Get the interval between successive executions of the 
PerChannelBookieClient's TimeoutTask. This value is in
+     * milliseconds. Every X milliseconds, the timeout task will be executed 
and it will error out entries that have
+     * timed out.
      *
      * <p>We do it more aggressive to not accumulate pending requests due to 
slow responses.
+     *
      * @return the interval at which request timeouts will be checked
      */
     @Deprecated
@@ -729,6 +760,7 @@ public class ClientConfiguration extends 
AbstractConfiguration {
      *
      * @return tick duration in milliseconds
      */
+    @Deprecated
     public long getPCBCTimeoutTimerTickDurationMs() {
         return getLong(PCBC_TIMEOUT_TIMER_TICK_DURATION_MS, 100);
     }
@@ -745,6 +777,7 @@ public class ClientConfiguration extends 
AbstractConfiguration {
      *          tick duration in milliseconds.
      * @return client configuration.
      */
+    @Deprecated
     public ClientConfiguration setPCBCTimeoutTimerTickDurationMs(long 
tickDuration) {
         setProperty(PCBC_TIMEOUT_TIMER_TICK_DURATION_MS, tickDuration);
         return this;
@@ -759,6 +792,7 @@ public class ClientConfiguration extends 
AbstractConfiguration {
      *
      * @return number of ticks that used for timeout timer.
      */
+    @Deprecated
     public int getPCBCTimeoutTimerNumTicks() {
         return getInt(PCBC_TIMEOUT_TIMER_NUM_TICKS, 1024);
     }
@@ -775,6 +809,7 @@ public class ClientConfiguration extends 
AbstractConfiguration {
      *          number of ticks that used for timeout timer.
      * @return client configuration.
      */
+    @Deprecated
     public ClientConfiguration setPCBCTimeoutTimerNumTicks(int numTicks) {
         setProperty(PCBC_TIMEOUT_TIMER_NUM_TICKS, numTicks);
         return this;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index ceac697..9b1ca9c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -23,23 +23,23 @@ package org.apache.bookkeeper.proto;
 import static com.google.common.base.Charsets.UTF_8;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ExtensionRegistry;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.HashedWheelTimer;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
+import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -76,10 +76,12 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
     AtomicLong totalBytesOutstanding = new AtomicLong();
 
     OrderedSafeExecutor executor;
+    ScheduledExecutorService scheduler;
+    ScheduledFuture<?> timeoutFuture;
+
     EventLoopGroup eventLoopGroup;
     final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool> 
channels =
             new ConcurrentHashMap<BookieSocketAddress, 
PerChannelBookieClientPool>();
-    final HashedWheelTimer requestTimer;
 
     private final ClientAuthProvider.Factory authProviderFactory;
     private final ExtensionRegistry registry;
@@ -93,12 +95,8 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
     private final long bookieErrorThresholdPerInterval;
 
     public BookieClient(ClientConfiguration conf, EventLoopGroup 
eventLoopGroup,
-            OrderedSafeExecutor executor) throws IOException {
-        this(conf, eventLoopGroup, executor, NullStatsLogger.INSTANCE);
-    }
-
-    public BookieClient(ClientConfiguration conf, EventLoopGroup 
eventLoopGroup,
-                        OrderedSafeExecutor executor, StatsLogger statsLogger) 
throws IOException {
+                        OrderedSafeExecutor executor, ScheduledExecutorService 
scheduler,
+                        StatsLogger statsLogger) throws IOException {
         this.conf = conf;
         this.eventLoopGroup = eventLoopGroup;
         this.executor = executor;
@@ -110,11 +108,21 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
 
         this.statsLogger = statsLogger;
         this.numConnectionsPerBookie = conf.getNumChannelsPerBookie();
-        this.requestTimer = new HashedWheelTimer(
-                new 
ThreadFactoryBuilder().setNameFormat("BookieClientTimer-%d").build(),
-                conf.getPCBCTimeoutTimerTickDurationMs(), 
TimeUnit.MILLISECONDS,
-                conf.getPCBCTimeoutTimerNumTicks());
         this.bookieErrorThresholdPerInterval = 
conf.getBookieErrorThresholdPerInterval();
+
+        this.scheduler = scheduler;
+        if (conf.getAddEntryTimeout() > 0 || conf.getReadEntryTimeout() > 0) {
+            SafeRunnable monitor = new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        monitorPendingOperations();
+                    }
+                };
+            this.timeoutFuture = this.scheduler.scheduleAtFixedRate(monitor,
+                                                                    
conf.getTimeoutMonitorIntervalSec(),
+                                                                    
conf.getTimeoutMonitorIntervalSec(),
+                                                                    
TimeUnit.SECONDS);
+        }
     }
 
     private int getRc(int rc) {
@@ -145,8 +153,8 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
     @Override
     public PerChannelBookieClient create(BookieSocketAddress address, 
PerChannelBookieClientPool pcbcPool,
             SecurityHandlerFactory shFactory) throws SecurityException {
-        return new PerChannelBookieClient(conf, executor, eventLoopGroup, 
address, requestTimer, statsLogger,
-                authProviderFactory, registry, pcbcPool, shFactory);
+        return new PerChannelBookieClient(conf, executor, eventLoopGroup, 
address, statsLogger,
+                                          authProviderFactory, registry, 
pcbcPool, shFactory);
     }
 
     private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
@@ -521,12 +529,14 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
         }
     }
 
-    public boolean isClosed() {
-        return closed;
+    private void monitorPendingOperations() {
+        for (PerChannelBookieClientPool clientPool : channels.values()) {
+            clientPool.checkTimeoutOnPendingOperations();
+        }
     }
 
-    public Timeout scheduleTimeout(TimerTask task, long timeoutSec, TimeUnit 
timeUnit) {
-        return requestTimer.newTimeout(task, timeoutSec, timeUnit);
+    public boolean isClosed() {
+        return closed;
     }
 
     public void close() {
@@ -538,11 +548,13 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
             }
             channels.clear();
             authProviderFactory.close();
+
+            if (timeoutFuture != null) {
+                timeoutFuture.cancel(false);
+            }
         } finally {
             closeLock.writeLock().unlock();
         }
-        // Shut down the timeout executor.
-        this.requestTimer.stop();
     }
 
     private static class Counter {
@@ -599,7 +611,10 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
                 .name("BookieClientWorker")
                 .numThreads(1)
                 .build();
-        BookieClient bc = new BookieClient(new ClientConfiguration(), 
eventLoopGroup, executor);
+        ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("BookKeeperClientScheduler"));
+        BookieClient bc = new BookieClient(new ClientConfiguration(), 
eventLoopGroup, executor,
+                                           scheduler, 
NullStatsLogger.INSTANCE);
         BookieSocketAddress addr = new BookieSocketAddress(args[0], 
Integer.parseInt(args[1]));
 
         for (int i = 0; i < 100000; i++) {
@@ -608,6 +623,7 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
         }
         counter.wait(0);
         System.out.println("Total = " + counter.total());
+        scheduler.shutdown();
         eventLoopGroup.shutdownGracefully();
         executor.shutdown();
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
index 3ac5cb6..41233cf 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
@@ -95,6 +95,13 @@ class DefaultPerChannelBookieClientPool implements 
PerChannelBookieClientPool,
     }
 
     @Override
+    public void checkTimeoutOnPendingOperations() {
+        for (int i = 0; i < clients.length; i++) {
+            clients[i].checkTimeoutOnPendingOperations();
+        }
+    }
+
+    @Override
     public void recordError() {
         errorCounter.incrementAndGet();
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 97bdaea..b128bc8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -54,11 +54,8 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.codec.LengthFieldPrepender;
 import io.netty.handler.codec.TooLongFrameException;
 import io.netty.handler.ssl.SslHandler;
-import io.netty.util.HashedWheelTimer;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 
@@ -70,12 +67,14 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiPredicate;
 
 import javax.net.ssl.SSLPeerUnverifiedException;
 
@@ -147,9 +146,8 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
     final BookieSocketAddress addr;
     final EventLoopGroup eventLoopGroup;
     final OrderedSafeExecutor executor;
-    final HashedWheelTimer requestTimer;
-    final int addEntryTimeout;
-    final int readEntryTimeout;
+    final long addEntryTimeoutNanos;
+    final long readEntryTimeoutNanos;
     final int maxFrameSize;
     final int getBookieInfoTimeout;
     final int startTLSTimeout;
@@ -203,7 +201,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
 
     public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup 
eventLoopGroup,
                                   BookieSocketAddress addr) throws 
SecurityException {
-        this(new ClientConfiguration(), executor, eventLoopGroup, addr, null, 
NullStatsLogger.INSTANCE, null, null,
+        this(new ClientConfiguration(), executor, eventLoopGroup, addr, 
NullStatsLogger.INSTANCE, null, null,
                 null);
     }
 
@@ -211,24 +209,22 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                                   BookieSocketAddress addr,
                                   ClientAuthProvider.Factory 
authProviderFactory,
                                   ExtensionRegistry extRegistry) throws 
SecurityException {
-        this(new ClientConfiguration(), executor, eventLoopGroup, addr, null, 
NullStatsLogger.INSTANCE,
+        this(new ClientConfiguration(), executor, eventLoopGroup, addr, 
NullStatsLogger.INSTANCE,
                 authProviderFactory, extRegistry, null);
     }
 
     public PerChannelBookieClient(ClientConfiguration conf, 
OrderedSafeExecutor executor,
                                   EventLoopGroup eventLoopGroup, 
BookieSocketAddress addr,
-                                  HashedWheelTimer requestTimer, StatsLogger 
parentStatsLogger,
-                                  ClientAuthProvider.Factory 
authProviderFactory,
+                                  StatsLogger parentStatsLogger, 
ClientAuthProvider.Factory authProviderFactory,
                                   ExtensionRegistry extRegistry,
                                   PerChannelBookieClientPool pcbcPool) throws 
SecurityException {
-       this(conf, executor, eventLoopGroup, addr, null, 
NullStatsLogger.INSTANCE,
+       this(conf, executor, eventLoopGroup, addr, NullStatsLogger.INSTANCE,
                 authProviderFactory, extRegistry, pcbcPool, null);
     }
 
     public PerChannelBookieClient(ClientConfiguration conf, 
OrderedSafeExecutor executor,
                                   EventLoopGroup eventLoopGroup, 
BookieSocketAddress addr,
-                                  HashedWheelTimer requestTimer, StatsLogger 
parentStatsLogger,
-                                  ClientAuthProvider.Factory 
authProviderFactory,
+                                  StatsLogger parentStatsLogger, 
ClientAuthProvider.Factory authProviderFactory,
                                   ExtensionRegistry extRegistry,
                                   PerChannelBookieClientPool pcbcPool,
                                   SecurityHandlerFactory shFactory) throws 
SecurityException {
@@ -242,9 +238,8 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
             this.eventLoopGroup = eventLoopGroup;
         }
         this.state = ConnectionState.DISCONNECTED;
-        this.requestTimer = requestTimer;
-        this.addEntryTimeout = conf.getAddEntryTimeout();
-        this.readEntryTimeout = conf.getReadEntryTimeout();
+        this.addEntryTimeoutNanos = 
TimeUnit.SECONDS.toNanos(conf.getAddEntryTimeout());
+        this.readEntryTimeoutNanos = 
TimeUnit.SECONDS.toNanos(conf.getReadEntryTimeout());
         this.getBookieInfoTimeout = conf.getBookieInfoTimeout();
         this.startTLSTimeout = conf.getStartTLSTimeout();
         this.useV2WireProtocol = conf.getUseV2WireProtocol();
@@ -768,6 +763,30 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         writeAndFlush(channel, completionKey, getBookieInfoRequest);
     }
 
+    private static final BiPredicate<CompletionKey, CompletionValue> 
timeoutCheck = (key, value) -> {
+        return value.maybeTimeout();
+    };
+
+    public void checkTimeoutOnPendingOperations() {
+        int timedOutOperations = completionObjects.removeIf(timeoutCheck);
+
+        synchronized (this) {
+            Iterator<CompletionValue> iterator = 
completionObjectsV2Conflicts.values().iterator();
+            while (iterator.hasNext()) {
+                CompletionValue value = iterator.next();
+                if (value.maybeTimeout()) {
+                    ++timedOutOperations;
+                    iterator.remove();
+                }
+            }
+        }
+
+        if (timedOutOperations > 0) {
+            LOG.info("Timed-out {} operations to channel {} for {}",
+                     timedOutOperations, channel, addr);
+        }
+    }
+
     /**
      * Disconnects the bookie client. It can be reused.
      */
@@ -819,6 +838,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                 cf.awaitUninterruptibly();
             }
         }
+
     }
 
     private ChannelFuture closeChannel(Channel c) {
@@ -1235,14 +1255,12 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         protected long ledgerId;
         protected long entryId;
         protected long startTime;
-        protected Timeout timeout;
 
         public CompletionValue(String operationName,
                                Object ctx,
                                long ledgerId, long entryId,
                                OpStatsLogger opLogger,
-                               OpStatsLogger timeoutOpLogger,
-                               Timeout timeout) {
+                               OpStatsLogger timeoutOpLogger) {
             this.operationName = operationName;
             this.ctx = ctx;
             this.ledgerId = ledgerId;
@@ -1250,19 +1268,13 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
             this.startTime = MathUtils.nowInNano();
             this.opLogger = opLogger;
             this.timeoutOpLogger = timeoutOpLogger;
-            this.timeout = timeout;
         }
 
         private long latency() {
             return MathUtils.elapsedNanos(startTime);
         }
 
-        void cancelTimeoutAndLogOp(int rc) {
-            Timeout t = timeout;
-            if (null != t) {
-                t.cancel();
-            }
-
+        void logOpResult(int rc) {
             if (rc != BKException.Code.OK) {
                 opLogger.registerFailedEvent(latency(), TimeUnit.NANOSECONDS);
             } else {
@@ -1275,6 +1287,15 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
             }
         }
 
+        boolean maybeTimeout() {
+            if (MathUtils.elapsedNanos(startTime) >= readEntryTimeoutNanos) {
+                timeout();
+                return true;
+            } else {
+                return false;
+            }
+        }
+
         void timeout() {
             errorOut(BKException.Code.TimeoutException);
             timeoutOpLogger.registerSuccessfulEvent(latency(),
@@ -1344,14 +1365,13 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                                   final long ledgerId) {
             super("WriteLAC",
                   originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
-                  writeLacOpLogger, writeLacTimeoutOpLogger,
-                  scheduleTimeout(key, addEntryTimeout));
+                  writeLacOpLogger, writeLacTimeoutOpLogger);
             this.cb = new WriteLacCallback() {
                     @Override
                     public void writeLacComplete(int rc, long ledgerId,
                                                  BookieSocketAddress addr,
                                                  Object ctx) {
-                        cancelTimeoutAndLogOp(rc);
+                        logOpResult(rc);
                         originalCallback.writeLacComplete(rc, ledgerId,
                                                           addr, originalCtx);
                         key.release();
@@ -1392,15 +1412,14 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                                  ReadLacCallback originalCallback,
                                  final Object ctx, final long ledgerId) {
             super("ReadLAC", ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
-                  readLacOpLogger, readLacTimeoutOpLogger,
-                  scheduleTimeout(key, readEntryTimeout));
+                  readLacOpLogger, readLacTimeoutOpLogger);
             this.cb = new ReadLacCallback() {
                     @Override
                     public void readLacComplete(int rc, long ledgerId,
                                                 ByteBuf lacBuffer,
                                                 ByteBuf lastEntryBuffer,
                                                 Object ctx) {
-                        cancelTimeoutAndLogOp(rc);
+                        logOpResult(rc);
                         originalCallback.readLacComplete(
                                 rc, ledgerId, lacBuffer, lastEntryBuffer, ctx);
                         key.release();
@@ -1452,15 +1471,14 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                               final Object originalCtx,
                               long ledgerId, final long entryId) {
             super("Read", originalCtx, ledgerId, entryId,
-                  readEntryOpLogger, readTimeoutOpLogger,
-                  scheduleTimeout(key, readEntryTimeout));
+                  readEntryOpLogger, readTimeoutOpLogger);
 
             this.cb = new ReadEntryCallback() {
                     @Override
                     public void readEntryComplete(int rc, long ledgerId,
                                                   long entryId, ByteBuf buffer,
                                                   Object ctx) {
-                        cancelTimeoutAndLogOp(rc);
+                        logOpResult(rc);
                         originalCallback.readEntryComplete(rc,
                                                            ledgerId, entryId,
                                                            buffer, 
originalCtx);
@@ -1550,12 +1568,11 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
 
         public StartTLSCompletion(final CompletionKey key) {
             super("StartTLS", null, -1, -1,
-                  startTLSOpLogger, startTLSTimeoutOpLogger,
-                  scheduleTimeout(key, startTLSTimeout));
+                  startTLSOpLogger, startTLSTimeoutOpLogger);
             this.cb = new StartTLSCallback() {
                 @Override
                 public void startTLSComplete(int rc, Object ctx) {
-                    cancelTimeoutAndLogOp(rc);
+                    logOpResult(rc);
                     key.release();
                 }
             };
@@ -1602,13 +1619,12 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                                        final GetBookieInfoCallback 
origCallback,
                                        final Object origCtx) {
             super("GetBookieInfo", origCtx, 0L, 0L,
-                  getBookieInfoOpLogger, getBookieInfoTimeoutOpLogger,
-                  scheduleTimeout(key, getBookieInfoTimeout));
+                  getBookieInfoOpLogger, getBookieInfoTimeoutOpLogger);
             this.cb = new GetBookieInfoCallback() {
                 @Override
                 public void getBookieInfoComplete(int rc, BookieInfo bInfo,
                                                   Object ctx) {
-                    cancelTimeoutAndLogOp(rc);
+                    logOpResult(rc);
                     origCallback.getBookieInfoComplete(rc, bInfo, origCtx);
                     key.release();
                 }
@@ -1668,8 +1684,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         WriteCallback originalCallback = null;
 
         AddCompletion(Recycler.Handle<AddCompletion> handle) {
-            super("Add", null, -1, -1,
-                  addEntryOpLogger, addTimeoutOpLogger, null);
+            super("Add", null, -1, -1, addEntryOpLogger, addTimeoutOpLogger);
             this.handle = handle;
         }
 
@@ -1683,20 +1698,29 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
             this.ledgerId = ledgerId;
             this.entryId = entryId;
             this.startTime = MathUtils.nowInNano();
-            this.timeout = scheduleTimeout(key, addEntryTimeout);
         }
 
         @Override
         public void writeComplete(int rc, long ledgerId, long entryId,
                                   BookieSocketAddress addr,
                                   Object ctx) {
-            cancelTimeoutAndLogOp(rc);
+            logOpResult(rc);
             originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx);
             key.release();
             handle.recycle(this);
         }
 
         @Override
+        boolean maybeTimeout() {
+            if (MathUtils.elapsedNanos(startTime) >= addEntryTimeoutNanos) {
+                timeout();
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        @Override
         public void errorOut() {
             errorOut(BKException.Code.BookieHandleNotAvailableException);
         }
@@ -1739,14 +1763,6 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         return new V3CompletionKey(txnId, operationType);
     }
 
-    Timeout scheduleTimeout(CompletionKey key, long timeout) {
-        if (null != requestTimer) {
-            return requestTimer.newTimeout(key, timeout, TimeUnit.SECONDS);
-        } else {
-            return null;
-        }
-    }
-
     class V3CompletionKey extends CompletionKey {
 
         public V3CompletionKey(long txnId, OperationType operationType) {
@@ -1774,7 +1790,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
 
     }
 
-    abstract class CompletionKey implements TimerTask {
+    abstract class CompletionKey {
         final long txnId;
         OperationType operationType;
 
@@ -1784,17 +1800,6 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
             this.operationType = operationType;
         }
 
-        @Override
-        public void run(Timeout timeout) throws Exception {
-            if (timeout.isCancelled()) {
-                return;
-            }
-            CompletionValue completion = completionObjects.remove(this);
-            if (completion != null) {
-                completion.timeout();
-            }
-        }
-
         public void release() {}
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
index bd07a4e..80f00a5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
@@ -46,6 +46,12 @@ interface PerChannelBookieClientPool {
     void recordError();
 
     /**
+     * Check if any ops on any channel needs to be timed out.
+     * This is called on all channels, even if the channel is not yet 
connected.
+     */
+    void checkTimeoutOnPendingOperations();
+
+    /**
      * Disconnect the connections in the pool.
      *
      * @param wait
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
index d2bff33..bf6c230 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
@@ -25,8 +25,11 @@ import static org.junit.Assert.assertTrue;
 
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -36,6 +39,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.After;
@@ -53,6 +57,7 @@ public class TestGetBookieInfoTimeout extends 
BookKeeperClusterTestCase {
     DigestType digestType;
     public EventLoopGroup eventLoopGroup;
     public OrderedSafeExecutor executor;
+    private ScheduledExecutorService scheduler;
 
     public TestGetBookieInfoTimeout() {
         super(10);
@@ -68,10 +73,13 @@ public class TestGetBookieInfoTimeout extends 
BookKeeperClusterTestCase {
                 .name("BKClientOrderedSafeExecutor")
                 .numThreads(2)
                 .build();
+        scheduler = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("BookKeeperClientScheduler"));
     }
 
     @After
     public void tearDown() throws Exception {
+        scheduler.shutdown();
         eventLoopGroup.shutdownGracefully();
         executor.shutdown();
     }
@@ -99,7 +107,7 @@ public class TestGetBookieInfoTimeout extends 
BookKeeperClusterTestCase {
         // try to get bookie info from the sleeping bookie. It should fail 
with timeout error
         BookieSocketAddress addr = new 
BookieSocketAddress(bookieToSleep.getSocketAddress().getHostString(),
                 bookieToSleep.getPort());
-        BookieClient bc = new BookieClient(cConf, eventLoopGroup, executor);
+        BookieClient bc = new BookieClient(cConf, eventLoopGroup, executor, 
scheduler, NullStatsLogger.INSTANCE);
         long flags = 
BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
                 | 
BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 6bf05ae..b799635 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -27,12 +27,15 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException.Code;
@@ -48,6 +51,7 @@ import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCall
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.After;
@@ -64,6 +68,7 @@ public class BookieClientTest {
 
     public EventLoopGroup eventLoopGroup;
     public OrderedSafeExecutor executor;
+    private ScheduledExecutorService scheduler;
     ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
 
     @Before
@@ -83,10 +88,13 @@ public class BookieClientTest {
                 .name("BKClientOrderedSafeExecutor")
                 .numThreads(2)
                 .build();
+        scheduler = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("BookKeeperClientScheduler"));
     }
 
     @After
     public void tearDown() throws Exception {
+        scheduler.shutdown();
         bs.shutdown();
         recursiveDelete(tmpDir);
         eventLoopGroup.shutdownGracefully();
@@ -146,7 +154,8 @@ public class BookieClientTest {
         BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port);
         ResultStruct arc = new ResultStruct();
 
-        BookieClient bc = new BookieClient(new ClientConfiguration(), 
eventLoopGroup, executor);
+        BookieClient bc = new BookieClient(new ClientConfiguration(), 
eventLoopGroup, executor,
+                                           scheduler, 
NullStatsLogger.INSTANCE);
         ByteBuf bb = createByteBuffer(1, 1, 1);
         bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, 
BookieProtocol.FLAG_NONE);
         synchronized (arc) {
@@ -246,7 +255,8 @@ public class BookieClientTest {
     public void testNoLedger() throws Exception {
         ResultStruct arc = new ResultStruct();
         BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port);
-        BookieClient bc = new BookieClient(new ClientConfiguration(), 
eventLoopGroup, executor);
+        BookieClient bc = new BookieClient(new ClientConfiguration(), 
eventLoopGroup, executor,
+                                           scheduler, 
NullStatsLogger.INSTANCE);
         synchronized (arc) {
             bc.readEntry(addr, 2, 13, recb, arc);
             arc.wait(1000);
@@ -257,7 +267,8 @@ public class BookieClientTest {
     @Test
     public void testGetBookieInfo() throws IOException, InterruptedException {
         BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port);
-        BookieClient bc = new BookieClient(new ClientConfiguration(), new 
NioEventLoopGroup(), executor);
+        BookieClient bc = new BookieClient(new ClientConfiguration(), new 
NioEventLoopGroup(), executor,
+                                           scheduler, 
NullStatsLogger.INSTANCE);
         long flags = 
BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
                 | 
BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
deleted file mode 100644
index fc20476..0000000
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.bookkeeper.test;
-
-import io.netty.buffer.Unpooled;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookieClient;
-import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class tests BookieClient. It just sends the a new entry to itself.
- */
-class LoopbackClient implements WriteCallback {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(LoopbackClient.class);
-
-    BookieClient client;
-    static int recvTimeout = 2000;
-    long begin = 0;
-    int limit;
-    OrderedSafeExecutor executor;
-
-    static class Counter {
-        int c;
-        int limit;
-
-        Counter(int limit) {
-            this.c = 0;
-            this.limit = limit;
-        }
-
-        synchronized void increment() {
-            if (++c == limit) {
-                this.notify();
-            }
-        }
-    }
-
-    LoopbackClient(EventLoopGroup eventLoopGroup, OrderedSafeExecutor 
executor, long begin, int limit)
-            throws IOException {
-        this.client = new BookieClient(new ClientConfiguration(), 
eventLoopGroup, executor);
-        this.begin = begin;
-    }
-
-    void write(long ledgerId, long entry, byte[] data, BookieSocketAddress 
addr, WriteCallback cb, Object ctx)
-            throws IOException, InterruptedException {
-        LOG.info("Ledger id: " + ledgerId + ", Entry: " + entry);
-        byte[] passwd = new byte[20];
-        Arrays.fill(passwd, (byte) 'a');
-
-        client.addEntry(addr, ledgerId, passwd, entry, 
Unpooled.wrappedBuffer(data), cb, ctx, BookieProtocol.FLAG_NONE);
-    }
-
-    @Override
-    public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
-        Counter counter = (Counter) ctx;
-        counter.increment();
-    }
-
-    public static void main(String args[]) {
-        byte[] data = new byte[Integer.parseInt(args[0])];
-        Integer limit = Integer.parseInt(args[1]);
-        Counter c = new Counter(limit);
-        long ledgerId = Long.valueOf("0").longValue();
-        long begin = System.currentTimeMillis();
-
-        LoopbackClient lb;
-        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
-        OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
-                .name("BookieClientScheduler")
-                .numThreads(2)
-                .build();
-        try {
-            BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", 
Integer.valueOf(args[2]).intValue());
-            lb = new LoopbackClient(eventLoopGroup, executor, begin, 
limit.intValue());
-
-            for (int i = 0; i < limit; i++) {
-                lb.write(ledgerId, i, data, addr, lb, c);
-            }
-
-            synchronized (c) {
-                c.wait();
-                System.out.println("Time to write all entries: " + 
(System.currentTimeMillis() - begin));
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-    }
-
-}

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to