This is an automated email from the ASF dual-hosted git repository. ableegoldman pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 89029aca02cdcd08737a1681172234b04daaccee Author: A. Sophie Blee-Goldman <[email protected]> AuthorDate: Fri Nov 13 13:25:43 2020 -0800 MINOR: demote "Committing task offsets" log to DEBUG (#9489) Demote "committing offsets" log message to DEBUG and promote/add summarizing INFO level logs in the main StreamThread loop Reviewers: Boyang Chen <[email protected]>, Walker Carlson <[email protected]>, John Roesler <[email protected]> --- .../streams/processor/internals/StreamThread.java | 18 +++++++++++++----- .../kafka/streams/processor/internals/TaskManager.java | 2 +- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index a5e4f86..332a035 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -629,6 +629,8 @@ public class StreamThread extends Thread { advanceNowAndComputeLatency(); int totalProcessed = 0; + int totalPunctuated = 0; + int totalCommitted = 0; long totalCommitLatency = 0L; long totalProcessLatency = 0L; long totalPunctuateLatency = 0L; @@ -666,6 +668,7 @@ public class StreamThread extends Thread { numIterations); final int punctuated = taskManager.punctuate(); + totalPunctuated += punctuated; final long punctuateLatency = advanceNowAndComputeLatency(); totalPunctuateLatency += punctuateLatency; if (punctuated > 0) { @@ -675,6 +678,7 @@ public class StreamThread extends Thread { log.debug("{} punctuators ran.", punctuated); final int committed = maybeCommit(); + totalCommitted += committed; final long commitLatency = advanceNowAndComputeLatency(); totalCommitLatency += commitLatency; if (committed > 0) { @@ -702,6 +706,10 @@ public class StreamThread extends Thread { // we record the ratio out of the while loop so that the accumulated latency spans over // multiple iterations with reasonably large max.num.records and hence is less vulnerable to outliers taskManager.recordTaskProcessRatio(totalProcessLatency, now); + + log.info("Processed {} total records, ran {} punctuators, and committed {} total tasks " + + "for active tasks {} and standby tasks {}", + totalProcessed, totalPunctuated, totalCommitted, taskManager.activeTaskIds(), taskManager.standbyTaskIds()); } now = time.milliseconds(); @@ -754,7 +762,7 @@ public class StreamThread extends Thread { // to unblock the restoration as soon as possible records = pollRequests(Duration.ZERO); } else if (state == State.PARTITIONS_REVOKED) { - // try to fetch som records with zero poll millis to unblock + // try to fetch some records with zero poll millis to unblock // other useful work while waiting for the join response records = pollRequests(Duration.ZERO); } else if (state == State.RUNNING || state == State.STARTING) { @@ -773,13 +781,13 @@ public class StreamThread extends Thread { final long pollLatency = advanceNowAndComputeLatency(); - if (log.isDebugEnabled()) { - log.debug("Main Consumer poll completed in {} ms and fetched {} records", pollLatency, records.count()); - } + final int numRecords = records.count(); + log.info("Main Consumer poll completed in {} ms and fetched {} records", pollLatency, numRecords); + pollSensor.record(pollLatency, now); if (!records.isEmpty()) { - pollRecordsSensor.record(records.count(), now); + pollRecordsSensor.record(numRecords, now); taskManager.addRecordsToTasks(records); } return pollLatency; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index d0c6a1e..eb551bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1060,7 +1060,7 @@ public class TaskManager { } private void commitOffsetsOrTransaction(final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> offsetsPerTask) { - log.info("Committing task offsets {}", offsetsPerTask); + log.debug("Committing task offsets {}", offsetsPerTask); if (!offsetsPerTask.isEmpty()) { if (processingMode == EXACTLY_ONCE_ALPHA) {
