This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2a0c31e7687 MINOR: improve StreamThread periodic processing log
(#18430)
2a0c31e7687 is described below
commit 2a0c31e76876e2aa1a0b71025b05dd68b6445be8
Author: Almog Gavra <[email protected]>
AuthorDate: Thu Jan 9 11:01:43 2025 -0800
MINOR: improve StreamThread periodic processing log (#18430)
The current log is really helpful, this PR adds a bit more information to
that log to help debug some issues. In particular, it is interesting to be able
to debug situations that have long intervals between polls. It also includes a
reference to how long it has been since it last logged so you don't have to
find the previous time it was logged to compute quick per-second ratios.
Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
.../apache/kafka/streams/processor/internals/StreamThread.java | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 2d9c1b0b11b..d2ff9ff22ea 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -311,6 +311,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
private long lastLogSummaryMs = -1L;
private long totalRecordsProcessedSinceLastSummary = 0L;
private long totalPunctuatorsSinceLastSummary = 0L;
+ private long totalPolledSinceLastSummary = 0L;
private long totalCommittedSinceLastSummary = 0L;
private long now;
@@ -960,6 +961,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
final long pollLatency;
taskManager.resumePollingForPartitionsWithAvailableSpace();
pollLatency = pollPhase();
+ totalPolledSinceLastSummary += 1;
// Shutdown hook could potentially be triggered and transit the thread
state to PENDING_SHUTDOWN during #pollRequests().
// The task manager internal states could be uninitialized if the
state transition happens during #onPartitionsAssigned().
@@ -1075,12 +1077,14 @@ public class StreamThread extends Thread implements
ProcessingThread {
pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
commitRatioSensor.record((double) totalCommitLatency / runOnceLatency,
now);
- if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs >
logSummaryIntervalMs) {
- log.info("Processed {} total records, ran {} punctuators, and
committed {} total tasks since the last update",
- totalRecordsProcessedSinceLastSummary,
totalPunctuatorsSinceLastSummary, totalCommittedSinceLastSummary);
+ final long timeSinceLastLog = now - lastLogSummaryMs;
+ if (logSummaryIntervalMs > 0 && timeSinceLastLog >
logSummaryIntervalMs) {
+ log.info("Processed {} total records, ran {} punctuators, polled
{} times and committed {} total tasks since the last update {}ms ago",
+ totalRecordsProcessedSinceLastSummary,
totalPunctuatorsSinceLastSummary, totalPolledSinceLastSummary,
totalCommittedSinceLastSummary, timeSinceLastLog);
totalRecordsProcessedSinceLastSummary = 0L;
totalPunctuatorsSinceLastSummary = 0L;
+ totalPolledSinceLastSummary = 0L;
totalCommittedSinceLastSummary = 0L;
lastLogSummaryMs = now;
}