Hey Jae, Yea, I've been thinking about this a bit. I think there's no good way to do this right now, but it seems like a reasonable request. I think the goal would be to trigger a commit every time a flush is called. This would MINIMIZE but not ELIMINATE the potential for duplicate messages. There could be some performance trade-off here, since commits can be a bit expensive (they're sync writes with acks -1 when writing to Kafka), but it should still work OK.
Can you open a JIRA up for this? My initial inclination is that we might be able to pass a TaskCoordinator into the SystemProducer API. This would allow the SystemProducer implementation to request a commit when it's flushing. The problem with this approach is that we just upgraded Samza 0.9.0 to the new Kafka producer, which eliminates the concept of batching. Any time you call send, the producer now immediately sends. This means that there's really no "flush". It's always flushing. There's only "sync", which is "wait for all outstanding sends to return". Without the concept of a "flush" in the new producer, having a coordinator doesn't really help, since it mean's you'd have to commit after every message (since every message is immediately flushed). Cheers, Chris On Thu, Jan 29, 2015 at 9:29 AM, Bae, Jae Hyeon <metac...@gmail.com> wrote: > Hi Chris > > > > On Thu, Jan 29, 2015 at 9:10 AM, Chris Riccomini <criccom...@apache.org> > wrote: > > > Hey Jae, > > > > If I understand you correctly, your concern is that there could be > flushes > > in-between commits. For example: > > > > T=30s; flush > > T=45s; flush > > T=60s; flush && commit > > T=65s; flush > > > > Your concern here is that if there's a failure before 60s, the messages > > that were flushed at 30s and 45s will be duplicated when the container > > reprocesses, right? > > > Correct > > > > > > > > Never mind. I found a solution. Flush should be synced with commit. > > > > Last night, I was sleepy and struggling with finding a solution, so this > morning, it turned out to be wrong :( > My idea was, send() function does not call flush() even though the buffer > is full. But this is risky. > > Actually, I was writing our internal data pipeline component as StreamTask > but I switched it to SystemProducer as Metamx Druid Tranquility did. But I > overlook that duplicate data which can be caused by flush & commit > mismatch. > > Do you have any idea? > > > > > Could you elaborate on this? > > > > Cheers, > > Chris > > > > On Thu, Jan 29, 2015 at 12:27 AM, Bae, Jae Hyeon <metac...@gmail.com> > > wrote: > > > > > Never mind. I found a solution. Flush should be synced with commit. > > > > > > On Thu, Jan 29, 2015 at 12:15 AM, Bae, Jae Hyeon <metac...@gmail.com> > > > wrote: > > > > > > > Hi Samza Devs > > > > > > > > StreamTask can control SamzaContainer.commit() through task > > coordinator. > > > > Can we make SystemProducer control commit after flush? With this > > feature, > > > > we can prevent any duplicate data on SamzaContainer failure. > > > > > > > > For example, if we set commit interval as 2 minutes, before commit > time > > > > interval expires, when its buffer size is greater than batch size, > > > > SystemProducer will flush data in the buffer. Right after flush, when > > the > > > > container dies, another container will start from the previous > commit. > > > > Then, we will have duplicate data. > > > > > > > > If we have longer commit interval, we will have more duplicate data. > I > > > > know this is not a big deal because container failure will be rare > case > > > and > > > > just a few minutes data will be duplicated. But I will be happy if we > > can > > > > clear this little concern. > > > > > > > > Any idea? > > > > > > > > Thank you > > > > Best, Jae > > > > > > > > > >