[ https://issues.apache.org/jira/browse/SAMZA-458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14364836#comment-14364836 ]
Yan Fang commented on SAMZA-458: -------------------------------- RB: https://reviews.apache.org/r/32155/ 1. There are some code that is never called. before patch - {code} if (sendFailed.get()) { logger.error("Unable to send message from %s to system %s" format(source, systemName)) //Close producer stop() producer = null metrics.flushFailed.inc throw new SamzaException("Unable to send message from %s to system %s" format(source, systemName)) } {code} line 138-143 is never called. Because when the sendFailed.get() is true, meaning the exception is nonretriable (line 97, 98), {code} throw new SamzaException("Failed to send message. Exception:\n %s".format(exception)) {code} line 118 already throws the exception and so will not call the flush. after patch (see RB), {code} producer = null //Mark loop as done as we are not going to retry loop.done metrics.sendFailed.inc throw new SamzaException("Failed to send message. Exception:\n %s".format(exception)) {code} line 121-125 will not be called because after calling stop(), the flush will throw the exception. So maybe we should combine them together ? Any thoughts? 2. stop() and flush() are calling each other. So have to add two variables to make it work. Any better ways to do this? > Close in KafkaSystemProducer should flush all source buffers > ------------------------------------------------------------ > > Key: SAMZA-458 > URL: https://issues.apache.org/jira/browse/SAMZA-458 > Project: Samza > Issue Type: Bug > Components: kafka > Reporter: Chris Riccomini > Assignee: Yan Fang > Labels: newbie > Fix For: 0.9.0 > > Attachments: SAMZA-458.patch > > > I noticed that calling KafkaSystemProducer.stop does not flush any > outstanding messages in the sourceBuffers array. Calling close should flush > all buffers. Without this, shutting down can cause messages to be dropped. -- This message was sent by Atlassian JIRA (v6.3.4#6332)