Repository: bookkeeper Updated Branches: refs/heads/master 6622b46d4 -> fe6259c7e
BOOKKEEPER-850: Use nanoseconds to calculate poll timeout when doing group commit (Matteo Merli via sijie) Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/fe6259c7 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/fe6259c7 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/fe6259c7 Branch: refs/heads/master Commit: fe6259c7eade644c8d2a5e96aba61c1792d64843 Parents: 6622b46 Author: Sijie Guo <[email protected]> Authored: Tue Apr 21 00:23:48 2015 -0700 Committer: Sijie Guo <[email protected]> Committed: Tue Apr 21 00:23:48 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../org/apache/bookkeeper/bookie/Journal.java | 20 +++++++++++--------- 2 files changed, 13 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/fe6259c7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 308a0a4..c36151d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -69,6 +69,8 @@ Trunk (unreleased changes) BOOKKEEPER-849: Collect stats with sub-milliseconds precision (Matteo Merli via sijie) + BOOKKEEPER-850: Use nanoseconds to calculate poll timeout when doing group commit (Matteo Merli via sijie) + bookkeeper-client: BOOKKEEPER-810: Allow to configure TCP connect timeout (Charles Xie via sijie) http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/fe6259c7/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index cc61aa5..48e5f55 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -494,7 +494,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource { final ServerConfiguration conf; final ForceWriteThread forceWriteThread; // Time after which we will stop grouping and issue the flush - private final long maxGroupWaitInMSec; + private final long maxGroupWaitInNanos; // Threshold after which we flush any buffered journal entries private final long bufferedEntriesThreshold; // Threshold after which we flush any buffered journal writes @@ -546,7 +546,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource { this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB; this.maxBackupJournals = conf.getMaxBackupJournals(); this.forceWriteThread = new ForceWriteThread(this, conf.getJournalAdaptiveGroupWrites()); - this.maxGroupWaitInMSec = conf.getJournalMaxGroupWaitMSec(); + this.maxGroupWaitInNanos = TimeUnit.MILLISECONDS.toNanos(conf.getJournalMaxGroupWaitMSec()); this.bufferedWritesThreshold = conf.getJournalBufferedWritesThreshold(); this.bufferedEntriesThreshold = conf.getJournalBufferedEntriesThreshold(); this.cbThreadPool = Executors.newFixedThreadPool(conf.getNumJournalCallbackThreads(), @@ -554,7 +554,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource { // Unless there is a cap on the max wait (which requires group force writes) // we cannot skip flushing for queue empty - this.flushWhenQueueEmpty = maxGroupWaitInMSec <= 0 || conf.getJournalFlushWhenQueueEmpty(); + this.flushWhenQueueEmpty = maxGroupWaitInNanos <= 0 || conf.getJournalFlushWhenQueueEmpty(); this.removePagesFromCache = conf.getJournalRemovePagesFromCache(); // read last log mark @@ -822,17 +822,19 @@ class Journal extends BookieCriticalThread implements CheckpointSource { if (toFlush.isEmpty()) { qe = queue.take(); } else { - long pollWaitTime = maxGroupWaitInMSec - MathUtils.elapsedMSec(toFlush.getFirst().enqueueTime); - if (flushWhenQueueEmpty || pollWaitTime < 0) { - pollWaitTime = 0; + long pollWaitTimeNanos = maxGroupWaitInNanos - MathUtils.elapsedNanos(toFlush.get(0).enqueueTime); + if (flushWhenQueueEmpty || pollWaitTimeNanos < 0) { + pollWaitTimeNanos = 0; } - qe = queue.poll(pollWaitTime, TimeUnit.MILLISECONDS); + qe = queue.poll(pollWaitTimeNanos, TimeUnit.NANOSECONDS); boolean shouldFlush = false; // We should issue a forceWrite if any of the three conditions below holds good // 1. If the oldest pending entry has been pending for longer than the max wait time - if (maxGroupWaitInMSec > 0 && !groupWhenTimeout && (MathUtils.elapsedMSec(toFlush.getFirst().enqueueTime) > maxGroupWaitInMSec)) { + if (maxGroupWaitInNanos > 0 && !groupWhenTimeout + && (MathUtils.elapsedNanos(toFlush.get(0).enqueueTime) > maxGroupWaitInNanos)) { groupWhenTimeout = true; - } else if (maxGroupWaitInMSec > 0 && groupWhenTimeout && qe != null && MathUtils.elapsedMSec(qe.enqueueTime) < maxGroupWaitInMSec) { + } else if (maxGroupWaitInNanos > 0 && groupWhenTimeout && qe != null + && MathUtils.elapsedNanos(qe.enqueueTime) < maxGroupWaitInNanos) { // when group timeout, it would be better to look forward, as there might be lots of entries already timeout // due to a previous slow write (writing to filesystem which impacted by force write). // Group those entries in the queue
