This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new ea3a6417 [FLINK-33201][Connectors/Kafka] Fix memory leak in 
CachingTopicSelector
ea3a6417 is described below

commit ea3a6417e290d02c1b87c7d0afbb6db910b8faa8
Author: Yop Lee <[email protected]>
AuthorDate: Thu Sep 21 11:05:56 2023 +0900

    [FLINK-33201][Connectors/Kafka] Fix memory leak in CachingTopicSelector
---
 .../connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
index 34cf6ef0..92eb625b 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
@@ -284,7 +284,7 @@ public class KafkaRecordSerializationSchemaBuilder<IN> {
         public String apply(IN in) {
             final String topic = cache.getOrDefault(in, 
topicSelector.apply(in));
             cache.put(in, topic);
-            if (cache.size() == CACHE_RESET_SIZE) {
+            if (cache.size() >= CACHE_RESET_SIZE) {
                 cache.clear();
             }
             return topic;

Reply via email to