This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2a0c31e7687 MINOR: improve StreamThread periodic processing log 
(#18430)
2a0c31e7687 is described below

commit 2a0c31e76876e2aa1a0b71025b05dd68b6445be8
Author: Almog Gavra <[email protected]>
AuthorDate: Thu Jan 9 11:01:43 2025 -0800

    MINOR: improve StreamThread periodic processing log (#18430)
    
    The current log is really helpful, this PR adds a bit more information to 
that log to help debug some issues. In particular, it is interesting to be able 
to debug situations that have long intervals between polls. It also includes a 
reference to how long it has been since it last logged so you don't have to 
find the previous time it was logged to compute quick per-second ratios.
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 .../apache/kafka/streams/processor/internals/StreamThread.java | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 2d9c1b0b11b..d2ff9ff22ea 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -311,6 +311,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
     private long lastLogSummaryMs = -1L;
     private long totalRecordsProcessedSinceLastSummary = 0L;
     private long totalPunctuatorsSinceLastSummary = 0L;
+    private long totalPolledSinceLastSummary = 0L;
     private long totalCommittedSinceLastSummary = 0L;
 
     private long now;
@@ -960,6 +961,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         final long pollLatency;
         taskManager.resumePollingForPartitionsWithAvailableSpace();
         pollLatency = pollPhase();
+        totalPolledSinceLastSummary += 1;
 
         // Shutdown hook could potentially be triggered and transit the thread 
state to PENDING_SHUTDOWN during #pollRequests().
         // The task manager internal states could be uninitialized if the 
state transition happens during #onPartitionsAssigned().
@@ -1075,12 +1077,14 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
         commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, 
now);
 
-        if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs > 
logSummaryIntervalMs) {
-            log.info("Processed {} total records, ran {} punctuators, and 
committed {} total tasks since the last update",
-                 totalRecordsProcessedSinceLastSummary, 
totalPunctuatorsSinceLastSummary, totalCommittedSinceLastSummary);
+        final long timeSinceLastLog = now - lastLogSummaryMs;
+        if (logSummaryIntervalMs > 0 && timeSinceLastLog > 
logSummaryIntervalMs) {
+            log.info("Processed {} total records, ran {} punctuators, polled 
{} times and committed {} total tasks since the last update {}ms ago",
+                 totalRecordsProcessedSinceLastSummary, 
totalPunctuatorsSinceLastSummary, totalPolledSinceLastSummary, 
totalCommittedSinceLastSummary, timeSinceLastLog);
 
             totalRecordsProcessedSinceLastSummary = 0L;
             totalPunctuatorsSinceLastSummary = 0L;
+            totalPolledSinceLastSummary = 0L;
             totalCommittedSinceLastSummary = 0L;
             lastLogSummaryMs = now;
         }

Reply via email to