This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ae570f5953 HOTFIX: Correct ordering of input buffer and enforced processing sensors (#12363) ae570f5953 is described below commit ae570f59533ae941bbf5ab9ff5e739a5bd855fd6 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Sun Jul 3 10:02:59 2022 -0700 HOTFIX: Correct ordering of input buffer and enforced processing sensors (#12363) 1. As titled, fix the right constructor param ordering. 2. Also added a few more loglines. Reviewers: Matthias J. Sax <matth...@confluent.io>, Sagar Rao <sagarmeansoc...@gmail.com>, Hao Li <1127478+lihao...@users.noreply.github.com> --- .../src/main/java/org/apache/kafka/streams/KafkaStreams.java | 7 ++++++- .../kafka/streams/processor/internals/PartitionGroup.java | 11 ++++++----- .../apache/kafka/streams/processor/internals/StreamTask.java | 4 +--- .../kafka/streams/processor/internals/StreamThread.java | 11 ++++++++--- .../kafka/streams/processor/internals/PartitionGroupTest.java | 2 +- 5 files changed, 22 insertions(+), 13 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 86ab83f67d..2c95aa85a4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -978,6 +978,10 @@ public class KafkaStreams implements AutoCloseable { } // Initially, all Stream Threads are created with 0 cache size and max buffer size and then resized here. resizeThreadCacheAndBufferMemory(numStreamThreads); + if (numStreamThreads > 0) { + log.info("Initializing {} StreamThread with cache size/max buffer size values as {} per thread.", + numStreamThreads, getThreadCacheAndBufferMemoryString()); + } stateDirCleaner = setupStateDirCleaner(); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, applicationConfigs); @@ -1143,7 +1147,8 @@ public class KafkaStreams implements AutoCloseable { + "for it to complete shutdown as this will result in deadlock.", streamThread.getName()); } resizeThreadCacheAndBufferMemory(getNumLiveStreamThreads()); - log.info("Resizing thread cache/max buffer size due to removal of thread {}, new cache size/max buffer size per thread is {}", streamThread.getName(), getThreadCacheAndBufferMemoryString()); + log.info("Resizing thread cache/max buffer size due to removal of thread {}, " + + "new cache size/max buffer size per thread is {}", streamThread.getName(), getThreadCacheAndBufferMemoryString()); if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) { final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceID.get()); final Collection<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 21d3cbfa3f..750699a1ec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -64,7 +64,7 @@ public class PartitionGroup { private final Sensor enforcedProcessingSensor; private final long maxTaskIdleMs; private final Sensor recordLatenessSensor; - private final Sensor totalBytesSensor; + private final Sensor totalInputBufferBytesSensor; private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime; private long streamTime; @@ -93,8 +93,8 @@ public class PartitionGroup { final Map<TopicPartition, RecordQueue> partitionQueues, final Function<TopicPartition, OptionalLong> lagProvider, final Sensor recordLatenessSensor, + final Sensor totalInputBufferBytesSensor, final Sensor enforcedProcessingSensor, - final Sensor totalBytesSensor, final long maxTaskIdleMs) { this.logger = logContext.logger(PartitionGroup.class); nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp)); @@ -103,7 +103,7 @@ public class PartitionGroup { this.enforcedProcessingSensor = enforcedProcessingSensor; this.maxTaskIdleMs = maxTaskIdleMs; this.recordLatenessSensor = recordLatenessSensor; - this.totalBytesSensor = totalBytesSensor; + this.totalInputBufferBytesSensor = totalInputBufferBytesSensor; totalBuffered = 0; allBuffered = false; streamTime = RecordQueue.UNKNOWN; @@ -230,6 +230,7 @@ public class PartitionGroup { // if partition is removed should delete its queue totalBuffered -= queueEntry.getValue().size(); totalBytesBuffered -= queueEntry.getValue().getTotalBytesBuffered(); + totalInputBufferBytesSensor.record(totalBytesBuffered); queuesIterator.remove(); removedPartitions.add(topicPartition); } @@ -275,7 +276,7 @@ public class PartitionGroup { if (record != null) { --totalBuffered; totalBytesBuffered -= oldBufferSize - newBufferSize; - totalBytesSensor.record(totalBytesBuffered); + totalInputBufferBytesSensor.record(totalBytesBuffered); if (queue.isEmpty()) { // if a certain queue has been drained, reset the flag allBuffered = false; @@ -329,7 +330,7 @@ public class PartitionGroup { totalBuffered += newSize - oldSize; totalBytesBuffered += newBufferSize - oldBufferSize; - totalBytesSensor.record(totalBytesBuffered); + totalInputBufferBytesSensor.record(totalBytesBuffered); return newSize; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 8514c6ae2e..33751e59d8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -176,8 +176,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, recordInfo = new PartitionGroup.RecordInfo(); - final Sensor enforcedProcessingSensor; - enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics); final long maxTaskIdleMs = config.maxTaskIdleMs; partitionGroup = new PartitionGroup( logContext, @@ -185,7 +183,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, mainConsumer::currentLag, TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics), TaskMetrics.totalInputBufferBytesSensor(threadId, taskId, streamsMetrics), - enforcedProcessingSensor, + TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics), maxTaskIdleMs ); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 64a4ff5433..1c252c2fda 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -798,7 +798,10 @@ public class StreamThread extends Thread { totalRecordsProcessedSinceLastSummary += processed; final long bufferSize = taskManager.getInputBufferSizeInBytes(); if (bufferSize <= maxBufferSizeBytes.get()) { - mainConsumer.resume(mainConsumer.paused()); + final Set<TopicPartition> pausedPartitions = mainConsumer.paused(); + log.info("Buffered records size {} bytes falls below {}. Resuming all the paused partitions {} in the consumer", + bufferSize, maxBufferSizeBytes.get(), pausedPartitions); + mainConsumer.resume(pausedPartitions); } } @@ -969,12 +972,14 @@ public class StreamThread extends Thread { final long bufferSize = taskManager.getInputBufferSizeInBytes(); // Pausing partitions as the buffer size now exceeds max buffer size if (bufferSize > maxBufferSizeBytes.get()) { - log.info("Buffered records size {} bytes exceeds {}. Pausing the consumer", bufferSize, maxBufferSizeBytes.get()); + final Set<TopicPartition> nonEmptyPartitions = taskManager.nonEmptyPartitions(); + log.info("Buffered records size {} bytes exceeds {}. Pausing partitions {} from the consumer", + bufferSize, maxBufferSizeBytes.get(), nonEmptyPartitions); // Only non-empty partitions are paused here. Reason is that, if a task has multiple partitions with // some of them empty, then in that case pausing even empty partitions would sacrifice ordered processing // and even lead to temporal deadlock. More explanation can be found here: // https://issues.apache.org/jira/browse/KAFKA-13152?focusedCommentId=17400647&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17400647 - mainConsumer.pause(taskManager.nonEmptyPartitions()); + mainConsumer.pause(nonEmptyPartitions); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 012373607e..e29cf69d61 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -861,8 +861,8 @@ public class PartitionGroupTest { ), tp -> OptionalLong.of(0L), getValueSensor(metrics, lastLatenessValue), - enforcedProcessingSensor, getValueSensor(metrics, totalBytesValue), + enforcedProcessingSensor, maxTaskIdleMs ); }