Hello all, I just came across the recent discussion on 'Trying to achieve deterministic behavior on recovery/rewind'. I've been thinking about this myself quite a bit recently, so I thought I'd pass along a few observations in case anyone else finds them useful. (I've just joined the mailing list, so I can't reply to that conversation directly -- sorry! Apologies as well if this isn't the appropriate spot for this.)
Jay Kreps mentioned a few extensions to Kafka that should support an exactly-once semantics. In many circumstances, it's *also* possible to do this entirely in-band, without making more assumptions about the underlying transport. (In other words, they can be implemented on top of the current Kafka release.) I think, but am not certain, that Samza would need some minimal changes to expose this to the user. Some context: I've been experimenting with the design a high-level dataflowy streaming framework, built around Kafka's partitioned logs. I've been looking at using Samza as a backend for this, where the dataflow graph gets compiled down to a set of Samza jobs -- so if my priorities seem strange, it's because I'm motivated by this particular use-case. For example, I sometimes rely on the idea of a producer 'owning' a partition. This is a similar idea to the way a consumer might be the only one in a group to read from a partition: if a producer owns a partition, it means it's the only producer *anywhere* that ever writes to it. (For example, a Samza task owns its checkpoint stream.) This turns out to simplify a lot of reasoning, and it's often not too hard to arrange if you're looking out for it, but a lot of jobs in the wild aren't set up this way. Assuming (for a moment) that our tasks are deterministic, there's a couple strategies that give you exactly-once behaviour: If a task owns the partition it's writing to, this behaviour is pretty easy to get. Periodically, as the task is running, we can 'checkpoint' the offset of the last message we wrote *out*, along with the input offset. If the task fails, we can just look under the output partition and see what the last offset written was, then rewind until the most recent checkpoint before that and continue from there. If the last offset in our output partition is `n`, and the last output offset in our checkpoint is `m`, we know that the next `n-m` messages are duplicates -- so we can just drop them entirely. Even if our task doesn't own its output partition, you can still get exactly-once semantics with a variant on the 'Idempotent Producer' proposal from here: https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer To make this work without upstream Kafka changes, you need to: - Stick the pid / sequence number info in the message itself - Do the deduplication on the consumer side Notice that, if you want a clean topic with no duplicates or metadata, you can follow this with a second job that uses the first strategy to dedup it and write the results into a new topic. The easiest way to extend this to nondeterministic streams is to just make those streams deterministic: you just need to log all your 'choices' before making them visible to the rest of the system, so you can replay them if something goes wrong. For example, if the nondeterminism is from the message arrival order, it's enough to take Chris Riccomini's suggestion and log whenever the MessageChooser chooses a stream. In the worst case -- say, if you have some nondeterministic code that sends messages and mutates state -- you need to capture all output and state changes, and make sure they're all logged together before continuing. Compared to the 'Transactional Messaging' proposal for Kafka, I think this: has similar latency and correctness guarantees, is equally amenable to batching if necessary, sends more data over the wire, requires no new extensions to the messaging layer, and is 'pay-what-you-use' -- in the sense that if only a part of your task is nondeterministic, you only have to capture that particular bit in the log. I understand that many people are not going to care about this kind of fine-grained control, and that a switch to flip that says 'make this arbitrary code I wrote deterministic' is very useful. But it would be cool if the kind of control I'm talking about were possible with Samza, both for people like me and those folks who are willing to work a little harder to get low latency without sacrificing correctness. For me, I think the only thing missing right now is a little more control over the incoming messages. For example, the first exactly-once strategy I outlined above needed to bootstrap itself from a stream of (input offset, output offset) pairs, and then take that most recent input offset and start consuming from *there* -- instead of whatever's in Samza's checkpoint stream. I can't find a way to do this in Samza with acceptable performance. On the other hand, I'm pretty sure a lower-level mechanism that allowed you to 'pull' or 'request' particular input could support both my unusual requirements and the existing checkpoint / message chooser / bootstrap stream machinery on top. I'd be happy to flesh any of this out a bit more, if anyone is interested. If I'm misunderstanding something, I'd also be very happy for any corrections, etc. Oh, also: I'm very impressed with all the work that's gone into Samza, and how seriously it takes fault tolerance and state management -- thanks! -- Ben
