jiazhai closed pull request #817: Client times out requests in batch rather 
than individually
URL: https://github.com/apache/bookkeeper/pull/817
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
 
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index 9c52788eb..450609110 100644
--- 
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++ 
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -24,7 +24,10 @@
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -33,6 +36,7 @@
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -158,9 +162,11 @@ public static void main(String[] args)
                 .name("BenchBookieClientScheduler")
                 .numThreads(1)
                 .build();
+        ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("BookKeeperClientScheduler"));
 
         ClientConfiguration conf = new ClientConfiguration();
-        BookieClient bc = new BookieClient(conf, eventLoop, executor);
+        BookieClient bc = new BookieClient(conf, eventLoop, executor, 
scheduler, NullStatsLogger.INSTANCE);
         LatencyCallback lc = new LatencyCallback();
 
         ThroughputCallback tc = new ThroughputCallback();
@@ -220,6 +226,7 @@ public static void main(String[] args)
         LOG.info("Throughput: " + ((long) entryCount) * 1000 / (endTime - 
startTime));
 
         bc.close();
+        scheduler.shutdown();
         eventLoop.shutdownGracefully();
         executor.shutdown();
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 99e948467..d0f8a64fc 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -146,6 +146,7 @@
     final int explicitLacInterval;
     final boolean delayEnsembleChange;
     final boolean reorderReadSequence;
+    final long addEntryQuorumTimeoutNanos;
 
     final Optional<SpeculativeRequestExecutionPolicy> 
readSpeculativeRequestPolicy;
     final Optional<SpeculativeRequestExecutionPolicy> 
readLACSpeculativeRequestPolicy;
@@ -487,9 +488,9 @@ private BookKeeper(ClientConfiguration conf,
             this.readLACSpeculativeRequestPolicy = 
Optional.<SpeculativeRequestExecutionPolicy>absent();
         }
 
-
         // initialize bookie client
