[
https://issues.apache.org/jira/browse/SAMZA-541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14299198#comment-14299198
]
Chris Riccomini commented on SAMZA-541:
---------------------------------------
I *think* that this should work. You're going to have to make the
ReadableCoordinator long-lived, and then pass it into TaskInstanceCollector, so
that the coordinator can be injected into the producer.send method behind the
scenes (without the end-user having to pass it in through collector.send).
# Add a reset method to ReadableCoordinator
# Create a single ReadableCoordinator in the SamzaContainer.apply method.
# Pass ReadableCoordinator into RunLoop.
# Use re-usable ReadableCoordinator in RunLoop, and call
ReadableCoordinator.reset in RunLoop.checkCoordinator.
# Pass ReadableCoordinator into TaskInstanceCollector in SamzaContainer.apply.
# Pass ReadableCoordinator into SystemProducers.send.
# Pass ReadableCoordinator into SystemProducer.send.
> Passing coordinator to producer chains
> --------------------------------------
>
> Key: SAMZA-541
> URL: https://issues.apache.org/jira/browse/SAMZA-541
> Project: Samza
> Issue Type: Improvement
> Components: container
> Reporter: Jae Hyeon Bae
> Assignee: Jae Hyeon Bae
>
> StreamTask can control SamzaContainer.commit() through task coordinator but
> SystemProducer can call flush() without commit() which can create duplicate
> data on container failure. For example,
> T=30s; flush
> T=45s; flush
> T=60s; flush && commit
> T=65s; flush
> "If there's a failure before 60s, the messages that were flushed at 30s and
> 45s will be duplicated when the container reprocesses" (quoted from
> [~criccomini] response).
> If SystemProducer can call access TaskCoordinator created from RunLoop in
> SamzaContainer, it will be flexible to control 'exactly-once-delivery'.
> The following interface should be changed:
> {code}
> MessageCollector.send(envelop, coordinator)
> SystemProducers.send(source, envelop, coordinator)
> SystemProducer.send(source, envelop, coordinator)
> {code}
> Kafka 0.8 new Java producer cannot be synchronized with TaskInstance.commit
> because it doesn't do batch-flush. Depending on SystemProducer
> implementation, it can guarantee 'exactly-once-delivery'.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)