Hi Jay,

I just changed the KIP title and updated the KIP page.

And yes, we are working on a general version control proposal to make the
protocol migration like this more smooth. I will also create a KIP for that
soon.

Thanks,

Jiangjie (Becket) Qin


On Thu, Sep 10, 2015 at 2:21 PM, Jay Kreps <j...@confluent.io> wrote:

> Great, can we change the name to something related to the change--"KIP-31:
> Move to relative offsets in compressed message sets".
>
> Also you had mentioned before you were going to expand on the mechanics of
> handling these log format changes, right?
>
> -Jay
>
> On Thu, Sep 10, 2015 at 12:42 PM, Jiangjie Qin <j...@linkedin.com.invalid>
> wrote:
>
> > Neha and Jay,
> >
> > Thanks a lot for the feedback. Good point about splitting the
> discussion. I
> > have split the proposal to three KIPs and it does make each discussion
> more
> > clear:
> > KIP-31 - Message format change (Use relative offset)
> > KIP-32 - Add CreateTime and LogAppendTime to Kafka message
> > KIP-33 - Build a time-based log index
> >
> > KIP-33 can be a follow up KIP for KIP-32, so we can discuss about KIP-31
> > and KIP-32 first for now. I will create a separate discussion thread for
> > KIP-32 and reply the concerns you raised regarding the timestamp.
> >
> > So far it looks there is no objection to KIP-31. Since I removed a few
> part
> > from previous KIP and only left the relative offset proposal, it would be
> > great if people can take another look to see if there is any concerns.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Tue, Sep 8, 2015 at 1:28 PM, Neha Narkhede <n...@confluent.io> wrote:
> >
> > > Becket,
> > >
> > > Nice write-up. Few thoughts -
> > >
> > > I'd split up the discussion for simplicity. Note that you can always
> > group
> > > several of these in one patch to reduce the protocol changes people
> have
> > to
> > > deal with.This is just a suggestion, but I think the following split
> > might
> > > make it easier to tackle the changes being proposed -
> > >
> > >    - Relative offsets
> > >    - Introducing the concept of time
> > >    - Time-based indexing (separate the usage of the timestamp field
> from
> > >    how/whether we want to include a timestamp in the message)
> > >
> > > I'm a +1 on relative offsets, we should've done it back when we
> > introduced
> > > it. Other than reducing the CPU overhead, this will also reduce the
> > garbage
> > > collection overhead on the brokers.
> > >
> > > On the timestamp field, I generally agree that we should add a
> timestamp
> > to
> > > a Kafka message but I'm not quite sold on how this KIP suggests the
> > > timestamp be set. Will avoid repeating the downsides of a broker side
> > > timestamp mentioned previously in this thread. I think the topic of
> > > including a timestamp in a Kafka message requires a lot more thought
> and
> > > details than what's in this KIP. I'd suggest we make it a separate KIP
> > that
> > > includes a list of all the different use cases for the timestamp
> (beyond
> > > log retention) including stream processing and discuss tradeoffs of
> > > including client and broker side timestamps.
> > >
> > > Agree with the benefit of time-based indexing, but haven't had a chance
> > to
> > > dive into the design details yet.
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps <j...@confluent.io> wrote:
> > >
> > > > Hey Beckett,
> > > >
> > > > I was proposing splitting up the KIP just for simplicity of
> discussion.
> > > You
> > > > can still implement them in one patch. I think otherwise it will be
> > hard
> > > to
> > > > discuss/vote on them since if you like the offset proposal but not
> the
> > > time
> > > > proposal what do you do?
> > > >
> > > > Introducing a second notion of time into Kafka is a pretty massive
> > > > philosophical change so it kind of warrants it's own KIP I think it
> > isn't
> > > > just "Change message format".
> > > >
> > > > WRT time I think one thing to clarify in the proposal is how MM will
> > have
> > > > access to set the timestamp? Presumably this will be a new field in
> > > > ProducerRecord, right? If so then any user can set the timestamp,
> > right?
> > > > I'm not sure you answered the questions around how this will work for
> > MM
> > > > since when MM retains timestamps from multiple partitions they will
> > then
> > > be
> > > > out of order and in the past (so the max(lastAppendedTimestamp,
> > > > currentTimeMillis) override you proposed will not work, right?). If
> we
> > > > don't do this then when you set up mirroring the data will all be new
> > and
> > > > you have the same retention problem you described. Maybe I missed
> > > > something...?
> > > >
> > > > My main motivation is that given that both Samza and Kafka streams
> are
> > > > doing work that implies a mandatory client-defined notion of time, I
> > > really
> > > > think introducing a different mandatory notion of time in Kafka is
> > going
> > > to
> > > > be quite odd. We should think hard about how client-defined time
> could
> > > > work. I'm not sure if it can, but I'm also not sure that it can't.
> > Having
> > > > both will be odd. Did you chat about this with Yi/Kartik on the Samza
> > > side?
> > > >
> > > > When you are saying it won't work you are assuming some particular
> > > > implementation? Maybe that the index is a monotonically increasing
> set
> > of
> > > > pointers to the least record with a timestamp larger than the index
> > time?
> > > > In other words a search for time X gives the largest offset at which
> > all
> > > > records are <= X?
> > > >
> > > > For retention, I agree with the problem you point out, but I think
> what
> > > you
> > > > are saying in that case is that you want a size limit too. If you use
> > > > system time you actually hit the same problem: say you do a full dump
> > of
> > > a
> > > > DB table with a setting of 7 days retention, your retention will
> > actually
> > > > not get enforced for the first 7 days because the data is "new to
> > Kafka".
> > > >
> > > > -Jay
> > > >
> > > >
> > > > On Mon, Sep 7, 2015 at 10:44 AM, Jiangjie Qin
> > <j...@linkedin.com.invalid
> > > >
> > > > wrote:
> > > >
> > > > > Jay,
> > > > >
> > > > > Thanks for the comments. Yes, there are actually three proposals as
> > you
> > > > > pointed out.
> > > > >
> > > > > We will have a separate proposal for (1) - version control
> mechanism.
> > > We
> > > > > actually thought about whether we want to separate 2 and 3
> internally
> > > > > before creating the KIP. The reason we put 2 and 3 together is it
> > will
> > > > > saves us another cross board wire protocol change. Like you said,
> we
> > > have
> > > > > to migrate all the clients in all languages. To some extent, the
> > effort
> > > > to
> > > > > spend on upgrading the clients can be even bigger than implementing
> > the
> > > > new
> > > > > feature itself. So there are some attractions if we can do 2 and 3
> > > > together
> > > > > instead of separately. Maybe after (1) is done it will be easier to
> > do
> > > > > protocol migration. But if we are able to come to an agreement on
> the
> > > > > timestamp solution, I would prefer to have it together with
> relative
> > > > offset
> > > > > in the interest of avoiding another wire protocol change (the
> process
> > > to
> > > > > migrate to relative offset is exactly the same as migrate to
> message
> > > with
> > > > > timestamp).
> > > > >
> > > > > In terms of timestamp. I completely agree that having client
> > timestamp
> > > is
> > > > > more useful if we can make sure the timestamp is good. But in
> reality
> > > > that
> > > > > can be a really big *IF*. I think the problem is exactly as Ewen
> > > > mentioned,
> > > > > if we let the client to set the timestamp, it would be very hard
> for
> > > the
> > > > > broker to utilize it. If broker apply retention policy based on the
> > > > client
> > > > > timestamp. One misbehave producer can potentially completely mess
> up
> > > the
> > > > > retention policy on the broker. Although people don't care about
> > server
> > > > > side timestamp. People do care a lot when timestamp breaks.
> Searching
> > > by
> > > > > timestamp is a really important use case even though it is not used
> > as
> > > > > often as searching by offset. It has significant direct impact on
> RTO
> > > > when
> > > > > there is a cross cluster failover as Todd mentioned.
> > > > >
> > > > > The trick using max(lastAppendedTimestamp, currentTimeMillis) is to
> > > > > guarantee monotonic increase of the timestamp. Many commercial
> system
> > > > > actually do something similar to this to solve the time skew. About
> > > > > changing the time, I am not sure if people use NTP like using a
> watch
> > > to
> > > > > just set it forward/backward by an hour or so. The time adjustment
> I
> > > used
> > > > > to do is typically to adjust something like a minute  / week. So
> for
> > > each
> > > > > second, there might be a few mircoseconds slower/faster but should
> > not
> > > > > break the clock completely to make sure all the time-based
> > transactions
> > > > are
> > > > > not affected. The one minute change will be done within a week but
> > not
> > > > > instantly.
> > > > >
> > > > > Personally, I think having client side timestamp will be useful if
> we
> > > > don't
> > > > > need to put the broker and data integrity under risk. If we have to
> > > > choose
> > > > > from one of them but not both. I would prefer server side timestamp
> > > > because
> > > > > for client side timestamp there is always a plan B which is putting
> > the
> > > > > timestamp into payload.
> > > > >
> > > > > Another reason I am reluctant to use the client side timestamp is
> > that
> > > it
> > > > > is always dangerous to mix the control plane with data plane. IP
> did
> > > this
> > > > > and it has caused so many different breaches so people are
> migrating
> > to
> > > > > something like MPLS. An example in Kafka is that any client can
> > > > construct a
> > > > > LeaderAndIsrRequest/UpdateMetadataRequest/ContorlledShutdownRequest
> > > (you
> > > > > name it) and send it to the broker to mess up the entire cluster,
> > also
> > > as
> > > > > we already noticed a busy cluster can respond quite slow to
> > controller
> > > > > messages. So it would really be nice if we can avoid giving the
> power
> > > to
> > > > > clients to control the log retention.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > >
> > > > > On Sun, Sep 6, 2015 at 9:54 PM, Todd Palino <tpal...@gmail.com>
> > wrote:
> > > > >
> > > > > > So, with regards to why you want to search by timestamp, the
> > biggest
> > > > > > problem I've seen is with consumers who want to reset their
> > > timestamps
> > > > > to a
> > > > > > specific point, whether it is to replay a certain amount of
> > messages,
> > > > or
> > > > > to
> > > > > > rewind to before some problem state existed. This happens more
> > often
> > > > than
> > > > > > anyone would like.
> > > > > >
> > > > > > To handle this now we need to constantly export the broker's
> offset
> > > for
> > > > > > every partition to a time-series database and then use external
> > > > processes
> > > > > > to query this. I know we're not the only ones doing this. The way
> > the
> > > > > > broker handles requests for offsets by timestamp is a little
> obtuse
> > > > > > (explain it to anyone without intimate knowledge of the internal
> > > > workings
> > > > > > of the broker - every time I do I see this). In addition, as
> Becket
> > > > > pointed
> > > > > > out, it causes problems specifically with retention of messages
> by
> > > time
> > > > > > when you move partitions around.
> > > > > >
> > > > > > I'm deliberately avoiding the discussion of what timestamp to
> use.
> > I
> > > > can
> > > > > > see the argument either way, though I tend to lean towards the
> idea
> > > > that
> > > > > > the broker timestamp is the only viable source of truth in this
> > > > > situation.
> > > > > >
> > > > > > -Todd
> > > > > >
> > > > > >
> > > > > > On Sun, Sep 6, 2015 at 7:08 PM, Ewen Cheslack-Postava <
> > > > e...@confluent.io
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps <j...@confluent.io>
> > > wrote:
> > > > > > >
> > > > > > > >
> > > > > > > > 2. Nobody cares what time it is on the server.
> > > > > > > >
> > > > > > >
> > > > > > > This is a good way of summarizing the issue I was trying to get
> > at,
> > > > > from
> > > > > > an
> > > > > > > app's perspective. Of the 3 stated goals of the KIP, #2 (lot
> > > > retention)
> > > > > > is
> > > > > > > reasonably handled by a server-side timestamp. I really just
> care
> > > > that
> > > > > a
> > > > > > > message is there long enough that I have a chance to process
> it.
> > #3
> > > > > > > (searching by timestamp) only seems useful if we can guarantee
> > the
> > > > > > > server-side timestamp is close enough to the original
> client-side
> > > > > > > timestamp, and any mirror maker step seems to break that (even
> > > > ignoring
> > > > > > any
> > > > > > > issues with broker availability).
> > > > > > >
> > > > > > > I'm also wondering whether optimizing for search-by-timestamp
> on
> > > the
> > > > > > broker
> > > > > > > is really something we want to do given that messages aren't
> > really
> > > > > > > guaranteed to be ordered by application-level timestamps on the
> > > > broker.
> > > > > > Is
> > > > > > > part of the need for this just due to the current consumer APIs
> > > being
> > > > > > > difficult to work with? For example, could you implement this
> > > pretty
> > > > > > easily
> > > > > > > client side just the way you would broker-side? I'd imagine a
> > > couple
> > > > of
> > > > > > > random seeks + reads during very rare occasions (i.e. when the
> > app
> > > > > starts
> > > > > > > up) wouldn't be a problem performance-wise. Or is it also that
> > you
> > > > need
> > > > > > the
> > > > > > > broker to enforce things like monotonically increasing
> timestamps
> > > > since
> > > > > > you
> > > > > > > can't do the query properly and efficiently without that
> > guarantee,
> > > > and
> > > > > > > therefore what applications are actually looking for *is*
> > > broker-side
> > > > > > > timestamps?
> > > > > > >
> > > > > > > -Ewen
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > > Consider cases where data is being copied from a database or
> > from
> > > > log
> > > > > > > > files. In steady-state the server time is very close to the
> > > client
> > > > > time
> > > > > > > if
> > > > > > > > their clocks are sync'd (see 1) but there will be times of
> > large
> > > > > > > divergence
> > > > > > > > when the copying process is stopped or falls behind. When
> this
> > > > occurs
> > > > > > it
> > > > > > > is
> > > > > > > > clear that the time the data arrived on the server is
> > irrelevant,
> > > > it
> > > > > is
> > > > > > > the
> > > > > > > > source timestamp that matters. This is the problem you are
> > trying
> > > > to
> > > > > > fix
> > > > > > > by
> > > > > > > > retaining the mm timestamp but really the client should
> always
> > > set
> > > > > the
> > > > > > > time
> > > > > > > > with the use of server-side time as a fallback. It would be
> > worth
> > > > > > talking
> > > > > > > > to the Samza folks and reading through this blog post (
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html
> > > > > > > > )
> > > > > > > > on this subject since we went through similar learnings on
> the
> > > > stream
> > > > > > > > processing side.
> > > > > > > >
> > > > > > > > I think the implication of these two is that we need a
> proposal
> > > > that
> > > > > > > > handles potentially very out-of-order timestamps in some kind
> > of
> > > > > sanish
> > > > > > > way
> > > > > > > > (buggy clients will set something totally wrong as the time).
> > > > > > > >
> > > > > > > > -Jay
> > > > > > > >
> > > > > > > > On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps <j...@confluent.io>
> > > > wrote:
> > > > > > > >
> > > > > > > > > The magic byte is used to version message format so we'll
> > need
> > > to
> > > > > > make
> > > > > > > > > sure that check is in place--I actually don't see it in the
> > > > current
> > > > > > > > > consumer code which I think is a bug we should fix for the
> > next
> > > > > > release
> > > > > > > > > (filed KAFKA-2523). The purpose of that field is so there
> is
> > a
> > > > > clear
> > > > > > > > check
> > > > > > > > > on the format rather than the scrambled scenarios Becket
> > > > describes.
> > > > > > > > >
> > > > > > > > > Also, Becket, I don't think just fixing the java client is
> > > > > sufficient
> > > > > > > as
> > > > > > > > > that would break other clients--i.e. if anyone writes a v1
> > > > > messages,
> > > > > > > even
> > > > > > > > > by accident, any non-v1-capable consumer will break. I
> think
> > we
> > > > > > > probably
> > > > > > > > > need a way to have the server ensure a particular message
> > > format
> > > > > > either
> > > > > > > > at
> > > > > > > > > read or write time.
> > > > > > > > >
> > > > > > > > > -Jay
> > > > > > > > >
> > > > > > > > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin
> > > > > > <j...@linkedin.com.invalid
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Hi Guozhang,
> > > > > > > > >>
> > > > > > > > >> I checked the code again. Actually CRC check probably
> won't
> > > > fail.
> > > > > > The
> > > > > > > > >> newly
> > > > > > > > >> added timestamp field might be treated as keyLength
> instead,
> > > so
> > > > we
> > > > > > are
> > > > > > > > >> likely to receive an IllegalArgumentException when try to
> > read
> > > > the
> > > > > > > key.
> > > > > > > > >> I'll update the KIP.
> > > > > > > > >>
> > > > > > > > >> Thanks,
> > > > > > > > >>
> > > > > > > > >> Jiangjie (Becket) Qin
> > > > > > > > >>
> > > > > > > > >> On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin <
> > > > j...@linkedin.com>
> > > > > > > > wrote:
> > > > > > > > >>
> > > > > > > > >> > Hi, Guozhang,
> > > > > > > > >> >
> > > > > > > > >> > Thanks for reading the KIP. By "old consumer", I meant
> the
> > > > > > > > >> > ZookeeperConsumerConnector in trunk now, i.e. without
> this
> > > bug
> > > > > > > fixed.
> > > > > > > > >> If we
> > > > > > > > >> > fix the ZookeeperConsumerConnector then it will throw
> > > > exception
> > > > > > > > >> complaining
> > > > > > > > >> > about the unsupported version when it sees message
> format
> > > V1.
> > > > > > What I
> > > > > > > > was
> > > > > > > > >> > trying to say is that if we have some
> > > > ZookeeperConsumerConnector
> > > > > > > > running
> > > > > > > > >> > without the fix, the consumer will complain about CRC
> > > mismatch
> > > > > > > instead
> > > > > > > > >> of
> > > > > > > > >> > unsupported version.
> > > > > > > > >> >
> > > > > > > > >> > Thanks,
> > > > > > > > >> >
> > > > > > > > >> > Jiangjie (Becket) Qin
> > > > > > > > >> >
> > > > > > > > >> > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang <
> > > > > > wangg...@gmail.com>
> > > > > > > > >> wrote:
> > > > > > > > >> >
> > > > > > > > >> >> Thanks for the write-up Jiangjie.
> > > > > > > > >> >>
> > > > > > > > >> >> One comment about migration plan: "For old consumers,
> if
> > > they
> > > > > see
> > > > > > > the
> > > > > > > > >> new
> > > > > > > > >> >> protocol the CRC check will fail"..
> > > > > > > > >> >>
> > > > > > > > >> >> Do you mean this bug in the old consumer cannot be
> fixed
> > > in a
> > > > > > > > >> >> backward-compatible way?
> > > > > > > > >> >>
> > > > > > > > >> >> Guozhang
> > > > > > > > >> >>
> > > > > > > > >> >>
> > > > > > > > >> >> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin
> > > > > > > > <j...@linkedin.com.invalid
> > > > > > > > >> >
> > > > > > > > >> >> wrote:
> > > > > > > > >> >>
> > > > > > > > >> >> > Hi,
> > > > > > > > >> >> >
> > > > > > > > >> >> > We just created KIP-31 to propose a message format
> > change
> > > > in
> > > > > > > Kafka.
> > > > > > > > >> >> >
> > > > > > > > >> >> >
> > > > > > > > >> >> >
> > > > > > > > >> >>
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal
> > > > > > > > >> >> >
> > > > > > > > >> >> > As a summary, the motivations are:
> > > > > > > > >> >> > 1. Avoid server side message re-compression
> > > > > > > > >> >> > 2. Honor time-based log roll and retention
> > > > > > > > >> >> > 3. Enable offset search by timestamp at a finer
> > > > granularity.
> > > > > > > > >> >> >
> > > > > > > > >> >> > Feedback and comments are welcome!
> > > > > > > > >> >> >
> > > > > > > > >> >> > Thanks,
> > > > > > > > >> >> >
> > > > > > > > >> >> > Jiangjie (Becket) Qin
> > > > > > > > >> >> >
> > > > > > > > >> >>
> > > > > > > > >> >>
> > > > > > > > >> >>
> > > > > > > > >> >> --
> > > > > > > > >> >> -- Guozhang
> > > > > > > > >> >>
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Thanks,
> > > > > > > Ewen
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Neha
> > >
> >
>

Reply via email to