-        this.bookieClient = new BookieClient(conf, this.eventLoopGroup, 
this.mainWorkerPool, statsLogger);
+        this.bookieClient = new BookieClient(conf, this.eventLoopGroup, 
this.mainWorkerPool,
+                                             scheduler, statsLogger);
         this.bookieWatcher = new BookieWatcher(conf, this.placementPolicy, 
regClient);
         if (conf.getDiskWeightBasedPlacementEnabled()) {
             LOG.info("Weighted ledger placement enabled");
@@ -520,6 +521,7 @@ private BookKeeper(ClientConfiguration conf,
             LOG.debug("Explicit LAC Interval : {}", this.explicitLacInterval);
         }
 
+        this.addEntryQuorumTimeoutNanos = 
TimeUnit.SECONDS.toNanos(conf.getAddEntryQuorumTimeout());
         scheduleBookieHealthCheckIfEnabled();
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 78f3e756a..5c15376f8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -45,6 +45,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -103,6 +104,7 @@
     final LoadingCache<BookieSocketAddress, Long> bookieFailureHistory;
     final boolean enableParallelRecoveryRead;
     final int recoveryReadBatchSize;
+    ScheduledFuture<?> timeoutFuture = null;
 
     /**
      * Invalid entry id. This value is returned from methods which
@@ -182,6 +184,19 @@ public Integer getSample() {
                                               }
                                           });
         initializeExplicitLacFlushPolicy();
+
+        if (bk.getConf().getAddEntryQuorumTimeout() > 0) {
+            SafeRunnable monitor = new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        monitorPendingAddOps();
+                    }
+                };
+            this.timeoutFuture = bk.scheduler.scheduleAtFixedRate(monitor,
+                                                                  
bk.getConf().getTimeoutMonitorIntervalSec(),
+                                                                  
bk.getConf().getTimeoutMonitorIntervalSec(),
+                                                                  
TimeUnit.SECONDS);
+        }
     }
 
     protected void initializeExplicitLacFlushPolicy() {
@@ -335,6 +350,9 @@ public void close()
         SyncCloseCallback callback = new SyncCloseCallback(result);
         asyncClose(callback, null);
         explicitLacFlushPolicy.stopExplicitLacFlush();
+        if (timeoutFuture != null) {
+            timeoutFuture.cancel(false);
+        }
         return result;
     }
 
@@ -1371,6 +1389,18 @@ void handleUnrecoverableErrorDuringAdd(int rc) {
         asyncCloseInternal(NoopCloseCallback.instance, null, rc);
     }
 
+    private void monitorPendingAddOps() {
+        int timedOut = 0;
+        for (PendingAddOp op : pendingAddOps) {
+            if (op.maybeTimeout()) {
+                timedOut++;
+            }
+        }
+        if (timedOut > 0) {
+            LOG.info("Timed out {} add ops", timedOut);
+        }
+    }
+
     void errorOutPendingAdds(int rc) {
         errorOutPendingAdds(rc, drainPendingAddsToErrorOut());
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index d41471150..e42bbf3de 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -25,8 +25,6 @@
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
 
 import java.util.Map;
 import java.util.concurrent.RejectedExecutionException;
@@ -51,7 +49,7 @@
  *
  *
  */
-class PendingAddOp extends SafeRunnable implements WriteCallback, TimerTask {
+class PendingAddOp extends SafeRunnable implements WriteCallback {
     private static final Logger LOG = 
LoggerFactory.getLogger(PendingAddOp.class);
 
     ByteBuf payload;
@@ -68,8 +66,7 @@
     boolean isRecoveryAdd = false;
     long requestTimeNanos;
 
-    int timeoutSec;
-    Timeout timeout = null;
+    long timeoutNanos;
 
     OpStatsLogger addOpLogger;
     long currentLedgerLength;
@@ -91,14 +88,11 @@ static PendingAddOp create(LedgerHandle lh, ByteBuf 
payload, AddCallback cb, Obj
         op.completed = false;
         op.ackSet = lh.distributionSchedule.getAckSet();
         op.addOpLogger = lh.bk.getAddOpLogger();
-        if (op.timeout != null) {
-            op.timeout.cancel();
-        }
-        op.timeout = null;
-        op.timeoutSec = lh.bk.getConf().getAddEntryQuorumTimeout();
+        op.timeoutNanos = lh.bk.addEntryQuorumTimeoutNanos;
         op.pendingWriteRequests = 0;
         op.callbackTriggered = false;
         op.hasRun = false;
+        op.requestTimeNanos = Long.MAX_VALUE;
         return op;
     }
 
@@ -131,9 +125,12 @@ void sendWriteRequest(int bookieIndex) {
         ++pendingWriteRequests;
     }
 
-    @Override
-    public void run(Timeout timeout) {
-        timeoutQuorumWait();
+    boolean maybeTimeout() {
+        if (MathUtils.elapsedNanos(requestTimeNanos) >= timeoutNanos) {
+            timeoutQuorumWait();
+            return true;
+        }
+        return false;
     }
 
     void timeoutQuorumWait() {
@@ -220,11 +217,6 @@ public void safeRun() {
             return;
         }
 
-        if (timeoutSec > -1) {
-            this.timeout = lh.bk.getBookieClient().scheduleTimeout(
-                    this, timeoutSec, TimeUnit.SECONDS);
-        }
-
         this.requestTimeNanos = MathUtils.nowInNano();
         checkNotNull(lh);
         checkNotNull(lh.macManager);
@@ -340,10 +332,6 @@ void sendAddSuccessCallbacks() {
     }
 
     void submitCallback(final int rc) {
-        if (null != timeout) {
-            timeout.cancel();
-        }
-
         if (LOG.isDebugEnabled()) {
             LOG.debug("Submit callback (lid:{}, eid: {}). rc:{}", lh.getId(), 
entryId, rc);
         }
@@ -429,10 +417,6 @@ private void recycle() {
         pendingWriteRequests = 0;
         callbackTriggered = false;
         hasRun = false;
-        if (timeout != null) {
-            timeout.cancel();
-        }
-        timeout = null;
 
         recyclerHandle.recycle(this);
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 361a383e5..966fd428a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -88,6 +88,7 @@
     protected static final String ADD_ENTRY_TIMEOUT_SEC = "addEntryTimeoutSec";
     protected static final String ADD_ENTRY_QUORUM_TIMEOUT_SEC = 
"addEntryQuorumTimeoutSec";
     protected static final String READ_ENTRY_TIMEOUT_SEC = 
"readEntryTimeoutSec";
+    protected static final String TIMEOUT_MONITOR_INTERVAL_SEC = 
"timeoutMonitorIntervalSec";
     protected static final String TIMEOUT_TASK_INTERVAL_MILLIS = 
"timeoutTaskIntervalMillis";
     protected static final String EXPLICIT_LAC_INTERVAL = 
"explicitLacInterval";
     protected static final String PCBC_TIMEOUT_TIMER_TICK_DURATION_MS = 
"pcbcTimeoutTimerTickDurationMs";
@@ -678,11 +679,41 @@ public ClientConfiguration setReadEntryTimeout(int 
timeout) {
     }
 
     /**
-     * Get the interval between successive executions of the 
PerChannelBookieClient's
-     * TimeoutTask. This value is in milliseconds. Every X milliseconds, the 
timeout task
-     * will be executed and it will error out entries that have timed out.
+     * Get the interval between successive executions of the operation timeout 
monitor. This value is in seconds.
+     *
+     * @see #setTimeoutMonitorIntervalSec(long)
+     * @return the interval at which request timeouts will be checked
+     */
+    public long getTimeoutMonitorIntervalSec() {
+        int minTimeout = Math.min(Math.min(getAddEntryQuorumTimeout(),
+                                           getAddEntryTimeout()), 
getReadEntryTimeout());
+        return getLong(TIMEOUT_MONITOR_INTERVAL_SEC, Math.max(minTimeout / 2, 
1));
+    }
+
+    /**
+     * Set the interval between successive executions of the operation timeout 
monitor. The value in seconds.
+     * Every X seconds, all outstanding add and read operations are checked to 
see if they have been running
+     * for longer than their configured timeout. Any that have been will be 
errored out.
+     *
+     * <p>This timeout should be set to a value which is a fraction of the 
values of
+     * {@link #getAddEntryQuorumTimeout}, {@link #getAddEntryTimeout} and 
{@link #getReadEntryTimeout},
+     * so that these timeouts run in a timely fashion.
+     *
+     * @param timeoutInterval The timeout monitor interval, in seconds
+     * @return client configuration
+     */
+    public ClientConfiguration setTimeoutMonitorIntervalSec(long 
timeoutInterval) {
+        setProperty(TIMEOUT_MONITOR_INTERVAL_SEC, 
Long.toString(timeoutInterval));
+        return this;
+    }
+
+    /**
+     * Get the interval between successive executions of the 
PerChannelBookieClient's TimeoutTask. This value is in
+     * milliseconds. Every X milliseconds, the timeout task will be executed 
and it will error out entries that have
+     * timed out.
      *
      * <p>We do it more aggressive to not accumulate pending requests due to 
slow responses.
+     *
      * @return the interval at which request timeouts will be checked
      */
     @Deprecated
@@ -729,6 +760,7 @@ public ClientConfiguration setExplictLacInterval(int 
interval) {
      *
      * @return tick duration in milliseconds
      */
+    @Deprecated
     public long getPCBCTimeoutTimerTickDurationMs() {
         return getLong(PCBC_TIMEOUT_TIMER_TICK_DURATION_MS, 100);
     }
@@ -745,6 +777,7 @@ public long getPCBCTimeoutTimerTickDurationMs() {
      *          tick duration in milliseconds.
      * @return client configuration.
      */
+    @Deprecated
     public ClientConfiguration setPCBCTimeoutTimerTickDurationMs(long 
tickDuration) {
         setProperty(PCBC_TIMEOUT_TIMER_TICK_DURATION_MS, tickDuration);
         return this;
@@ -759,6 +792,7 @@ public ClientConfiguration 
setPCBCTimeoutTimerTickDurationMs(long tickDuration)
      *
      * @return number of ticks that used for timeout timer.
      */
+    @Deprecated
     public int getPCBCTimeoutTimerNumTicks() {
         return getInt(PCBC_TIMEOUT_TIMER_NUM_TICKS, 1024);
     }
@@ -775,6 +809,7 @@ public int getPCBCTimeoutTimerNumTicks() {
      *          number of ticks that used for timeout timer.
      * @return client configuration.
      */
+    @Deprecated
     public ClientConfiguration setPCBCTimeoutTimerNumTicks(int numTicks) {
         setProperty(PCBC_TIMEOUT_TIMER_NUM_TICKS, numTicks);
         return this;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index ceac6978a..9b1ca9cc9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -23,23 +23,23 @@
 import static com.google.common.base.Charsets.UTF_8;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ExtensionRegistry;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.HashedWheelTimer;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
+import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -76,10 +76,12 @@
     AtomicLong totalBytesOutstanding = new AtomicLong();
 
     OrderedSafeExecutor executor;
+    ScheduledExecutorService scheduler;
+    ScheduledFuture<?> timeoutFuture;
+
     EventLoopGroup eventLoopGroup;
     final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool> 
channels =
             new ConcurrentHashMap<BookieSocketAddress, 
PerChannelBookieClientPool>();
-    final HashedWheelTimer requestTimer;
 
     private final ClientAuthProvider.Factory authProviderFactory;
     private final ExtensionRegistry registry;
@@ -93,12 +95,8 @@
     private final long bookieErrorThresholdPerInterval;
 
     public BookieClient(ClientConfiguration conf, EventLoopGroup 
eventLoopGroup,
-            OrderedSafeExecutor executor) throws IOException {
-        this(conf, eventLoopGroup, executor, NullStatsLogger.INSTANCE);
-    }
-
-    public BookieClient(ClientConfiguration conf, EventLoopGroup 
eventLoopGroup,
-                        OrderedSafeExecutor executor, StatsLogger statsLogger) 
throws IOException {
+                        OrderedSafeExecutor executor, ScheduledExecutorService 
scheduler,
+                        StatsLogger statsLogger) throws IOException {
         this.conf = conf;
         this.eventLoopGroup = eventLoopGroup;
         this.executor = executor;
@@ -110,11 +108,21 @@ public BookieClient(ClientConfiguration conf, 
EventLoopGroup eventLoopGroup,
 
         this.statsLogger = statsLogger;
         this.numConnectionsPerBookie = conf.getNumChannelsPerBookie();
-        this.requestTimer = new HashedWheelTimer(
-                new 
ThreadFactoryBuilder().setNameFormat("BookieClientTimer-%d").build(),
-                conf.getPCBCTimeoutTimerTickDurationMs(), 
TimeUnit.MILLISECONDS,
-                conf.getPCBCTimeoutTimerNumTicks());
         this.bookieErrorThresholdPerInterval = 
conf.getBookieErrorThresholdPerInterval();
+
+        this.scheduler = scheduler;
+        if (conf.getAddEntryTimeout() > 0 || conf.getReadEntryTimeout() > 0) {
+            SafeRunnable monitor = new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        monitorPendingOperations();
+                    }
+                };
+            this.timeoutFuture = this.scheduler.scheduleAtFixedRate(monitor,
+                                                                    
conf.getTimeoutMonitorIntervalSec(),
+                                                                    
conf.getTimeoutMonitorIntervalSec(),
+                                                                    
TimeUnit.SECONDS);
+        }
     }
 
     private int getRc(int rc) {
@@ -145,8 +153,8 @@ private int getRc(int rc) {
     @Override
     public PerChannelBookieClient create(BookieSocketAddress address, 
PerChannelBookieClientPool pcbcPool,
             SecurityHandlerFactory shFactory) throws SecurityException {
-        return new PerChannelBookieClient(conf, executor, eventLoopGroup, 
address, requestTimer, statsLogger,
-                authProviderFactory, registry, pcbcPool, shFactory);
+        return new PerChannelBookieClient(conf, executor, eventLoopGroup, 
address, statsLogger,
+                                          authProviderFactory, registry, 
pcbcPool, shFactory);
     }
 
     private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
@@ -521,12 +529,14 @@ public void safeRun() {
         }
     }
 
-    public boolean isClosed() {
-        return closed;
+    private void monitorPendingOperations() {
+        for (PerChannelBookieClientPool clientPool : channels.values()) {
+            clientPool.checkTimeoutOnPendingOperations();
+        }
     }
 
-    public Timeout scheduleTimeout(TimerTask task, long timeoutSec, TimeUnit 
timeUnit) {
-        return requestTimer.newTimeout(task, timeoutSec, timeUnit);
+    public boolean isClosed() {
+        return closed;
     }
 
     public void close() {
@@ -538,11 +548,13 @@ public void close() {
             }
             channels.clear();
             authProviderFactory.close();
+
+            if (timeoutFuture != null) {
+                timeoutFuture.cancel(false);
+            }
         } finally {
             closeLock.writeLock().unlock();
         }
-        // Shut down the timeout executor.
-        this.requestTimer.stop();
     }
 
     private static class Counter {
@@ -599,7 +611,10 @@ public void writeComplete(int rc, long ledger, long entry, 
BookieSocketAddress a
                 .name("BookieClientWorker")
                 .numThreads(1)
                 .build();
-        BookieClient bc = new BookieClient(new ClientConfiguration(), 
eventLoopGroup, executor);
+        ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("BookKeeperClientScheduler"));
+        BookieClient bc = new BookieClient(new ClientConfiguration(), 
eventLoopGroup, executor,
+                                           scheduler, 
NullStatsLogger.INSTANCE);
         BookieSocketAddress addr = new BookieSocketAddress(args[0], 
Integer.parseInt(args[1]));
 
         for (int i = 0; i < 100000; i++) {
@@ -608,6 +623,7 @@ public void writeComplete(int rc, long ledger, long entry, 
BookieSocketAddress a
         }
         counter.wait(0);
         System.out.println("Total = " + counter.total());
+        scheduler.shutdown();
         eventLoopGroup.shutdownGracefully();
         executor.shutdown();
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
index 3ac5cb6d9..41233cf83 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
@@ -94,6 +94,13 @@ public void obtain(GenericCallback<PerChannelBookieClient> 
callback, long key) {
         clients[idx].connectIfNeededAndDoOp(callback);
     }
 
+    @Override
+    public void checkTimeoutOnPendingOperations() {
+        for (int i = 0; i < clients.length; i++) {
+            clients[i].checkTimeoutOnPendingOperations();
+        }
+    }
+
     @Override
     public void recordError() {
         errorCounter.incrementAndGet();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index c2c8af1ee..c982b6712 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -54,11 +54,8 @@
 import io.netty.handler.codec.LengthFieldPrepender;
 import io.netty.handler.codec.TooLongFrameException;
 import io.netty.handler.ssl.SslHandler;
-import io.netty.util.HashedWheelTimer;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 
@@ -70,12 +67,14 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiPredicate;
 
 import javax.net.ssl.SSLPeerUnverifiedException;
 
@@ -147,9 +146,8 @@
     final BookieSocketAddress addr;
     final EventLoopGroup eventLoopGroup;
     final OrderedSafeExecutor executor;
-    final HashedWheelTimer requestTimer;
-    final int addEntryTimeout;
-    final int readEntryTimeout;
+    final long addEntryTimeoutNanos;
+    final long readEntryTimeoutNanos;
     final int maxFrameSize;
     final int getBookieInfoTimeout;
     final int startTLSTimeout;
@@ -203,7 +201,7 @@
 
     public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup 
eventLoopGroup,
                                   BookieSocketAddress addr) throws 
SecurityException {
-        this(new ClientConfiguration(), executor, eventLoopGroup, addr, null, 
NullStatsLogger.INSTANCE, null, null,
+        this(new ClientConfiguration(), executor, eventLoopGroup, addr, 
NullStatsLogger.INSTANCE, null, null,
                 null);
     }
 
@@ -211,24 +209,22 @@ public PerChannelBookieClient(OrderedSafeExecutor 
executor, EventLoopGroup event
                                   BookieSocketAddress addr,
                                   ClientAuthProvider.Factory 
authProviderFactory,
                                   ExtensionRegistry extRegistry) throws 
SecurityException {
-        this(new ClientConfiguration(), executor, eventLoopGroup, addr, null, 
NullStatsLogger.INSTANCE,
+        this(new ClientConfiguration(), executor, eventLoopGroup, addr, 
NullStatsLogger.INSTANCE,
                 authProviderFactory, extRegistry, null);
     }
 
     public PerChannelBookieClient(ClientConfiguration conf, 
OrderedSafeExecutor executor,
                                   EventLoopGroup eventLoopGroup, 
BookieSocketAddress addr,
-                                  HashedWheelTimer requestTimer, StatsLogger 
parentStatsLogger,
-                                  ClientAuthProvider.Factory 
authProviderFactory,
+                                  StatsLogger parentStatsLogger, 
ClientAuthProvider.Factory authProviderFactory,
                                   ExtensionRegistry extRegistry,
                                   PerChannelBookieClientPool pcbcPool) throws 
SecurityException {
-       this(conf, executor, eventLoopGroup, addr, null, 
NullStatsLogger.INSTANCE,
+       this(conf, executor, eventLoopGroup, addr, NullStatsLogger.INSTANCE,
                 authProviderFactory, extRegistry, pcbcPool, null);
     }
 
     public PerChannelBookieClient(ClientConfiguration conf, 
OrderedSafeExecutor executor,
                                   EventLoopGroup eventLoopGroup, 
BookieSocketAddress addr,
-                                  HashedWheelTimer requestTimer, StatsLogger 
parentStatsLogger,
-                                  ClientAuthProvider.Factory 
authProviderFactory,
+                                  StatsLogger parentStatsLogger, 
ClientAuthProvider.Factory authProviderFactory,
                                   ExtensionRegistry extRegistry,
                                   PerChannelBookieClientPool pcbcPool,
                                   SecurityHandlerFactory shFactory) throws 
SecurityException {
@@ -242,9 +238,8 @@ public PerChannelBookieClient(ClientConfiguration conf, 
OrderedSafeExecutor exec
             this.eventLoopGroup = eventLoopGroup;
         }
         this.state = ConnectionState.DISCONNECTED;
-        this.requestTimer = requestTimer;
-        this.addEntryTimeout = conf.getAddEntryTimeout();
-        this.readEntryTimeout = conf.getReadEntryTimeout();
+        this.addEntryTimeoutNanos = 
TimeUnit.SECONDS.toNanos(conf.getAddEntryTimeout());
+        this.readEntryTimeoutNanos = 
TimeUnit.SECONDS.toNanos(conf.getReadEntryTimeout());
         this.getBookieInfoTimeout = conf.getBookieInfoTimeout();
         this.startTLSTimeout = conf.getStartTLSTimeout();
         this.useV2WireProtocol = conf.getUseV2WireProtocol();
@@ -768,6 +763,30 @@ public void getBookieInfo(final long requested, 
GetBookieInfoCallback cb, Object
         writeAndFlush(channel, completionKey, getBookieInfoRequest);
     }
 
+    private static final BiPredicate<CompletionKey, CompletionValue> 
timeoutCheck = (key, value) -> {
+        return value.maybeTimeout();
+    };
+
+    public void checkTimeoutOnPendingOperations() {
+        int timedOutOperations = completionObjects.removeIf(timeoutCheck);
+
+        synchronized (this) {
+            Iterator<CompletionValue> iterator = 
completionObjectsV2Conflicts.values().iterator();
+            while (iterator.hasNext()) {
+                CompletionValue value = iterator.next();
+                if (value.maybeTimeout()) {
+                    ++timedOutOperations;
+                    iterator.remove();
+                }
+            }
+        }
+
+        if (timedOutOperations > 0) {
+            LOG.info("Timed-out {} operations to channel {} for {}",
+                     timedOutOperations, channel, addr);
+        }
+    }
+
     /**
      * Disconnects the bookie client. It can be reused.
      */
@@ -819,6 +838,7 @@ private void closeInternal(boolean permanent, boolean wait) 
{
                 cf.awaitUninterruptibly();
             }
         }
+
     }
 
     private ChannelFuture closeChannel(Channel c) {
@@ -1235,14 +1255,12 @@ public void operationComplete(Future<Channel> future) 
throws Exception {
         protected long ledgerId;
         protected long entryId;
         protected long startTime;
-        protected Timeout timeout;
 
         public CompletionValue(String operationName,
                                Object ctx,
                                long ledgerId, long entryId,
                                OpStatsLogger opLogger,
-                               OpStatsLogger timeoutOpLogger,
-                               Timeout timeout) {
+                               OpStatsLogger timeoutOpLogger) {
             this.operationName = operationName;
             this.ctx = ctx;
             this.ledgerId = ledgerId;
@@ -1250,19 +1268,13 @@ public CompletionValue(String operationName,
             this.startTime = MathUtils.nowInNano();
             this.opLogger = opLogger;
             this.timeoutOpLogger = timeoutOpLogger;
-            this.timeout = timeout;
         }
 
         private long latency() {
             return MathUtils.elapsedNanos(startTime);
         }
 
-        void cancelTimeoutAndLogOp(int rc) {
-            Timeout t = timeout;
-            if (null != t) {
-                t.cancel();
-            }
-
+        void logOpResult(int rc) {
             if (rc != BKException.Code.OK) {
                 opLogger.registerFailedEvent(latency(), TimeUnit.NANOSECONDS);
             } else {
@@ -1275,6 +1287,15 @@ void cancelTimeoutAndLogOp(int rc) {
             }
         }
 
+        boolean maybeTimeout() {
+            if (MathUtils.elapsedNanos(startTime) >= readEntryTimeoutNanos) {
+                timeout();
+                return true;
+            } else {
+                return false;
+            }
+        }
+
         void timeout() {
             errorOut(BKException.Code.TimeoutException);
             timeoutOpLogger.registerSuccessfulEvent(latency(),
@@ -1344,14 +1365,13 @@ public WriteLacCompletion(final CompletionKey key,
                                   final long ledgerId) {
             super("WriteLAC",
                   originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
-                  writeLacOpLogger, writeLacTimeoutOpLogger,
-                  scheduleTimeout(key, addEntryTimeout));
+                  writeLacOpLogger, writeLacTimeoutOpLogger);
             this.cb = new WriteLacCallback() {
                     @Override
                     public void writeLacComplete(int rc, long ledgerId,
                                                  BookieSocketAddress addr,
                                                  Object ctx) {
-                        cancelTimeoutAndLogOp(rc);
+                        logOpResult(rc);
                         originalCallback.writeLacComplete(rc, ledgerId,
                                                           addr, originalCtx);
                         key.release();
@@ -1392,15 +1412,14 @@ public ReadLacCompletion(final CompletionKey key,
                                  ReadLacCallback originalCallback,
                                  final Object ctx, final long ledgerId) {
             super("ReadLAC", ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
-                  readLacOpLogger, readLacTimeoutOpLogger,
-                  scheduleTimeout(key, readEntryTimeout));
+                  readLacOpLogger, readLacTimeoutOpLogger);
             this.cb = new ReadLacCallback() {
                     @Override
                     public void readLacComplete(int rc, long ledgerId,
                                                 ByteBuf lacBuffer,
                                                 ByteBuf lastEntryBuffer,
                                                 Object ctx) {
-                        cancelTimeoutAndLogOp(rc);
+                        logOpResult(rc);
                         originalCallback.readLacComplete(
                                 rc, ledgerId, lacBuffer, lastEntryBuffer, ctx);
                         key.release();
@@ -1452,15 +1471,14 @@ public ReadCompletion(final CompletionKey key,
                               final Object originalCtx,
                               long ledgerId, final long entryId) {
             super("Read", originalCtx, ledgerId, entryId,
-                  readEntryOpLogger, readTimeoutOpLogger,
-                  scheduleTimeout(key, readEntryTimeout));
+                  readEntryOpLogger, readTimeoutOpLogger);
 
             this.cb = new ReadEntryCallback() {
                     @Override
                     public void readEntryComplete(int rc, long ledgerId,
                                                   long entryId, ByteBuf buffer,
                                                   Object ctx) {
-                        cancelTimeoutAndLogOp(rc);
+                        logOpResult(rc);
                         originalCallback.readEntryComplete(rc,
                                                            ledgerId, entryId,
                                                            buffer, 
originalCtx);
@@ -1550,12 +1568,11 @@ private void handleReadResponse(long ledgerId,
 
         public StartTLSCompletion(final CompletionKey key) {
             super("StartTLS", null, -1, -1,
-                  startTLSOpLogger, startTLSTimeoutOpLogger,
-                  scheduleTimeout(key, startTLSTimeout));
+                  startTLSOpLogger, startTLSTimeoutOpLogger);
             this.cb = new StartTLSCallback() {
                 @Override
                 public void startTLSComplete(int rc, Object ctx) {
-                    cancelTimeoutAndLogOp(rc);
+                    logOpResult(rc);
                     key.release();
                 }
             };
@@ -1602,13 +1619,12 @@ public GetBookieInfoCompletion(final CompletionKey key,
                                        final GetBookieInfoCallback 
origCallback,
                                        final Object origCtx) {
             super("GetBookieInfo", origCtx, 0L, 0L,
-                  getBookieInfoOpLogger, getBookieInfoTimeoutOpLogger,
-                  scheduleTimeout(key, getBookieInfoTimeout));
+                  getBookieInfoOpLogger, getBookieInfoTimeoutOpLogger);
             this.cb = new GetBookieInfoCallback() {
                 @Override
                 public void getBookieInfoComplete(int rc, BookieInfo bInfo,
                                                   Object ctx) {
-                    cancelTimeoutAndLogOp(rc);
+                    logOpResult(rc);
                     origCallback.getBookieInfoComplete(rc, bInfo, origCtx);
                     key.release();
                 }
@@ -1668,8 +1684,7 @@ AddCompletion acquireAddCompletion(final CompletionKey 
key,
         WriteCallback originalCallback = null;
 
         AddCompletion(Recycler.Handle<AddCompletion> handle) {
-            super("Add", null, -1, -1,
-                  addEntryOpLogger, addTimeoutOpLogger, null);
+            super("Add", null, -1, -1, addEntryOpLogger, addTimeoutOpLogger);
             this.handle = handle;
         }
 
@@ -1683,19 +1698,28 @@ void reset(final CompletionKey key,
             this.ledgerId = ledgerId;
             this.entryId = entryId;
             this.startTime = MathUtils.nowInNano();
-            this.timeout = scheduleTimeout(key, addEntryTimeout);
         }
 
         @Override
         public void writeComplete(int rc, long ledgerId, long entryId,
                                   BookieSocketAddress addr,
                                   Object ctx) {
-            cancelTimeoutAndLogOp(rc);
+            logOpResult(rc);
             originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx);
             key.release();
             handle.recycle(this);
         }
 
+        @Override
+        boolean maybeTimeout() {
+            if (MathUtils.elapsedNanos(startTime) >= addEntryTimeoutNanos) {
+                timeout();
+                return true;
+            } else {
+                return false;
+            }
+        }
+
         @Override
         public void errorOut() {
             errorOut(BKException.Code.BookieHandleNotAvailableException);
@@ -1739,14 +1763,6 @@ CompletionKey newCompletionKey(long txnId, OperationType 
operationType) {
         return new V3CompletionKey(txnId, operationType);
     }
 
-    Timeout scheduleTimeout(CompletionKey key, long timeout) {
-        if (null != requestTimer) {
-            return requestTimer.newTimeout(key, timeout, TimeUnit.SECONDS);
-        } else {
-            return null;
-        }
-    }
-
     class V3CompletionKey extends CompletionKey {
 
         public V3CompletionKey(long txnId, OperationType operationType) {
@@ -1774,7 +1790,7 @@ public String toString() {
 
     }
 
-    abstract class CompletionKey implements TimerTask {
+    abstract class CompletionKey {
         final long txnId;
         OperationType operationType;
 
@@ -1784,17 +1800,6 @@ public String toString() {
             this.operationType = operationType;
         }
 
-        @Override
-        public void run(Timeout timeout) throws Exception {
-            if (timeout.isCancelled()) {
-                return;
-            }
-            CompletionValue completion = completionObjects.remove(this);
-            if (completion != null) {
-                completion.timeout();
-            }
-        }
-
         public void release() {}
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
index bd07a4e1b..80f00a595 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
@@ -45,6 +45,12 @@
      */
     void recordError();
 
+    /**
+     * Check if any ops on any channel needs to be timed out.
+     * This is called on all channels, even if the channel is not yet 
connected.
+     */
+    void checkTimeoutOnPendingOperations();
+
     /**
      * Disconnect the connections in the pool.
      *
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
index d2bff33ec..bf6c2305f 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
@@ -25,8 +25,11 @@
 
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -36,6 +39,7 @@
 import org.apache.bookkeeper.proto.BookieClient;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.After;
@@ -53,6 +57,7 @@
     DigestType digestType;
     public EventLoopGroup eventLoopGroup;
     public OrderedSafeExecutor executor;
+    private ScheduledExecutorService scheduler;
 
     public TestGetBookieInfoTimeout() {
         super(10);
@@ -68,10 +73,13 @@ public void setUp() throws Exception {
                 .name("BKClientOrderedSafeExecutor")
                 .numThreads(2)
                 .build();
+        scheduler = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("BookKeeperClientScheduler"));
     }
 
     @After
     public void tearDown() throws Exception {
+        scheduler.shutdown();
         eventLoopGroup.shutdownGracefully();
         executor.shutdown();
     }
@@ -99,7 +107,7 @@ public void testGetBookieInfoTimeout() throws Exception {
         // try to get bookie info from the sleeping bookie. It should fail 
with timeout error
         BookieSocketAddress addr = new 
BookieSocketAddress(bookieToSleep.getSocketAddress().getHostString(),
                 bookieToSleep.getPort());
-        BookieClient bc = new BookieClient(cConf, eventLoopGroup, executor);
+        BookieClient bc = new BookieClient(cConf, eventLoopGroup, executor, 
scheduler, NullStatsLogger.INSTANCE);
         long flags = 
BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
                 | 
BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 6bf05ae39..b799635c1 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -27,12 +27,15 @@
 import io.netty.buffer.Unpooled;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException.Code;
@@ -48,6 +51,7 @@
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.After;
@@ -64,6 +68,7 @@
 
     public EventLoopGroup eventLoopGroup;
     public OrderedSafeExecutor executor;
+    private ScheduledExecutorService scheduler;
     ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
 
     @Before
@@ -83,10 +88,13 @@ public void setUp() throws Exception {
                 .name("BKClientOrderedSafeExecutor")
                 .numThreads(2)
                 .build();
+        scheduler = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("BookKeeperClientScheduler"));
     }
 
     @After
     public void tearDown() throws Exception {
+        scheduler.shutdown();
         bs.shutdown();
         recursiveDelete(tmpDir);
         eventLoopGroup.shutdownGracefully();
@@ -146,7 +154,8 @@ public void testWriteGaps() throws Exception {
         BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port);
         ResultStruct arc = new ResultStruct();
 
-        BookieClient bc = new BookieClient(new ClientConfiguration(), 
eventLoopGroup, executor);
+        BookieClient bc = new BookieClient(new ClientConfiguration(), 
eventLoopGroup, executor,
+                                           scheduler, 
NullStatsLogger.INSTANCE);
         ByteBuf bb = createByteBuffer(1, 1, 1);
         bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, 
BookieProtocol.FLAG_NONE);
         synchronized (arc) {
@@ -246,7 +255,8 @@ private ByteBuf createByteBuffer(int i, long lid, long eid) 
{
     public void testNoLedger() throws Exception {
         ResultStruct arc = new ResultStruct();
         BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port);
-        BookieClient bc = new BookieClient(new ClientConfiguration(), 
eventLoopGroup, executor);
+        BookieClient bc = new BookieClient(new ClientConfiguration(), 
eventLoopGroup, executor,
+                                           scheduler, 
NullStatsLogger.INSTANCE);
         synchronized (arc) {
             bc.readEntry(addr, 2, 13, recb, arc);
             arc.wait(1000);
@@ -257,7 +267,8 @@ public void testNoLedger() throws Exception {
     @Test
     public void testGetBookieInfo() throws IOException, InterruptedException {
         BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port);
-        BookieClient bc = new BookieClient(new ClientConfiguration(), new 
NioEventLoopGroup(), executor);
+        BookieClient bc = new BookieClient(new ClientConfiguration(), new 
NioEventLoopGroup(), executor,
+                                           scheduler, 
NullStatsLogger.INSTANCE);
         long flags = 
BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
                 | 
BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
deleted file mode 100644
index fc2047658..000000000
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.bookkeeper.test;
-
-import io.netty.buffer.Unpooled;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookieClient;
-import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class tests BookieClient. It just sends the a new entry to itself.
- */
-class LoopbackClient implements WriteCallback {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(LoopbackClient.class);
-
-    BookieClient client;
-    static int recvTimeout = 2000;
-    long begin = 0;
-    int limit;
-    OrderedSafeExecutor executor;
-
-    static class Counter {
-        int c;
-        int limit;
-
-        Counter(int limit) {
-            this.c = 0;
-            this.limit = limit;
-        }
-
-        synchronized void increment() {
-            if (++c == limit) {
-                this.notify();
-            }
-        }
-    }
-
-    LoopbackClient(EventLoopGroup eventLoopGroup, OrderedSafeExecutor 
executor, long begin, int limit)
-            throws IOException {
-        this.client = new BookieClient(new ClientConfiguration(), 
eventLoopGroup, executor);
-        this.begin = begin;
-    }
-
-    void write(long ledgerId, long entry, byte[] data, BookieSocketAddress 
addr, WriteCallback cb, Object ctx)
-            throws IOException, InterruptedException {
-        LOG.info("Ledger id: " + ledgerId + ", Entry: " + entry);
-        byte[] passwd = new byte[20];
-        Arrays.fill(passwd, (byte) 'a');
-
-        client.addEntry(addr, ledgerId, passwd, entry, 
Unpooled.wrappedBuffer(data), cb, ctx, BookieProtocol.FLAG_NONE);
-    }
-
-    @Override
-    public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
-        Counter counter = (Counter) ctx;
-        counter.increment();
-    }
-
-    public static void main(String args[]) {
-        byte[] data = new byte[Integer.parseInt(args[0])];
-        Integer limit = Integer.parseInt(args[1]);
-        Counter c = new Counter(limit);
-        long ledgerId = Long.valueOf("0").longValue();
-        long begin = System.currentTimeMillis();
-
-        LoopbackClient lb;
-        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
-        OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
-                .name("BookieClientScheduler")
-                .numThreads(2)
-                .build();
-        try {
-            BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", 
Integer.valueOf(args[2]).intValue());
-            lb = new LoopbackClient(eventLoopGroup, executor, begin, 
limit.intValue());
-
-            for (int i = 0; i < limit; i++) {
-                lb.write(ledgerId, i, data, addr, lb, c);
-            }
-
-            synchronized (c) {
-                c.wait();
-                System.out.println("Time to write all entries: " + 
(System.currentTimeMillis() - begin));
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-    }
-
-}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to