So I had another look at the 'Idempotent Producer' proposal this
afternoon, and made a few notes on how I think they compare; if I've
made any mistakes, I'd be delighted if someone with more context on
the idempotent producer design would correct me.

As a first intuition, you can think of the 'conditional publish'
proposal as the special case of the 'idempotent producer' idea, where
there's just a single producer per-partition. The key observation here
is: if there's only one producer, you can conflate the 'sequence
number' and the expected offset. The conditional publish proposal uses
existing Kafka offset APIs for roughly the same things as the
idempotent producer proposal uses sequence numbers for -- eg. instead
of having a "lease PID" API that returns the current sequence number,
we can use the existing 'offset API' to retrieve the upcoming offset.

Both proposals attempt to deal with the situation where there are
transiently multiple publishers for the same partition (and PID). The
idempotent producer setup tracks a generation id for each pid, and
discards any writes with a generation id smaller than the latest
value. Conditional publish is 'first write wins' -- and instead of
dropping duplicates on the server, it returns an error to the client.
The duplicate-handling behaviour (dropping vs. erroring) has some
interesting consequences:

- If all producers are producing the same stream of messages, silently
dropping duplicates on the server is more convenient. (Suppose we have
a batch of messages 0-9, and the high-water mark on the server is 7.
Idempotent producer, as I read it, would append 7-9 to the partition
and return success; meanwhile, conditional publish would fail the
entire batch.)

- If producers might be writing different streams of messages, the
proposed behaviour of the idempotent producer is probably worse --
since it can silently interleave messages from two different
producers. This can be a problem for some commit-log style use-cases,
since it can transform a valid series of operations into an invalid
one.

- Given the error-on-duplicate behaviour, it's possible to implement
deduplication on the client. (Sketch: if a publish returns an error
for some partition, fetch the upcoming offset / sequence number for
that partition, and discard all messages with a smaller offset on the
client before republishing.)

I think this makes the erroring behaviour more general, though
deduplicating saves a roundtrip or two at conflict time.

I'm less clear about the behaviour of the generation id, or what
happens when (say) two producers with the same generation id are spun
up at the same time. I'd be interested in hearing other folks'
comments on this.

Ewen: I'm not sure I understand the questions well enough to answer
properly, but some quick notes:
- I don't think it makes sense to assign an expected offset without
already having assigned a partition. If the producer code is doing the
partition assignment, it should probably do the offset assignment
too... or we could just let application code handle both.
- I'm not aware of any case where reassigning offsets to messages
automatically after an offset mismatch makes sense: in the cases we've
discussed, it seems like either it's safe to drop duplicates, or we
want to handle the error at the application level.

I'm going to try and come with an idempotent-producer-type example
that works with the draft patch in the next few days, so hopefully
we'll have something more concrete to discuss. Otherwise -- if you
have a clear idea of how eg. sequence number assignment would work in
the idempotent-producer proposal, we could probably translate that
over to get the equivalent for the conditional publish API.



