[ 
https://issues.apache.org/jira/browse/SAMZA-458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14368250#comment-14368250
 ] 

Navina Ramesh commented on SAMZA-458:
-------------------------------------

Yeah. The callback associated with the new producer's send method is actually 
executed in a separate I/O thread in Kafka. 
In the current code (without your patch), lines 138-143 do get called. 

In an execution that looks like this:
Samza Thread                      I/O Thread in Kafka

send(msg1)                          
send(msg2)      
                                            callback for msg1 - successful      
              
                                            callback for msg2 - failed
send(msg3)
flush()

In the above case, when callback for msg2 fails, it marks sendFailed as true. 
The samza thread that is currently blocked in the flush() method will detect 
this flag as true and throw an exception. This is the logic in Lines 138 to 
143. 

Does this example make it clear?  I think you missed the fact that the callback 
and container are on different thread. It isn't very obvious.


> Close in KafkaSystemProducer should flush all source buffers
> ------------------------------------------------------------
>
>                 Key: SAMZA-458
>                 URL: https://issues.apache.org/jira/browse/SAMZA-458
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>            Reporter: Chris Riccomini
>            Assignee: Yan Fang
>              Labels: newbie
>             Fix For: 0.9.0
>
>         Attachments: SAMZA-458.patch
>
>
> I noticed that calling KafkaSystemProducer.stop does not flush any 
> outstanding messages in the sourceBuffers array. Calling close should flush 
> all buffers. Without this, shutting down can cause messages to be dropped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to