This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new ea3a6417 [FLINK-33201][Connectors/Kafka] Fix memory leak in
CachingTopicSelector
ea3a6417 is described below
commit ea3a6417e290d02c1b87c7d0afbb6db910b8faa8
Author: Yop Lee <[email protected]>
AuthorDate: Thu Sep 21 11:05:56 2023 +0900
[FLINK-33201][Connectors/Kafka] Fix memory leak in CachingTopicSelector
---
.../connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
index 34cf6ef0..92eb625b 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
@@ -284,7 +284,7 @@ public class KafkaRecordSerializationSchemaBuilder<IN> {
public String apply(IN in) {
final String topic = cache.getOrDefault(in,
topicSelector.apply(in));
cache.put(in, topic);
- if (cache.size() == CACHE_RESET_SIZE) {
+ if (cache.size() >= CACHE_RESET_SIZE) {
cache.clear();
}
return topic;