Yash Mayya created KAFKA-14342:
----------------------------------

             Summary: KafkaOffsetBackingStore should clear offsets for source 
partitions on tombstone messages
                 Key: KAFKA-14342
                 URL: https://issues.apache.org/jira/browse/KAFKA-14342
             Project: Kafka
          Issue Type: Improvement
          Components: KafkaConnect
            Reporter: Yash Mayya
            Assignee: Yash Mayya


[KafkaOffsetBackingStore|https://github.com/apache/kafka/blob/56d588d55ac313c0efca586a3bcd984c99a89018/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L70]
 is used to track source connector offsets using a backing Kafka topic. It 
implements interface methods to get and set offsets using a 
[KafkaBasedLog|https://github.com/apache/kafka/blob/56d588d55ac313c0efca586a3bcd984c99a89018/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L80].
 It also maintains an in-memory map containing \{partition, offset} entries for 
source connectors (which is populated via the consumer callback mechanism from 
the KafkaBasedLog). When a tombstone offset (i.e. Kafka message with a null 
value) is encountered for a source partition, the map is simply updated to make 
the value null for the corresponding partition key. For certain source 
connectors which have a lot of source partitions that are "closed" frequently, 
this can be very problematic. Imagine a file source connector which reads data 
from all files in a directory line-by-line (and where file appends are not 
tracked) - each file corresponds to a source partition here, and the offset 
would be the line number in the file. If there are millions of files being 
read, this can bring down the Connect worker due to JVM heap exhaustion (OOM) 
caused by the in-memory map in KafkaOffsetBackingStore growing too large. Even 
if the connector writes tombstone offsets for the last record in a source 
partition, this doesn't help completely since we don't currently remove entries 
from KafkaOffsetBackingStore's in-memory offset map (so the source partition 
keys will stick around) - even though we indicate 
[here|https://github.com/apache/kafka/blob/56d588d55ac313c0efca586a3bcd984c99a89018/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java#L37]
 that tombstones can be used to "delete" offsets.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to