Repository: bookkeeper Updated Branches: refs/heads/master 5662416d8 -> 6cfecea6c
BOOKKEEPER-862: Add tracing and stats to OrderedSafeExecutor for debugging slow tasks (Leigh Stewart via sijie) Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/6cfecea6 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/6cfecea6 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/6cfecea6 Branch: refs/heads/master Commit: 6cfecea6c3e2b6e327fb53ac85f1894df81a10b2 Parents: 5662416 Author: Sijie Guo <si...@apache.org> Authored: Tue Oct 6 01:05:18 2015 -0700 Committer: Sijie Guo <si...@apache.org> Committed: Tue Oct 6 01:05:18 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../bookkeeper/benchmark/BenchBookie.java | 5 +- .../apache/bookkeeper/client/BookKeeper.java | 9 +- .../bookkeeper/client/LedgerDeleteOp.java | 5 + .../client/LedgerFragmentReplicator.java | 4 + .../apache/bookkeeper/client/LedgerHandle.java | 38 ++++ .../apache/bookkeeper/client/LedgerOpenOp.java | 4 + .../bookkeeper/client/ReadOnlyLedgerHandle.java | 5 + .../bookkeeper/conf/ClientConfiguration.java | 46 ++++ .../apache/bookkeeper/proto/BookieClient.java | 6 +- .../bookkeeper/proto/PacketProcessorBaseV3.java | 4 + .../proto/PerChannelBookieClient.java | 17 ++ .../bookkeeper/proto/ReadEntryProcessor.java | 5 + .../bookkeeper/proto/WriteEntryProcessor.java | 6 + .../bookkeeper/util/OrderedSafeExecutor.java | 221 ++++++++++++++++--- .../proto/TestPerChannelBookieClient.java | 21 +- .../bookkeeper/test/BookieClientTest.java | 5 +- .../apache/bookkeeper/test/LoopbackClient.java | 5 +- .../server/benchmark/BookieBenchmark.java | 6 +- .../server/persistence/ReadAheadCache.java | 6 +- 20 files changed, 372 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 36d4372..14a9bed 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -91,6 +91,8 @@ Trunk (unreleased changes) BOOKKEEPER-802: Bookkeeper protocol documentation (ivank via sijie) + BOOKKEEPER-862: Add tracing and stats to OrderedSafeExecutor for debugging slow tasks (Leigh Stewart via sijie) + bookkeeper-client: BOOKKEEPER-810: Allow to configure TCP connect timeout (Charles Xie via sijie) http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java ---------------------------------------------------------------------- diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java index 258a3fb..89ffb82 100644 --- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java +++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java @@ -142,7 +142,10 @@ public class BenchBookie { ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors .newCachedThreadPool()); - OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "BenchBookieClientScheduler"); + OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder() + .name("BenchBookieClientScheduler") + .numThreads(1) + .build(); ClientConfiguration conf = new ClientConfiguration(); BookieClient bc = new BookieClient(conf, channelFactory, executor); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index 6fe1371..6bb71fa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -295,8 +295,13 @@ public class BookKeeper { this.placementPolicy = initializeEnsemblePlacementPolicy(conf); // initialize main worker pool - this.mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads(), - "BookKeeperClientWorker"); + this.mainWorkerPool = OrderedSafeExecutor.newBuilder() + .name("BookKeeperClientWorker") + .numThreads(conf.getNumWorkerThreads()) + .statsLogger(statsLogger) + .traceTaskExecution(conf.getEnableTaskExecutionStats()) + .traceTaskWarnTimeMicroSec(conf.getTaskExecutionWarnTimeMicros()) + .build(); // initialize bookie client this.bookieClient = new BookieClient(conf, this.channelFactory, this.mainWorkerPool, statsLogger); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java index 13ce1fd..50fe54a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java @@ -89,4 +89,9 @@ class LedgerDeleteOp extends OrderedSafeGenericCallback<Void> { } cb.deleteComplete(rc, this.ctx); } + + @Override + public String toString() { + return String.format("LedgerDeleteOp(%d)", ledgerId); + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java index 2078245..b4c8cc8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java @@ -412,6 +412,10 @@ public class LedgerFragmentReplicator { newBookie); } } + @Override + public String toString() { + return String.format("ReReadMetadataForUpdateEnsemble(%d)", lh.getId()); + } }); return; } else if (rc != BKException.Code.OK) { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index f0d79b2..4ed3c03 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -377,6 +377,11 @@ public class LedgerHandle { } } } + + @Override + public String toString() { + return String.format("ReReadMetadataForClose(%d)", ledgerId); + } }); } else if (rc != BKException.Code.OK) { LOG.error("Error update ledger metadata for ledger " + ledgerId + " : " + rc); @@ -385,11 +390,21 @@ public class LedgerHandle { cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx); } } + + @Override + public String toString() { + return String.format("WriteLedgerConfigForClose(%d)", ledgerId); + } }; writeLedgerConfig(new CloseCb()); } + + @Override + public String toString() { + return String.format("CloseLedgerHandle(%d)", ledgerId); + } }); } @@ -597,6 +612,10 @@ public class LedgerHandle { entryId, lastAddConfirmed, currentLength, data, offset, length); op.initiate(toSend, length); } + @Override + public String toString() { + return String.format("AsyncAddEntry(lid=%d, eid=%d)", ledgerId, entryId); + } }); } catch (RejectedExecutionException e) { cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException), @@ -938,6 +957,11 @@ public class LedgerHandle { // the failed bookie has been replaced unsetSuccessAndSendWriteRequest(ensembleInfo.bookieIndex); } + + @Override + public String toString() { + return String.format("ChangeEnsemble(%d)", ledgerId); + } }; /** @@ -1059,6 +1083,10 @@ public class LedgerHandle { return true; } + @Override + public String toString() { + return String.format("ReReadLedgerMetadata(%d)", ledgerId); + } }; void unsetSuccessAndSendWriteRequest(final int bookieIndex) { @@ -1119,6 +1147,11 @@ public class LedgerHandle { recover(cb); } } + + @Override + public String toString() { + return String.format("ReReadMetadataForRecover(%d)", ledgerId); + } }); } else if (rc == BKException.Code.OK) { new LedgerRecoveryOp(LedgerHandle.this, cb).initiate(); @@ -1127,6 +1160,11 @@ public class LedgerHandle { cb.operationComplete(rc, null); } } + + @Override + public String toString() { + return String.format("WriteLedgerConfigForRecover(%d)", ledgerId); + } }); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java index 69a09e9..cc97866 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java @@ -176,6 +176,10 @@ class LedgerOpenOp implements GenericCallback<LedgerMetadata> { openComplete(bk.getReturnRc(BKException.Code.LedgerRecoveryException), null); } } + @Override + public String toString() { + return String.format("Recover(%d)", ledgerId); + } }); } else { lh.asyncReadLastConfirmed(new ReadLastConfirmedCallback() { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java index 8de4092..711f209 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java @@ -57,6 +57,11 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements LedgerMetadataListene ReadOnlyLedgerHandle.this.metadata = this.m; } } + + @Override + public String toString() { + return String.format("MetadataUpdater(%d)", ledgerId); + } } ReadOnlyLedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata, http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index ae63c70..dde6d3a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -70,6 +70,10 @@ public class ClientConfiguration extends AbstractConfiguration { // Ensemble Placement Policy protected final static String ENSEMBLE_PLACEMENT_POLICY = "ensemblePlacementPolicy"; + // Stats + protected final static String ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats"; + protected final static String TASK_EXECUTION_WARN_TIME_MICROS = "taskExecutionWarnTimeMicros"; + /** * Construct a default client-side configuration */ @@ -619,4 +623,46 @@ public class ClientConfiguration extends AbstractConfiguration { setProperty(ENSEMBLE_PLACEMENT_POLICY, policyClass.getName()); return this; } + + /** + * Whether to enable recording task execution stats. + * + * @return flag to enable/disable recording task execution stats. + */ + public boolean getEnableTaskExecutionStats() { + return getBoolean(ENABLE_TASK_EXECUTION_STATS, false); + } + + /** + * Enable/Disable recording task execution stats. + * + * @param enabled + * flag to enable/disable recording task execution stats. + * @return client configuration. + */ + public ClientConfiguration setEnableTaskExecutionStats(boolean enabled) { + setProperty(ENABLE_TASK_EXECUTION_STATS, enabled); + return this; + } + + /** + * Get task execution duration which triggers a warning. + * + * @return time in microseconds which triggers a warning. + */ + public long getTaskExecutionWarnTimeMicros() { + return getLong(TASK_EXECUTION_WARN_TIME_MICROS, TimeUnit.SECONDS.toMicros(1)); + } + + /** + * Set task execution duration which triggers a warning. + * + * @param warnTime + * time in microseconds which triggers a warning. + * @return client configuration. + */ + public ClientConfiguration setTaskExecutionWarnTimeMicros(long warnTime) { + setProperty(TASK_EXECUTION_WARN_TIME_MICROS, warnTime); + return this; + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java index f455c90..87d1865 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java @@ -318,8 +318,10 @@ public class BookieClient implements PerChannelBookieClientFactory { "BookKeeper-NIOBoss-%d").build()), Executors.newCachedThreadPool(tfb.setNameFormat( "BookKeeper-NIOWorker-%d").build())); - OrderedSafeExecutor executor = new OrderedSafeExecutor(1, - "BookieClientWorker"); + OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder() + .name("BookieClientWorker") + .numThreads(1) + .build(); BookieClient bc = new BookieClient(new ClientConfiguration(), channelFactory, executor); BookieSocketAddress addr = new BookieSocketAddress(args[0], Integer.parseInt(args[1])); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java index 0d7bbee..9ffca53 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java @@ -71,4 +71,8 @@ public abstract class PacketProcessorBaseV3 { return header.build(); } + @Override + public String toString() { + return request.toString(); + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 34b0362..6d8058f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -550,6 +550,11 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan readCompletion.cb.readEntryComplete(rc, readCompletion.ledgerId, readCompletion.entryId, null, readCompletion.ctx); } + + @Override + public String toString() { + return String.format("ErrorOutReadKey(%s)", key); + } }); } @@ -577,6 +582,11 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan addr, addCompletion.ctx); LOG.debug("Invoked callback method: {}", addCompletion.entryId); } + + @Override + public String toString() { + return String.format("ErrorOutAddKey(%s)", key); + } }); } @@ -724,6 +734,13 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan break; } } + + @Override + public String toString() { + return String.format("HandleResponse(Txn=%d, Type=%s, Entry=(%d, %d))", + header.getTxnId(), header.getOperation(), + completionValue.ledgerId, completionValue.entryId); + } }); } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java index 7c17bb1..43360fa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java @@ -132,4 +132,9 @@ class ReadEntryProcessor extends PacketProcessorBase { requestProcessor.readRequestStats); } } + + @Override + public String toString() { + return String.format("ReadEntry(%d, %d)", request.getLedgerId(), request.getEntryId()); + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java index f493d73..b314998 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java @@ -100,4 +100,10 @@ class WriteEntryProcessor extends PacketProcessorBase implements WriteCallback { ResponseBuilder.buildAddResponse(request), requestProcessor.addRequestStats); } + + @Override + public String toString() { + return String.format("WriteEntry(%d, %d)", + request.getLedgerId(), request.getEntryId()); + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java index 597c886..f1d0e9f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java @@ -17,16 +17,25 @@ */ package org.apache.bookkeeper.util; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Random; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,42 +56,142 @@ import org.slf4j.LoggerFactory; * */ public class OrderedSafeExecutor { - final ExecutorService threads[]; + final static long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1); + final String name; + final ThreadPoolExecutor threads[]; final long threadIds[]; + final BlockingQueue<Runnable> queues[]; final Random rand = new Random(); + final OpStatsLogger taskExecutionStats; + final OpStatsLogger taskPendingStats; + final boolean traceTaskExecution; + final long warnTimeMicroSec; + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private String name = "OrderedSafeExecutor"; + private int numThreads = Runtime.getRuntime().availableProcessors(); + private ThreadFactory threadFactory = null; + private StatsLogger statsLogger = NullStatsLogger.INSTANCE; + private boolean traceTaskExecution = false; + private long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT; + + public Builder name(String name) { + this.name = name; + return this; + } + + public Builder numThreads(int num) { + this.numThreads = num; + return this; + } + + public Builder threadFactory(ThreadFactory threadFactory) { + this.threadFactory = threadFactory; + return this; + } + + public Builder statsLogger(StatsLogger statsLogger) { + this.statsLogger = statsLogger; + return this; + } + + public Builder traceTaskExecution(boolean enabled) { + this.traceTaskExecution = enabled; + return this; + } + + public Builder traceTaskWarnTimeMicroSec(long warnTimeMicroSec) { + this.warnTimeMicroSec = warnTimeMicroSec; + return this; + } + + public OrderedSafeExecutor build() { + if (null == threadFactory) { + threadFactory = Executors.defaultThreadFactory(); + } + return new OrderedSafeExecutor(name, numThreads, threadFactory, statsLogger, + traceTaskExecution, warnTimeMicroSec); + } + + } + + private class TimedRunnable extends SafeRunnable { + final SafeRunnable runnable; + final long initNanos; + + TimedRunnable(SafeRunnable runnable) { + this.runnable = runnable; + this.initNanos = MathUtils.nowInNano(); + } + + @Override + public void safeRun() { + taskPendingStats.registerSuccessfulEvent(initNanos, TimeUnit.NANOSECONDS); + long startNanos = MathUtils.nowInNano(); + this.runnable.safeRun(); + long elapsedMicroSec = MathUtils.elapsedMicroSec(startNanos); + taskExecutionStats.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS); + if (elapsedMicroSec >= warnTimeMicroSec) { + logger.warn("Runnable {}:{} took too long {} micros to execute.", + new Object[] { runnable, runnable.getClass(), elapsedMicroSec }); + } + } + } + + @Deprecated + public OrderedSafeExecutor(int numThreads, String threadName) { + this(threadName, numThreads, Executors.defaultThreadFactory(), NullStatsLogger.INSTANCE, + false, WARN_TIME_MICRO_SEC_DEFAULT); + } /** * Constructs Safe executor * * @param numThreads * - number of threads - * @param threadName - * - name of the thread + * @param baseName + * - base name of executor threads + * @param threadFactory + * - for constructing threads + * @param statsLogger + * - for reporting executor stats + * @param traceTaskExecution + * - should we stat task execution + * @param warnTimeMicroSec + * - log long task exec warning after this interval */ - public OrderedSafeExecutor(int numThreads, String threadName) { - if (numThreads <= 0) { - throw new IllegalArgumentException(); - } - if (StringUtils.isBlank(threadName)) { - // sets default name - threadName = "OrderedSafeExecutor"; - } - threads = new ExecutorService[numThreads]; + @SuppressWarnings("unchecked") + private OrderedSafeExecutor(String baseName, int numThreads, ThreadFactory threadFactory, + StatsLogger statsLogger, boolean traceTaskExecution, + long warnTimeMicroSec) { + Preconditions.checkArgument(numThreads > 0); + Preconditions.checkArgument(!StringUtils.isBlank(baseName)); + + this.warnTimeMicroSec = warnTimeMicroSec; + name = baseName; + threads = new ThreadPoolExecutor[numThreads]; threadIds = new long[numThreads]; + queues = new BlockingQueue[numThreads]; for (int i = 0; i < numThreads; i++) { - StringBuilder thName = new StringBuilder(threadName); - thName.append("-"); - thName.append(i); - thName.append("-%d"); - ThreadFactoryBuilder tfb = new ThreadFactoryBuilder() - .setNameFormat(thName.toString()); - threads[i] = Executors.newSingleThreadExecutor(tfb.build()); - final int tid = i; + queues[i] = new LinkedBlockingQueue<Runnable>(); + threads[i] = new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, queues[i], + new ThreadFactoryBuilder() + .setNameFormat(name + "-orderedsafeexecutor-" + i + "-%d") + .setThreadFactory(threadFactory) + .build()); + + // Save thread ids + final int idx = i; try { - threads[i].submit(new SafeRunnable() { + threads[idx].submit(new SafeRunnable() { @Override public void safeRun() { - threadIds[tid] = Thread.currentThread().getId(); + threadIds[idx] = Thread.currentThread().getId(); } }).get(); } catch (InterruptedException e) { @@ -90,7 +199,47 @@ public class OrderedSafeExecutor { } catch (ExecutionException e) { throw new RuntimeException("Couldn't start thread " + i, e); } + + // Register gauges + statsLogger.registerGauge(String.format("%s-queue-%d", name, idx), new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return queues[idx].size(); + } + }); + statsLogger.registerGauge(String.format("%s-completed-tasks-%d", name, idx), new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return threads[idx].getCompletedTaskCount(); + } + }); + statsLogger.registerGauge(String.format("%s-total-tasks-%d", name, idx), new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return threads[idx].getTaskCount(); + } + }); } + + // Stats + this.taskExecutionStats = statsLogger.scope(name).getOpStatsLogger("task_execution"); + this.taskPendingStats = statsLogger.scope(name).getOpStatsLogger("task_queued"); + this.traceTaskExecution = traceTaskExecution; } ExecutorService chooseThread() { @@ -113,11 +262,19 @@ public class OrderedSafeExecutor { } + private SafeRunnable timedRunnable(SafeRunnable r) { + if (traceTaskExecution) { + return new TimedRunnable(r); + } else { + return r; + } + } + /** * schedules a one time action to execute */ public void submit(SafeRunnable r) { - chooseThread().submit(r); + chooseThread().submit(timedRunnable(r)); } /** @@ -126,7 +283,7 @@ public class OrderedSafeExecutor { * @param r */ public void submitOrdered(Object orderingKey, SafeRunnable r) { - chooseThread(orderingKey).submit(r); + chooseThread(orderingKey).submit(timedRunnable(r)); } private long getThreadID(Object orderingKey) { @@ -184,11 +341,17 @@ public class OrderedSafeExecutor { } else { try { executor.submitOrdered(orderingKey, new SafeRunnable() { - @Override - public void safeRun() { - safeOperationComplete(rc, result); - } - }); + @Override + public void safeRun() { + safeOperationComplete(rc, result); + } + @Override + public String toString() { + return String.format("Callback(key=%s, name=%s)", + orderingKey, + OrderedSafeGenericCallback.this); + } + }); } catch (RejectedExecutionException re) { LOG.warn("Failed to submit callback for {} : ", orderingKey, re); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java index 675c2fd..ac6bd8d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java @@ -70,8 +70,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); - OrderedSafeExecutor executor = new OrderedSafeExecutor(1, - "BKClientOrderedSafeExecutor"); + OrderedSafeExecutor executor = getOrderedSafeExecutor(); BookieSocketAddress addr = getBookie(0); for (int i = 0; i < 1000; i++) { @@ -89,6 +88,15 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { executor.shutdown(); } + public OrderedSafeExecutor getOrderedSafeExecutor() { + return OrderedSafeExecutor.newBuilder() + .name("PCBC") + .numThreads(1) + .traceTaskExecution(true) + .traceTaskWarnTimeMicroSec(TimeUnit.MILLISECONDS.toMicros(100)) + .build(); + } + /** * Test race scenario found in {@link https://issues.apache.org/jira/browse/BOOKKEEPER-5} * where multiple clients try to connect a channel simultaneously. If not synchronised @@ -106,8 +114,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); - OrderedSafeExecutor executor = new OrderedSafeExecutor(1, - "BKClientOrderedSafeExecutor"); + OrderedSafeExecutor executor = getOrderedSafeExecutor(); BookieSocketAddress addr = getBookie(0); for (int i = 0; i < 100; i++) { @@ -140,8 +147,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); - OrderedSafeExecutor executor = new OrderedSafeExecutor(1, - "BKClientOrderedSafeExecutor"); + OrderedSafeExecutor executor = getOrderedSafeExecutor(); BookieSocketAddress addr = getBookie(0); final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr); @@ -238,8 +244,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); - final OrderedSafeExecutor executor = new OrderedSafeExecutor(1, - "BKClientOrderedSafeExecutor"); + final OrderedSafeExecutor executor = getOrderedSafeExecutor(); BookieSocketAddress addr = getBookie(0); final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java index 285cf71..a170cee 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java @@ -73,7 +73,10 @@ public class BookieClientTest { bs.start(); channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors .newCachedThreadPool()); - executor = new OrderedSafeExecutor(2, "BKClientOrderedSafeExecutor"); + executor = OrderedSafeExecutor.newBuilder() + .name("BKClientOrderedSafeExecutor") + .numThreads(2) + .build(); } @After http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java index 778a804..3a36129 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java @@ -96,7 +96,10 @@ class LoopbackClient implements WriteCallback { LoopbackClient lb; ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors .newCachedThreadPool()); - OrderedSafeExecutor executor = new OrderedSafeExecutor(2, "BookieClientScheduler"); + OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder() + .name("BookieClientScheduler") + .numThreads(2) + .build(); try { BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", Integer.valueOf(args[2]).intValue()); lb = new LoopbackClient(channelFactory, executor, begin, limit.intValue()); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java b/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java index 7985d39..d58883d 100644 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java +++ b/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java @@ -41,8 +41,10 @@ public class BookieBenchmark extends AbstractBenchmark { BookieClient bkc; BookieSocketAddress addr; ClientSocketChannelFactory channelFactory; - OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "BookieBenchmarkScheduler"); - + OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder() + .name("BookieBenchmarkScheduler") + .numThreads(1) + .build(); public BookieBenchmark(String bookieHostPort) throws Exception { channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java index 2235282..48be3e8 100644 --- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java +++ b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java @@ -154,8 +154,10 @@ public class ReadAheadCache implements PersistenceManager, HedwigJMXService { this.realPersistenceManager = realPersistenceManager; this.cfg = cfg; numCacheWorkers = cfg.getNumReadAheadCacheThreads(); - cacheWorkers = new OrderedSafeExecutor(numCacheWorkers, - "ReadAheadCacheScheduler"); + cacheWorkers = OrderedSafeExecutor.newBuilder() + .name("ReadAheadCacheScheduler") + .numThreads(numCacheWorkers) + .build(); reloadConf(cfg); }