This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 1854763  MINOR: log 2min processing summary of StreamThread loop 
(#9941)
1854763 is described below

commit 18547633697a29b690a8fb0c24e2f0289ecf8eeb
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Thu Jan 21 16:32:55 2021 -0800

    MINOR: log 2min processing summary of StreamThread loop (#9941)
    
    Remove all INFO-level logging from the main StreamThread loop in favor of a 
summary with a 2min interval
    
    Reviewers: Walker Carlson <[email protected]>, Guozhang Wang 
<[email protected]>
---
 .../streams/processor/internals/StreamThread.java  | 34 ++++++++++++++--------
 1 file changed, 22 insertions(+), 12 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index de93856..863289d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -270,6 +270,12 @@ public class StreamThread extends Thread {
     private final Sensor punctuateRatioSensor;
     private final Sensor commitRatioSensor;
 
+    private static final long LOG_SUMMARY_INTERVAL_MS = 2 * 60 * 1000L; // log 
a summary of processing every 2 minutes
+    private long lastLogSummaryMs = -1L;
+    private long totalRecordsProcessedSinceLastSummary = 0L;
+    private long totalPunctuatorsSinceLastSummary = 0L;
+    private long totalCommittedSinceLastSummary = 0L;
+
     private long now;
     private long lastPollMs;
     private long lastCommitMs;
@@ -634,8 +640,6 @@ public class StreamThread extends Thread {
         advanceNowAndComputeLatency();
 
         int totalProcessed = 0;
-        int totalPunctuated = 0;
-        int totalCommitted = 0;
         long totalCommitLatency = 0L;
         long totalProcessLatency = 0L;
         long totalPunctuateLatency = 0L;
@@ -666,6 +670,7 @@ public class StreamThread extends Thread {
                     processLatencySensor.record(processLatency / (double) 
processed, now);
 
                     totalProcessed += processed;
+                    totalRecordsProcessedSinceLastSummary += processed;
                 }
 
                 log.debug("Processed {} records with {} iterations; invoking 
punctuators if necessary",
@@ -673,7 +678,7 @@ public class StreamThread extends Thread {
                           numIterations);
 
                 final int punctuated = taskManager.punctuate();
-                totalPunctuated += punctuated;
+                totalPunctuatorsSinceLastSummary += punctuated;
                 final long punctuateLatency = advanceNowAndComputeLatency();
                 totalPunctuateLatency += punctuateLatency;
                 if (punctuated > 0) {
@@ -683,7 +688,7 @@ public class StreamThread extends Thread {
                 log.debug("{} punctuators ran.", punctuated);
 
                 final int committed = maybeCommit();
-                totalCommitted += committed;
+                totalCommittedSinceLastSummary += committed;
                 final long commitLatency = advanceNowAndComputeLatency();
                 totalCommitLatency += commitLatency;
                 if (committed > 0) {
@@ -696,7 +701,7 @@ public class StreamThread extends Thread {
                 }
 
                 if (processed == 0) {
-                    // if there is no records to be processed, exit after 
punctuate / commit
+                    // if there are no records to be processed, exit after 
punctuate / commit
                     break;
                 } else if (Math.max(now - lastPollMs, 0) > maxPollTimeMs / 2) {
                     numIterations = numIterations > 1 ? numIterations / 2 : 
numIterations;
@@ -711,12 +716,6 @@ public class StreamThread extends Thread {
             // we record the ratio out of the while loop so that the 
accumulated latency spans over
             // multiple iterations with reasonably large max.num.records and 
hence is less vulnerable to outliers
             taskManager.recordTaskProcessRatio(totalProcessLatency, now);
-
-            // Don't log summary if no new records were processed to avoid 
spamming logs for low-traffic topics
-            if (totalProcessed > 0 || totalPunctuated > 0 || totalCommitted > 
0) {
-                log.info("Processed {} total records, ran {} punctuators, and 
committed {} total tasks",
-                         totalProcessed, totalPunctuated, totalCommitted);
-            }
         }
 
         now = time.milliseconds();
@@ -726,6 +725,17 @@ public class StreamThread extends Thread {
         punctuateRatioSensor.record((double) totalPunctuateLatency / 
runOnceLatency, now);
         pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
         commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, 
now);
+
+        final boolean logProcessingSummary = now - lastLogSummaryMs > 
LOG_SUMMARY_INTERVAL_MS;
+        if (logProcessingSummary) {
+            log.info("Processed {} total records, ran {} punctuators, and 
committed {} total tasks since the last update",
+                 totalRecordsProcessedSinceLastSummary, 
totalPunctuatorsSinceLastSummary, totalCommittedSinceLastSummary);
+
+            totalRecordsProcessedSinceLastSummary = 0L;
+            totalPunctuatorsSinceLastSummary = 0L;
+            totalCommittedSinceLastSummary = 0L;
+            lastLogSummaryMs = now;
+        }
     }
 
     private void initializeAndRestorePhase() {
@@ -791,7 +801,7 @@ public class StreamThread extends Thread {
 
         final int numRecords = records.count();
         if (numRecords > 0) {
-            log.info("Main Consumer poll completed in {} ms and fetched {} 
records", pollLatency, numRecords);
+            log.debug("Main Consumer poll completed in {} ms and fetched {} 
records", pollLatency, numRecords);
         }
 
         pollSensor.record(pollLatency, now);

Reply via email to