In KV store usage, all instances are writers, aren't they? There is no
leader or master, thus there is no fail over. The offset based CAS ensures
an update is based on the latest value and doesn't care who is writing the
new value.

I think the idea of the offset based CAS is great. I think it works very
well with Event Sourcing. It may be a bit weak for ensuring the single
writer though.


On Tue, Jul 21, 2015 at 8:45 AM, Jun Rao <j...@confluent.io> wrote:

> For 1, yes, when there is a transient leader change, it's guaranteed that a
> prefix of the messages in a request will be committed. However, it seems
> that the client needs to know what subset of messages are committed in
> order to resume the sending. Then the question is how.
>
> As Flavio indicated, for the use cases that you listed, it would be useful
> to figure out the exact logic by using this feature. For example, in the
> partition K/V store example, when we fail over to a new writer to the
> commit log, the zombie writer can publish new messages to the log after the
> new writer takes over, but before it publishes any message. We probably
> need to outline how this case can be handled properly.
>
> Thanks,
>
> Jun
>
> On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin <b...@kirw.in> wrote:
>
> > Hi Jun,
> >
> > Thanks for the close reading! Responses inline.
> >
> > > Thanks for the write-up. The single producer use case you mentioned
> makes
> > > sense. It would be useful to include that in the KIP wiki.
> >
> > Great -- I'll make sure that the wiki is clear about this.
> >
> > > 1. What happens when the leader of the partition changes in the middle
> > of a
> > > produce request? In this case, the producer client is not sure whether
> > the
> > > request succeeds or not. If there is only a single message in the
> > request,
> > > the producer can just resend the request. If it sees an OffsetMismatch
> > > error, it knows that the previous send actually succeeded and can
> proceed
> > > with the next write. This is nice since it not only allows the producer
> > to
> > > proceed during transient failures in the broker, it also avoids
> > duplicates
> > > during producer resend. One caveat is when there are multiple messages
> in
> > > the same partition in a produce request. The issue is that in our
> current
> > > replication protocol, it's possible for some, but not all messages in
> the
> > > request to be committed. This makes resend a bit harder to deal with
> > since
> > > on receiving an OffsetMismatch error, it's not clear which messages
> have
> > > been committed. One possibility is to expect that compression is
> enabled,
> > > in which case multiple messages are compressed into a single message. I
> > was
> > > thinking that another possibility is for the broker to return the
> current
> > > high watermark when sending an OffsetMismatch error. Based on this
> info,
> > > the producer can resend the subset of messages that have not been
> > > committed. However, this may not work in a compacted topic since there
> > can
> > > be holes in the offset.
> >
> > This is a excellent question. It's my understanding that at least a
> > *prefix* of messages will be committed (right?) -- which seems to be
> > enough for many cases. I'll try and come up with a more concrete
> > answer here.
> >
> > > 2. Is this feature only intended to be used with ack = all? The client
> > > doesn't get the offset with ack = 0. With ack = 1, it's possible for a
> > > previously acked message to be lost during leader transition, which
> will
> > > make the client logic more complicated.
> >
> > It's true that acks = 0 doesn't seem to be particularly useful; in all
> > the cases I've come across, the client eventually wants to know about
> > the mismatch error. However, it seems like there are some cases where
> > acks = 1 would be fine -- eg. in a bulk load of a fixed dataset,
> > losing messages during a leader transition just means you need to
> > rewind / restart the load, which is not especially catastrophic. For
> > many other interesting cases, acks = all is probably preferable.
> >
> > > 3. How does the producer client know the offset to send the first
> > message?
> > > Do we need to expose an API in producer to get the current high
> > watermark?
> >
> > You're right, it might be irritating to have to go through the
> > consumer API just for this. There are some cases where the offsets are
> > already available -- like the commit-log-for-KV-store example -- but
> > in general, being able to get the offsets from the producer interface
> > does sound convenient.
> >
> > > We plan to have a KIP discussion meeting tomorrow at 11am PST. Perhaps
> > you
> > > can describe this KIP a bit then?
> >
> > Sure, happy to join.
> >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > >
> > > On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin <b...@kirw.in> wrote:
> > >
> > >> Just wanted to flag a little discussion that happened on the ticket:
> > >>
> > >>
> >
> https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259
> > >>
> > >> In particular, Yasuhiro Matsuda proposed an interesting variant on
> > >> this that performs the offset check on the message key (instead of
> > >> just the partition), with bounded space requirements, at the cost of
> > >> potentially some spurious failures. (ie. the produce request may fail
> > >> even if that particular key hasn't been updated recently.) This
> > >> addresses a couple of the drawbacks of the per-key approach mentioned
> > >> at the bottom of the KIP.
> > >>
> > >> On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin <b...@kirw.in> wrote:
> > >> > Hi all,
> > >> >
> > >> > So, perhaps it's worth adding a couple specific examples of where
> this
> > >> > feature is useful, to make this a bit more concrete:
> > >> >
> > >> > - Suppose I'm using Kafka as a commit log for a partitioned KV
> store,
> > >> > like Samza or Pistachio (?) do. We bootstrap the process state by
> > >> > reading from that partition, and log all state updates to that
> > >> > partition when we're running. Now imagine that one of my processes
> > >> > locks up -- GC or similar -- and the system transitions that
> partition
> > >> > over to another node. When the GC is finished, the old 'owner' of
> that
> > >> > partition might still be trying to write to the commit log at the
> same
> > >> > as the new one is. A process might detect this by noticing that the
> > >> > offset of the published message is bigger than it thought the
> upcoming
> > >> > offset was, which implies someone else has been writing to the
> log...
> > >> > but by then it's too late, and the commit log is already corrupt.
> With
> > >> > a 'conditional produce', one of those processes will have it's
> publish
> > >> > request refused -- so we've avoided corrupting the state.
> > >> >
> > >> > - Envision some copycat-like system, where we have some sharded
> > >> > postgres setup and we're tailing each shard into its own partition.
> > >> > Normally, it's fairly easy to avoid duplicates here: we can track
> > >> > which offset in the WAL corresponds to which offset in Kafka, and we
> > >> > know how many messages we've written to Kafka already, so the state
> is
> > >> > very simple. However, it is possible that for a moment -- due to
> > >> > rebalancing or operator error or some other thing -- two different
> > >> > nodes are tailing the same postgres shard at once! Normally this
> would
> > >> > introduce duplicate messages, but by specifying the expected offset,
> > >> > we can avoid this.
> > >> >
> > >> > So perhaps it's better to say that this is useful when a single
> > >> > producer is *expected*, but multiple producers are *possible*? (In
> the
> > >> > same way that the high-level consumer normally has 1 consumer in a
> > >> > group reading from a partition, but there are small windows where
> more
> > >> > than one might be reading at the same time.) This is also the spirit
> > >> > of the 'runtime cost' comment -- in the common case, where there is
> > >> > little to no contention, there's no performance overhead either. I
> > >> > mentioned this a little in the Motivation section -- maybe I should
> > >> > flesh that out a little bit?
> > >> >
> > >> > For me, the motivation to work this up was that I kept running into
> > >> > cases, like the above, where the existing API was
> almost-but-not-quite
> > >> > enough to give the guarantees I was looking for -- and the extension
> > >> > needed to handle those cases too was pretty small and
> natural-feeling.
> > >> >
> > >> > On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh <asi...@cloudera.com>
> > >> wrote:
> > >> >> Good concept. I have a question though.
> > >> >>
> > >> >> Say there are two producers A and B. Both producers are producing
> to
> > >> same
> > >> >> partition.
> > >> >> - A sends a message with expected offset, x1
> > >> >> - Broker accepts is and sends an Ack
> > >> >> - B sends a message with expected offset, x1
> > >> >> - Broker rejects it, sends nack
> > >> >> - B sends message again with expected offset, x1+1
> > >> >> - Broker accepts it and sends Ack
> > >> >> I guess this is what this KIP suggests, right? If yes, then how
> does
> > >> this
> > >> >> ensure that same message will not be written twice when two
> producers
> > >> are
> > >> >> producing to same partition? Producer on receiving a nack will try
> > again
> > >> >> with next offset and will keep doing so till the message is
> accepted.
> > >> Am I
> > >> >> missing something?
> > >> >>
> > >> >> Also, you have mentioned on KIP, "it imposes little to no runtime
> > cost
> > >> in
> > >> >> memory or time", I think that is not true for time. With this
> > approach
> > >> >> producers' performance will reduce proportionally to number of
> > producers
> > >> >> writing to same partition. Please correct me if I am missing out
> > >> something.
> > >> >>
> > >> >>
> > >> >> On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat <
> > >> >> gharatmayures...@gmail.com> wrote:
> > >> >>
> > >> >>> If we have 2 producers producing to a partition, they can be out
> of
> > >> order,
> > >> >>> then how does one producer know what offset to expect as it does
> not
> > >> >>> interact with other producer?
> > >> >>>
> > >> >>> Can you give an example flow that explains how it works with
> single
> > >> >>> producer and with multiple producers?
> > >> >>>
> > >> >>>
> > >> >>> Thanks,
> > >> >>>
> > >> >>> Mayuresh
> > >> >>>
> > >> >>> On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira <
> > >> >>> fpjunque...@yahoo.com.invalid> wrote:
> > >> >>>
> > >> >>> > I like this feature, it reminds me of conditional updates in
> > >> zookeeper.
> > >> >>> > I'm not sure if it'd be best to have some mechanism for fencing
> > >> rather
> > >> >>> than
> > >> >>> > a conditional write like you're proposing. The reason I'm saying
> > >> this is
> > >> >>> > that the conditional write applies to requests individually,
> > while it
> > >> >>> > sounds like you want to make sure that there is a single client
> > >> writing
> > >> >>> so
> > >> >>> > over multiple requests.
> > >> >>> >
> > >> >>> > -Flavio
> > >> >>> >
> > >> >>> > > On 17 Jul 2015, at 07:30, Ben Kirwin <b...@kirw.in> wrote:
> > >> >>> > >
> > >> >>> > > Hi there,
> > >> >>> > >
> > >> >>> > > I just added a KIP for a 'conditional publish' operation: a
> > simple
> > >> >>> > > CAS-like mechanism for the Kafka producer. The wiki page is
> > here:
> > >> >>> > >
> > >> >>> > >
> > >> >>> >
> > >> >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish
> > >> >>> > >
> > >> >>> > > And there's some previous discussion on the ticket and the
> users
> > >> list:
> > >> >>> > >
> > >> >>> > > https://issues.apache.org/jira/browse/KAFKA-2260
> > >> >>> > >
> > >> >>> > >
> > >> >>> >
> > >> >>>
> > >>
> >
> https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E
> > >> >>> > >
> > >> >>> > > As always, comments and suggestions are very welcome.
> > >> >>> > >
> > >> >>> > > Thanks,
> > >> >>> > > Ben
> > >> >>> >
> > >> >>> >
> > >> >>>
> > >> >>>
> > >> >>> --
> > >> >>> -Regards,
> > >> >>> Mayuresh R. Gharat
> > >> >>> (862) 250-7125
> > >> >>>
> > >> >>
> > >> >>
> > >> >>
> > >> >> --
> > >> >>
> > >> >> Regards,
> > >> >> Ashish
> > >>
> >
>

Reply via email to