jiazhai closed pull request #817: Client times out requests in batch rather
than individually
URL: https://github.com/apache/bookkeeper/pull/817
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index 9c52788eb..450609110 100644
---
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -24,7 +24,10 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -33,6 +36,7 @@
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -158,9 +162,11 @@ public static void main(String[] args)
.name("BenchBookieClientScheduler")
.numThreads(1)
.build();
+ ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(
+ new DefaultThreadFactory("BookKeeperClientScheduler"));
ClientConfiguration conf = new ClientConfiguration();
- BookieClient bc = new BookieClient(conf, eventLoop, executor);
+ BookieClient bc = new BookieClient(conf, eventLoop, executor,
scheduler, NullStatsLogger.INSTANCE);
LatencyCallback lc = new LatencyCallback();
ThroughputCallback tc = new ThroughputCallback();
@@ -220,6 +226,7 @@ public static void main(String[] args)
LOG.info("Throughput: " + ((long) entryCount) * 1000 / (endTime -
startTime));
bc.close();
+ scheduler.shutdown();
eventLoop.shutdownGracefully();
executor.shutdown();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 99e948467..d0f8a64fc 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -146,6 +146,7 @@
final int explicitLacInterval;
final boolean delayEnsembleChange;
final boolean reorderReadSequence;
+ final long addEntryQuorumTimeoutNanos;
final Optional<SpeculativeRequestExecutionPolicy>
readSpeculativeRequestPolicy;
final Optional<SpeculativeRequestExecutionPolicy>
readLACSpeculativeRequestPolicy;
@@ -487,9 +488,9 @@ private BookKeeper(ClientConfiguration conf,
this.readLACSpeculativeRequestPolicy =
Optional.<SpeculativeRequestExecutionPolicy>absent();
}
-
// initialize bookie client
- this.bookieClient = new BookieClient(conf, this.eventLoopGroup,
this.mainWorkerPool, statsLogger);
+ this.bookieClient = new BookieClient(conf, this.eventLoopGroup,
this.mainWorkerPool,
+ scheduler, statsLogger);
this.bookieWatcher = new BookieWatcher(conf, this.placementPolicy,
regClient);
if (conf.getDiskWeightBasedPlacementEnabled()) {
LOG.info("Weighted ledger placement enabled");
@@ -520,6 +521,7 @@ private BookKeeper(ClientConfiguration conf,
LOG.debug("Explicit LAC Interval : {}", this.explicitLacInterval);
}
+ this.addEntryQuorumTimeoutNanos =
TimeUnit.SECONDS.toNanos(conf.getAddEntryQuorumTimeout());
scheduleBookieHealthCheckIfEnabled();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 78f3e756a..5c15376f8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -45,6 +45,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -103,6 +104,7 @@
final LoadingCache<BookieSocketAddress, Long> bookieFailureHistory;
final boolean enableParallelRecoveryRead;
final int recoveryReadBatchSize;
+ ScheduledFuture<?> timeoutFuture = null;
/**
* Invalid entry id. This value is returned from methods which
@@ -182,6 +184,19 @@ public Integer getSample() {
}
});
initializeExplicitLacFlushPolicy();
+
+ if (bk.getConf().getAddEntryQuorumTimeout() > 0) {
+ SafeRunnable monitor = new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ monitorPendingAddOps();
+ }
+ };
+ this.timeoutFuture = bk.scheduler.scheduleAtFixedRate(monitor,
+
bk.getConf().getTimeoutMonitorIntervalSec(),
+
bk.getConf().getTimeoutMonitorIntervalSec(),
+
TimeUnit.SECONDS);
+ }
}
protected void initializeExplicitLacFlushPolicy() {
@@ -335,6 +350,9 @@ public void close()
SyncCloseCallback callback = new SyncCloseCallback(result);
asyncClose(callback, null);
explicitLacFlushPolicy.stopExplicitLacFlush();
+ if (timeoutFuture != null) {
+ timeoutFuture.cancel(false);
+ }
return result;
}
@@ -1371,6 +1389,18 @@ void handleUnrecoverableErrorDuringAdd(int rc) {
asyncCloseInternal(NoopCloseCallback.instance, null, rc);
}
+ private void monitorPendingAddOps() {
+ int timedOut = 0;
+ for (PendingAddOp op : pendingAddOps) {
+ if (op.maybeTimeout()) {
+ timedOut++;
+ }
+ }
+ if (timedOut > 0) {
+ LOG.info("Timed out {} add ops", timedOut);
+ }
+ }
+
void errorOutPendingAdds(int rc) {
errorOutPendingAdds(rc, drainPendingAddsToErrorOut());
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index d41471150..e42bbf3de 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -25,8 +25,6 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
@@ -51,7 +49,7 @@
*
*
*/
-class PendingAddOp extends SafeRunnable implements WriteCallback, TimerTask {
+class PendingAddOp extends SafeRunnable implements WriteCallback {
private static final Logger LOG =
LoggerFactory.getLogger(PendingAddOp.class);
ByteBuf payload;
@@ -68,8 +66,7 @@
boolean isRecoveryAdd = false;
long requestTimeNanos;
- int timeoutSec;
- Timeout timeout = null;
+ long timeoutNanos;
OpStatsLogger addOpLogger;
long currentLedgerLength;
@@ -91,14 +88,11 @@ static PendingAddOp create(LedgerHandle lh, ByteBuf
payload, AddCallback cb, Obj
op.completed = false;
op.ackSet = lh.distributionSchedule.getAckSet();
op.addOpLogger = lh.bk.getAddOpLogger();
- if (op.timeout != null) {
- op.timeout.cancel();
- }
- op.timeout = null;
- op.timeoutSec = lh.bk.getConf().getAddEntryQuorumTimeout();
+ op.timeoutNanos = lh.bk.addEntryQuorumTimeoutNanos;
op.pendingWriteRequests = 0;
op.callbackTriggered = false;
op.hasRun = false;
+ op.requestTimeNanos = Long.MAX_VALUE;
return op;
}
@@ -131,9 +125,12 @@ void sendWriteRequest(int bookieIndex) {
++pendingWriteRequests;
}
- @Override
- public void run(Timeout timeout) {
- timeoutQuorumWait();
+ boolean maybeTimeout() {
+ if (MathUtils.elapsedNanos(requestTimeNanos) >= timeoutNanos) {
+ timeoutQuorumWait();
+ return true;
+ }
+ return false;
}
void timeoutQuorumWait() {
@@ -220,11 +217,6 @@ public void safeRun() {
return;
}
- if (timeoutSec > -1) {
- this.timeout = lh.bk.getBookieClient().scheduleTimeout(
- this, timeoutSec, TimeUnit.SECONDS);
- }
-
this.requestTimeNanos = MathUtils.nowInNano();
checkNotNull(lh);
checkNotNull(lh.macManager);
@@ -340,10 +332,6 @@ void sendAddSuccessCallbacks() {
}
void submitCallback(final int rc) {
- if (null != timeout) {
- timeout.cancel();
- }
-
if (LOG.isDebugEnabled()) {
LOG.debug("Submit callback (lid:{}, eid: {}). rc:{}", lh.getId(),
entryId, rc);
}
@@ -429,10 +417,6 @@ private void recycle() {
pendingWriteRequests = 0;
callbackTriggered = false;
hasRun = false;
- if (timeout != null) {
- timeout.cancel();
- }
- timeout = null;
recyclerHandle.recycle(this);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 361a383e5..966fd428a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -88,6 +88,7 @@
protected static final String ADD_ENTRY_TIMEOUT_SEC = "addEntryTimeoutSec";
protected static final String ADD_ENTRY_QUORUM_TIMEOUT_SEC =
"addEntryQuorumTimeoutSec";
protected static final String READ_ENTRY_TIMEOUT_SEC =
"readEntryTimeoutSec";
+ protected static final String TIMEOUT_MONITOR_INTERVAL_SEC =
"timeoutMonitorIntervalSec";
protected static final String TIMEOUT_TASK_INTERVAL_MILLIS =
"timeoutTaskIntervalMillis";
protected static final String EXPLICIT_LAC_INTERVAL =
"explicitLacInterval";
protected static final String PCBC_TIMEOUT_TIMER_TICK_DURATION_MS =
"pcbcTimeoutTimerTickDurationMs";
@@ -678,11 +679,41 @@ public ClientConfiguration setReadEntryTimeout(int
timeout) {
}
/**
- * Get the interval between successive executions of the
PerChannelBookieClient's
- * TimeoutTask. This value is in milliseconds. Every X milliseconds, the
timeout task
- * will be executed and it will error out entries that have timed out.
+ * Get the interval between successive executions of the operation timeout
monitor. This value is in seconds.
+ *
+ * @see #setTimeoutMonitorIntervalSec(long)
+ * @return the interval at which request timeouts will be checked
+ */
+ public long getTimeoutMonitorIntervalSec() {
+ int minTimeout = Math.min(Math.min(getAddEntryQuorumTimeout(),
+ getAddEntryTimeout()),
getReadEntryTimeout());
+ return getLong(TIMEOUT_MONITOR_INTERVAL_SEC, Math.max(minTimeout / 2,
1));
+ }
+
+ /**
+ * Set the interval between successive executions of the operation timeout
monitor. The value in seconds.
+ * Every X seconds, all outstanding add and read operations are checked to
see if they have been running
+ * for longer than their configured timeout. Any that have been will be
errored out.
+ *
+ * <p>This timeout should be set to a value which is a fraction of the
values of
+ * {@link #getAddEntryQuorumTimeout}, {@link #getAddEntryTimeout} and
{@link #getReadEntryTimeout},
+ * so that these timeouts run in a timely fashion.
+ *
+ * @param timeoutInterval The timeout monitor interval, in seconds
+ * @return client configuration
+ */
+ public ClientConfiguration setTimeoutMonitorIntervalSec(long
timeoutInterval) {
+ setProperty(TIMEOUT_MONITOR_INTERVAL_SEC,
Long.toString(timeoutInterval));
+ return this;
+ }
+
+ /**
+ * Get the interval between successive executions of the
PerChannelBookieClient's TimeoutTask. This value is in
+ * milliseconds. Every X milliseconds, the timeout task will be executed
and it will error out entries that have
+ * timed out.
*
* <p>We do it more aggressive to not accumulate pending requests due to
slow responses.
+ *
* @return the interval at which request timeouts will be checked
*/
@Deprecated
@@ -729,6 +760,7 @@ public ClientConfiguration setExplictLacInterval(int
interval) {
*
* @return tick duration in milliseconds
*/
+ @Deprecated
public long getPCBCTimeoutTimerTickDurationMs() {
return getLong(PCBC_TIMEOUT_TIMER_TICK_DURATION_MS, 100);
}
@@ -745,6 +777,7 @@ public long getPCBCTimeoutTimerTickDurationMs() {
* tick duration in milliseconds.
* @return client configuration.
*/
+ @Deprecated
public ClientConfiguration setPCBCTimeoutTimerTickDurationMs(long
tickDuration) {
setProperty(PCBC_TIMEOUT_TIMER_TICK_DURATION_MS, tickDuration);
return this;
@@ -759,6 +792,7 @@ public ClientConfiguration
setPCBCTimeoutTimerTickDurationMs(long tickDuration)
*
* @return number of ticks that used for timeout timer.
*/
+ @Deprecated
public int getPCBCTimeoutTimerNumTicks() {
return getInt(PCBC_TIMEOUT_TIMER_NUM_TICKS, 1024);
}
@@ -775,6 +809,7 @@ public int getPCBCTimeoutTimerNumTicks() {
* number of ticks that used for timeout timer.
* @return client configuration.
*/
+ @Deprecated
public ClientConfiguration setPCBCTimeoutTimerNumTicks(int numTicks) {
setProperty(PCBC_TIMEOUT_TIMER_NUM_TICKS, numTicks);
return this;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index ceac6978a..9b1ca9cc9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -23,23 +23,23 @@
import static com.google.common.base.Charsets.UTF_8;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ExtensionRegistry;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.HashedWheelTimer;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -76,10 +76,12 @@
AtomicLong totalBytesOutstanding = new AtomicLong();
OrderedSafeExecutor executor;
+ ScheduledExecutorService scheduler;
+ ScheduledFuture<?> timeoutFuture;
+
EventLoopGroup eventLoopGroup;
final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool>
channels =
new ConcurrentHashMap<BookieSocketAddress,
PerChannelBookieClientPool>();
- final HashedWheelTimer requestTimer;
private final ClientAuthProvider.Factory authProviderFactory;
private final ExtensionRegistry registry;
@@ -93,12 +95,8 @@
private final long bookieErrorThresholdPerInterval;
public BookieClient(ClientConfiguration conf, EventLoopGroup
eventLoopGroup,
- OrderedSafeExecutor executor) throws IOException {
- this(conf, eventLoopGroup, executor, NullStatsLogger.INSTANCE);
- }
-
- public BookieClient(ClientConfiguration conf, EventLoopGroup
eventLoopGroup,
- OrderedSafeExecutor executor, StatsLogger statsLogger)
throws IOException {
+ OrderedSafeExecutor executor, ScheduledExecutorService
scheduler,
+ StatsLogger statsLogger) throws IOException {
this.conf = conf;
this.eventLoopGroup = eventLoopGroup;
this.executor = executor;
@@ -110,11 +108,21 @@ public BookieClient(ClientConfiguration conf,
EventLoopGroup eventLoopGroup,
this.statsLogger = statsLogger;
this.numConnectionsPerBookie = conf.getNumChannelsPerBookie();
- this.requestTimer = new HashedWheelTimer(
- new
ThreadFactoryBuilder().setNameFormat("BookieClientTimer-%d").build(),
- conf.getPCBCTimeoutTimerTickDurationMs(),
TimeUnit.MILLISECONDS,
- conf.getPCBCTimeoutTimerNumTicks());
this.bookieErrorThresholdPerInterval =
conf.getBookieErrorThresholdPerInterval();
+
+ this.scheduler = scheduler;
+ if (conf.getAddEntryTimeout() > 0 || conf.getReadEntryTimeout() > 0) {
+ SafeRunnable monitor = new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ monitorPendingOperations();
+ }
+ };
+ this.timeoutFuture = this.scheduler.scheduleAtFixedRate(monitor,
+
conf.getTimeoutMonitorIntervalSec(),
+
conf.getTimeoutMonitorIntervalSec(),
+
TimeUnit.SECONDS);
+ }
}
private int getRc(int rc) {
@@ -145,8 +153,8 @@ private int getRc(int rc) {
@Override
public PerChannelBookieClient create(BookieSocketAddress address,
PerChannelBookieClientPool pcbcPool,
SecurityHandlerFactory shFactory) throws SecurityException {
- return new PerChannelBookieClient(conf, executor, eventLoopGroup,
address, requestTimer, statsLogger,
- authProviderFactory, registry, pcbcPool, shFactory);
+ return new PerChannelBookieClient(conf, executor, eventLoopGroup,
address, statsLogger,
+ authProviderFactory, registry,
pcbcPool, shFactory);
}
private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
@@ -521,12 +529,14 @@ public void safeRun() {
}
}
- public boolean isClosed() {
- return closed;
+ private void monitorPendingOperations() {
+ for (PerChannelBookieClientPool clientPool : channels.values()) {
+ clientPool.checkTimeoutOnPendingOperations();
+ }
}
- public Timeout scheduleTimeout(TimerTask task, long timeoutSec, TimeUnit
timeUnit) {
- return requestTimer.newTimeout(task, timeoutSec, timeUnit);
+ public boolean isClosed() {
+ return closed;
}
public void close() {
@@ -538,11 +548,13 @@ public void close() {
}
channels.clear();
authProviderFactory.close();
+
+ if (timeoutFuture != null) {
+ timeoutFuture.cancel(false);
+ }
} finally {
closeLock.writeLock().unlock();
}
- // Shut down the timeout executor.
- this.requestTimer.stop();
}
private static class Counter {
@@ -599,7 +611,10 @@ public void writeComplete(int rc, long ledger, long entry,
BookieSocketAddress a
.name("BookieClientWorker")
.numThreads(1)
.build();
- BookieClient bc = new BookieClient(new ClientConfiguration(),
eventLoopGroup, executor);
+ ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(
+ new DefaultThreadFactory("BookKeeperClientScheduler"));
+ BookieClient bc = new BookieClient(new ClientConfiguration(),
eventLoopGroup, executor,
+ scheduler,
NullStatsLogger.INSTANCE);
BookieSocketAddress addr = new BookieSocketAddress(args[0],
Integer.parseInt(args[1]));
for (int i = 0; i < 100000; i++) {
@@ -608,6 +623,7 @@ public void writeComplete(int rc, long ledger, long entry,
BookieSocketAddress a
}
counter.wait(0);
System.out.println("Total = " + counter.total());
+ scheduler.shutdown();
eventLoopGroup.shutdownGracefully();
executor.shutdown();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
index 3ac5cb6d9..41233cf83 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
@@ -94,6 +94,13 @@ public void obtain(GenericCallback<PerChannelBookieClient>
callback, long key) {
clients[idx].connectIfNeededAndDoOp(callback);
}
+ @Override
+ public void checkTimeoutOnPendingOperations() {
+ for (int i = 0; i < clients.length; i++) {
+ clients[i].checkTimeoutOnPendingOperations();
+ }
+ }
+
@Override
public void recordError() {
errorCounter.incrementAndGet();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index c2c8af1ee..c982b6712 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -54,11 +54,8 @@
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.ssl.SslHandler;
-import io.netty.util.HashedWheelTimer;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
@@ -70,12 +67,14 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiPredicate;
import javax.net.ssl.SSLPeerUnverifiedException;
@@ -147,9 +146,8 @@
final BookieSocketAddress addr;
final EventLoopGroup eventLoopGroup;
final OrderedSafeExecutor executor;
- final HashedWheelTimer requestTimer;
- final int addEntryTimeout;
- final int readEntryTimeout;
+ final long addEntryTimeoutNanos;
+ final long readEntryTimeoutNanos;
final int maxFrameSize;
final int getBookieInfoTimeout;
final int startTLSTimeout;
@@ -203,7 +201,7 @@
public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup
eventLoopGroup,
BookieSocketAddress addr) throws
SecurityException {
- this(new ClientConfiguration(), executor, eventLoopGroup, addr, null,
NullStatsLogger.INSTANCE, null, null,
+ this(new ClientConfiguration(), executor, eventLoopGroup, addr,
NullStatsLogger.INSTANCE, null, null,
null);
}
@@ -211,24 +209,22 @@ public PerChannelBookieClient(OrderedSafeExecutor
executor, EventLoopGroup event
BookieSocketAddress addr,
ClientAuthProvider.Factory
authProviderFactory,
ExtensionRegistry extRegistry) throws
SecurityException {
- this(new ClientConfiguration(), executor, eventLoopGroup, addr, null,
NullStatsLogger.INSTANCE,
+ this(new ClientConfiguration(), executor, eventLoopGroup, addr,
NullStatsLogger.INSTANCE,
authProviderFactory, extRegistry, null);
}
public PerChannelBookieClient(ClientConfiguration conf,
OrderedSafeExecutor executor,
EventLoopGroup eventLoopGroup,
BookieSocketAddress addr,
- HashedWheelTimer requestTimer, StatsLogger
parentStatsLogger,
- ClientAuthProvider.Factory
authProviderFactory,
+ StatsLogger parentStatsLogger,
ClientAuthProvider.Factory authProviderFactory,
ExtensionRegistry extRegistry,
PerChannelBookieClientPool pcbcPool) throws
SecurityException {
- this(conf, executor, eventLoopGroup, addr, null,
NullStatsLogger.INSTANCE,
+ this(conf, executor, eventLoopGroup, addr, NullStatsLogger.INSTANCE,
authProviderFactory, extRegistry, pcbcPool, null);
}
public PerChannelBookieClient(ClientConfiguration conf,
OrderedSafeExecutor executor,
EventLoopGroup eventLoopGroup,
BookieSocketAddress addr,
- HashedWheelTimer requestTimer, StatsLogger
parentStatsLogger,
- ClientAuthProvider.Factory
authProviderFactory,
+ StatsLogger parentStatsLogger,
ClientAuthProvider.Factory authProviderFactory,
ExtensionRegistry extRegistry,
PerChannelBookieClientPool pcbcPool,
SecurityHandlerFactory shFactory) throws
SecurityException {
@@ -242,9 +238,8 @@ public PerChannelBookieClient(ClientConfiguration conf,
OrderedSafeExecutor exec
this.eventLoopGroup = eventLoopGroup;
}
this.state = ConnectionState.DISCONNECTED;
- this.requestTimer = requestTimer;
- this.addEntryTimeout = conf.getAddEntryTimeout();
- this.readEntryTimeout = conf.getReadEntryTimeout();
+ this.addEntryTimeoutNanos =
TimeUnit.SECONDS.toNanos(conf.getAddEntryTimeout());
+ this.readEntryTimeoutNanos =
TimeUnit.SECONDS.toNanos(conf.getReadEntryTimeout());
this.getBookieInfoTimeout = conf.getBookieInfoTimeout();
this.startTLSTimeout = conf.getStartTLSTimeout();
this.useV2WireProtocol = conf.getUseV2WireProtocol();
@@ -768,6 +763,30 @@ public void getBookieInfo(final long requested,
GetBookieInfoCallback cb, Object
writeAndFlush(channel, completionKey, getBookieInfoRequest);
}
+ private static final BiPredicate<CompletionKey, CompletionValue>
timeoutCheck = (key, value) -> {
+ return value.maybeTimeout();
+ };
+
+ public void checkTimeoutOnPendingOperations() {
+ int timedOutOperations = completionObjects.removeIf(timeoutCheck);
+
+ synchronized (this) {
+ Iterator<CompletionValue> iterator =
completionObjectsV2Conflicts.values().iterator();
+ while (iterator.hasNext()) {
+ CompletionValue value = iterator.next();
+ if (value.maybeTimeout()) {
+ ++timedOutOperations;
+ iterator.remove();
+ }
+ }
+ }
+
+ if (timedOutOperations > 0) {
+ LOG.info("Timed-out {} operations to channel {} for {}",
+ timedOutOperations, channel, addr);
+ }
+ }
+
/**
* Disconnects the bookie client. It can be reused.
*/
@@ -819,6 +838,7 @@ private void closeInternal(boolean permanent, boolean wait)
{
cf.awaitUninterruptibly();
}
}
+
}
private ChannelFuture closeChannel(Channel c) {
@@ -1235,14 +1255,12 @@ public void operationComplete(Future<Channel> future)
throws Exception {
protected long ledgerId;
protected long entryId;
protected long startTime;
- protected Timeout timeout;
public CompletionValue(String operationName,
Object ctx,
long ledgerId, long entryId,
OpStatsLogger opLogger,
- OpStatsLogger timeoutOpLogger,
- Timeout timeout) {
+ OpStatsLogger timeoutOpLogger) {
this.operationName = operationName;
this.ctx = ctx;
this.ledgerId = ledgerId;
@@ -1250,19 +1268,13 @@ public CompletionValue(String operationName,
this.startTime = MathUtils.nowInNano();
this.opLogger = opLogger;
this.timeoutOpLogger = timeoutOpLogger;
- this.timeout = timeout;
}
private long latency() {
return MathUtils.elapsedNanos(startTime);
}
- void cancelTimeoutAndLogOp(int rc) {
- Timeout t = timeout;
- if (null != t) {
- t.cancel();
- }
-
+ void logOpResult(int rc) {
if (rc != BKException.Code.OK) {
opLogger.registerFailedEvent(latency(), TimeUnit.NANOSECONDS);
} else {
@@ -1275,6 +1287,15 @@ void cancelTimeoutAndLogOp(int rc) {
}
}
+ boolean maybeTimeout() {
+ if (MathUtils.elapsedNanos(startTime) >= readEntryTimeoutNanos) {
+ timeout();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
void timeout() {
errorOut(BKException.Code.TimeoutException);
timeoutOpLogger.registerSuccessfulEvent(latency(),
@@ -1344,14 +1365,13 @@ public WriteLacCompletion(final CompletionKey key,
final long ledgerId) {
super("WriteLAC",
originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
- writeLacOpLogger, writeLacTimeoutOpLogger,
- scheduleTimeout(key, addEntryTimeout));
+ writeLacOpLogger, writeLacTimeoutOpLogger);
this.cb = new WriteLacCallback() {
@Override
public void writeLacComplete(int rc, long ledgerId,
BookieSocketAddress addr,
Object ctx) {
- cancelTimeoutAndLogOp(rc);
+ logOpResult(rc);
originalCallback.writeLacComplete(rc, ledgerId,
addr, originalCtx);
key.release();
@@ -1392,15 +1412,14 @@ public ReadLacCompletion(final CompletionKey key,
ReadLacCallback originalCallback,
final Object ctx, final long ledgerId) {
super("ReadLAC", ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
- readLacOpLogger, readLacTimeoutOpLogger,
- scheduleTimeout(key, readEntryTimeout));
+ readLacOpLogger, readLacTimeoutOpLogger);
this.cb = new ReadLacCallback() {
@Override
public void readLacComplete(int rc, long ledgerId,
ByteBuf lacBuffer,
ByteBuf lastEntryBuffer,
Object ctx) {
- cancelTimeoutAndLogOp(rc);
+ logOpResult(rc);
originalCallback.readLacComplete(
rc, ledgerId, lacBuffer, lastEntryBuffer, ctx);
key.release();
@@ -1452,15 +1471,14 @@ public ReadCompletion(final CompletionKey key,
final Object originalCtx,
long ledgerId, final long entryId) {
super("Read", originalCtx, ledgerId, entryId,
- readEntryOpLogger, readTimeoutOpLogger,
- scheduleTimeout(key, readEntryTimeout));
+ readEntryOpLogger, readTimeoutOpLogger);
this.cb = new ReadEntryCallback() {
@Override
public void readEntryComplete(int rc, long ledgerId,
long entryId, ByteBuf buffer,
Object ctx) {
- cancelTimeoutAndLogOp(rc);
+ logOpResult(rc);
originalCallback.readEntryComplete(rc,
ledgerId, entryId,
buffer,
originalCtx);
@@ -1550,12 +1568,11 @@ private void handleReadResponse(long ledgerId,
public StartTLSCompletion(final CompletionKey key) {
super("StartTLS", null, -1, -1,
- startTLSOpLogger, startTLSTimeoutOpLogger,
- scheduleTimeout(key, startTLSTimeout));
+ startTLSOpLogger, startTLSTimeoutOpLogger);
this.cb = new StartTLSCallback() {
@Override
public void startTLSComplete(int rc, Object ctx) {
- cancelTimeoutAndLogOp(rc);
+ logOpResult(rc);
key.release();
}
};
@@ -1602,13 +1619,12 @@ public GetBookieInfoCompletion(final CompletionKey key,
final GetBookieInfoCallback
origCallback,
final Object origCtx) {
super("GetBookieInfo", origCtx, 0L, 0L,
- getBookieInfoOpLogger, getBookieInfoTimeoutOpLogger,
- scheduleTimeout(key, getBookieInfoTimeout));
+ getBookieInfoOpLogger, getBookieInfoTimeoutOpLogger);
this.cb = new GetBookieInfoCallback() {
@Override
public void getBookieInfoComplete(int rc, BookieInfo bInfo,
Object ctx) {
- cancelTimeoutAndLogOp(rc);
+ logOpResult(rc);
origCallback.getBookieInfoComplete(rc, bInfo, origCtx);
key.release();
}
@@ -1668,8 +1684,7 @@ AddCompletion acquireAddCompletion(final CompletionKey
key,
WriteCallback originalCallback = null;
AddCompletion(Recycler.Handle<AddCompletion> handle) {
- super("Add", null, -1, -1,
- addEntryOpLogger, addTimeoutOpLogger, null);
+ super("Add", null, -1, -1, addEntryOpLogger, addTimeoutOpLogger);
this.handle = handle;
}
@@ -1683,19 +1698,28 @@ void reset(final CompletionKey key,
this.ledgerId = ledgerId;
this.entryId = entryId;
this.startTime = MathUtils.nowInNano();
- this.timeout = scheduleTimeout(key, addEntryTimeout);
}
@Override
public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr,
Object ctx) {
- cancelTimeoutAndLogOp(rc);
+ logOpResult(rc);
originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx);
key.release();
handle.recycle(this);
}
+ @Override
+ boolean maybeTimeout() {
+ if (MathUtils.elapsedNanos(startTime) >= addEntryTimeoutNanos) {
+ timeout();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
@Override
public void errorOut() {
errorOut(BKException.Code.BookieHandleNotAvailableException);
@@ -1739,14 +1763,6 @@ CompletionKey newCompletionKey(long txnId, OperationType
operationType) {
return new V3CompletionKey(txnId, operationType);
}
- Timeout scheduleTimeout(CompletionKey key, long timeout) {
- if (null != requestTimer) {
- return requestTimer.newTimeout(key, timeout, TimeUnit.SECONDS);
- } else {
- return null;
- }
- }
-
class V3CompletionKey extends CompletionKey {
public V3CompletionKey(long txnId, OperationType operationType) {
@@ -1774,7 +1790,7 @@ public String toString() {
}
- abstract class CompletionKey implements TimerTask {
+ abstract class CompletionKey {
final long txnId;
OperationType operationType;
@@ -1784,17 +1800,6 @@ public String toString() {
this.operationType = operationType;
}
- @Override
- public void run(Timeout timeout) throws Exception {
- if (timeout.isCancelled()) {
- return;
- }
- CompletionValue completion = completionObjects.remove(this);
- if (completion != null) {
- completion.timeout();
- }
- }
-
public void release() {}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
index bd07a4e1b..80f00a595 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
@@ -45,6 +45,12 @@
*/
void recordError();
+ /**
+ * Check if any ops on any channel needs to be timed out.
+ * This is called on all channels, even if the channel is not yet
connected.
+ */
+ void checkTimeoutOnPendingOperations();
+
/**
* Disconnect the connections in the pool.
*
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
index d2bff33ec..bf6c2305f 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
@@ -25,8 +25,11 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -36,6 +39,7 @@
import org.apache.bookkeeper.proto.BookieClient;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.junit.After;
@@ -53,6 +57,7 @@
DigestType digestType;
public EventLoopGroup eventLoopGroup;
public OrderedSafeExecutor executor;
+ private ScheduledExecutorService scheduler;
public TestGetBookieInfoTimeout() {
super(10);
@@ -68,10 +73,13 @@ public void setUp() throws Exception {
.name("BKClientOrderedSafeExecutor")
.numThreads(2)
.build();
+ scheduler = Executors.newSingleThreadScheduledExecutor(
+ new DefaultThreadFactory("BookKeeperClientScheduler"));
}
@After
public void tearDown() throws Exception {
+ scheduler.shutdown();
eventLoopGroup.shutdownGracefully();
executor.shutdown();
}
@@ -99,7 +107,7 @@ public void testGetBookieInfoTimeout() throws Exception {
// try to get bookie info from the sleeping bookie. It should fail
with timeout error
BookieSocketAddress addr = new
BookieSocketAddress(bookieToSleep.getSocketAddress().getHostString(),
bookieToSleep.getPort());
- BookieClient bc = new BookieClient(cConf, eventLoopGroup, executor);
+ BookieClient bc = new BookieClient(cConf, eventLoopGroup, executor,
scheduler, NullStatsLogger.INSTANCE);
long flags =
BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
|
BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 6bf05ae39..b799635c1 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -27,12 +27,15 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.Code;
@@ -48,6 +51,7 @@
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.junit.After;
@@ -64,6 +68,7 @@
public EventLoopGroup eventLoopGroup;
public OrderedSafeExecutor executor;
+ private ScheduledExecutorService scheduler;
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
@Before
@@ -83,10 +88,13 @@ public void setUp() throws Exception {
.name("BKClientOrderedSafeExecutor")
.numThreads(2)
.build();
+ scheduler = Executors.newSingleThreadScheduledExecutor(
+ new DefaultThreadFactory("BookKeeperClientScheduler"));
}
@After
public void tearDown() throws Exception {
+ scheduler.shutdown();
bs.shutdown();
recursiveDelete(tmpDir);
eventLoopGroup.shutdownGracefully();
@@ -146,7 +154,8 @@ public void testWriteGaps() throws Exception {
BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port);
ResultStruct arc = new ResultStruct();
- BookieClient bc = new BookieClient(new ClientConfiguration(),
eventLoopGroup, executor);
+ BookieClient bc = new BookieClient(new ClientConfiguration(),
eventLoopGroup, executor,
+ scheduler,
NullStatsLogger.INSTANCE);
ByteBuf bb = createByteBuffer(1, 1, 1);
bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc,
BookieProtocol.FLAG_NONE);
synchronized (arc) {
@@ -246,7 +255,8 @@ private ByteBuf createByteBuffer(int i, long lid, long eid)
{
public void testNoLedger() throws Exception {
ResultStruct arc = new ResultStruct();
BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port);
- BookieClient bc = new BookieClient(new ClientConfiguration(),
eventLoopGroup, executor);
+ BookieClient bc = new BookieClient(new ClientConfiguration(),
eventLoopGroup, executor,
+ scheduler,
NullStatsLogger.INSTANCE);
synchronized (arc) {
bc.readEntry(addr, 2, 13, recb, arc);
arc.wait(1000);
@@ -257,7 +267,8 @@ public void testNoLedger() throws Exception {
@Test
public void testGetBookieInfo() throws IOException, InterruptedException {
BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port);
- BookieClient bc = new BookieClient(new ClientConfiguration(), new
NioEventLoopGroup(), executor);
+ BookieClient bc = new BookieClient(new ClientConfiguration(), new
NioEventLoopGroup(), executor,
+ scheduler,
NullStatsLogger.INSTANCE);
long flags =
BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
|
BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
deleted file mode 100644
index fc2047658..000000000
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.bookkeeper.test;
-
-import io.netty.buffer.Unpooled;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookieClient;
-import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class tests BookieClient. It just sends the a new entry to itself.
- */
-class LoopbackClient implements WriteCallback {
-
- private static final Logger LOG =
LoggerFactory.getLogger(LoopbackClient.class);
-
- BookieClient client;
- static int recvTimeout = 2000;
- long begin = 0;
- int limit;
- OrderedSafeExecutor executor;
-
- static class Counter {
- int c;
- int limit;
-
- Counter(int limit) {
- this.c = 0;
- this.limit = limit;
- }
-
- synchronized void increment() {
- if (++c == limit) {
- this.notify();
- }
- }
- }
-
- LoopbackClient(EventLoopGroup eventLoopGroup, OrderedSafeExecutor
executor, long begin, int limit)
- throws IOException {
- this.client = new BookieClient(new ClientConfiguration(),
eventLoopGroup, executor);
- this.begin = begin;
- }
-
- void write(long ledgerId, long entry, byte[] data, BookieSocketAddress
addr, WriteCallback cb, Object ctx)
- throws IOException, InterruptedException {
- LOG.info("Ledger id: " + ledgerId + ", Entry: " + entry);
- byte[] passwd = new byte[20];
- Arrays.fill(passwd, (byte) 'a');
-
- client.addEntry(addr, ledgerId, passwd, entry,
Unpooled.wrappedBuffer(data), cb, ctx, BookieProtocol.FLAG_NONE);
- }
-
- @Override
- public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx) {
- Counter counter = (Counter) ctx;
- counter.increment();
- }
-
- public static void main(String args[]) {
- byte[] data = new byte[Integer.parseInt(args[0])];
- Integer limit = Integer.parseInt(args[1]);
- Counter c = new Counter(limit);
- long ledgerId = Long.valueOf("0").longValue();
- long begin = System.currentTimeMillis();
-
- LoopbackClient lb;
- EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
- OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
- .name("BookieClientScheduler")
- .numThreads(2)
- .build();
- try {
- BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1",
Integer.valueOf(args[2]).intValue());
- lb = new LoopbackClient(eventLoopGroup, executor, begin,
limit.intValue());
-
- for (int i = 0; i < limit; i++) {
- lb.write(ledgerId, i, data, addr, lb, c);
- }
-
- synchronized (c) {
- c.wait();
- System.out.println("Time to write all entries: " +
(System.currentTimeMillis() - begin));
- }
- } catch (IOException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
-}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services