Repository: bookkeeper Updated Branches: refs/heads/master 61390322e -> 63a2a91eb
BOOKKEEPER-888: Dispatch individual callbacks from journal in different threads Currently the journal is sending all the responses from a single thread, after the entries in a batch are synced. Since a thread pool has been configured, it is better to spread the send-response tasks to all the available threads. Author: Matteo Merli <[email protected]> Reviewers: Sijie Guo <[email protected]> Closes #8 from merlimat/bk-888 Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/63a2a91e Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/63a2a91e Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/63a2a91e Branch: refs/heads/master Commit: 63a2a91eba191e92a1902cf5400325dcf4d36089 Parents: 6139032 Author: Matteo Merli <[email protected]> Authored: Mon Feb 8 23:30:29 2016 -0800 Committer: Sijie Guo <[email protected]> Committed: Mon Feb 8 23:30:29 2016 -0800 ---------------------------------------------------------------------- .../org/apache/bookkeeper/bookie/Journal.java | 21 ++++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/63a2a91e/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 48e5f55..08394c1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -268,7 +268,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource { /** * Journal Entry to Record */ - private static class QueueEntry { + private class QueueEntry implements Runnable { ByteBuffer entry; long ledgerId; long entryId; @@ -286,15 +286,17 @@ class Journal extends BookieCriticalThread implements CheckpointSource { this.enqueueTime = enqueueTime; } - public void callback() { + @Override + public void run() { if (LOG.isDebugEnabled()) { LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId); } + journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS); cb.writeComplete(0, ledgerId, entryId, null, ctx); } } - private class ForceWriteRequest implements Runnable { + private class ForceWriteRequest { private final JournalChannel logFile; private final LinkedList<QueueEntry> forceWriteWaiters; private boolean shouldClose; @@ -330,7 +332,9 @@ class Journal extends BookieCriticalThread implements CheckpointSource { lastLogMark.setCurLogMark(this.logId, this.lastFlushedPosition); // Notify the waiters that the force write succeeded - cbThreadPool.submit(this); + for (QueueEntry e : this.forceWriteWaiters) { + cbThreadPool.submit(e); + } return this.forceWriteWaiters.size(); } @@ -339,15 +343,6 @@ class Journal extends BookieCriticalThread implements CheckpointSource { } } - @Override - public void run() { - for (QueueEntry e : this.forceWriteWaiters) { - journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(e.enqueueTime), - TimeUnit.NANOSECONDS); - e.callback(); // Process cbs inline - } - } - public void closeFileIfNecessary() { // Close if shouldClose is set if (shouldClose) {
