Hi Ben,

Thanks for your thoughts! A few quick replies:

On 29 Sep 2014, at 20:41, Ben Kirwin <[email protected]> wrote:
> Some context: I've been experimenting with the design a high-level
> dataflowy streaming framework, built around Kafka's partitioned logs. I've
> been looking at using Samza as a backend for this, where the dataflow graph
> gets compiled down to a set of Samza jobs -- so if my priorities seem
> strange, it's because I'm motivated by this particular use-case.

That sounds fun. Would love to hear more about your framework when you're ready 
to share.

> For example, I sometimes rely on the idea of a producer 'owning' a
> partition. This is a similar idea to the way a consumer might be the only
> one in a group to read from a partition: if a producer owns a partition, it
> means it's the only producer *anywhere* that ever writes to it. (For
> example, a Samza task owns its checkpoint stream.)

In https://issues.apache.org/jira/browse/SAMZA-300 we discussed the idea of 
"write locks" on streams, which could be used to enforce such exclusive 
ownership. Nothing implemented yet, though.

> Assuming (for a moment) that our tasks are deterministic, there's a couple
> strategies that give you exactly-once behaviour:
> 
> If a task owns the partition it's writing to, this behaviour is pretty easy
> to get. Periodically, as the task is running, we can 'checkpoint' the
> offset of the last message we wrote *out*, along with the input offset.

This is quite like what we had in mind with Idempotent Producer (before the 
idea of idempotent producer got generalised into transactions in Kafka).

Our tendency has been that if Kafka is providing some facility, it would be 
better for Samza to use that facility, rather than to implement its own version 
of the same. Of course that runs counter to wanting to support other message 
brokers, and also we're deviating from that principle in some places (even when 
Kafka adds support for tracking consumer offsets, Samza will probably keep its 
own checkpointing implementation).

That's just saying: it's not obvious whether Samza should be implementing 
something like idempotency, or whether it's best left to the messaging layer. 
There are good arguments in either direction.

> To make this work without upstream Kafka changes, you need to:
> - Stick the pid / sequence number info in the message itself
> - Do the deduplication on the consumer side
> 
> Notice that, if you want a clean topic with no duplicates or metadata, you
> can follow this with a second job that uses the first strategy to dedup it
> and write the results into a new topic.

This is an interesting idea. Need to think more about it.

> The easiest way to extend this to nondeterministic streams is to just make
> those streams deterministic: you just need to log all your 'choices' before
> making them visible to the rest of the system, so you can replay them if
> something goes wrong. For example, if the nondeterminism is from the
> message arrival order, it's enough to take Chris Riccomini's suggestion and
> log whenever the MessageChooser chooses a stream. In the worst case -- say,
> if you have some nondeterministic code that sends messages and mutates
> state -- you need to capture all output and state changes, and make sure
> they're all logged together before continuing.

I don't quite follow. What would that logging look like (assuming it involves 
requests to external systems like a remote database)? Would all external 
communication have to go through some kind of logging layer?

Nondeterminism can also come from stuff like using the system clock, random 
number generator, order of iteration in a hash table, etc. You can say "just 
don't do that" (and indeed some frameworks do), but it's hard to enforce, and 
it would be hard for users to understand why a single Math.random() throws off 
the correctness of their entire job. Given that the intention of exactly-once 
semantics is to simplify the programming abstraction, I'm not totally convinced 
that's a win.

> For me, I think the only thing missing right now is a little more control
> over the incoming messages. For example, the first exactly-once strategy I
> outlined above needed to bootstrap itself from a stream of (input offset,
> output offset) pairs, and then take that most recent input offset and start
> consuming from *there* -- instead of whatever's in Samza's checkpoint
> stream.

Could you perhaps do this by implementing a CheckpointManager (steal the 
implementation from the Kafka checkpoint manager, but change it to suit your 
needs)?

> On the other hand, I'm pretty sure a lower-level mechanism that allowed you
> to 'pull' or 'request' particular input could support both my unusual
> requirements and the existing checkpoint / message chooser / bootstrap
> stream machinery on top.

Something like this was requested on 
https://issues.apache.org/jira/browse/SAMZA-255 -- if that is what you're 
looking for, could you explain your use case on that issue please? It's always 
possible to add features if there's a compelling use case, we just want to make 
sure they are well thought out.

Best,
Martin

Reply via email to