This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 786da27 Client times out requests in batch rather than individually
786da27 is described below
commit 786da273b0f90c1b35b7244b0408959fd034d411
Author: Ivan Kelly <[email protected]>
AuthorDate: Wed Dec 13 16:43:25 2017 +0800
Client times out requests in batch rather than individually
This patch is a squash of f59b597f, 00fc5cd2, 1658eb61 & 833dc307 from the
yahoo-4.3 branch. The change removes the HashWheelTimer from
PerChannelBookieClient and instead runs through all outstanding ops
periodically, timing them out as necessary.
The motivation for this change is to reduce the number of objects created
per add/read request. With HashWheelTimer a TimerTask needs to be added to the
Timer, which allocates and returns a Timeout object. We had to be careful to
cancel this timeout on success.
The new approach is simpler, doesn't require any allocation per request,
and doesn't require any cancellation. Functionally the end result is the same -
very slow requests are timed out.
The patch also makes some additions to the original changes (due to the
codebase moving since it was originally written).
- PendingAddOps are changed to use the same mechanism for quorum timeouts.
- The checkTimeout method is moved to the PCBCPool so we can run the check
on non-connected PCBC instances. This is necessary for TLS.
Author: Ivan Kelly <[email protected]>
Author: Siddharth Boobna <[email protected]>
Author: Matteo Merli <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>
This closes #817 from ivankelly/yahoo-bp-9
---
.../apache/bookkeeper/benchmark/BenchBookie.java | 9 +-
.../org/apache/bookkeeper/client/BookKeeper.java | 6 +-
.../org/apache/bookkeeper/client/LedgerHandle.java | 30 +++++
.../org/apache/bookkeeper/client/PendingAddOp.java | 36 ++----
.../bookkeeper/conf/ClientConfiguration.java | 41 ++++++-
.../org/apache/bookkeeper/proto/BookieClient.java | 64 ++++++----
.../proto/DefaultPerChannelBookieClientPool.java | 7 ++
.../bookkeeper/proto/PerChannelBookieClient.java | 135 +++++++++++----------
.../proto/PerChannelBookieClientPool.java | 6 +
.../client/TestGetBookieInfoTimeout.java | 10 +-
.../apache/bookkeeper/test/BookieClientTest.java | 17 ++-
.../org/apache/bookkeeper/test/LoopbackClient.java | 121 ------------------
12 files changed, 236 insertions(+), 246 deletions(-)
diff --git
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index 9c52788..4506091 100644
---
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -24,7 +24,10 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -33,6 +36,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -158,9 +162,11 @@ public class BenchBookie {
.name("BenchBookieClientScheduler")
.numThreads(1)
.build();
+ ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(
+ new DefaultThreadFactory("BookKeeperClientScheduler"));
ClientConfiguration conf = new ClientConfiguration();
- BookieClient bc = new BookieClient(conf, eventLoop, executor);
+ BookieClient bc = new BookieClient(conf, eventLoop, executor,
scheduler, NullStatsLogger.INSTANCE);
LatencyCallback lc = new LatencyCallback();
ThroughputCallback tc = new ThroughputCallback();
@@ -220,6 +226,7 @@ public class BenchBookie {
LOG.info("Throughput: " + ((long) entryCount) * 1000 / (endTime -
startTime));
bc.close();
+ scheduler.shutdown();
eventLoop.shutdownGracefully();
executor.shutdown();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 99e9484..d0f8a64 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -146,6 +146,7 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
final int explicitLacInterval;
final boolean delayEnsembleChange;
final boolean reorderReadSequence;
+ final long addEntryQuorumTimeoutNanos;
final Optional<SpeculativeRequestExecutionPolicy>
readSpeculativeRequestPolicy;
final Optional<SpeculativeRequestExecutionPolicy>
readLACSpeculativeRequestPolicy;
@@ -487,9 +488,9 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
this.readLACSpeculativeRequestPolicy =
Optional.<SpeculativeRequestExecutionPolicy>absent();
}
-
// initialize bookie client
- this.bookieClient = new BookieClient(conf, this.eventLoopGroup,
this.mainWorkerPool, statsLogger);
+ this.bookieClient = new BookieClient(conf, this.eventLoopGroup,
this.mainWorkerPool,
+ scheduler, statsLogger);
this.bookieWatcher = new BookieWatcher(conf, this.placementPolicy,
regClient);
if (conf.getDiskWeightBasedPlacementEnabled()) {
LOG.info("Weighted ledger placement enabled");
@@ -520,6 +521,7 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
LOG.debug("Explicit LAC Interval : {}", this.explicitLacInterval);
}
+ this.addEntryQuorumTimeoutNanos =
TimeUnit.SECONDS.toNanos(conf.getAddEntryQuorumTimeout());
scheduleBookieHealthCheckIfEnabled();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 78f3e75..5c15376 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -45,6 +45,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -103,6 +104,7 @@ public class LedgerHandle implements WriteHandle {
final LoadingCache<BookieSocketAddress, Long> bookieFailureHistory;
final boolean enableParallelRecoveryRead;
final int recoveryReadBatchSize;
+ ScheduledFuture<?> timeoutFuture = null;
/**
* Invalid entry id. This value is returned from methods which
@@ -182,6 +184,19 @@ public class LedgerHandle implements WriteHandle {
}
});
initializeExplicitLacFlushPolicy();
+
+ if (bk.getConf().getAddEntryQuorumTimeout() > 0) {
+ SafeRunnable monitor = new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ monitorPendingAddOps();
+ }
+ };
+ this.timeoutFuture = bk.scheduler.scheduleAtFixedRate(monitor,
+
bk.getConf().getTimeoutMonitorIntervalSec(),
+
bk.getConf().getTimeoutMonitorIntervalSec(),
+
TimeUnit.SECONDS);
+ }
}
protected void initializeExplicitLacFlushPolicy() {
@@ -335,6 +350,9 @@ public class LedgerHandle implements WriteHandle {
SyncCloseCallback callback = new SyncCloseCallback(result);
asyncClose(callback, null);
explicitLacFlushPolicy.stopExplicitLacFlush();
+ if (timeoutFuture != null) {
+ timeoutFuture.cancel(false);
+ }
return result;
}
@@ -1371,6 +1389,18 @@ public class LedgerHandle implements WriteHandle {
asyncCloseInternal(NoopCloseCallback.instance, null, rc);
}
+ private void monitorPendingAddOps() {
+ int timedOut = 0;
+ for (PendingAddOp op : pendingAddOps) {
+ if (op.maybeTimeout()) {
+ timedOut++;
+ }
+ }
+ if (timedOut > 0) {
+ LOG.info("Timed out {} add ops", timedOut);
+ }
+ }
+
void errorOutPendingAdds(int rc) {
errorOutPendingAdds(rc, drainPendingAddsToErrorOut());
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index d414711..e42bbf3 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -25,8 +25,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
@@ -51,7 +49,7 @@ import org.slf4j.LoggerFactory;
*
*
*/
-class PendingAddOp extends SafeRunnable implements WriteCallback, TimerTask {
+class PendingAddOp extends SafeRunnable implements WriteCallback {
private static final Logger LOG =
LoggerFactory.getLogger(PendingAddOp.class);
ByteBuf payload;
@@ -68,8 +66,7 @@ class PendingAddOp extends SafeRunnable implements
WriteCallback, TimerTask {
boolean isRecoveryAdd = false;
long requestTimeNanos;
- int timeoutSec;
- Timeout timeout = null;
+ long timeoutNanos;
OpStatsLogger addOpLogger;
long currentLedgerLength;
@@ -91,14 +88,11 @@ class PendingAddOp extends SafeRunnable implements
WriteCallback, TimerTask {
op.completed = false;
op.ackSet = lh.distributionSchedule.getAckSet();
op.addOpLogger = lh.bk.getAddOpLogger();
- if (op.timeout != null) {
- op.timeout.cancel();
- }
- op.timeout = null;
- op.timeoutSec = lh.bk.getConf().getAddEntryQuorumTimeout();
+ op.timeoutNanos = lh.bk.addEntryQuorumTimeoutNanos;
op.pendingWriteRequests = 0;
op.callbackTriggered = false;
op.hasRun = false;
+ op.requestTimeNanos = Long.MAX_VALUE;
return op;
}
@@ -131,9 +125,12 @@ class PendingAddOp extends SafeRunnable implements
WriteCallback, TimerTask {
++pendingWriteRequests;
}
- @Override
- public void run(Timeout timeout) {
- timeoutQuorumWait();
+ boolean maybeTimeout() {
+ if (MathUtils.elapsedNanos(requestTimeNanos) >= timeoutNanos) {
+ timeoutQuorumWait();
+ return true;
+ }
+ return false;
}
void timeoutQuorumWait() {
@@ -220,11 +217,6 @@ class PendingAddOp extends SafeRunnable implements
WriteCallback, TimerTask {
return;
}
- if (timeoutSec > -1) {
- this.timeout = lh.bk.getBookieClient().scheduleTimeout(
- this, timeoutSec, TimeUnit.SECONDS);
- }
-
this.requestTimeNanos = MathUtils.nowInNano();
checkNotNull(lh);
checkNotNull(lh.macManager);
@@ -340,10 +332,6 @@ class PendingAddOp extends SafeRunnable implements
WriteCallback, TimerTask {
}
void submitCallback(final int rc) {
- if (null != timeout) {
- timeout.cancel();
- }
-
if (LOG.isDebugEnabled()) {
LOG.debug("Submit callback (lid:{}, eid: {}). rc:{}", lh.getId(),
entryId, rc);
}
@@ -429,10 +417,6 @@ class PendingAddOp extends SafeRunnable implements
WriteCallback, TimerTask {
pendingWriteRequests = 0;
callbackTriggered = false;
hasRun = false;
- if (timeout != null) {
- timeout.cancel();
- }
- timeout = null;
recyclerHandle.recycle(this);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 361a383..966fd42 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -88,6 +88,7 @@ public class ClientConfiguration extends
AbstractConfiguration {
protected static final String ADD_ENTRY_TIMEOUT_SEC = "addEntryTimeoutSec";
protected static final String ADD_ENTRY_QUORUM_TIMEOUT_SEC =
"addEntryQuorumTimeoutSec";
protected static final String READ_ENTRY_TIMEOUT_SEC =
"readEntryTimeoutSec";
+ protected static final String TIMEOUT_MONITOR_INTERVAL_SEC =
"timeoutMonitorIntervalSec";
protected static final String TIMEOUT_TASK_INTERVAL_MILLIS =
"timeoutTaskIntervalMillis";
protected static final String EXPLICIT_LAC_INTERVAL =
"explicitLacInterval";
protected static final String PCBC_TIMEOUT_TIMER_TICK_DURATION_MS =
"pcbcTimeoutTimerTickDurationMs";
@@ -678,11 +679,41 @@ public class ClientConfiguration extends
AbstractConfiguration {
}
/**
- * Get the interval between successive executions of the
PerChannelBookieClient's
- * TimeoutTask. This value is in milliseconds. Every X milliseconds, the
timeout task
- * will be executed and it will error out entries that have timed out.
+ * Get the interval between successive executions of the operation timeout
monitor. This value is in seconds.
+ *
+ * @see #setTimeoutMonitorIntervalSec(long)
+ * @return the interval at which request timeouts will be checked
+ */
+ public long getTimeoutMonitorIntervalSec() {
+ int minTimeout = Math.min(Math.min(getAddEntryQuorumTimeout(),
+ getAddEntryTimeout()),
getReadEntryTimeout());
+ return getLong(TIMEOUT_MONITOR_INTERVAL_SEC, Math.max(minTimeout / 2,
1));
+ }
+
+ /**
+ * Set the interval between successive executions of the operation timeout
monitor. The value in seconds.
+ * Every X seconds, all outstanding add and read operations are checked to
see if they have been running
+ * for longer than their configured timeout. Any that have been will be
errored out.
+ *
+ * <p>This timeout should be set to a value which is a fraction of the
values of
+ * {@link #getAddEntryQuorumTimeout}, {@link #getAddEntryTimeout} and
{@link #getReadEntryTimeout},
+ * so that these timeouts run in a timely fashion.
+ *
+ * @param timeoutInterval The timeout monitor interval, in seconds
+ * @return client configuration
+ */
+ public ClientConfiguration setTimeoutMonitorIntervalSec(long
timeoutInterval) {
+ setProperty(TIMEOUT_MONITOR_INTERVAL_SEC,
Long.toString(timeoutInterval));
+ return this;
+ }
+
+ /**
+ * Get the interval between successive executions of the
PerChannelBookieClient's TimeoutTask. This value is in
+ * milliseconds. Every X milliseconds, the timeout task will be executed
and it will error out entries that have
+ * timed out.
*
* <p>We do it more aggressive to not accumulate pending requests due to
slow responses.
+ *
* @return the interval at which request timeouts will be checked
*/
@Deprecated
@@ -729,6 +760,7 @@ public class ClientConfiguration extends
AbstractConfiguration {
*
* @return tick duration in milliseconds
*/
+ @Deprecated
public long getPCBCTimeoutTimerTickDurationMs() {
return getLong(PCBC_TIMEOUT_TIMER_TICK_DURATION_MS, 100);
}
@@ -745,6 +777,7 @@ public class ClientConfiguration extends
AbstractConfiguration {
* tick duration in milliseconds.
* @return client configuration.
*/
+ @Deprecated
public ClientConfiguration setPCBCTimeoutTimerTickDurationMs(long
tickDuration) {
setProperty(PCBC_TIMEOUT_TIMER_TICK_DURATION_MS, tickDuration);
return this;
@@ -759,6 +792,7 @@ public class ClientConfiguration extends
AbstractConfiguration {
*
* @return number of ticks that used for timeout timer.
*/
+ @Deprecated
public int getPCBCTimeoutTimerNumTicks() {
return getInt(PCBC_TIMEOUT_TIMER_NUM_TICKS, 1024);
}
@@ -775,6 +809,7 @@ public class ClientConfiguration extends
AbstractConfiguration {
* number of ticks that used for timeout timer.
* @return client configuration.
*/
+ @Deprecated
public ClientConfiguration setPCBCTimeoutTimerNumTicks(int numTicks) {
setProperty(PCBC_TIMEOUT_TIMER_NUM_TICKS, numTicks);
return this;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index ceac697..9b1ca9c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -23,23 +23,23 @@ package org.apache.bookkeeper.proto;
import static com.google.common.base.Charsets.UTF_8;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ExtensionRegistry;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.HashedWheelTimer;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -76,10 +76,12 @@ public class BookieClient implements
PerChannelBookieClientFactory {
AtomicLong totalBytesOutstanding = new AtomicLong();
OrderedSafeExecutor executor;
+ ScheduledExecutorService scheduler;
+ ScheduledFuture<?> timeoutFuture;
+
EventLoopGroup eventLoopGroup;
final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool>
channels =
new ConcurrentHashMap<BookieSocketAddress,
PerChannelBookieClientPool>();
- final HashedWheelTimer requestTimer;
private final ClientAuthProvider.Factory authProviderFactory;
private final ExtensionRegistry registry;
@@ -93,12 +95,8 @@ public class BookieClient implements
PerChannelBookieClientFactory {
private final long bookieErrorThresholdPerInterval;
public BookieClient(ClientConfiguration conf, EventLoopGroup
eventLoopGroup,
- OrderedSafeExecutor executor) throws IOException {
- this(conf, eventLoopGroup, executor, NullStatsLogger.INSTANCE);
- }
-
- public BookieClient(ClientConfiguration conf, EventLoopGroup
eventLoopGroup,
- OrderedSafeExecutor executor, StatsLogger statsLogger)
throws IOException {
+ OrderedSafeExecutor executor, ScheduledExecutorService
scheduler,
+ StatsLogger statsLogger) throws IOException {
this.conf = conf;
this.eventLoopGroup = eventLoopGroup;
this.executor = executor;
@@ -110,11 +108,21 @@ public class BookieClient implements
PerChannelBookieClientFactory {
this.statsLogger = statsLogger;
this.numConnectionsPerBookie = conf.getNumChannelsPerBookie();
- this.requestTimer = new HashedWheelTimer(
- new
ThreadFactoryBuilder().setNameFormat("BookieClientTimer-%d").build(),
- conf.getPCBCTimeoutTimerTickDurationMs(),
TimeUnit.MILLISECONDS,
- conf.getPCBCTimeoutTimerNumTicks());
this.bookieErrorThresholdPerInterval =
conf.getBookieErrorThresholdPerInterval();
+
+ this.scheduler = scheduler;
+ if (conf.getAddEntryTimeout() > 0 || conf.getReadEntryTimeout() > 0) {
+ SafeRunnable monitor = new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ monitorPendingOperations();
+ }
+ };
+ this.timeoutFuture = this.scheduler.scheduleAtFixedRate(monitor,
+
conf.getTimeoutMonitorIntervalSec(),
+
conf.getTimeoutMonitorIntervalSec(),
+
TimeUnit.SECONDS);
+ }
}
private int getRc(int rc) {
@@ -145,8 +153,8 @@ public class BookieClient implements
PerChannelBookieClientFactory {
@Override
public PerChannelBookieClient create(BookieSocketAddress address,
PerChannelBookieClientPool pcbcPool,
SecurityHandlerFactory shFactory) throws SecurityException {
- return new PerChannelBookieClient(conf, executor, eventLoopGroup,
address, requestTimer, statsLogger,
- authProviderFactory, registry, pcbcPool, shFactory);
+ return new PerChannelBookieClient(conf, executor, eventLoopGroup,
address, statsLogger,
+ authProviderFactory, registry,
pcbcPool, shFactory);
}
private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
@@ -521,12 +529,14 @@ public class BookieClient implements
PerChannelBookieClientFactory {
}
}
- public boolean isClosed() {
- return closed;
+ private void monitorPendingOperations() {
+ for (PerChannelBookieClientPool clientPool : channels.values()) {
+ clientPool.checkTimeoutOnPendingOperations();
+ }
}
- public Timeout scheduleTimeout(TimerTask task, long timeoutSec, TimeUnit
timeUnit) {
- return requestTimer.newTimeout(task, timeoutSec, timeUnit);
+ public boolean isClosed() {
+ return closed;
}
public void close() {
@@ -538,11 +548,13 @@ public class BookieClient implements
PerChannelBookieClientFactory {
}
channels.clear();
authProviderFactory.close();
+
+ if (timeoutFuture != null) {
+ timeoutFuture.cancel(false);
+ }
} finally {
closeLock.writeLock().unlock();
}
- // Shut down the timeout executor.
- this.requestTimer.stop();
}
private static class Counter {
@@ -599,7 +611,10 @@ public class BookieClient implements
PerChannelBookieClientFactory {
.name("BookieClientWorker")
.numThreads(1)
.build();
- BookieClient bc = new BookieClient(new ClientConfiguration(),
eventLoopGroup, executor);
+ ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(
+ new DefaultThreadFactory("BookKeeperClientScheduler"));
+ BookieClient bc = new BookieClient(new ClientConfiguration(),
eventLoopGroup, executor,
+ scheduler,
NullStatsLogger.INSTANCE);
BookieSocketAddress addr = new BookieSocketAddress(args[0],
Integer.parseInt(args[1]));
for (int i = 0; i < 100000; i++) {
@@ -608,6 +623,7 @@ public class BookieClient implements
PerChannelBookieClientFactory {
}
counter.wait(0);
System.out.println("Total = " + counter.total());
+ scheduler.shutdown();
eventLoopGroup.shutdownGracefully();
executor.shutdown();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
index 3ac5cb6..41233cf 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
@@ -95,6 +95,13 @@ class DefaultPerChannelBookieClientPool implements
PerChannelBookieClientPool,
}
@Override
+ public void checkTimeoutOnPendingOperations() {
+ for (int i = 0; i < clients.length; i++) {
+ clients[i].checkTimeoutOnPendingOperations();
+ }
+ }
+
+ @Override
public void recordError() {
errorCounter.incrementAndGet();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 97bdaea..b128bc8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -54,11 +54,8 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.ssl.SslHandler;
-import io.netty.util.HashedWheelTimer;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
@@ -70,12 +67,14 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiPredicate;
import javax.net.ssl.SSLPeerUnverifiedException;
@@ -147,9 +146,8 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
final BookieSocketAddress addr;
final EventLoopGroup eventLoopGroup;
final OrderedSafeExecutor executor;
- final HashedWheelTimer requestTimer;
- final int addEntryTimeout;
- final int readEntryTimeout;
+ final long addEntryTimeoutNanos;
+ final long readEntryTimeoutNanos;
final int maxFrameSize;
final int getBookieInfoTimeout;
final int startTLSTimeout;
@@ -203,7 +201,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup
eventLoopGroup,
BookieSocketAddress addr) throws
SecurityException {
- this(new ClientConfiguration(), executor, eventLoopGroup, addr, null,
NullStatsLogger.INSTANCE, null, null,
+ this(new ClientConfiguration(), executor, eventLoopGroup, addr,
NullStatsLogger.INSTANCE, null, null,
null);
}
@@ -211,24 +209,22 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
BookieSocketAddress addr,
ClientAuthProvider.Factory
authProviderFactory,
ExtensionRegistry extRegistry) throws
SecurityException {
- this(new ClientConfiguration(), executor, eventLoopGroup, addr, null,
NullStatsLogger.INSTANCE,
+ this(new ClientConfiguration(), executor, eventLoopGroup, addr,
NullStatsLogger.INSTANCE,
authProviderFactory, extRegistry, null);
}
public PerChannelBookieClient(ClientConfiguration conf,
OrderedSafeExecutor executor,
EventLoopGroup eventLoopGroup,
BookieSocketAddress addr,
- HashedWheelTimer requestTimer, StatsLogger
parentStatsLogger,
- ClientAuthProvider.Factory
authProviderFactory,
+ StatsLogger parentStatsLogger,
ClientAuthProvider.Factory authProviderFactory,
ExtensionRegistry extRegistry,
PerChannelBookieClientPool pcbcPool) throws
SecurityException {
- this(conf, executor, eventLoopGroup, addr, null,
NullStatsLogger.INSTANCE,
+ this(conf, executor, eventLoopGroup, addr, NullStatsLogger.INSTANCE,
authProviderFactory, extRegistry, pcbcPool, null);
}
public PerChannelBookieClient(ClientConfiguration conf,
OrderedSafeExecutor executor,
EventLoopGroup eventLoopGroup,
BookieSocketAddress addr,
- HashedWheelTimer requestTimer, StatsLogger
parentStatsLogger,
- ClientAuthProvider.Factory
authProviderFactory,
+ StatsLogger parentStatsLogger,
ClientAuthProvider.Factory authProviderFactory,
ExtensionRegistry extRegistry,
PerChannelBookieClientPool pcbcPool,
SecurityHandlerFactory shFactory) throws
SecurityException {
@@ -242,9 +238,8 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
this.eventLoopGroup = eventLoopGroup;
}
this.state = ConnectionState.DISCONNECTED;
- this.requestTimer = requestTimer;
- this.addEntryTimeout = conf.getAddEntryTimeout();
- this.readEntryTimeout = conf.getReadEntryTimeout();
+ this.addEntryTimeoutNanos =
TimeUnit.SECONDS.toNanos(conf.getAddEntryTimeout());
+ this.readEntryTimeoutNanos =
TimeUnit.SECONDS.toNanos(conf.getReadEntryTimeout());
this.getBookieInfoTimeout = conf.getBookieInfoTimeout();
this.startTLSTimeout = conf.getStartTLSTimeout();
this.useV2WireProtocol = conf.getUseV2WireProtocol();
@@ -768,6 +763,30 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
writeAndFlush(channel, completionKey, getBookieInfoRequest);
}
+ private static final BiPredicate<CompletionKey, CompletionValue>
timeoutCheck = (key, value) -> {
+ return value.maybeTimeout();
+ };
+
+ public void checkTimeoutOnPendingOperations() {
+ int timedOutOperations = completionObjects.removeIf(timeoutCheck);
+
+ synchronized (this) {
+ Iterator<CompletionValue> iterator =
completionObjectsV2Conflicts.values().iterator();
+ while (iterator.hasNext()) {
+ CompletionValue value = iterator.next();
+ if (value.maybeTimeout()) {
+ ++timedOutOperations;
+ iterator.remove();
+ }
+ }
+ }
+
+ if (timedOutOperations > 0) {
+ LOG.info("Timed-out {} operations to channel {} for {}",
+ timedOutOperations, channel, addr);
+ }
+ }
+
/**
* Disconnects the bookie client. It can be reused.
*/
@@ -819,6 +838,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
cf.awaitUninterruptibly();
}
}
+
}
private ChannelFuture closeChannel(Channel c) {
@@ -1235,14 +1255,12 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
protected long ledgerId;
protected long entryId;
protected long startTime;
- protected Timeout timeout;
public CompletionValue(String operationName,
Object ctx,
long ledgerId, long entryId,
OpStatsLogger opLogger,
- OpStatsLogger timeoutOpLogger,
- Timeout timeout) {
+ OpStatsLogger timeoutOpLogger) {
this.operationName = operationName;
this.ctx = ctx;
this.ledgerId = ledgerId;
@@ -1250,19 +1268,13 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
this.startTime = MathUtils.nowInNano();
this.opLogger = opLogger;
this.timeoutOpLogger = timeoutOpLogger;
- this.timeout = timeout;
}
private long latency() {
return MathUtils.elapsedNanos(startTime);
}
- void cancelTimeoutAndLogOp(int rc) {
- Timeout t = timeout;
- if (null != t) {
- t.cancel();
- }
-
+ void logOpResult(int rc) {
if (rc != BKException.Code.OK) {
opLogger.registerFailedEvent(latency(), TimeUnit.NANOSECONDS);
} else {
@@ -1275,6 +1287,15 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
}
+ boolean maybeTimeout() {
+ if (MathUtils.elapsedNanos(startTime) >= readEntryTimeoutNanos) {
+ timeout();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
void timeout() {
errorOut(BKException.Code.TimeoutException);
timeoutOpLogger.registerSuccessfulEvent(latency(),
@@ -1344,14 +1365,13 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
final long ledgerId) {
super("WriteLAC",
originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
- writeLacOpLogger, writeLacTimeoutOpLogger,
- scheduleTimeout(key, addEntryTimeout));
+ writeLacOpLogger, writeLacTimeoutOpLogger);
this.cb = new WriteLacCallback() {
@Override
public void writeLacComplete(int rc, long ledgerId,
BookieSocketAddress addr,
Object ctx) {
- cancelTimeoutAndLogOp(rc);
+ logOpResult(rc);
originalCallback.writeLacComplete(rc, ledgerId,
addr, originalCtx);
key.release();
@@ -1392,15 +1412,14 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
ReadLacCallback originalCallback,
final Object ctx, final long ledgerId) {
super("ReadLAC", ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
- readLacOpLogger, readLacTimeoutOpLogger,
- scheduleTimeout(key, readEntryTimeout));
+ readLacOpLogger, readLacTimeoutOpLogger);
this.cb = new ReadLacCallback() {
@Override
public void readLacComplete(int rc, long ledgerId,
ByteBuf lacBuffer,
ByteBuf lastEntryBuffer,
Object ctx) {
- cancelTimeoutAndLogOp(rc);
+ logOpResult(rc);
originalCallback.readLacComplete(
rc, ledgerId, lacBuffer, lastEntryBuffer, ctx);
key.release();
@@ -1452,15 +1471,14 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
final Object originalCtx,
long ledgerId, final long entryId) {
super("Read", originalCtx, ledgerId, entryId,
- readEntryOpLogger, readTimeoutOpLogger,
- scheduleTimeout(key, readEntryTimeout));
+ readEntryOpLogger, readTimeoutOpLogger);
this.cb = new ReadEntryCallback() {
@Override
public void readEntryComplete(int rc, long ledgerId,
long entryId, ByteBuf buffer,
Object ctx) {
- cancelTimeoutAndLogOp(rc);
+ logOpResult(rc);
originalCallback.readEntryComplete(rc,
ledgerId, entryId,
buffer,
originalCtx);
@@ -1550,12 +1568,11 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
public StartTLSCompletion(final CompletionKey key) {
super("StartTLS", null, -1, -1,
- startTLSOpLogger, startTLSTimeoutOpLogger,
- scheduleTimeout(key, startTLSTimeout));
+ startTLSOpLogger, startTLSTimeoutOpLogger);
this.cb = new StartTLSCallback() {
@Override
public void startTLSComplete(int rc, Object ctx) {
- cancelTimeoutAndLogOp(rc);
+ logOpResult(rc);
key.release();
}
};
@@ -1602,13 +1619,12 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
final GetBookieInfoCallback
origCallback,
final Object origCtx) {
super("GetBookieInfo", origCtx, 0L, 0L,
- getBookieInfoOpLogger, getBookieInfoTimeoutOpLogger,
- scheduleTimeout(key, getBookieInfoTimeout));
+ getBookieInfoOpLogger, getBookieInfoTimeoutOpLogger);
this.cb = new GetBookieInfoCallback() {
@Override
public void getBookieInfoComplete(int rc, BookieInfo bInfo,
Object ctx) {
- cancelTimeoutAndLogOp(rc);
+ logOpResult(rc);
origCallback.getBookieInfoComplete(rc, bInfo, origCtx);
key.release();
}
@@ -1668,8 +1684,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
WriteCallback originalCallback = null;
AddCompletion(Recycler.Handle<AddCompletion> handle) {
- super("Add", null, -1, -1,
- addEntryOpLogger, addTimeoutOpLogger, null);
+ super("Add", null, -1, -1, addEntryOpLogger, addTimeoutOpLogger);
this.handle = handle;
}
@@ -1683,20 +1698,29 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
this.ledgerId = ledgerId;
this.entryId = entryId;
this.startTime = MathUtils.nowInNano();
- this.timeout = scheduleTimeout(key, addEntryTimeout);
}
@Override
public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr,
Object ctx) {
- cancelTimeoutAndLogOp(rc);
+ logOpResult(rc);
originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx);
key.release();
handle.recycle(this);
}
@Override
+ boolean maybeTimeout() {
+ if (MathUtils.elapsedNanos(startTime) >= addEntryTimeoutNanos) {
+ timeout();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
public void errorOut() {
errorOut(BKException.Code.BookieHandleNotAvailableException);
}
@@ -1739,14 +1763,6 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
return new V3CompletionKey(txnId, operationType);
}
- Timeout scheduleTimeout(CompletionKey key, long timeout) {
- if (null != requestTimer) {
- return requestTimer.newTimeout(key, timeout, TimeUnit.SECONDS);
- } else {
- return null;
- }
- }
-
class V3CompletionKey extends CompletionKey {
public V3CompletionKey(long txnId, OperationType operationType) {
@@ -1774,7 +1790,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
- abstract class CompletionKey implements TimerTask {
+ abstract class CompletionKey {
final long txnId;
OperationType operationType;
@@ -1784,17 +1800,6 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
this.operationType = operationType;
}
- @Override
- public void run(Timeout timeout) throws Exception {
- if (timeout.isCancelled()) {
- return;
- }
- CompletionValue completion = completionObjects.remove(this);
- if (completion != null) {
- completion.timeout();
- }
- }
-
public void release() {}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
index bd07a4e..80f00a5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
@@ -46,6 +46,12 @@ interface PerChannelBookieClientPool {
void recordError();
/**
+ * Check if any ops on any channel needs to be timed out.
+ * This is called on all channels, even if the channel is not yet
connected.
+ */
+ void checkTimeoutOnPendingOperations();
+
+ /**
* Disconnect the connections in the pool.
*
* @param wait
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
index d2bff33..bf6c230 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
@@ -25,8 +25,11 @@ import static org.junit.Assert.assertTrue;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -36,6 +39,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.junit.After;
@@ -53,6 +57,7 @@ public class TestGetBookieInfoTimeout extends
BookKeeperClusterTestCase {
DigestType digestType;
public EventLoopGroup eventLoopGroup;
public OrderedSafeExecutor executor;
+ private ScheduledExecutorService scheduler;
public TestGetBookieInfoTimeout() {
super(10);
@@ -68,10 +73,13 @@ public class TestGetBookieInfoTimeout extends
BookKeeperClusterTestCase {
.name("BKClientOrderedSafeExecutor")
.numThreads(2)
.build();
+ scheduler = Executors.newSingleThreadScheduledExecutor(
+ new DefaultThreadFactory("BookKeeperClientScheduler"));
}
@After
public void tearDown() throws Exception {
+ scheduler.shutdown();
eventLoopGroup.shutdownGracefully();
executor.shutdown();
}
@@ -99,7 +107,7 @@ public class TestGetBookieInfoTimeout extends
BookKeeperClusterTestCase {
// try to get bookie info from the sleeping bookie. It should fail
with timeout error
BookieSocketAddress addr = new
BookieSocketAddress(bookieToSleep.getSocketAddress().getHostString(),
bookieToSleep.getPort());
- BookieClient bc = new BookieClient(cConf, eventLoopGroup, executor);
+ BookieClient bc = new BookieClient(cConf, eventLoopGroup, executor,
scheduler, NullStatsLogger.INSTANCE);
long flags =
BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
|
BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 6bf05ae..b799635 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -27,12 +27,15 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.Code;
@@ -48,6 +51,7 @@ import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCall
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.junit.After;
@@ -64,6 +68,7 @@ public class BookieClientTest {
public EventLoopGroup eventLoopGroup;
public OrderedSafeExecutor executor;
+ private ScheduledExecutorService scheduler;
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
@Before
@@ -83,10 +88,13 @@ public class BookieClientTest {
.name("BKClientOrderedSafeExecutor")
.numThreads(2)
.build();
+ scheduler = Executors.newSingleThreadScheduledExecutor(
+ new DefaultThreadFactory("BookKeeperClientScheduler"));
}
@After
public void tearDown() throws Exception {
+ scheduler.shutdown();
bs.shutdown();
recursiveDelete(tmpDir);
eventLoopGroup.shutdownGracefully();
@@ -146,7 +154,8 @@ public class BookieClientTest {
BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port);
ResultStruct arc = new ResultStruct();
- BookieClient bc = new BookieClient(new ClientConfiguration(),
eventLoopGroup, executor);
+ BookieClient bc = new BookieClient(new ClientConfiguration(),
eventLoopGroup, executor,
+ scheduler,
NullStatsLogger.INSTANCE);
ByteBuf bb = createByteBuffer(1, 1, 1);
bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc,
BookieProtocol.FLAG_NONE);
synchronized (arc) {
@@ -246,7 +255,8 @@ public class BookieClientTest {
public void testNoLedger() throws Exception {
ResultStruct arc = new ResultStruct();
BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port);
- BookieClient bc = new BookieClient(new ClientConfiguration(),
eventLoopGroup, executor);
+ BookieClient bc = new BookieClient(new ClientConfiguration(),
eventLoopGroup, executor,
+ scheduler,
NullStatsLogger.INSTANCE);
synchronized (arc) {
bc.readEntry(addr, 2, 13, recb, arc);
arc.wait(1000);
@@ -257,7 +267,8 @@ public class BookieClientTest {
@Test
public void testGetBookieInfo() throws IOException, InterruptedException {
BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port);
- BookieClient bc = new BookieClient(new ClientConfiguration(), new
NioEventLoopGroup(), executor);
+ BookieClient bc = new BookieClient(new ClientConfiguration(), new
NioEventLoopGroup(), executor,
+ scheduler,
NullStatsLogger.INSTANCE);
long flags =
BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
|
BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
deleted file mode 100644
index fc20476..0000000
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.bookkeeper.test;
-
-import io.netty.buffer.Unpooled;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookieClient;
-import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class tests BookieClient. It just sends the a new entry to itself.
- */
-class LoopbackClient implements WriteCallback {
-
- private static final Logger LOG =
LoggerFactory.getLogger(LoopbackClient.class);
-
- BookieClient client;
- static int recvTimeout = 2000;
- long begin = 0;
- int limit;
- OrderedSafeExecutor executor;
-
- static class Counter {
- int c;
- int limit;
-
- Counter(int limit) {
- this.c = 0;
- this.limit = limit;
- }
-
- synchronized void increment() {
- if (++c == limit) {
- this.notify();
- }
- }
- }
-
- LoopbackClient(EventLoopGroup eventLoopGroup, OrderedSafeExecutor
executor, long begin, int limit)
- throws IOException {
- this.client = new BookieClient(new ClientConfiguration(),
eventLoopGroup, executor);
- this.begin = begin;
- }
-
- void write(long ledgerId, long entry, byte[] data, BookieSocketAddress
addr, WriteCallback cb, Object ctx)
- throws IOException, InterruptedException {
- LOG.info("Ledger id: " + ledgerId + ", Entry: " + entry);
- byte[] passwd = new byte[20];
- Arrays.fill(passwd, (byte) 'a');
-
- client.addEntry(addr, ledgerId, passwd, entry,
Unpooled.wrappedBuffer(data), cb, ctx, BookieProtocol.FLAG_NONE);
- }
-
- @Override
- public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx) {
- Counter counter = (Counter) ctx;
- counter.increment();
- }
-
- public static void main(String args[]) {
- byte[] data = new byte[Integer.parseInt(args[0])];
- Integer limit = Integer.parseInt(args[1]);
- Counter c = new Counter(limit);
- long ledgerId = Long.valueOf("0").longValue();
- long begin = System.currentTimeMillis();
-
- LoopbackClient lb;
- EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
- OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
- .name("BookieClientScheduler")
- .numThreads(2)
- .build();
- try {
- BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1",
Integer.valueOf(args[2]).intValue());
- lb = new LoopbackClient(eventLoopGroup, executor, begin,
limit.intValue());
-
- for (int i = 0; i < limit; i++) {
- lb.write(ledgerId, i, data, addr, lb, c);
- }
-
- synchronized (c) {
- c.wait();
- System.out.println("Time to write all entries: " +
(System.currentTimeMillis() - begin));
- }
- } catch (IOException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
-}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].