[ 
https://issues.apache.org/jira/browse/SAMZA-635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navina Ramesh updated SAMZA-635:
--------------------------------
    Fix Version/s:     (was: 0.10.0)

> KafkaSystemProducer may get exceptions out-of-order
> ---------------------------------------------------
>
>                 Key: SAMZA-635
>                 URL: https://issues.apache.org/jira/browse/SAMZA-635
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.9.0
>            Reporter: Yi Pan (Data Infrastructure)
>
> In the current KafkaSystemProducer design, there is a possibility that a 
> non-retriable exceptions can be thrown from the Kafka producer send thread 
> and creates the race conditions in the following code blocks:
> {code}
> 82    sendFailed.set(false)
> 83
> 84    retryBackoff.run(
> 85      loop => {
> 86        if (sendFailed.get()) {
> 87          throw exceptionThrown.get()
> 88        }
> {code}
> And 
> {code}
> 91            def onCompletion(metadata: RecordMetadata, exception: 
> Exception): Unit = {
> 92              if (exception == null) {
> 93                //send was successful. Don't retry
> 94                metrics.sendSuccess.inc
> 95              } else {
> 96                //If there is an exception in the callback, it means that 
> the Kafka producer has exhausted the max-retries
> 97                //Hence, fail container!
> 98                exceptionThrown.compareAndSet(null, exception)
> 99                sendFailed.set(true)
> 100              }
> 101            }
> {code}
> The main thread sets and gets _sendFailed_ in line 82 and 86, and the Kafka 
> send thread is setting it in line 99.
> Thera could be two race conditions here:
> 1) the Kafka send thread complete line 99 and the main thread executes line 
> 82, in which we missed an exception
> 2) the main thread finishes line 82 in the current message, and the Kafka 
> send thread execute line 99 for the previous message. In this case, the main 
> thread got an exception that is for the previous message, not the current one.
> The configuration that can trigger this to happen is:
> {code}
> systems.kafka.producer.max.request.size=102400
> {code}
> Broker side:
> {code}
> message.max.bytes=10240
> {code}
> And inside task.process(), we send a 16KB message first, then a small message.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to