[ https://issues.apache.org/jira/browse/KAFKA-14079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Christopher L. Shannon updated KAFKA-14079: ------------------------------------------- Description: KAFKA-13348 added the ability to ignore producer exceptions by setting {{error.tolerance}} to {{{}all{}}}. When this is set to all a null record metadata is passed to commitRecord() and the task continues. The issue is that records are tracked by {{SubmittedRecords}} and the first time an error happens the code does not ack the record with the error and just skips it so it will not have the offsets committed or be removed from SubmittedRecords before calling commitRecord(). This leads to a bug where future offsets won't be committed anymore and also a memory leak because the algorithm that removes acked records from the internal map to commit offsets [looks |https://github.com/apache/kafka/blob/3.2.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java#L177] at the head of the Deque where the records are tracked in and if it sees the record is unacked it will not process anymore removals. This leads to all new records that go through the task to continue to be added and not have offsets committed and never removed from tracking until an OOM error occurs. The fix is to make sure to ack the failed records so they can have their offsets commited and be removed from tracking. This is fine to do as the records are intended to be skipped and not reprocessed. Metrics also need to be updated as well. was: KAFKA-13348 added the ability to ignore producer exceptions by setting {{error.tolerance}} to {{{}all{}}}. When this is set to all a null record metadata is passed to commitRecord() and the task continues. The issue is that records are tracked by \{{SubmittedRecords }}and the first time an error happens the code does not remove the record with the error from SubmittedRecords before calling commitRecord(). This leads to a bug where offsets won't be commited anymore and also a memory leak because the algorithm that removes acked records from the internal map [looks |https://github.com/apache/kafka/blob/3.2.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java#L177]at the head of the Deque where the records are tracked in and if it sees the record is unacked it will not process anymore removals. This leads to all new records that go through the task to continue to be added and not commited and never removed until an OOM error occurrs. The fix is to make sure to ack the failed records before calling commitRecord(). The failed records can be acked and commited as they won't be retried. Metrics also need to be updated as well. > Source task will not commit offsets and develops memory leak if > "error.tolerance" is set to "all" > ------------------------------------------------------------------------------------------------- > > Key: KAFKA-14079 > URL: https://issues.apache.org/jira/browse/KAFKA-14079 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 3.2.0 > Reporter: Christopher L. Shannon > Priority: Major > Fix For: 3.2.1 > > > KAFKA-13348 added the ability to ignore producer exceptions by setting > {{error.tolerance}} to {{{}all{}}}. When this is set to all a null record > metadata is passed to commitRecord() and the task continues. > The issue is that records are tracked by {{SubmittedRecords}} and the first > time an error happens the code does not ack the record with the error and > just skips it so it will not have the offsets committed or be removed from > SubmittedRecords before calling commitRecord(). > This leads to a bug where future offsets won't be committed anymore and also > a memory leak because the algorithm that removes acked records from the > internal map to commit offsets [looks > |https://github.com/apache/kafka/blob/3.2.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java#L177] > at the head of the Deque where the records are tracked in and if it sees the > record is unacked it will not process anymore removals. This leads to all new > records that go through the task to continue to be added and not have offsets > committed and never removed from tracking until an OOM error occurs. > The fix is to make sure to ack the failed records so they can have their > offsets commited and be removed from tracking. This is fine to do as the > records are intended to be skipped and not reprocessed. Metrics also need to > be updated as well. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)