fix merge conflicts
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/014512c3 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/014512c3 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/014512c3 Branch: refs/heads/sijie/bookkeeper_fallocate Commit: 014512c3563bd06bd90789db0c5d0369fe421a62 Parents: d980296 Author: Sijie Guo <[email protected]> Authored: Thu Nov 17 17:32:15 2016 -0800 Committer: Sijie Guo <[email protected]> Committed: Thu Nov 17 17:32:15 2016 -0800 ---------------------------------------------------------------------- .../bookie/BookKeeperServerStats.java | 1 + .../bookkeeper/bookie/BufferedChannel.java | 8 +++-- .../org/apache/bookkeeper/bookie/Journal.java | 34 +++++++++----------- 3 files changed, 23 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/014512c3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java index 239f923..79c0d61 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java @@ -53,6 +53,7 @@ public interface BookKeeperServerStats { public final static String JOURNAL_FORCE_WRITE_BATCH_ENTRIES = "JOURNAL_FORCE_WRITE_BATCH_ENTRIES"; public final static String JOURNAL_FORCE_WRITE_BATCH_BYTES = "JOURNAL_FORCE_WRITE_BATCH_BYTES"; public final static String JOURNAL_FLUSH_LATENCY = "JOURNAL_FLUSH_LATENCY"; + public final static String JOURNAL_FLUSH_IN_MEM_ADD = "JOURNAL_FLUSH_IN_MEM_ADD"; public final static String JOURNAL_QUEUE_LATENCY = "JOURNAL_QUEUE_LATENCY"; public final static String JOURNAL_PROCESS_TIME_LATENCY = "JOURNAL_PROCESS_TIME_LATENCY"; public final static String JOURNAL_CREATION_LATENCY = "JOURNAL_CREATION_LATENCY"; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/014512c3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java index cb7d914..0492943 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java @@ -24,9 +24,10 @@ package org.apache.bookkeeper.bookie; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import org.apache.bookkeeper.util.ZeroBuffer; import java.util.concurrent.atomic.AtomicLong; +import org.apache.bookkeeper.util.ZeroBuffer; + /** * Provides a buffering layer in front of a FileChannel. */ @@ -64,8 +65,9 @@ public class BufferedChannel extends BufferedReadChannel { * @param src The source ByteBuffer which contains the data to be written. * @throws IOException if a write operation fails. */ - synchronized public void write(ByteBuffer src) throws IOException { + synchronized public int write(ByteBuffer src) throws IOException { int copied = 0; + int flushes = 0; while(src.remaining() > 0) { int truncated = 0; if (writeBuffer.remaining() < src.remaining()) { @@ -78,9 +80,11 @@ public class BufferedChannel extends BufferedReadChannel { // if we have run out of buffer space, we should flush to the file if (writeBuffer.remaining() == 0) { flushInternal(); + ++flushes; } } position += copied; + return flushes; } /** http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/014512c3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index dd62d28..e8adfff 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -30,8 +30,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -39,18 +37,15 @@ import com.google.common.base.Stopwatch; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; -<<<<<<< HEAD import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; -======= -import org.apache.bookkeeper.stats.BookkeeperServerStatsLogger; -import org.apache.bookkeeper.stats.ServerStatsProvider; ->>>>>>> 2d5718f... bookie: fallocate & sync_file_range import org.apache.bookkeeper.util.DaemonThreadFactory; import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.util.MathUtils; +import org.apache.bookkeeper.util.OrderedSafeExecutor; +import org.apache.bookkeeper.util.SafeRunnable; import org.apache.bookkeeper.util.ZeroBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -295,7 +290,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource { } @Override - public void run() { + public void safeRun() { if (LOG.isDebugEnabled()) { LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId); } @@ -541,10 +536,12 @@ class Journal extends BookieCriticalThread implements CheckpointSource { private final LedgerDirsManager ledgerDirsManager; // Expose Stats + private final StatsLogger statsLogger; private final OpStatsLogger journalAddEntryStats; - private final OpStatsLogger journalSyncStats; + private final OpStatsLogger journalMemAddEntryStats; private final OpStatsLogger journalCreationStats; private final OpStatsLogger journalFlushStats; + private final OpStatsLogger journalMemAddFlushStats; private final OpStatsLogger journalProcessTimeStats; private final OpStatsLogger journalQueueStats; private final OpStatsLogger forceWriteGroupingCountStats; @@ -580,7 +577,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource { this.cbThreadPool = OrderedSafeExecutor.newBuilder() .name("BookieJournal") .numThreads(conf.getNumJournalCallbackThreads()) - .statsLogger(Stats.get().getStatsLogger("journal")) + .statsLogger(statsLogger) .threadFactory(new DaemonThreadFactory()) .build(); @@ -594,10 +591,12 @@ class Journal extends BookieCriticalThread implements CheckpointSource { LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark()); // Expose Stats + this.statsLogger = statsLogger; journalAddEntryStats = statsLogger.getOpStatsLogger(JOURNAL_ADD_ENTRY); - journalSyncStats = statsLogger.getOpStatsLogger(JOURNAL_SYNC); + journalMemAddEntryStats = statsLogger.getOpStatsLogger(JOURNAL_MEM_ADD_ENTRY); journalCreationStats = statsLogger.getOpStatsLogger(JOURNAL_CREATION_LATENCY); journalFlushStats = statsLogger.getOpStatsLogger(JOURNAL_FLUSH_LATENCY); + journalMemAddFlushStats = statsLogger.getOpStatsLogger(JOURNAL_FLUSH_IN_MEM_ADD); journalQueueStats = statsLogger.getOpStatsLogger(JOURNAL_QUEUE_LATENCY); journalProcessTimeStats = statsLogger.getOpStatsLogger(JOURNAL_PROCESS_TIME_LATENCY); forceWriteGroupingCountStats = statsLogger.getOpStatsLogger(JOURNAL_FORCE_WRITE_GROUPING_COUNT); @@ -928,8 +927,8 @@ class Journal extends BookieCriticalThread implements CheckpointSource { if (!enableGroupForceWrites) { logFile.startSyncRange(prevFlushPosition, lastFlushPosition); } - journalFlushLatencyStats.registerSuccessfulEvent( - journalFlushWatcher.stop().elapsed(TimeUnit.MICROSECONDS)); + journalFlushStats.registerSuccessfulEvent( + journalFlushWatcher.stop().elapsedTime(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); // Trace the lifetime of entries through persistence if (LOG.isDebugEnabled()) { @@ -983,9 +982,9 @@ class Journal extends BookieCriticalThread implements CheckpointSource { flushes += bc.write(lenBuff); flushes += bc.write(qe.entry); - journalMemAddFlushTimesStats.registerSuccessfulEvent(flushes); - journalMemAddLatencyStats.registerSuccessfulEvent( - MathUtils.elapsedMicroSec(qe.enqueueTime)); + journalMemAddFlushStats.registerSuccessfulValue(flushes); + journalMemAddEntryStats.registerSuccessfulEvent( + MathUtils.elapsedMicroSec(qe.enqueueTime), TimeUnit.MICROSECONDS); toFlush.add(qe); qe = null; @@ -1018,8 +1017,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource { LOG.info("Shutting down Journal"); forceWriteThread.shutdown(); cbThreadPool.shutdown(); - ; - if (!cbThreadPool.forceShutdown(5, TimeUnit.SECONDS)) { + if (!cbThreadPool.awaitTermination(5, TimeUnit.SECONDS)) { LOG.warn("Couldn't shutdown journal callback thread gracefully. Forcing"); } running = false;
