[
https://issues.apache.org/jira/browse/SAMZA-635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Navina Ramesh updated SAMZA-635:
--------------------------------
Fix Version/s: (was: 0.10.0)
> KafkaSystemProducer may get exceptions out-of-order
> ---------------------------------------------------
>
> Key: SAMZA-635
> URL: https://issues.apache.org/jira/browse/SAMZA-635
> Project: Samza
> Issue Type: Bug
> Components: kafka
> Affects Versions: 0.9.0
> Reporter: Yi Pan (Data Infrastructure)
>
> In the current KafkaSystemProducer design, there is a possibility that a
> non-retriable exceptions can be thrown from the Kafka producer send thread
> and creates the race conditions in the following code blocks:
> {code}
> 82 sendFailed.set(false)
> 83
> 84 retryBackoff.run(
> 85 loop => {
> 86 if (sendFailed.get()) {
> 87 throw exceptionThrown.get()
> 88 }
> {code}
> And
> {code}
> 91 def onCompletion(metadata: RecordMetadata, exception:
> Exception): Unit = {
> 92 if (exception == null) {
> 93 //send was successful. Don't retry
> 94 metrics.sendSuccess.inc
> 95 } else {
> 96 //If there is an exception in the callback, it means that
> the Kafka producer has exhausted the max-retries
> 97 //Hence, fail container!
> 98 exceptionThrown.compareAndSet(null, exception)
> 99 sendFailed.set(true)
> 100 }
> 101 }
> {code}
> The main thread sets and gets _sendFailed_ in line 82 and 86, and the Kafka
> send thread is setting it in line 99.
> Thera could be two race conditions here:
> 1) the Kafka send thread complete line 99 and the main thread executes line
> 82, in which we missed an exception
> 2) the main thread finishes line 82 in the current message, and the Kafka
> send thread execute line 99 for the previous message. In this case, the main
> thread got an exception that is for the previous message, not the current one.
> The configuration that can trigger this to happen is:
> {code}
> systems.kafka.producer.max.request.size=102400
> {code}
> Broker side:
> {code}
> message.max.bytes=10240
> {code}
> And inside task.process(), we send a 16KB message first, then a small message.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)