[
https://issues.apache.org/jira/browse/KAFKA-6876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Egerton resolved KAFKA-6876.
----------------------------------
Resolution: Duplicate
> Sender exceptions ignored by WorkerSourceTask producer Callback causing data
> loss
> ---------------------------------------------------------------------------------
>
> Key: KAFKA-6876
> URL: https://issues.apache.org/jira/browse/KAFKA-6876
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 0.11.0.1, 1.1.0, 2.0.1
> Environment: Linux, JDK 8
> Reporter: Paul Davidson
> Priority: Major
>
> The producer callback in "WorkerSourceTask" handles exceptions during a
> send() by logging at ERROR level and continuing. This can lead to offsets
> being committed for records that were never sent correctly. The records are
> effectively skipped, leading to data loss in our use case.
> The source code for the Callback "onCompletion()" method suggests this should
> "basically never happen ... callbacks with exceptions should never be invoked
> in practice", but we have seen this happen several times in production,
> especially in near heap-exhaustion situations when the Sender thread
> generates an exception (often caused by KAFKA-6551).
> From WorkerSourceTask line 253:
> {code:java}
> new Callback() {
> @Override
> public void onCompletion(RecordMetadata recordMetadata, Exception e) {
> if (e != null) {
> // Given the default settings for zero data loss, this should
> basically never happen --
> // between "infinite" retries, indefinite blocking on full
> buffers, and "infinite" request
> // timeouts, callbacks with exceptions should never be invoked in
> practice. If the
> // user overrode these settings, the best we can do is notify them
> of the failure via
> // logging.
> log.error("{} failed to send record to {}: {}", this, topic, e);
> log.debug("{} Failed record: {}", this, preTransformRecord);
> } else {
> log.trace("{} Wrote record successfully: topic {} partition {}
> offset {}",
> this,
> recordMetadata.topic(), recordMetadata.partition(),
> recordMetadata.offset());
> commitTaskRecord(preTransformRecord);
> }
> recordSent(producerRecord);
> counter.completeRecord();
> }
> }
> {code}
>
> Example of an exception triggering the bug:
> {code:java}
> 2018-04-27 21:14:25,740 [kafka-producer-network-thread | source-23] ERROR
> o.a.k.c.runtime.WorkerSourceTask - source-23 failed to send record to
> topic-name: {}
> java.lang.IllegalStateException: Producer is closed forcefully.
> at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:610)
> at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:597)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:183)
> at java.lang.Thread.run(Thread.java:748)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)