This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.15 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 18c998cfb41ba622216c6a0f96d402f283163ae8 Author: LinChen <[email protected]> AuthorDate: Thu Jul 7 15:52:00 2022 +0800 Metrics: add journalCbQueueLatency (#3364) (cherry picked from commit f7732921e72021e2f42dff32b12796f290249a33) --- .../apache/bookkeeper/bookie/BookKeeperServerStats.java | 1 + .../main/java/org/apache/bookkeeper/bookie/Journal.java | 16 +++++++++++++++- .../org/apache/bookkeeper/bookie/stats/JournalStats.java | 9 +++++++++ 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java index 5ee006bd08..f13747f9f6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java @@ -163,6 +163,7 @@ public interface BookKeeperServerStats { String INDEX_INMEM_ILLEGAL_STATE_DELETE = "INDEX_INMEM_ILLEGAL_STATE_DELETE"; String JOURNAL_FORCE_WRITE_QUEUE_SIZE = "JOURNAL_FORCE_WRITE_QUEUE_SIZE"; String JOURNAL_CB_QUEUE_SIZE = "JOURNAL_CB_QUEUE_SIZE"; + String JOURNAL_CB_QUEUED_LATENCY = "JOURNAL_CB_QUEUED_LATENCY"; String JOURNAL_NUM_FORCE_WRITES = "JOURNAL_NUM_FORCE_WRITES"; String JOURNAL_NUM_FLUSH_EMPTY_QUEUE = "JOURNAL_NUM_FLUSH_EMPTY_QUEUE"; String JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES = "JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES"; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 193b557312..9de7dcda57 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -303,15 +303,18 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { WriteCallback cb; Object ctx; long enqueueTime; + long enqueueCbThreadPooleQueueTime; boolean ackBeforeSync; OpStatsLogger journalAddEntryStats; + OpStatsLogger journalCbQueuedLatency; Counter journalCbQueueSize; Counter callbackTime; static QueueEntry create(ByteBuf entry, boolean ackBeforeSync, long ledgerId, long entryId, WriteCallback cb, Object ctx, long enqueueTime, OpStatsLogger journalAddEntryStats, - Counter journalCbQueueSize, Counter callbackTime) { + Counter journalCbQueueSize, OpStatsLogger journalCbQueuedLatency, + Counter callbackTime) { QueueEntry qe = RECYCLER.get(); qe.entry = entry; qe.ackBeforeSync = ackBeforeSync; @@ -321,13 +324,20 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { qe.entryId = entryId; qe.enqueueTime = enqueueTime; qe.journalAddEntryStats = journalAddEntryStats; + qe.journalCbQueuedLatency = journalCbQueuedLatency; qe.journalCbQueueSize = journalCbQueueSize; qe.callbackTime = callbackTime; return qe; } + public void setEnqueueCbThreadPooleQueueTime(long enqueueCbThreadPooleQueueTime) { + this.enqueueCbThreadPooleQueueTime = enqueueCbThreadPooleQueueTime; + } + @Override public void run() { + journalCbQueuedLatency.registerSuccessfulEvent( + MathUtils.elapsedNanos(enqueueCbThreadPooleQueueTime), TimeUnit.NANOSECONDS); long startTime = System.nanoTime(); if (LOG.isDebugEnabled()) { LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId); @@ -392,6 +402,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { for (int i = 0; i < forceWriteWaiters.size(); i++) { QueueEntry qe = forceWriteWaiters.get(i); if (qe != null) { + qe.setEnqueueCbThreadPooleQueueTime(MathUtils.nowInNano()); cbThreadPool.execute(qe); } } @@ -952,6 +963,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { entry, ackBeforeSync, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(), journalStats.getJournalAddEntryStats(), journalStats.getJournalCbQueueSize(), + journalStats.getJournalCbQueuedLatency(), callbackTime)); } @@ -961,6 +973,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { BookieImpl.METAENTRY_ID_FORCE_LEDGER, cb, ctx, MathUtils.nowInNano(), journalStats.getJournalForceLedgerStats(), journalStats.getJournalCbQueueSize(), + journalStats.getJournalCbQueuedLatency(), callbackTime)); // Increment afterwards because the add operation could fail. journalStats.getJournalQueueSize().inc(); @@ -1127,6 +1140,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { if (entry != null && (!syncData || entry.ackBeforeSync)) { toFlush.set(i, null); numEntriesToFlush--; + entry.setEnqueueCbThreadPooleQueueTime(MathUtils.nowInNano()); cbThreadPool.execute(entry); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java index 133936de3e..a35d74d566 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java @@ -23,6 +23,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_ADD_ENTRY; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CB_QUEUED_LATENCY; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CREATION_LATENCY; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FLUSH_LATENCY; @@ -153,6 +154,13 @@ public class JournalStats { help = "The journal callback queue size" ) private final Counter journalCbQueueSize; + + @StatsDoc( + name = JOURNAL_CB_QUEUED_LATENCY, + help = "The journal callback queued latency" + ) + private final OpStatsLogger journalCbQueuedLatency; + @StatsDoc( name = JOURNAL_NUM_FLUSH_MAX_WAIT, help = "The number of journal flushes triggered by MAX_WAIT time" @@ -203,6 +211,7 @@ public class JournalStats { journalQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_QUEUE_SIZE); forceWriteQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE); journalCbQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE); + journalCbQueuedLatency = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_CB_QUEUED_LATENCY); flushMaxWaitCounter = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_WAIT); flushMaxOutstandingBytesCounter = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES);
