[ https://issues.apache.org/jira/browse/SAMZA-458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14368250#comment-14368250 ]
Navina Ramesh commented on SAMZA-458: ------------------------------------- Yeah. The callback associated with the new producer's send method is actually executed in a separate I/O thread in Kafka. In the current code (without your patch), lines 138-143 do get called. In an execution that looks like this: Samza Thread I/O Thread in Kafka send(msg1) send(msg2) callback for msg1 - successful callback for msg2 - failed send(msg3) flush() In the above case, when callback for msg2 fails, it marks sendFailed as true. The samza thread that is currently blocked in the flush() method will detect this flag as true and throw an exception. This is the logic in Lines 138 to 143. Does this example make it clear? I think you missed the fact that the callback and container are on different thread. It isn't very obvious. > Close in KafkaSystemProducer should flush all source buffers > ------------------------------------------------------------ > > Key: SAMZA-458 > URL: https://issues.apache.org/jira/browse/SAMZA-458 > Project: Samza > Issue Type: Bug > Components: kafka > Reporter: Chris Riccomini > Assignee: Yan Fang > Labels: newbie > Fix For: 0.9.0 > > Attachments: SAMZA-458.patch > > > I noticed that calling KafkaSystemProducer.stop does not flush any > outstanding messages in the sourceBuffers array. Calling close should flush > all buffers. Without this, shutting down can cause messages to be dropped. -- This message was sent by Atlassian JIRA (v6.3.4#6332)