[ 
https://issues.apache.org/jira/browse/SAMZA-541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14297399#comment-14297399
 ] 

Chris Riccomini commented on SAMZA-541:
---------------------------------------

Email thread is 
[here|http://mail-archives.apache.org/mod_mbox/samza-dev/201501.mbox/%3CCAKe7ALfkKpmq%2BvzsiWzzxG8wV8Moee3DeVmJFGTHia%3D28qveJw%40mail.gmail.com%3E].

Some notes:

# MessageCollector should remain untouched. TaskInstanceCollector should be 
updated to have a TaskCoordinator, which it can feed in as it forwards send() 
calls to the SystemProducers (producerMultiplexer).
# " it will be flexible to control 'exactly-once-delivery'." It won't quite be 
exactly-once. All this would do would shrink the risk of duplicates. Duplicates 
could still occur, since the flush and commit calls are non-atomic (you have to 
either flush first, then commit, or vice-versa).
# We should bug some of the Kafka devs to see if there's a trick to handling 
this (e.g. increasing linger.ms + adding a callback).

Regarding (3), if we re-introduced some Samza-level buffering on top of the 
underlying Kafka producer, we could then have more control by fully flushing a 
batch, then committing. This kind of ruins the advantage of the new Kafka 
producer, though, since it will increase latency (messages that could be sent 
are batched instead of sent immediately), and might decrease throughput (since 
the new producer automatically selects the best possible batch size, whereas 
the old-style has fixed-batch sizes).

> Passing coordinator to producer chains
> --------------------------------------
>
>                 Key: SAMZA-541
>                 URL: https://issues.apache.org/jira/browse/SAMZA-541
>             Project: Samza
>          Issue Type: Improvement
>          Components: container
>            Reporter: Jae Hyeon Bae
>            Assignee: Jae Hyeon Bae
>
> StreamTask can control SamzaContainer.commit() through task coordinator but 
> SystemProducer can call flush() without commit() which can create duplicate 
> data on container failure. For example, 
> T=30s; flush 
> T=45s; flush
> T=60s; flush && commit
> T=65s; flush
> "If there's a failure before 60s, the messages that were flushed at 30s and 
> 45s will be duplicated when the container reprocesses" (quoted from 
> [~criccomini] response).
> If SystemProducer can call access TaskCoordinator created from RunLoop in 
> SamzaContainer, it will be flexible to control 'exactly-once-delivery'.
> The following interface should be changed:
> {code}
> MessageCollector.send(envelop, coordinator)
> SystemProducers.send(source, envelop, coordinator)
> SystemProducer.send(source, envelop, coordinator)
> {code}
> Kafka 0.8 new Java producer cannot be synchronized with TaskInstance.commit 
> because it doesn't do batch-flush. Depending on SystemProducer 
> implementation, it can guarantee 'exactly-once-delivery'.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to