This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ee565f5f6b KAFKA-13939: Only track dirty keys if logging is enabled. (#12263) ee565f5f6b is described below commit ee565f5f6b97c84d4f7f895fcb79188822284414 Author: jnewhouse <jnewho...@quantcast.com> AuthorDate: Thu Jun 16 14:27:38 2022 -0700 KAFKA-13939: Only track dirty keys if logging is enabled. (#12263) InMemoryTimeOrderedKeyValueBuffer keeps a Set of keys that have been seen in order to log them for durability. This set is never used nor cleared if logging is not enabled. Having it be populated creates a memory leak. This change stops populating the set if logging is not enabled. Reviewers: Divij Vaidya <di...@amazon.com>, Kvicii <42023367+kvi...@users.noreply.github.com>, Guozhang Wang <wangg...@gmail.com> --- .../state/internals/InMemoryTimeOrderedKeyValueBuffer.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java index 5894023bbe..5403f9e703 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -423,7 +423,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere delegate.remove(); index.remove(next.getKey().key()); - dirtyKeys.add(next.getKey().key()); + if (loggingEnabled) { + dirtyKeys.add(next.getKey().key()); + } memBufferSize -= computeRecordSize(next.getKey().key(), bufferValue); @@ -497,7 +499,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere serializedKey, new BufferValue(serializedPriorValue, serialChange.oldValue, serialChange.newValue, recordContext) ); - dirtyKeys.add(serializedKey); + if (loggingEnabled) { + dirtyKeys.add(serializedKey); + } updateBufferMetrics(); }