This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 89029aca02cdcd08737a1681172234b04daaccee
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Fri Nov 13 13:25:43 2020 -0800

    MINOR: demote "Committing task offsets" log to DEBUG (#9489)
    
    Demote "committing offsets" log message to DEBUG and promote/add 
summarizing INFO level logs in the main StreamThread loop
    
    Reviewers: Boyang Chen <[email protected]>, Walker Carlson 
<[email protected]>, John Roesler <[email protected]>
---
 .../streams/processor/internals/StreamThread.java      | 18 +++++++++++++-----
 .../kafka/streams/processor/internals/TaskManager.java |  2 +-
 2 files changed, 14 insertions(+), 6 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a5e4f86..332a035 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -629,6 +629,8 @@ public class StreamThread extends Thread {
         advanceNowAndComputeLatency();
 
         int totalProcessed = 0;
+        int totalPunctuated = 0;
+        int totalCommitted = 0;
         long totalCommitLatency = 0L;
         long totalProcessLatency = 0L;
         long totalPunctuateLatency = 0L;
@@ -666,6 +668,7 @@ public class StreamThread extends Thread {
                           numIterations);
 
                 final int punctuated = taskManager.punctuate();
+                totalPunctuated += punctuated;
                 final long punctuateLatency = advanceNowAndComputeLatency();
                 totalPunctuateLatency += punctuateLatency;
                 if (punctuated > 0) {
@@ -675,6 +678,7 @@ public class StreamThread extends Thread {
                 log.debug("{} punctuators ran.", punctuated);
 
                 final int committed = maybeCommit();
+                totalCommitted += committed;
                 final long commitLatency = advanceNowAndComputeLatency();
                 totalCommitLatency += commitLatency;
                 if (committed > 0) {
@@ -702,6 +706,10 @@ public class StreamThread extends Thread {
             // we record the ratio out of the while loop so that the 
accumulated latency spans over
             // multiple iterations with reasonably large max.num.records and 
hence is less vulnerable to outliers
             taskManager.recordTaskProcessRatio(totalProcessLatency, now);
+
+            log.info("Processed {} total records, ran {} punctuators, and 
committed {} total tasks " +
+                        "for active tasks {} and standby tasks {}",
+                     totalProcessed, totalPunctuated, totalCommitted, 
taskManager.activeTaskIds(), taskManager.standbyTaskIds());
         }
 
         now = time.milliseconds();
@@ -754,7 +762,7 @@ public class StreamThread extends Thread {
             // to unblock the restoration as soon as possible
             records = pollRequests(Duration.ZERO);
         } else if (state == State.PARTITIONS_REVOKED) {
-            // try to fetch som records with zero poll millis to unblock
+            // try to fetch some records with zero poll millis to unblock
             // other useful work while waiting for the join response
             records = pollRequests(Duration.ZERO);
         } else if (state == State.RUNNING || state == State.STARTING) {
@@ -773,13 +781,13 @@ public class StreamThread extends Thread {
 
         final long pollLatency = advanceNowAndComputeLatency();
 
-        if (log.isDebugEnabled()) {
-            log.debug("Main Consumer poll completed in {} ms and fetched {} 
records", pollLatency, records.count());
-        }
+        final int numRecords = records.count();
+        log.info("Main Consumer poll completed in {} ms and fetched {} 
records", pollLatency, numRecords);
+
         pollSensor.record(pollLatency, now);
 
         if (!records.isEmpty()) {
-            pollRecordsSensor.record(records.count(), now);
+            pollRecordsSensor.record(numRecords, now);
             taskManager.addRecordsToTasks(records);
         }
         return pollLatency;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index d0c6a1e..eb551bd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -1060,7 +1060,7 @@ public class TaskManager {
     }
 
     private void commitOffsetsOrTransaction(final Map<TaskId, 
Map<TopicPartition, OffsetAndMetadata>> offsetsPerTask) {
-        log.info("Committing task offsets {}", offsetsPerTask);
+        log.debug("Committing task offsets {}", offsetsPerTask);
 
         if (!offsetsPerTask.isEmpty()) {
             if (processingMode == EXACTLY_ONCE_ALPHA) {

Reply via email to