[ 
https://issues.apache.org/jira/browse/SAMZA-458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14364836#comment-14364836
 ] 

Yan Fang commented on SAMZA-458:
--------------------------------

RB: https://reviews.apache.org/r/32155/

1. There are some code that is never called.
  before patch - 
{code}
 if (sendFailed.get()) {
          logger.error("Unable to send message from %s to system %s" 
format(source, systemName))
          //Close producer
          stop()
          producer = null
          metrics.flushFailed.inc
          throw new SamzaException("Unable to send message from %s to system 
%s" format(source, systemName))
        }
{code}
line 138-143 is never called. Because when the sendFailed.get() is true, 
meaning the exception is nonretriable (line 97, 98), 
{code}
throw new SamzaException("Failed to send message. Exception:\n 
%s".format(exception))
{code}
line 118 already throws the exception and so will not call the flush.

after patch (see RB),
{code}
producer = null
          //Mark loop as done as we are not going to retry
          loop.done
          metrics.sendFailed.inc
          throw new SamzaException("Failed to send message. Exception:\n 
%s".format(exception))
{code}
line 121-125 will not be called because after calling stop(), the flush will 
throw the exception.

So maybe we should combine them together ? Any thoughts?

2. stop() and flush() are calling each other. So have to add two variables to 
make it work. Any better ways to do this?

> Close in KafkaSystemProducer should flush all source buffers
> ------------------------------------------------------------
>
>                 Key: SAMZA-458
>                 URL: https://issues.apache.org/jira/browse/SAMZA-458
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>            Reporter: Chris Riccomini
>            Assignee: Yan Fang
>              Labels: newbie
>             Fix For: 0.9.0
>
>         Attachments: SAMZA-458.patch
>
>
> I noticed that calling KafkaSystemProducer.stop does not flush any 
> outstanding messages in the sourceBuffers array. Calling close should flush 
> all buffers. Without this, shutting down can cause messages to be dropped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to