Tangent: I think we should complete the move of Produce / Fetch RPC to
the client libraries before we add more revisions to this protocol.

On Wed, Jul 22, 2015 at 11:02 AM, Jiangjie Qin
<j...@linkedin.com.invalid> wrote:
> I missed yesterday's KIP hangout. I'm currently working on another KIP for
> enriched metadata of messages. Guozhang has already created a wiki page
> before (
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata).
> We plan to fill the relative offset to the offset field in the batch sent
> by producer to avoid broker side re-compression. The message offset would
> become batch base offset + relative offset. I guess maybe the expected
> offset in KIP-27 can be only set for base offset? Would that affect certain
> use cases?
>
> For Jun's comments, I am not sure I completely get it. I think the producer
> only sends one batch per partition in a request. So either that batch is
> appended or not. Why a batch would be partially committed?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin <b...@kirw.in> wrote:
>
>> That's a fair point. I've added some imagined job logic to the KIP, so
>> we can make sure the proposal stays in sync with the usages we're
>> discussing. (The logic is just a quick sketch for now -- I expect I'll
>> need to elaborate it as we get into more detail, or to address other
>> concerns...)
>>
>> On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao <j...@confluent.io> wrote:
>> > For 1, yes, when there is a transient leader change, it's guaranteed
>> that a
>> > prefix of the messages in a request will be committed. However, it seems
>> > that the client needs to know what subset of messages are committed in
>> > order to resume the sending. Then the question is how.
>> >
>> > As Flavio indicated, for the use cases that you listed, it would be
>> useful
>> > to figure out the exact logic by using this feature. For example, in the
>> > partition K/V store example, when we fail over to a new writer to the
>> > commit log, the zombie writer can publish new messages to the log after
>> the
>> > new writer takes over, but before it publishes any message. We probably
>> > need to outline how this case can be handled properly.
>> >
>> > Thanks,
>> >
>> > Jun
>> >
>> > On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin <b...@kirw.in> wrote:
>> >
>> >> Hi Jun,
>> >>
>> >> Thanks for the close reading! Responses inline.
>> >>
>> >> > Thanks for the write-up. The single producer use case you mentioned
>> makes
>> >> > sense. It would be useful to include that in the KIP wiki.
>> >>
>> >> Great -- I'll make sure that the wiki is clear about this.
>> >>
>> >> > 1. What happens when the leader of the partition changes in the middle
>> >> of a
>> >> > produce request? In this case, the producer client is not sure whether
>> >> the
>> >> > request succeeds or not. If there is only a single message in the
>> >> request,
>> >> > the producer can just resend the request. If it sees an OffsetMismatch
>> >> > error, it knows that the previous send actually succeeded and can
>> proceed
>> >> > with the next write. This is nice since it not only allows the
>> producer
>> >> to
>> >> > proceed during transient failures in the broker, it also avoids
>> >> duplicates
>> >> > during producer resend. One caveat is when there are multiple
>> messages in
>> >> > the same partition in a produce request. The issue is that in our
>> current
>> >> > replication protocol, it's possible for some, but not all messages in
>> the
>> >> > request to be committed. This makes resend a bit harder to deal with
>> >> since
>> >> > on receiving an OffsetMismatch error, it's not clear which messages
>> have
>> >> > been committed. One possibility is to expect that compression is
>> enabled,
>> >> > in which case multiple messages are compressed into a single message.
>> I
>> >> was
>> >> > thinking that another possibility is for the broker to return the
>> current
>> >> > high watermark when sending an OffsetMismatch error. Based on this
>> info,
>> >> > the producer can resend the subset of messages that have not been
>> >> > committed. However, this may not work in a compacted topic since there
>> >> can
>> >> > be holes in the offset.
>> >>
>> >> This is a excellent question. It's my understanding that at least a
>> >> *prefix* of messages will be committed (right?) -- which seems to be
>> >> enough for many cases. I'll try and come up with a more concrete
>> >> answer here.
>> >>
>> >> > 2. Is this feature only intended to be used with ack = all? The client
>> >> > doesn't get the offset with ack = 0. With ack = 1, it's possible for a
>> >> > previously acked message to be lost during leader transition, which
>> will
>> >> > make the client logic more complicated.
>> >>
>> >> It's true that acks = 0 doesn't seem to be particularly useful; in all
>> >> the cases I've come across, the client eventually wants to know about
>> >> the mismatch error. However, it seems like there are some cases where
>> >> acks = 1 would be fine -- eg. in a bulk load of a fixed dataset,
>> >> losing messages during a leader transition just means you need to
>> >> rewind / restart the load, which is not especially catastrophic. For
>> >> many other interesting cases, acks = all is probably preferable.
>> >>
>> >> > 3. How does the producer client know the offset to send the first
>> >> message?
>> >> > Do we need to expose an API in producer to get the current high
>> >> watermark?
>> >>
>> >> You're right, it might be irritating to have to go through the
>> >> consumer API just for this. There are some cases where the offsets are
>> >> already available -- like the commit-log-for-KV-store example -- but
>> >> in general, being able to get the offsets from the producer interface
>> >> does sound convenient.
>> >>
>> >> > We plan to have a KIP discussion meeting tomorrow at 11am PST. Perhaps
>> >> you
>> >> > can describe this KIP a bit then?
>> >>
>> >> Sure, happy to join.
>> >>
>> >> > Thanks,
>> >> >
>> >> > Jun
>> >> >
>> >> >
>> >> >
>> >> > On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin <b...@kirw.in> wrote:
>> >> >
>> >> >> Just wanted to flag a little discussion that happened on the ticket:
>> >> >>
>> >> >>
>> >>
>> https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259
>> >> >>
>> >> >> In particular, Yasuhiro Matsuda proposed an interesting variant on
>> >> >> this that performs the offset check on the message key (instead of
>> >> >> just the partition), with bounded space requirements, at the cost of
>> >> >> potentially some spurious failures. (ie. the produce request may fail
>> >> >> even if that particular key hasn't been updated recently.) This
>> >> >> addresses a couple of the drawbacks of the per-key approach mentioned
>> >> >> at the bottom of the KIP.
>> >> >>
>> >> >> On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin <b...@kirw.in> wrote:
>> >> >> > Hi all,
>> >> >> >
>> >> >> > So, perhaps it's worth adding a couple specific examples of where
>> this
>> >> >> > feature is useful, to make this a bit more concrete:
>> >> >> >
>> >> >> > - Suppose I'm using Kafka as a commit log for a partitioned KV
>> store,
>> >> >> > like Samza or Pistachio (?) do. We bootstrap the process state by
>> >> >> > reading from that partition, and log all state updates to that
>> >> >> > partition when we're running. Now imagine that one of my processes
>> >> >> > locks up -- GC or similar -- and the system transitions that
>> partition
>> >> >> > over to another node. When the GC is finished, the old 'owner' of
>> that
>> >> >> > partition might still be trying to write to the commit log at the
>> same
>> >> >> > as the new one is. A process might detect this by noticing that the
>> >> >> > offset of the published message is bigger than it thought the
>> upcoming
>> >> >> > offset was, which implies someone else has been writing to the
>> log...
>> >> >> > but by then it's too late, and the commit log is already corrupt.
>> With
>> >> >> > a 'conditional produce', one of those processes will have it's
>> publish
>> >> >> > request refused -- so we've avoided corrupting the state.
>> >> >> >
>> >> >> > - Envision some copycat-like system, where we have some sharded
>> >> >> > postgres setup and we're tailing each shard into its own partition.
>> >> >> > Normally, it's fairly easy to avoid duplicates here: we can track
>> >> >> > which offset in the WAL corresponds to which offset in Kafka, and
>> we
>> >> >> > know how many messages we've written to Kafka already, so the
>> state is
>> >> >> > very simple. However, it is possible that for a moment -- due to
>> >> >> > rebalancing or operator error or some other thing -- two different
>> >> >> > nodes are tailing the same postgres shard at once! Normally this
>> would
>> >> >> > introduce duplicate messages, but by specifying the expected
>> offset,
>> >> >> > we can avoid this.
>> >> >> >
>> >> >> > So perhaps it's better to say that this is useful when a single
>> >> >> > producer is *expected*, but multiple producers are *possible*? (In
>> the
>> >> >> > same way that the high-level consumer normally has 1 consumer in a
>> >> >> > group reading from a partition, but there are small windows where
>> more
>> >> >> > than one might be reading at the same time.) This is also the
>> spirit
>> >> >> > of the 'runtime cost' comment -- in the common case, where there is
>> >> >> > little to no contention, there's no performance overhead either. I
>> >> >> > mentioned this a little in the Motivation section -- maybe I should
>> >> >> > flesh that out a little bit?
>> >> >> >
>> >> >> > For me, the motivation to work this up was that I kept running into
>> >> >> > cases, like the above, where the existing API was
>> almost-but-not-quite
>> >> >> > enough to give the guarantees I was looking for -- and the
>> extension
>> >> >> > needed to handle those cases too was pretty small and
>> natural-feeling.
>> >> >> >
>> >> >> > On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh <asi...@cloudera.com
>> >
>> >> >> wrote:
>> >> >> >> Good concept. I have a question though.
>> >> >> >>
>> >> >> >> Say there are two producers A and B. Both producers are producing
>> to
>> >> >> same
>> >> >> >> partition.
>> >> >> >> - A sends a message with expected offset, x1
>> >> >> >> - Broker accepts is and sends an Ack
>> >> >> >> - B sends a message with expected offset, x1
>> >> >> >> - Broker rejects it, sends nack
>> >> >> >> - B sends message again with expected offset, x1+1
>> >> >> >> - Broker accepts it and sends Ack
>> >> >> >> I guess this is what this KIP suggests, right? If yes, then how
>> does
>> >> >> this
>> >> >> >> ensure that same message will not be written twice when two
>> producers
>> >> >> are
>> >> >> >> producing to same partition? Producer on receiving a nack will try
>> >> again
>> >> >> >> with next offset and will keep doing so till the message is
>> accepted.
>> >> >> Am I
>> >> >> >> missing something?
>> >> >> >>
>> >> >> >> Also, you have mentioned on KIP, "it imposes little to no runtime
>> >> cost
>> >> >> in
>> >> >> >> memory or time", I think that is not true for time. With this
>> >> approach
>> >> >> >> producers' performance will reduce proportionally to number of
>> >> producers
>> >> >> >> writing to same partition. Please correct me if I am missing out
>> >> >> something.
>> >> >> >>
>> >> >> >>
>> >> >> >> On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat <
>> >> >> >> gharatmayures...@gmail.com> wrote:
>> >> >> >>
>> >> >> >>> If we have 2 producers producing to a partition, they can be out
>> of
>> >> >> order,
>> >> >> >>> then how does one producer know what offset to expect as it does
>> not
>> >> >> >>> interact with other producer?
>> >> >> >>>
>> >> >> >>> Can you give an example flow that explains how it works with
>> single
>> >> >> >>> producer and with multiple producers?
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> Thanks,
>> >> >> >>>
>> >> >> >>> Mayuresh
>> >> >> >>>
>> >> >> >>> On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira <
>> >> >> >>> fpjunque...@yahoo.com.invalid> wrote:
>> >> >> >>>
>> >> >> >>> > I like this feature, it reminds me of conditional updates in
>> >> >> zookeeper.
>> >> >> >>> > I'm not sure if it'd be best to have some mechanism for fencing
>> >> >> rather
>> >> >> >>> than
>> >> >> >>> > a conditional write like you're proposing. The reason I'm
>> saying
>> >> >> this is
>> >> >> >>> > that the conditional write applies to requests individually,
>> >> while it
>> >> >> >>> > sounds like you want to make sure that there is a single client
>> >> >> writing
>> >> >> >>> so
>> >> >> >>> > over multiple requests.
>> >> >> >>> >
>> >> >> >>> > -Flavio
>> >> >> >>> >
>> >> >> >>> > > On 17 Jul 2015, at 07:30, Ben Kirwin <b...@kirw.in> wrote:
>> >> >> >>> > >
>> >> >> >>> > > Hi there,
>> >> >> >>> > >
>> >> >> >>> > > I just added a KIP for a 'conditional publish' operation: a
>> >> simple
>> >> >> >>> > > CAS-like mechanism for the Kafka producer. The wiki page is
>> >> here:
>> >> >> >>> > >
>> >> >> >>> > >
>> >> >> >>> >
>> >> >> >>>
>> >> >>
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish
>> >> >> >>> > >
>> >> >> >>> > > And there's some previous discussion on the ticket and the
>> users
>> >> >> list:
>> >> >> >>> > >
>> >> >> >>> > > https://issues.apache.org/jira/browse/KAFKA-2260
>> >> >> >>> > >
>> >> >> >>> > >
>> >> >> >>> >
>> >> >> >>>
>> >> >>
>> >>
>> https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E
>> >> >> >>> > >
>> >> >> >>> > > As always, comments and suggestions are very welcome.
>> >> >> >>> > >
>> >> >> >>> > > Thanks,
>> >> >> >>> > > Ben
>> >> >> >>> >
>> >> >> >>> >
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> --
>> >> >> >>> -Regards,
>> >> >> >>> Mayuresh R. Gharat
>> >> >> >>> (862) 250-7125
>> >> >> >>>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> --
>> >> >> >>
>> >> >> >> Regards,
>> >> >> >> Ashish
>> >> >>
>> >>
>>

Reply via email to