On Fri, Jul 24, 2015 at 2:16 AM, Ewen Cheslack-Postava
<e...@confluent.io> wrote:
> @Becket - for compressed batches, I think this just works out given the KIP
> as described. Without the change you're referring to, it still only makes
> sense to batch messages with this KIP if all the expected offsets are
> sequential (else some messages are guaranteed to fail). I think that
> probably just works out, but raises an issue I brought up on the KIP call.
>
> Batching can be a bit weird with this proposal. If you try to write key A
> and key B, the second operation is dependent on the first. Which means to
> make an effective client for this, we need to keep track of per-partition
> offsets so we can set expected offsets properly. For example, if A was
> expected to publish at offset 10, then if B was published to the same
> partition, we need to make sure it's marked as expected offset 11 (assuming
> no subpartition high water marks). We either need to have the application
> keep track of this itself and set the offsets, which requires that it know
> about how keys map to partitions, or the client needs to manage this
> process. But if the client manages it, I think the client gets quite a bit
> more complicated. If the produce request containing A fails, what happens
> to B? Are there retries that somehow update the expected offset, or do we
> just give up since we know it's always going to fail with the expected
> offset that was automatically assigned to it?
>
> One way to handle this is to use Yasuhiro's idea of increasing the
> granularity of high watermarks using subpartitions. But I guess my question
> is: if one producer client is writing many keys, and some of those keys are
> produced to the same partition, and those messages are batched, what
> happens? Do we end up with lots of failed messages? Or do we have
> complicated logic in the producer to figure out what the right expected
> offset for each message is? Or do they all share the same base expected
> offset as in the compressed case, in which case they all share the same
> fate and subpartitioning doesn't help? Or is there a simpler solution I'm
> just not seeing? Maybe this just disables batching entirely and throughput
> isn't an issue in these cases?
>
> Sorry, I know that's probably not entirely clear, but that's because I'm
> very uncertain of how batching works with this KIP.
>
>
> On how this relates to other proposals: I think it might also be helpful to
> get an overview of all the proposals for relevant modifications to
> producers/produce requests since many of these proposals are possibly
> alternatives (though some may not be mutually exclusive). Many people don't
> have all the context from the past couple of years of the project. Are
> there any other relevant wikis or docs besides the following?
>
> https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
>
> -Ewen
>
> On Wed, Jul 22, 2015 at 11:18 AM, Gwen Shapira <gshap...@cloudera.com>
> wrote:
>
>> Tangent: I think we should complete the move of Produce / Fetch RPC to
>> the client libraries before we add more revisions to this protocol.
>>
>> On Wed, Jul 22, 2015 at 11:02 AM, Jiangjie Qin
>> <j...@linkedin.com.invalid> wrote:
>> > I missed yesterday's KIP hangout. I'm currently working on another KIP
>> for
>> > enriched metadata of messages. Guozhang has already created a wiki page
>> > before (
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
>> ).
>> > We plan to fill the relative offset to the offset field in the batch sent
>> > by producer to avoid broker side re-compression. The message offset would
>> > become batch base offset + relative offset. I guess maybe the expected
>> > offset in KIP-27 can be only set for base offset? Would that affect
>> certain
>> > use cases?
>> >
>> > For Jun's comments, I am not sure I completely get it. I think the
>> producer
>> > only sends one batch per partition in a request. So either that batch is
>> > appended or not. Why a batch would be partially committed?
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> > On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin <b...@kirw.in> wrote:
>> >
>> >> That's a fair point. I've added some imagined job logic to the KIP, so
>> >> we can make sure the proposal stays in sync with the usages we're
>> >> discussing. (The logic is just a quick sketch for now -- I expect I'll
>> >> need to elaborate it as we get into more detail, or to address other
>> >> concerns...)
>> >>
>> >> On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao <j...@confluent.io> wrote:
>> >> > For 1, yes, when there is a transient leader change, it's guaranteed
>> >> that a
>> >> > prefix of the messages in a request will be committed. However, it
>> seems
>> >> > that the client needs to know what subset of messages are committed in
>> >> > order to resume the sending. Then the question is how.
>> >> >
>> >> > As Flavio indicated, for the use cases that you listed, it would be
>> >> useful
>> >> > to figure out the exact logic by using this feature. For example, in
>> the
>> >> > partition K/V store example, when we fail over to a new writer to the
>> >> > commit log, the zombie writer can publish new messages to the log
>> after
>> >> the
>> >> > new writer takes over, but before it publishes any message. We
>> probably
>> >> > need to outline how this case can be handled properly.
>> >> >
>> >> > Thanks,
>> >> >
>> >> > Jun
>> >> >
>> >> > On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin <b...@kirw.in> wrote:
>> >> >
>> >> >> Hi Jun,
>> >> >>
>> >> >> Thanks for the close reading! Responses inline.
>> >> >>
>> >> >> > Thanks for the write-up. The single producer use case you mentioned
>> >> makes
>> >> >> > sense. It would be useful to include that in the KIP wiki.
>> >> >>
>> >> >> Great -- I'll make sure that the wiki is clear about this.
>> >> >>
>> >> >> > 1. What happens when the leader of the partition changes in the
>> middle
>> >> >> of a
>> >> >> > produce request? In this case, the producer client is not sure
>> whether
>> >> >> the
>> >> >> > request succeeds or not. If there is only a single message in the
>> >> >> request,
>> >> >> > the producer can just resend the request. If it sees an
>> OffsetMismatch
>> >> >> > error, it knows that the previous send actually succeeded and can
>> >> proceed
>> >> >> > with the next write. This is nice since it not only allows the
>> >> producer
>> >> >> to
>> >> >> > proceed during transient failures in the broker, it also avoids
>> >> >> duplicates
>> >> >> > during producer resend. One caveat is when there are multiple
>> >> messages in
>> >> >> > the same partition in a produce request. The issue is that in our
>> >> current
>> >> >> > replication protocol, it's possible for some, but not all messages
>> in
>> >> the
>> >> >> > request to be committed. This makes resend a bit harder to deal
>> with
>> >> >> since
>> >> >> > on receiving an OffsetMismatch error, it's not clear which messages
>> >> have
>> >> >> > been committed. One possibility is to expect that compression is
>> >> enabled,
>> >> >> > in which case multiple messages are compressed into a single
>> message.
>> >> I
>> >> >> was
>> >> >> > thinking that another possibility is for the broker to return the
>> >> current
>> >> >> > high watermark when sending an OffsetMismatch error. Based on this
>> >> info,
>> >> >> > the producer can resend the subset of messages that have not been
>> >> >> > committed. However, this may not work in a compacted topic since
>> there
>> >> >> can
>> >> >> > be holes in the offset.
>> >> >>
>> >> >> This is a excellent question. It's my understanding that at least a
>> >> >> *prefix* of messages will be committed (right?) -- which seems to be
>> >> >> enough for many cases. I'll try and come up with a more concrete
>> >> >> answer here.
>> >> >>
>> >> >> > 2. Is this feature only intended to be used with ack = all? The
>> client
>> >> >> > doesn't get the offset with ack = 0. With ack = 1, it's possible
>> for a
>> >> >> > previously acked message to be lost during leader transition, which
>> >> will
>> >> >> > make the client logic more complicated.
>> >> >>
>> >> >> It's true that acks = 0 doesn't seem to be particularly useful; in
>> all
>> >> >> the cases I've come across, the client eventually wants to know about
>> >> >> the mismatch error. However, it seems like there are some cases where
>> >> >> acks = 1 would be fine -- eg. in a bulk load of a fixed dataset,
>> >> >> losing messages during a leader transition just means you need to
>> >> >> rewind / restart the load, which is not especially catastrophic. For
>> >> >> many other interesting cases, acks = all is probably preferable.
>> >> >>
>> >> >> > 3. How does the producer client know the offset to send the first
>> >> >> message?
>> >> >> > Do we need to expose an API in producer to get the current high
>> >> >> watermark?
>> >> >>
>> >> >> You're right, it might be irritating to have to go through the
>> >> >> consumer API just for this. There are some cases where the offsets
>> are
>> >> >> already available -- like the commit-log-for-KV-store example -- but
>> >> >> in general, being able to get the offsets from the producer interface
>> >> >> does sound convenient.
>> >> >>
>> >> >> > We plan to have a KIP discussion meeting tomorrow at 11am PST.
>> Perhaps
>> >> >> you
>> >> >> > can describe this KIP a bit then?
>> >> >>
>> >> >> Sure, happy to join.
>> >> >>
>> >> >> > Thanks,
>> >> >> >
>> >> >> > Jun
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin <b...@kirw.in> wrote:
>> >> >> >
>> >> >> >> Just wanted to flag a little discussion that happened on the
>> ticket:
>> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259
>> >> >> >>
>> >> >> >> In particular, Yasuhiro Matsuda proposed an interesting variant on
>> >> >> >> this that performs the offset check on the message key (instead of
>> >> >> >> just the partition), with bounded space requirements, at the cost
>> of
>> >> >> >> potentially some spurious failures. (ie. the produce request may
>> fail
>> >> >> >> even if that particular key hasn't been updated recently.) This
>> >> >> >> addresses a couple of the drawbacks of the per-key approach
>> mentioned
>> >> >> >> at the bottom of the KIP.
>> >> >> >>
>> >> >> >> On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin <b...@kirw.in> wrote:
>> >> >> >> > Hi all,
>> >> >> >> >
>> >> >> >> > So, perhaps it's worth adding a couple specific examples of
>> where
>> >> this
>> >> >> >> > feature is useful, to make this a bit more concrete:
>> >> >> >> >
>> >> >> >> > - Suppose I'm using Kafka as a commit log for a partitioned KV
>> >> store,
>> >> >> >> > like Samza or Pistachio (?) do. We bootstrap the process state
>> by
>> >> >> >> > reading from that partition, and log all state updates to that
>> >> >> >> > partition when we're running. Now imagine that one of my
>> processes
>> >> >> >> > locks up -- GC or similar -- and the system transitions that
>> >> partition
>> >> >> >> > over to another node. When the GC is finished, the old 'owner'
>> of
>> >> that
>> >> >> >> > partition might still be trying to write to the commit log at
>> the
>> >> same
>> >> >> >> > as the new one is. A process might detect this by noticing that
>> the
>> >> >> >> > offset of the published message is bigger than it thought the
>> >> upcoming
>> >> >> >> > offset was, which implies someone else has been writing to the
>> >> log...
>> >> >> >> > but by then it's too late, and the commit log is already
>> corrupt.
>> >> With
>> >> >> >> > a 'conditional produce', one of those processes will have it's
>> >> publish
>> >> >> >> > request refused -- so we've avoided corrupting the state.
>> >> >> >> >
>> >> >> >> > - Envision some copycat-like system, where we have some sharded
>> >> >> >> > postgres setup and we're tailing each shard into its own
>> partition.
>> >> >> >> > Normally, it's fairly easy to avoid duplicates here: we can
>> track
>> >> >> >> > which offset in the WAL corresponds to which offset in Kafka,
>> and
>> >> we
>> >> >> >> > know how many messages we've written to Kafka already, so the
>> >> state is
>> >> >> >> > very simple. However, it is possible that for a moment -- due to
>> >> >> >> > rebalancing or operator error or some other thing -- two
>> different
>> >> >> >> > nodes are tailing the same postgres shard at once! Normally this
>> >> would
>> >> >> >> > introduce duplicate messages, but by specifying the expected
>> >> offset,
>> >> >> >> > we can avoid this.
>> >> >> >> >
>> >> >> >> > So perhaps it's better to say that this is useful when a single
>> >> >> >> > producer is *expected*, but multiple producers are *possible*?
>> (In
>> >> the
>> >> >> >> > same way that the high-level consumer normally has 1 consumer
>> in a
>> >> >> >> > group reading from a partition, but there are small windows
>> where
>> >> more
>> >> >> >> > than one might be reading at the same time.) This is also the
>> >> spirit
>> >> >> >> > of the 'runtime cost' comment -- in the common case, where
>> there is
>> >> >> >> > little to no contention, there's no performance overhead
>> either. I
>> >> >> >> > mentioned this a little in the Motivation section -- maybe I
>> should
>> >> >> >> > flesh that out a little bit?
>> >> >> >> >
>> >> >> >> > For me, the motivation to work this up was that I kept running
>> into
>> >> >> >> > cases, like the above, where the existing API was
>> >> almost-but-not-quite
>> >> >> >> > enough to give the guarantees I was looking for -- and the
>> >> extension
>> >> >> >> > needed to handle those cases too was pretty small and
>> >> natural-feeling.
>> >> >> >> >
>> >> >> >> > On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh <
>> asi...@cloudera.com
>> >> >
>> >> >> >> wrote:
>> >> >> >> >> Good concept. I have a question though.
>> >> >> >> >>
>> >> >> >> >> Say there are two producers A and B. Both producers are
>> producing
>> >> to
>> >> >> >> same
>> >> >> >> >> partition.
>> >> >> >> >> - A sends a message with expected offset, x1
>> >> >> >> >> - Broker accepts is and sends an Ack
>> >> >> >> >> - B sends a message with expected offset, x1
>> >> >> >> >> - Broker rejects it, sends nack
>> >> >> >> >> - B sends message again with expected offset, x1+1
>> >> >> >> >> - Broker accepts it and sends Ack
>> >> >> >> >> I guess this is what this KIP suggests, right? If yes, then how
>> >> does
>> >> >> >> this
>> >> >> >> >> ensure that same message will not be written twice when two
>> >> producers
>> >> >> >> are
>> >> >> >> >> producing to same partition? Producer on receiving a nack will
>> try
>> >> >> again
>> >> >> >> >> with next offset and will keep doing so till the message is
>> >> accepted.
>> >> >> >> Am I
>> >> >> >> >> missing something?
>> >> >> >> >>
>> >> >> >> >> Also, you have mentioned on KIP, "it imposes little to no
>> runtime
>> >> >> cost
>> >> >> >> in
>> >> >> >> >> memory or time", I think that is not true for time. With this
>> >> >> approach
>> >> >> >> >> producers' performance will reduce proportionally to number of
>> >> >> producers
>> >> >> >> >> writing to same partition. Please correct me if I am missing
>> out
>> >> >> >> something.
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >> On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat <
>> >> >> >> >> gharatmayures...@gmail.com> wrote:
>> >> >> >> >>
>> >> >> >> >>> If we have 2 producers producing to a partition, they can be
>> out
>> >> of
>> >> >> >> order,
>> >> >> >> >>> then how does one producer know what offset to expect as it
>> does
>> >> not
>> >> >> >> >>> interact with other producer?
>> >> >> >> >>>
>> >> >> >> >>> Can you give an example flow that explains how it works with
>> >> single
>> >> >> >> >>> producer and with multiple producers?
>> >> >> >> >>>
>> >> >> >> >>>
>> >> >> >> >>> Thanks,
>> >> >> >> >>>
>> >> >> >> >>> Mayuresh
>> >> >> >> >>>
>> >> >> >> >>> On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira <
>> >> >> >> >>> fpjunque...@yahoo.com.invalid> wrote:
>> >> >> >> >>>
>> >> >> >> >>> > I like this feature, it reminds me of conditional updates in
>> >> >> >> zookeeper.
>> >> >> >> >>> > I'm not sure if it'd be best to have some mechanism for
>> fencing
>> >> >> >> rather
>> >> >> >> >>> than
>> >> >> >> >>> > a conditional write like you're proposing. The reason I'm
>> >> saying
>> >> >> >> this is
>> >> >> >> >>> > that the conditional write applies to requests individually,
>> >> >> while it
>> >> >> >> >>> > sounds like you want to make sure that there is a single
>> client
>> >> >> >> writing
>> >> >> >> >>> so
>> >> >> >> >>> > over multiple requests.
>> >> >> >> >>> >
>> >> >> >> >>> > -Flavio
>> >> >> >> >>> >
>> >> >> >> >>> > > On 17 Jul 2015, at 07:30, Ben Kirwin <b...@kirw.in> wrote:
>> >> >> >> >>> > >
>> >> >> >> >>> > > Hi there,
>> >> >> >> >>> > >
>> >> >> >> >>> > > I just added a KIP for a 'conditional publish' operation:
>> a
>> >> >> simple
>> >> >> >> >>> > > CAS-like mechanism for the Kafka producer. The wiki page
>> is
>> >> >> here:
>> >> >> >> >>> > >
>> >> >> >> >>> > >
>> >> >> >> >>> >
>> >> >> >> >>>
>> >> >> >>
>> >> >>
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish
>> >> >> >> >>> > >
>> >> >> >> >>> > > And there's some previous discussion on the ticket and the
>> >> users
>> >> >> >> list:
>> >> >> >> >>> > >
>> >> >> >> >>> > > https://issues.apache.org/jira/browse/KAFKA-2260
>> >> >> >> >>> > >
>> >> >> >> >>> > >
>> >> >> >> >>> >
>> >> >> >> >>>
>> >> >> >>
>> >> >>
>> >>
>> https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E
>> >> >> >> >>> > >
>> >> >> >> >>> > > As always, comments and suggestions are very welcome.
>> >> >> >> >>> > >
>> >> >> >> >>> > > Thanks,
>> >> >> >> >>> > > Ben
>> >> >> >> >>> >
>> >> >> >> >>> >
>> >> >> >> >>>
>> >> >> >> >>>
>> >> >> >> >>> --
>> >> >> >> >>> -Regards,
>> >> >> >> >>> Mayuresh R. Gharat
>> >> >> >> >>> (862) 250-7125
>> >> >> >> >>>
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >> --
>> >> >> >> >>
>> >> >> >> >> Regards,
>> >> >> >> >> Ashish
>> >> >> >>
>> >> >>
>> >>
>>
>
>
>
> --
> Thanks,
> Ewen

Reply via email to