This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ae570f5953 HOTFIX: Correct ordering of input buffer and enforced 
processing sensors (#12363)
ae570f5953 is described below

commit ae570f59533ae941bbf5ab9ff5e739a5bd855fd6
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Sun Jul 3 10:02:59 2022 -0700

    HOTFIX: Correct ordering of input buffer and enforced processing sensors 
(#12363)
    
    1. As titled, fix the right constructor param ordering.
    2. Also added a few more loglines.
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>, Sagar Rao 
<sagarmeansoc...@gmail.com>, Hao Li <1127478+lihao...@users.noreply.github.com>
---
 .../src/main/java/org/apache/kafka/streams/KafkaStreams.java  |  7 ++++++-
 .../kafka/streams/processor/internals/PartitionGroup.java     | 11 ++++++-----
 .../apache/kafka/streams/processor/internals/StreamTask.java  |  4 +---
 .../kafka/streams/processor/internals/StreamThread.java       | 11 ++++++++---
 .../kafka/streams/processor/internals/PartitionGroupTest.java |  2 +-
 5 files changed, 22 insertions(+), 13 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 86ab83f67d..2c95aa85a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -978,6 +978,10 @@ public class KafkaStreams implements AutoCloseable {
         }
         // Initially, all Stream Threads are created with 0 cache size and max 
buffer size and then resized here.
         resizeThreadCacheAndBufferMemory(numStreamThreads);
+        if (numStreamThreads > 0) {
+            log.info("Initializing {} StreamThread with cache size/max buffer 
size values as {} per thread.",
+                numStreamThreads, getThreadCacheAndBufferMemoryString());
+        }
 
         stateDirCleaner = setupStateDirCleaner();
         rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, applicationConfigs);
@@ -1143,7 +1147,8 @@ public class KafkaStreams implements AutoCloseable {
                                     + "for it to complete shutdown as this 
will result in deadlock.", streamThread.getName());
                         }
                         
resizeThreadCacheAndBufferMemory(getNumLiveStreamThreads());
-                        log.info("Resizing thread cache/max buffer size due to 
removal of thread {}, new cache size/max buffer size per thread is {}", 
streamThread.getName(), getThreadCacheAndBufferMemoryString());
+                        log.info("Resizing thread cache/max buffer size due to 
removal of thread {}, " +
+                            "new cache size/max buffer size per thread is {}", 
streamThread.getName(), getThreadCacheAndBufferMemoryString());
                         if (groupInstanceID.isPresent() && 
callingThreadIsNotCurrentStreamThread) {
                             final MemberToRemove memberToRemove = new 
MemberToRemove(groupInstanceID.get());
                             final Collection<MemberToRemove> membersToRemove = 
Collections.singletonList(memberToRemove);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 21d3cbfa3f..750699a1ec 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -64,7 +64,7 @@ public class PartitionGroup {
     private final Sensor enforcedProcessingSensor;
     private final long maxTaskIdleMs;
     private final Sensor recordLatenessSensor;
-    private final Sensor totalBytesSensor;
+    private final Sensor totalInputBufferBytesSensor;
     private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;
 
     private long streamTime;
@@ -93,8 +93,8 @@ public class PartitionGroup {
                    final Map<TopicPartition, RecordQueue> partitionQueues,
                    final Function<TopicPartition, OptionalLong> lagProvider,
                    final Sensor recordLatenessSensor,
+                   final Sensor totalInputBufferBytesSensor,
                    final Sensor enforcedProcessingSensor,
-                   final Sensor totalBytesSensor,
                    final long maxTaskIdleMs) {
         this.logger = logContext.logger(PartitionGroup.class);
         nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), 
Comparator.comparingLong(RecordQueue::headRecordTimestamp));
@@ -103,7 +103,7 @@ public class PartitionGroup {
         this.enforcedProcessingSensor = enforcedProcessingSensor;
         this.maxTaskIdleMs = maxTaskIdleMs;
         this.recordLatenessSensor = recordLatenessSensor;
-        this.totalBytesSensor = totalBytesSensor;
+        this.totalInputBufferBytesSensor = totalInputBufferBytesSensor;
         totalBuffered = 0;
         allBuffered = false;
         streamTime = RecordQueue.UNKNOWN;
@@ -230,6 +230,7 @@ public class PartitionGroup {
                 // if partition is removed should delete its queue
                 totalBuffered -= queueEntry.getValue().size();
                 totalBytesBuffered -= 
queueEntry.getValue().getTotalBytesBuffered();
+                totalInputBufferBytesSensor.record(totalBytesBuffered);
                 queuesIterator.remove();
                 removedPartitions.add(topicPartition);
             }
@@ -275,7 +276,7 @@ public class PartitionGroup {
             if (record != null) {
                 --totalBuffered;
                 totalBytesBuffered -= oldBufferSize - newBufferSize;
-                totalBytesSensor.record(totalBytesBuffered);
+                totalInputBufferBytesSensor.record(totalBytesBuffered);
                 if (queue.isEmpty()) {
                     // if a certain queue has been drained, reset the flag
                     allBuffered = false;
@@ -329,7 +330,7 @@ public class PartitionGroup {
 
         totalBuffered += newSize - oldSize;
         totalBytesBuffered += newBufferSize - oldBufferSize;
-        totalBytesSensor.record(totalBytesBuffered);
+        totalInputBufferBytesSensor.record(totalBytesBuffered);
         return newSize;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 8514c6ae2e..33751e59d8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -176,8 +176,6 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
 
         recordInfo = new PartitionGroup.RecordInfo();
 
-        final Sensor enforcedProcessingSensor;
-        enforcedProcessingSensor = 
TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics);
         final long maxTaskIdleMs = config.maxTaskIdleMs;
         partitionGroup = new PartitionGroup(
             logContext,
@@ -185,7 +183,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
             mainConsumer::currentLag,
             TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics),
             TaskMetrics.totalInputBufferBytesSensor(threadId, taskId, 
streamsMetrics),
-            enforcedProcessingSensor,
+            TaskMetrics.enforcedProcessingSensor(threadId, taskId, 
streamsMetrics),
             maxTaskIdleMs
         );
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 64a4ff5433..1c252c2fda 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -798,7 +798,10 @@ public class StreamThread extends Thread {
                     totalRecordsProcessedSinceLastSummary += processed;
                     final long bufferSize = 
taskManager.getInputBufferSizeInBytes();
                     if (bufferSize <= maxBufferSizeBytes.get()) {
-                        mainConsumer.resume(mainConsumer.paused());
+                        final Set<TopicPartition> pausedPartitions = 
mainConsumer.paused();
+                        log.info("Buffered records size {} bytes falls below 
{}. Resuming all the paused partitions {} in the consumer",
+                            bufferSize, maxBufferSizeBytes.get(), 
pausedPartitions);
+                        mainConsumer.resume(pausedPartitions);
                     }
                 }
 
@@ -969,12 +972,14 @@ public class StreamThread extends Thread {
             final long bufferSize = taskManager.getInputBufferSizeInBytes();
             // Pausing partitions as the buffer size now exceeds max buffer 
size
             if (bufferSize > maxBufferSizeBytes.get()) {
-                log.info("Buffered records size {} bytes exceeds {}. Pausing 
the consumer", bufferSize, maxBufferSizeBytes.get());
+                final Set<TopicPartition> nonEmptyPartitions = 
taskManager.nonEmptyPartitions();
+                log.info("Buffered records size {} bytes exceeds {}. Pausing 
partitions {} from the consumer",
+                    bufferSize, maxBufferSizeBytes.get(), nonEmptyPartitions);
                 // Only non-empty partitions are paused here. Reason is that, 
if a task has multiple partitions with
                 // some of them empty, then in that case pausing even empty 
partitions would sacrifice ordered processing
                 // and even lead to temporal deadlock. More explanation can be 
found here:
                 // 
https://issues.apache.org/jira/browse/KAFKA-13152?focusedCommentId=17400647&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17400647
-                mainConsumer.pause(taskManager.nonEmptyPartitions());
+                mainConsumer.pause(nonEmptyPartitions);
             }
         }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 012373607e..e29cf69d61 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -861,8 +861,8 @@ public class PartitionGroupTest {
             ),
             tp -> OptionalLong.of(0L),
             getValueSensor(metrics, lastLatenessValue),
-            enforcedProcessingSensor,
             getValueSensor(metrics, totalBytesValue),
+            enforcedProcessingSensor,
             maxTaskIdleMs
         );
     }

Reply via email to