This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 1854763 MINOR: log 2min processing summary of StreamThread loop
(#9941)
1854763 is described below
commit 18547633697a29b690a8fb0c24e2f0289ecf8eeb
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Thu Jan 21 16:32:55 2021 -0800
MINOR: log 2min processing summary of StreamThread loop (#9941)
Remove all INFO-level logging from the main StreamThread loop in favor of a
summary with a 2min interval
Reviewers: Walker Carlson <[email protected]>, Guozhang Wang
<[email protected]>
---
.../streams/processor/internals/StreamThread.java | 34 ++++++++++++++--------
1 file changed, 22 insertions(+), 12 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index de93856..863289d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -270,6 +270,12 @@ public class StreamThread extends Thread {
private final Sensor punctuateRatioSensor;
private final Sensor commitRatioSensor;
+ private static final long LOG_SUMMARY_INTERVAL_MS = 2 * 60 * 1000L; // log
a summary of processing every 2 minutes
+ private long lastLogSummaryMs = -1L;
+ private long totalRecordsProcessedSinceLastSummary = 0L;
+ private long totalPunctuatorsSinceLastSummary = 0L;
+ private long totalCommittedSinceLastSummary = 0L;
+
private long now;
private long lastPollMs;
private long lastCommitMs;
@@ -634,8 +640,6 @@ public class StreamThread extends Thread {
advanceNowAndComputeLatency();
int totalProcessed = 0;
- int totalPunctuated = 0;
- int totalCommitted = 0;
long totalCommitLatency = 0L;
long totalProcessLatency = 0L;
long totalPunctuateLatency = 0L;
@@ -666,6 +670,7 @@ public class StreamThread extends Thread {
processLatencySensor.record(processLatency / (double)
processed, now);
totalProcessed += processed;
+ totalRecordsProcessedSinceLastSummary += processed;
}
log.debug("Processed {} records with {} iterations; invoking
punctuators if necessary",
@@ -673,7 +678,7 @@ public class StreamThread extends Thread {
numIterations);
final int punctuated = taskManager.punctuate();
- totalPunctuated += punctuated;
+ totalPunctuatorsSinceLastSummary += punctuated;
final long punctuateLatency = advanceNowAndComputeLatency();
totalPunctuateLatency += punctuateLatency;
if (punctuated > 0) {
@@ -683,7 +688,7 @@ public class StreamThread extends Thread {
log.debug("{} punctuators ran.", punctuated);
final int committed = maybeCommit();
- totalCommitted += committed;
+ totalCommittedSinceLastSummary += committed;
final long commitLatency = advanceNowAndComputeLatency();
totalCommitLatency += commitLatency;
if (committed > 0) {
@@ -696,7 +701,7 @@ public class StreamThread extends Thread {
}
if (processed == 0) {
- // if there is no records to be processed, exit after
punctuate / commit
+ // if there are no records to be processed, exit after
punctuate / commit
break;
} else if (Math.max(now - lastPollMs, 0) > maxPollTimeMs / 2) {
numIterations = numIterations > 1 ? numIterations / 2 :
numIterations;
@@ -711,12 +716,6 @@ public class StreamThread extends Thread {
// we record the ratio out of the while loop so that the
accumulated latency spans over
// multiple iterations with reasonably large max.num.records and
hence is less vulnerable to outliers
taskManager.recordTaskProcessRatio(totalProcessLatency, now);
-
- // Don't log summary if no new records were processed to avoid
spamming logs for low-traffic topics
- if (totalProcessed > 0 || totalPunctuated > 0 || totalCommitted >
0) {
- log.info("Processed {} total records, ran {} punctuators, and
committed {} total tasks",
- totalProcessed, totalPunctuated, totalCommitted);
- }
}
now = time.milliseconds();
@@ -726,6 +725,17 @@ public class StreamThread extends Thread {
punctuateRatioSensor.record((double) totalPunctuateLatency /
runOnceLatency, now);
pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
commitRatioSensor.record((double) totalCommitLatency / runOnceLatency,
now);
+
+ final boolean logProcessingSummary = now - lastLogSummaryMs >
LOG_SUMMARY_INTERVAL_MS;
+ if (logProcessingSummary) {
+ log.info("Processed {} total records, ran {} punctuators, and
committed {} total tasks since the last update",
+ totalRecordsProcessedSinceLastSummary,
totalPunctuatorsSinceLastSummary, totalCommittedSinceLastSummary);
+
+ totalRecordsProcessedSinceLastSummary = 0L;
+ totalPunctuatorsSinceLastSummary = 0L;
+ totalCommittedSinceLastSummary = 0L;
+ lastLogSummaryMs = now;
+ }
}
private void initializeAndRestorePhase() {
@@ -791,7 +801,7 @@ public class StreamThread extends Thread {
final int numRecords = records.count();
if (numRecords > 0) {
- log.info("Main Consumer poll completed in {} ms and fetched {}
records", pollLatency, numRecords);
+ log.debug("Main Consumer poll completed in {} ms and fetched {}
records", pollLatency, numRecords);
}
pollSensor.record(pollLatency, now);