[
https://issues.apache.org/jira/browse/SAMZA-541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14297399#comment-14297399
]
Chris Riccomini commented on SAMZA-541:
---------------------------------------
Email thread is
[here|http://mail-archives.apache.org/mod_mbox/samza-dev/201501.mbox/%3CCAKe7ALfkKpmq%2BvzsiWzzxG8wV8Moee3DeVmJFGTHia%3D28qveJw%40mail.gmail.com%3E].
Some notes:
# MessageCollector should remain untouched. TaskInstanceCollector should be
updated to have a TaskCoordinator, which it can feed in as it forwards send()
calls to the SystemProducers (producerMultiplexer).
# " it will be flexible to control 'exactly-once-delivery'." It won't quite be
exactly-once. All this would do would shrink the risk of duplicates. Duplicates
could still occur, since the flush and commit calls are non-atomic (you have to
either flush first, then commit, or vice-versa).
# We should bug some of the Kafka devs to see if there's a trick to handling
this (e.g. increasing linger.ms + adding a callback).
Regarding (3), if we re-introduced some Samza-level buffering on top of the
underlying Kafka producer, we could then have more control by fully flushing a
batch, then committing. This kind of ruins the advantage of the new Kafka
producer, though, since it will increase latency (messages that could be sent
are batched instead of sent immediately), and might decrease throughput (since
the new producer automatically selects the best possible batch size, whereas
the old-style has fixed-batch sizes).
> Passing coordinator to producer chains
> --------------------------------------
>
> Key: SAMZA-541
> URL: https://issues.apache.org/jira/browse/SAMZA-541
> Project: Samza
> Issue Type: Improvement
> Components: container
> Reporter: Jae Hyeon Bae
> Assignee: Jae Hyeon Bae
>
> StreamTask can control SamzaContainer.commit() through task coordinator but
> SystemProducer can call flush() without commit() which can create duplicate
> data on container failure. For example,
> T=30s; flush
> T=45s; flush
> T=60s; flush && commit
> T=65s; flush
> "If there's a failure before 60s, the messages that were flushed at 30s and
> 45s will be duplicated when the container reprocesses" (quoted from
> [~criccomini] response).
> If SystemProducer can call access TaskCoordinator created from RunLoop in
> SamzaContainer, it will be flexible to control 'exactly-once-delivery'.
> The following interface should be changed:
> {code}
> MessageCollector.send(envelop, coordinator)
> SystemProducers.send(source, envelop, coordinator)
> SystemProducer.send(source, envelop, coordinator)
> {code}
> Kafka 0.8 new Java producer cannot be synchronized with TaskInstance.commit
> because it doesn't do batch-flush. Depending on SystemProducer
> implementation, it can guarantee 'exactly-once-delivery'.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)