I suppose the jira link is different. It points to this jira :
https://issues.apache.org/jira/browse/KAFKA-1


Thanks,

Mayuresh

On Mon, Sep 14, 2015 at 5:13 PM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> I just updated the KIP-33 to explain the indexing on CreateTime and
> LogAppendTime respectively. I also used some use case to compare the two
> solutions.
> Although this is for KIP-33, but it does give a some insights on whether it
> makes sense to have a per message LogAppendTime.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index
>
> As a short summary of the conclusions we have already reached on timestamp:
> 1. It is good to add a timestamp to the message.
> 2. LogAppendTime should be used for broker policy enforcement (Log
> retention / rolling)
> 3. It is useful to have a CreateTime in message format, which is immutable
> after producer sends the message.
>
> There are following questions still in discussion:
> 1. Should we also add LogAppendTime to message format?
> 2. which timestamp should we use to build the index.
>
> Let's talk about question 1 first because question 2 is actually a follow
> up question for question 1.
> Here are what I think:
> 1a. To enforce broker log policy, theoretically we don't need per-message
> LogAppendTime. If we don't include LogAppendTime in message, we still need
> to implement a separate solution to pass log segment timestamps among
> brokers. That means if we don't include the LogAppendTime in message, there
> will be further complication in replication.
> 1b. LogAppendTime has some advantage over CreateTime (KIP-33 has detail
> comparison)
> 1c. We have already exposed offset, which is essentially an internal
> concept of message in terms of position. Exposing LogAppendTime means we
> expose another internal concept of message in terms of time.
>
> Considering the above reasons, personally I think it worth adding the
> LogAppendTime to each message.
>
> Any thoughts?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Sep 14, 2015 at 11:44 AM, Jiangjie Qin <j...@linkedin.com> wrote:
>
> > I was trying to send last email before KIP hangout so maybe did not think
> > it through completely. By the way, the discussion is actually more
> related
> > to KIP-33, i.e. whether we should index on CreateTime or LogAppendTime.
> > (Although it seems all the discussion are still in this mailing
> thread...)
> > This solution in last email is for indexing on CreateTime. It is
> > essentially what Jay suggested except we use a timestamp map instead of a
> > memory mapped index file. Please ignore the proposal of using a log
> > compacted topic. The solution can be simplified to:
> >
> > Each broker keeps
> > 1. a timestamp index map - Map[TopicPartitionSegment, Map[Timestamp,
> > Offset]]. The timestamp is on minute boundary.
> > 2. A timestamp index file for each segment.
> > When a broker receives a message (both leader or follower), it checks if
> > the timestamp index map contains the timestamp for current segment. The
> > broker add the offset to the map and append an entry to the timestamp
> index
> > if the timestamp does not exist. i.e. we only use the index file as a
> > persistent copy of the index timestamp map.
> >
> > When a log segment is deleted, we need to:
> > 1. delete the TopicPartitionKeySegment key in the timestamp index map.
> > 2. delete the timestamp index file
> >
> > This solution assumes we only keep CreateTime in the message. There are a
> > few trade-offs in this solution:
> > 1. The granularity of search will be per minute.
> > 2. All the timestamp index map has to be in the memory all the time.
> > 3. We need to think about another way to honor log retention time and
> > time-based log rolling.
> > 4. We lose the benefit brought by including LogAppendTime in the message
> > mentioned earlier.
> >
> > I am not sure whether this solution is necessarily better than indexing
> on
> > LogAppendTime.
> >
> > I will update KIP-33 to explain the solution to index on CreateTime and
> > LogAppendTime respectively and put some more concrete use cases as well.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Mon, Sep 14, 2015 at 9:40 AM, Jiangjie Qin <j...@linkedin.com> wrote:
> >
> >> Hi Joel,
> >>
> >> Good point about rebuilding index. I agree that having a per message
> >> LogAppendTime might be necessary. About time adjustment, the solution
> >> sounds promising, but it might be better to make it as a follow up of
> the
> >> KIP because it seems a really rare use case.
> >>
> >> I have another thought on how to manage the out of order timestamps.
> >> Maybe we can do the following:
> >> Create a special log compacted topic __timestamp_index similar to topic,
> >> the key would be (TopicPartition, TimeStamp_Rounded_To_Minute), the
> value
> >> is offset. In memory, we keep a map for each TopicPartition, the value
> is
> >> (timestamp_rounded_to_minute -> smallest_offset_in_the_minute). This
> way we
> >> can search out of order message and make sure no message is missing.
> >>
> >> Thoughts?
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Fri, Sep 11, 2015 at 12:46 PM, Joel Koshy <jjkosh...@gmail.com>
> wrote:
> >>
> >>> Jay had mentioned the scenario of mirror-maker bootstrap which would
> >>> effectively reset the logAppendTimestamps for the bootstrapped data.
> >>> If we don't include logAppendTimestamps in each message there is a
> >>> similar scenario when rebuilding indexes during recovery. So it seems
> >>> it may be worth adding that timestamp to messages. The drawback to
> >>> that is exposing a server-side concept in the protocol (although we
> >>> already do that with offsets). logAppendTimestamp really should be
> >>> decided by the broker so I think the first scenario may have to be
> >>> written off as a gotcha, but the second may be worth addressing (by
> >>> adding it to the message format).
> >>>
> >>> The other point that Jay raised which needs to be addressed (since we
> >>> require monotically increasing timestamps in the index) in the
> >>> proposal is changing time on the server (I'm a little less concerned
> >>> about NTP clock skews than a user explicitly changing the server's
> >>> time - i.e., big clock skews). We would at least want to "set back"
> >>> all the existing timestamps to guarantee non-decreasing timestamps
> >>> with future messages. I'm not sure at this point how best to handle
> >>> that, but we could perhaps have a epoch/base-time (or time-correction)
> >>> stored in the log directories and base all log index timestamps off
> >>> that base-time (or corrected). So if at any time you determine that
> >>> time has changed backwards you can adjust that base-time without
> >>> having to fix up all the entries. Without knowing the exact diff
> >>> between the previous clock and new clock we cannot adjust the times
> >>> exactly, but we can at least ensure increasing timestamps.
> >>>
> >>> On Fri, Sep 11, 2015 at 10:52 AM, Jiangjie Qin
> >>> <j...@linkedin.com.invalid> wrote:
> >>> > Ewen and Jay,
> >>> >
> >>> > They way I see the LogAppendTime is another format of "offset". It
> >>> serves
> >>> > the following purpose:
> >>> > 1. Locate messages not only by position, but also by time. The
> >>> difference
> >>> > from offset is timestamp is not unique for all messags.
> >>> > 2. Allow broker to manage messages based on time, e.g. retention,
> >>> rolling
> >>> > 3. Provide convenience for user to search message not only by offset,
> >>> but
> >>> > also by timestamp.
> >>> >
> >>> > For purpose (2) we don't need per message server timestamp. We only
> >>> need
> >>> > per log segment server timestamp and propagate it among brokers.
> >>> >
> >>> > For (1) and (3), we need per message timestamp. Then the question is
> >>> > whether we should use CreateTime or LogAppendTime?
> >>> >
> >>> > I completely agree that an application timestamp is very useful for
> >>> many
> >>> > use cases. But it seems to me that having Kafka to understand and
> >>> maintain
> >>> > application timestamp is a bit over demanding. So I think there is
> >>> value to
> >>> > pass on CreateTime for application convenience, but I am not sure it
> >>> can
> >>> > replace LogAppendTime. Managing out-of-order CreateTime is equivalent
> >>> to
> >>> > allowing producer to send their own offset and ask broker to manage
> the
> >>> > offset for them, It is going to be very hard to maintain and could
> >>> create
> >>> > huge performance/functional issue because of complicated logic.
> >>> >
> >>> > About whether we should expose LogAppendTime to broker, I agree that
> >>> server
> >>> > timestamp is internal to broker, but isn't offset also an internal
> >>> concept?
> >>> > Arguably it's not provided by producer so consumer application logic
> >>> does
> >>> > not have to know offset. But user needs to know offset because they
> >>> need to
> >>> > know "where is the message" in the log. LogAppendTime provides the
> >>> answer
> >>> > of "When was the message appended" to the log. So personally I think
> >>> it is
> >>> > reasonable to expose the LogAppendTime to consumers.
> >>> >
> >>> > I can see some use cases of exposing the LogAppendTime, to name some:
> >>> > 1. Let's say broker has 7 days of log retention, some application
> >>> wants to
> >>> > reprocess the data in past 3 days. User can simply provide the
> >>> timestamp
> >>> > and start consume.
> >>> > 2. User can easily know lag by time.
> >>> > 3. Cross cluster fail over. This is a more complicated use case,
> there
> >>> are
> >>> > two goals: 1) Not lose message; and 2) do not reconsume tons of
> >>> messages.
> >>> > Only knowing offset of cluster A won't help with finding fail over
> >>> point in
> >>> > cluster B  because an offset of a cluster means nothing to another
> >>> cluster.
> >>> > Timestamp however is a good cross cluster reference in this case.
> >>> >
> >>> > Thanks,
> >>> >
> >>> > Jiangjie (Becket) Qin
> >>> >
> >>> > On Thu, Sep 10, 2015 at 9:28 PM, Ewen Cheslack-Postava <
> >>> e...@confluent.io>
> >>> > wrote:
> >>> >
> >>> >> Re: MM preserving timestamps: Yes, this was how I interpreted the
> >>> point in
> >>> >> the KIP and I only raised the issue because it restricts the
> >>> usefulness of
> >>> >> timestamps anytime MM is involved. I agree it's not a deal breaker,
> >>> but I
> >>> >> wanted to understand exact impact of the change. Some users seem to
> >>> want to
> >>> >> be able to seek by application-defined timestamps (despite the many
> >>> obvious
> >>> >> issues involved), and the proposal clearly would not support that
> >>> unless
> >>> >> the timestamps submitted with the produce requests were respected.
> If
> >>> we
> >>> >> ignore client submitted timestamps, then we probably want to try to
> >>> hide
> >>> >> the timestamps as much as possible in any public interface (e.g.
> never
> >>> >> shows up in any public consumer APIs), but expose it just enough to
> be
> >>> >> useful for operational purposes.
> >>> >>
> >>> >> Sorry if my devil's advocate position / attempt to map the design
> >>> space led
> >>> >> to some confusion!
> >>> >>
> >>> >> -Ewen
> >>> >>
> >>> >>
> >>> >> On Thu, Sep 10, 2015 at 5:48 PM, Jay Kreps <j...@confluent.io>
> wrote:
> >>> >>
> >>> >> > Ah, I see, I think I misunderstood about MM, it was called out in
> >>> the
> >>> >> > proposal and I thought you were saying you'd retain the timestamp
> >>> but I
> >>> >> > think you're calling out that you're not. In that case you do have
> >>> the
> >>> >> > opposite problem, right? When you add mirroring for a topic all
> >>> that data
> >>> >> > will have a timestamp of now and retention won't be right. Not a
> >>> blocker
> >>> >> > but a bit of a gotcha.
> >>> >> >
> >>> >> > -Jay
> >>> >> >
> >>> >> >
> >>> >> >
> >>> >> > On Thu, Sep 10, 2015 at 5:40 PM, Joel Koshy <jjkosh...@gmail.com>
> >>> wrote:
> >>> >> >
> >>> >> > > > Don't you see all the same issues you see with client-defined
> >>> >> > timestamp's
> >>> >> > > > if you let mm control the timestamp as you were proposing?
> That
> >>> means
> >>> >> > > time
> >>> >> > >
> >>> >> > > Actually I don't think that was in the proposal (or was it?).
> >>> i.e., I
> >>> >> > > think it was always supposed to be controlled by the broker (and
> >>> not
> >>> >> > > MM).
> >>> >> > >
> >>> >> > > > Also, Joel, can you just confirm that you guys have talked
> >>> through
> >>> >> the
> >>> >> > > > whole timestamp thing with the Samza folks at LI? The reason I
> >>> ask
> >>> >> > about
> >>> >> > > > this is that Samza and Kafka Streams (KIP-28) are both trying
> >>> to rely
> >>> >> > on
> >>> >> > >
> >>> >> > > We have not. This is a good point - we will follow-up.
> >>> >> > >
> >>> >> > > > WRT your idea of a FollowerFetchRequestI had thought of a
> >>> similar
> >>> >> idea
> >>> >> > > > where we use the leader's timestamps to approximately set the
> >>> >> > follower's
> >>> >> > > > timestamps. I had thought of just adding a partition metadata
> >>> request
> >>> >> > > that
> >>> >> > > > would subsume the current offset/time lookup and could be used
> >>> by the
> >>> >> > > > follower to try to approximately keep their timestamps kosher.
> >>> It's a
> >>> >> > > > little hacky and doesn't help with MM but it is also maybe
> less
> >>> >> > invasive
> >>> >> > > so
> >>> >> > > > that approach could be viable.
> >>> >> > >
> >>> >> > > That would also work, but perhaps responding with the actual
> >>> leader
> >>> >> > > offset-timestamp entries (corresponding to the fetched portion)
> >>> would
> >>> >> > > be exact and it should be small as well. Anyway, the main
> >>> motivation
> >>> >> > > in this was to avoid leaking server-side timestamps to the
> >>> >> > > message-format if people think it is worth it so the
> alternatives
> >>> are
> >>> >> > > implementation details. My original instinct was that it also
> >>> avoids a
> >>> >> > > backwards incompatible change (but it does not because we also
> >>> have
> >>> >> > > the relative offset change).
> >>> >> > >
> >>> >> > > Thanks,
> >>> >> > >
> >>> >> > > Joel
> >>> >> > >
> >>> >> > > >
> >>> >> > > >
> >>> >> > > >
> >>> >> > > > On Thu, Sep 10, 2015 at 3:36 PM, Joel Koshy <
> >>> jjkosh...@gmail.com>
> >>> >> > wrote:
> >>> >> > > >
> >>> >> > > >> I just wanted to comment on a few points made earlier in this
> >>> >> thread:
> >>> >> > > >>
> >>> >> > > >> Concerns on clock skew: at least for the original proposal's
> >>> scope
> >>> >> > > >> (which was more for honoring retention broker-side) this
> would
> >>> only
> >>> >> be
> >>> >> > > >> an issue when spanning leader movements right? i.e., leader
> >>> >> migration
> >>> >> > > >> latency has to be much less than clock skew for this to be a
> >>> real
> >>> >> > > >> issue wouldn’t it?
> >>> >> > > >>
> >>> >> > > >> Client timestamp vs broker timestamp: I’m not sure Kafka
> >>> (brokers)
> >>> >> are
> >>> >> > > >> the right place to reason about client-side timestamps
> >>> precisely due
> >>> >> > > >> to the nuances that have been discussed at length in this
> >>> thread. My
> >>> >> > > >> preference would have been to the timestamp (now called
> >>> >> > > >> LogAppendTimestamp) have nothing to do with the applications.
> >>> Ewen
> >>> >> > > >> raised a valid concern about leaking such
> “private/server-side”
> >>> >> > > >> timestamps into the protocol spec. i.e., it is fine to have
> the
> >>> >> > > >> CreateTime which is expressly client-provided and immutable
> >>> >> > > >> thereafter, but the LogAppendTime is also going part of the
> >>> protocol
> >>> >> > > >> and it would be good to avoid exposure (to client developers)
> >>> if
> >>> >> > > >> possible. Ok, so here is a slightly different approach that I
> >>> was
> >>> >> just
> >>> >> > > >> thinking about (and did not think too far so it may not
> work):
> >>> do
> >>> >> not
> >>> >> > > >> add the LogAppendTime to messages. Instead, build the
> >>> time-based
> >>> >> index
> >>> >> > > >> on the server side on message arrival time alone. Introduce a
> >>> new
> >>> >> > > >> ReplicaFetchRequest/Response pair. ReplicaFetchResponses will
> >>> also
> >>> >> > > >> include the slice of the time-based index for the follower
> >>> broker.
> >>> >> > > >> This way we can at least keep timestamps aligned across
> >>> brokers for
> >>> >> > > >> retention purposes. We do lose the append timestamp for
> >>> mirroring
> >>> >> > > >> pipelines (which appears to be the case in KIP-32 as well).
> >>> >> > > >>
> >>> >> > > >> Configurable index granularity: We can do this but I’m not
> >>> sure it
> >>> >> is
> >>> >> > > >> very useful and as Jay noted, a major change from the old
> >>> proposal
> >>> >> > > >> linked from the KIP is the sparse time-based index which we
> >>> felt was
> >>> >> > > >> essential to bound memory usage (and having timestamps on
> each
> >>> log
> >>> >> > > >> index entry was probably a big waste since in the common case
> >>> >> several
> >>> >> > > >> messages span the same timestamp). BTW another benefit of the
> >>> second
> >>> >> > > >> index is that it makes it easier to roll-back or throw away
> if
> >>> >> > > >> necessary (vs. modifying the existing index format) -
> although
> >>> that
> >>> >> > > >> obviously does not help with rolling back the timestamp
> change
> >>> in
> >>> >> the
> >>> >> > > >> message format, but it is one less thing to worry about.
> >>> >> > > >>
> >>> >> > > >> Versioning: I’m not sure everyone is saying the same thing
> wrt
> >>> the
> >>> >> > > >> scope of this. There is the record format change, but I also
> >>> think
> >>> >> > > >> this ties into all of the API versioning that we already have
> >>> in
> >>> >> > > >> Kafka. The current API versioning approach works fine for
> >>> >> > > >> upgrades/downgrades across official Kafka releases, but not
> so
> >>> well
> >>> >> > > >> between releases. (We almost got bitten by this at LinkedIn
> >>> with the
> >>> >> > > >> recent changes to various requests but were able to work
> around
> >>> >> > > >> these.) We can clarify this in the follow-up KIP.
> >>> >> > > >>
> >>> >> > > >> Thanks,
> >>> >> > > >>
> >>> >> > > >> Joel
> >>> >> > > >>
> >>> >> > > >>
> >>> >> > > >> On Thu, Sep 10, 2015 at 3:00 PM, Jiangjie Qin
> >>> >> > <j...@linkedin.com.invalid
> >>> >> > > >
> >>> >> > > >> wrote:
> >>> >> > > >> > Hi Jay,
> >>> >> > > >> >
> >>> >> > > >> > I just changed the KIP title and updated the KIP page.
> >>> >> > > >> >
> >>> >> > > >> > And yes, we are working on a general version control
> >>> proposal to
> >>> >> > make
> >>> >> > > the
> >>> >> > > >> > protocol migration like this more smooth. I will also
> create
> >>> a KIP
> >>> >> > for
> >>> >> > > >> that
> >>> >> > > >> > soon.
> >>> >> > > >> >
> >>> >> > > >> > Thanks,
> >>> >> > > >> >
> >>> >> > > >> > Jiangjie (Becket) Qin
> >>> >> > > >> >
> >>> >> > > >> >
> >>> >> > > >> > On Thu, Sep 10, 2015 at 2:21 PM, Jay Kreps <
> j...@confluent.io
> >>> >
> >>> >> > wrote:
> >>> >> > > >> >
> >>> >> > > >> >> Great, can we change the name to something related to the
> >>> >> > > >> change--"KIP-31:
> >>> >> > > >> >> Move to relative offsets in compressed message sets".
> >>> >> > > >> >>
> >>> >> > > >> >> Also you had mentioned before you were going to expand on
> >>> the
> >>> >> > > mechanics
> >>> >> > > >> of
> >>> >> > > >> >> handling these log format changes, right?
> >>> >> > > >> >>
> >>> >> > > >> >> -Jay
> >>> >> > > >> >>
> >>> >> > > >> >> On Thu, Sep 10, 2015 at 12:42 PM, Jiangjie Qin
> >>> >> > > >> <j...@linkedin.com.invalid>
> >>> >> > > >> >> wrote:
> >>> >> > > >> >>
> >>> >> > > >> >> > Neha and Jay,
> >>> >> > > >> >> >
> >>> >> > > >> >> > Thanks a lot for the feedback. Good point about
> splitting
> >>> the
> >>> >> > > >> >> discussion. I
> >>> >> > > >> >> > have split the proposal to three KIPs and it does make
> >>> each
> >>> >> > > discussion
> >>> >> > > >> >> more
> >>> >> > > >> >> > clear:
> >>> >> > > >> >> > KIP-31 - Message format change (Use relative offset)
> >>> >> > > >> >> > KIP-32 - Add CreateTime and LogAppendTime to Kafka
> message
> >>> >> > > >> >> > KIP-33 - Build a time-based log index
> >>> >> > > >> >> >
> >>> >> > > >> >> > KIP-33 can be a follow up KIP for KIP-32, so we can
> >>> discuss
> >>> >> about
> >>> >> > > >> KIP-31
> >>> >> > > >> >> > and KIP-32 first for now. I will create a separate
> >>> discussion
> >>> >> > > thread
> >>> >> > > >> for
> >>> >> > > >> >> > KIP-32 and reply the concerns you raised regarding the
> >>> >> timestamp.
> >>> >> > > >> >> >
> >>> >> > > >> >> > So far it looks there is no objection to KIP-31. Since I
> >>> >> removed
> >>> >> > a
> >>> >> > > few
> >>> >> > > >> >> part
> >>> >> > > >> >> > from previous KIP and only left the relative offset
> >>> proposal,
> >>> >> it
> >>> >> > > >> would be
> >>> >> > > >> >> > great if people can take another look to see if there is
> >>> any
> >>> >> > > concerns.
> >>> >> > > >> >> >
> >>> >> > > >> >> > Thanks,
> >>> >> > > >> >> >
> >>> >> > > >> >> > Jiangjie (Becket) Qin
> >>> >> > > >> >> >
> >>> >> > > >> >> >
> >>> >> > > >> >> > On Tue, Sep 8, 2015 at 1:28 PM, Neha Narkhede <
> >>> >> n...@confluent.io
> >>> >> > >
> >>> >> > > >> wrote:
> >>> >> > > >> >> >
> >>> >> > > >> >> > > Becket,
> >>> >> > > >> >> > >
> >>> >> > > >> >> > > Nice write-up. Few thoughts -
> >>> >> > > >> >> > >
> >>> >> > > >> >> > > I'd split up the discussion for simplicity. Note that
> >>> you can
> >>> >> > > always
> >>> >> > > >> >> > group
> >>> >> > > >> >> > > several of these in one patch to reduce the protocol
> >>> changes
> >>> >> > > people
> >>> >> > > >> >> have
> >>> >> > > >> >> > to
> >>> >> > > >> >> > > deal with.This is just a suggestion, but I think the
> >>> >> following
> >>> >> > > split
> >>> >> > > >> >> > might
> >>> >> > > >> >> > > make it easier to tackle the changes being proposed -
> >>> >> > > >> >> > >
> >>> >> > > >> >> > >    - Relative offsets
> >>> >> > > >> >> > >    - Introducing the concept of time
> >>> >> > > >> >> > >    - Time-based indexing (separate the usage of the
> >>> timestamp
> >>> >> > > field
> >>> >> > > >> >> from
> >>> >> > > >> >> > >    how/whether we want to include a timestamp in the
> >>> message)
> >>> >> > > >> >> > >
> >>> >> > > >> >> > > I'm a +1 on relative offsets, we should've done it
> back
> >>> when
> >>> >> we
> >>> >> > > >> >> > introduced
> >>> >> > > >> >> > > it. Other than reducing the CPU overhead, this will
> also
> >>> >> reduce
> >>> >> > > the
> >>> >> > > >> >> > garbage
> >>> >> > > >> >> > > collection overhead on the brokers.
> >>> >> > > >> >> > >
> >>> >> > > >> >> > > On the timestamp field, I generally agree that we
> >>> should add
> >>> >> a
> >>> >> > > >> >> timestamp
> >>> >> > > >> >> > to
> >>> >> > > >> >> > > a Kafka message but I'm not quite sold on how this KIP
> >>> >> suggests
> >>> >> > > the
> >>> >> > > >> >> > > timestamp be set. Will avoid repeating the downsides
> of
> >>> a
> >>> >> > broker
> >>> >> > > >> side
> >>> >> > > >> >> > > timestamp mentioned previously in this thread. I think
> >>> the
> >>> >> > topic
> >>> >> > > of
> >>> >> > > >> >> > > including a timestamp in a Kafka message requires a
> lot
> >>> more
> >>> >> > > thought
> >>> >> > > >> >> and
> >>> >> > > >> >> > > details than what's in this KIP. I'd suggest we make
> it
> >>> a
> >>> >> > > separate
> >>> >> > > >> KIP
> >>> >> > > >> >> > that
> >>> >> > > >> >> > > includes a list of all the different use cases for the
> >>> >> > timestamp
> >>> >> > > >> >> (beyond
> >>> >> > > >> >> > > log retention) including stream processing and discuss
> >>> >> > tradeoffs
> >>> >> > > of
> >>> >> > > >> >> > > including client and broker side timestamps.
> >>> >> > > >> >> > >
> >>> >> > > >> >> > > Agree with the benefit of time-based indexing, but
> >>> haven't
> >>> >> had
> >>> >> > a
> >>> >> > > >> chance
> >>> >> > > >> >> > to
> >>> >> > > >> >> > > dive into the design details yet.
> >>> >> > > >> >> > >
> >>> >> > > >> >> > > Thanks,
> >>> >> > > >> >> > > Neha
> >>> >> > > >> >> > >
> >>> >> > > >> >> > > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps <
> >>> j...@confluent.io
> >>> >> >
> >>> >> > > >> wrote:
> >>> >> > > >> >> > >
> >>> >> > > >> >> > > > Hey Beckett,
> >>> >> > > >> >> > > >
> >>> >> > > >> >> > > > I was proposing splitting up the KIP just for
> >>> simplicity of
> >>> >> > > >> >> discussion.
> >>> >> > > >> >> > > You
> >>> >> > > >> >> > > > can still implement them in one patch. I think
> >>> otherwise it
> >>> >> > > will
> >>> >> > > >> be
> >>> >> > > >> >> > hard
> >>> >> > > >> >> > > to
> >>> >> > > >> >> > > > discuss/vote on them since if you like the offset
> >>> proposal
> >>> >> > but
> >>> >> > > not
> >>> >> > > >> >> the
> >>> >> > > >> >> > > time
> >>> >> > > >> >> > > > proposal what do you do?
> >>> >> > > >> >> > > >
> >>> >> > > >> >> > > > Introducing a second notion of time into Kafka is a
> >>> pretty
> >>> >> > > massive
> >>> >> > > >> >> > > > philosophical change so it kind of warrants it's own
> >>> KIP I
> >>> >> > > think
> >>> >> > > >> it
> >>> >> > > >> >> > isn't
> >>> >> > > >> >> > > > just "Change message format".
> >>> >> > > >> >> > > >
> >>> >> > > >> >> > > > WRT time I think one thing to clarify in the
> proposal
> >>> is
> >>> >> how
> >>> >> > MM
> >>> >> > > >> will
> >>> >> > > >> >> > have
> >>> >> > > >> >> > > > access to set the timestamp? Presumably this will be
> >>> a new
> >>> >> > > field
> >>> >> > > >> in
> >>> >> > > >> >> > > > ProducerRecord, right? If so then any user can set
> the
> >>> >> > > timestamp,
> >>> >> > > >> >> > right?
> >>> >> > > >> >> > > > I'm not sure you answered the questions around how
> >>> this
> >>> >> will
> >>> >> > > work
> >>> >> > > >> for
> >>> >> > > >> >> > MM
> >>> >> > > >> >> > > > since when MM retains timestamps from multiple
> >>> partitions
> >>> >> > they
> >>> >> > > >> will
> >>> >> > > >> >> > then
> >>> >> > > >> >> > > be
> >>> >> > > >> >> > > > out of order and in the past (so the
> >>> >> > max(lastAppendedTimestamp,
> >>> >> > > >> >> > > > currentTimeMillis) override you proposed will not
> >>> work,
> >>> >> > > right?).
> >>> >> > > >> If
> >>> >> > > >> >> we
> >>> >> > > >> >> > > > don't do this then when you set up mirroring the
> data
> >>> will
> >>> >> > all
> >>> >> > > be
> >>> >> > > >> new
> >>> >> > > >> >> > and
> >>> >> > > >> >> > > > you have the same retention problem you described.
> >>> Maybe I
> >>> >> > > missed
> >>> >> > > >> >> > > > something...?
> >>> >> > > >> >> > > >
> >>> >> > > >> >> > > > My main motivation is that given that both Samza and
> >>> Kafka
> >>> >> > > streams
> >>> >> > > >> >> are
> >>> >> > > >> >> > > > doing work that implies a mandatory client-defined
> >>> notion
> >>> >> of
> >>> >> > > >> time, I
> >>> >> > > >> >> > > really
> >>> >> > > >> >> > > > think introducing a different mandatory notion of
> >>> time in
> >>> >> > > Kafka is
> >>> >> > > >> >> > going
> >>> >> > > >> >> > > to
> >>> >> > > >> >> > > > be quite odd. We should think hard about how
> >>> client-defined
> >>> >> > > time
> >>> >> > > >> >> could
> >>> >> > > >> >> > > > work. I'm not sure if it can, but I'm also not sure
> >>> that it
> >>> >> > > can't.
> >>> >> > > >> >> > Having
> >>> >> > > >> >> > > > both will be odd. Did you chat about this with
> >>> Yi/Kartik on
> >>> >> > the
> >>> >> > > >> Samza
> >>> >> > > >> >> > > side?
> >>> >> > > >> >> > > >
> >>> >> > > >> >> > > > When you are saying it won't work you are assuming
> >>> some
> >>> >> > > particular
> >>> >> > > >> >> > > > implementation? Maybe that the index is a
> >>> monotonically
> >>> >> > > increasing
> >>> >> > > >> >> set
> >>> >> > > >> >> > of
> >>> >> > > >> >> > > > pointers to the least record with a timestamp larger
> >>> than
> >>> >> the
> >>> >> > > >> index
> >>> >> > > >> >> > time?
> >>> >> > > >> >> > > > In other words a search for time X gives the largest
> >>> offset
> >>> >> > at
> >>> >> > > >> which
> >>> >> > > >> >> > all
> >>> >> > > >> >> > > > records are <= X?
> >>> >> > > >> >> > > >
> >>> >> > > >> >> > > > For retention, I agree with the problem you point
> >>> out, but
> >>> >> I
> >>> >> > > think
> >>> >> > > >> >> what
> >>> >> > > >> >> > > you
> >>> >> > > >> >> > > > are saying in that case is that you want a size
> limit
> >>> too.
> >>> >> If
> >>> >> > > you
> >>> >> > > >> use
> >>> >> > > >> >> > > > system time you actually hit the same problem: say
> >>> you do a
> >>> >> > > full
> >>> >> > > >> dump
> >>> >> > > >> >> > of
> >>> >> > > >> >> > > a
> >>> >> > > >> >> > > > DB table with a setting of 7 days retention, your
> >>> retention
> >>> >> > > will
> >>> >> > > >> >> > actually
> >>> >> > > >> >> > > > not get enforced for the first 7 days because the
> >>> data is
> >>> >> > "new
> >>> >> > > to
> >>> >> > > >> >> > Kafka".
> >>> >> > > >> >> > > >
> >>> >> > > >> >> > > > -Jay
> >>> >> > > >> >> > > >
> >>> >> > > >> >> > > >
> >>> >> > > >> >> > > > On Mon, Sep 7, 2015 at 10:44 AM, Jiangjie Qin
> >>> >> > > >> >> > <j...@linkedin.com.invalid
> >>> >> > > >> >> > > >
> >>> >> > > >> >> > > > wrote:
> >>> >> > > >> >> > > >
> >>> >> > > >> >> > > > > Jay,
> >>> >> > > >> >> > > > >
> >>> >> > > >> >> > > > > Thanks for the comments. Yes, there are actually
> >>> three
> >>> >> > > >> proposals as
> >>> >> > > >> >> > you
> >>> >> > > >> >> > > > > pointed out.
> >>> >> > > >> >> > > > >
> >>> >> > > >> >> > > > > We will have a separate proposal for (1) - version
> >>> >> control
> >>> >> > > >> >> mechanism.
> >>> >> > > >> >> > > We
> >>> >> > > >> >> > > > > actually thought about whether we want to separate
> >>> 2 and
> >>> >> 3
> >>> >> > > >> >> internally
> >>> >> > > >> >> > > > > before creating the KIP. The reason we put 2 and 3
> >>> >> together
> >>> >> > > is
> >>> >> > > >> it
> >>> >> > > >> >> > will
> >>> >> > > >> >> > > > > saves us another cross board wire protocol change.
> >>> Like
> >>> >> you
> >>> >> > > >> said,
> >>> >> > > >> >> we
> >>> >> > > >> >> > > have
> >>> >> > > >> >> > > > > to migrate all the clients in all languages. To
> some
> >>> >> > extent,
> >>> >> > > the
> >>> >> > > >> >> > effort
> >>> >> > > >> >> > > > to
> >>> >> > > >> >> > > > > spend on upgrading the clients can be even bigger
> >>> than
> >>> >> > > >> implementing
> >>> >> > > >> >> > the
> >>> >> > > >> >> > > > new
> >>> >> > > >> >> > > > > feature itself. So there are some attractions if
> we
> >>> can
> >>> >> do
> >>> >> > 2
> >>> >> > > >> and 3
> >>> >> > > >> >> > > > together
> >>> >> > > >> >> > > > > instead of separately. Maybe after (1) is done it
> >>> will be
> >>> >> > > >> easier to
> >>> >> > > >> >> > do
> >>> >> > > >> >> > > > > protocol migration. But if we are able to come to
> an
> >>> >> > > agreement
> >>> >> > > >> on
> >>> >> > > >> >> the
> >>> >> > > >> >> > > > > timestamp solution, I would prefer to have it
> >>> together
> >>> >> with
> >>> >> > > >> >> relative
> >>> >> > > >> >> > > > offset
> >>> >> > > >> >> > > > > in the interest of avoiding another wire protocol
> >>> change
> >>> >> > (the
> >>> >> > > >> >> process
> >>> >> > > >> >> > > to
> >>> >> > > >> >> > > > > migrate to relative offset is exactly the same as
> >>> migrate
> >>> >> > to
> >>> >> > > >> >> message
> >>> >> > > >> >> > > with
> >>> >> > > >> >> > > > > timestamp).
> >>> >> > > >> >> > > > >
> >>> >> > > >> >> > > > > In terms of timestamp. I completely agree that
> >>> having
> >>> >> > client
> >>> >> > > >> >> > timestamp
> >>> >> > > >> >> > > is
> >>> >> > > >> >> > > > > more useful if we can make sure the timestamp is
> >>> good.
> >>> >> But
> >>> >> > in
> >>> >> > > >> >> reality
> >>> >> > > >> >> > > > that
> >>> >> > > >> >> > > > > can be a really big *IF*. I think the problem is
> >>> exactly
> >>> >> as
> >>> >> > > Ewen
> >>> >> > > >> >> > > > mentioned,
> >>> >> > > >> >> > > > > if we let the client to set the timestamp, it
> would
> >>> be
> >>> >> very
> >>> >> > > hard
> >>> >> > > >> >> for
> >>> >> > > >> >> > > the
> >>> >> > > >> >> > > > > broker to utilize it. If broker apply retention
> >>> policy
> >>> >> > based
> >>> >> > > on
> >>> >> > > >> the
> >>> >> > > >> >> > > > client
> >>> >> > > >> >> > > > > timestamp. One misbehave producer can potentially
> >>> >> > completely
> >>> >> > > >> mess
> >>> >> > > >> >> up
> >>> >> > > >> >> > > the
> >>> >> > > >> >> > > > > retention policy on the broker. Although people
> >>> don't
> >>> >> care
> >>> >> > > about
> >>> >> > > >> >> > server
> >>> >> > > >> >> > > > > side timestamp. People do care a lot when
> timestamp
> >>> >> breaks.
> >>> >> > > >> >> Searching
> >>> >> > > >> >> > > by
> >>> >> > > >> >> > > > > timestamp is a really important use case even
> >>> though it
> >>> >> is
> >>> >> > > not
> >>> >> > > >> used
> >>> >> > > >> >> > as
> >>> >> > > >> >> > > > > often as searching by offset. It has significant
> >>> direct
> >>> >> > > impact
> >>> >> > > >> on
> >>> >> > > >> >> RTO
> >>> >> > > >> >> > > > when
> >>> >> > > >> >> > > > > there is a cross cluster failover as Todd
> mentioned.
> >>> >> > > >> >> > > > >
> >>> >> > > >> >> > > > > The trick using max(lastAppendedTimestamp,
> >>> >> > currentTimeMillis)
> >>> >> > > >> is to
> >>> >> > > >> >> > > > > guarantee monotonic increase of the timestamp.
> Many
> >>> >> > > commercial
> >>> >> > > >> >> system
> >>> >> > > >> >> > > > > actually do something similar to this to solve the
> >>> time
> >>> >> > skew.
> >>> >> > > >> About
> >>> >> > > >> >> > > > > changing the time, I am not sure if people use NTP
> >>> like
> >>> >> > > using a
> >>> >> > > >> >> watch
> >>> >> > > >> >> > > to
> >>> >> > > >> >> > > > > just set it forward/backward by an hour or so. The
> >>> time
> >>> >> > > >> adjustment
> >>> >> > > >> >> I
> >>> >> > > >> >> > > used
> >>> >> > > >> >> > > > > to do is typically to adjust something like a
> >>> minute  /
> >>> >> > > week. So
> >>> >> > > >> >> for
> >>> >> > > >> >> > > each
> >>> >> > > >> >> > > > > second, there might be a few mircoseconds
> >>> slower/faster
> >>> >> but
> >>> >> > > >> should
> >>> >> > > >> >> > not
> >>> >> > > >> >> > > > > break the clock completely to make sure all the
> >>> >> time-based
> >>> >> > > >> >> > transactions
> >>> >> > > >> >> > > > are
> >>> >> > > >> >> > > > > not affected. The one minute change will be done
> >>> within a
> >>> >> > > week
> >>> >> > > >> but
> >>> >> > > >> >> > not
> >>> >> > > >> >> > > > > instantly.
> >>> >> > > >> >> > > > >
> >>> >> > > >> >> > > > > Personally, I think having client side timestamp
> >>> will be
> >>> >> > > useful
> >>> >> > > >> if
> >>> >> > > >> >> we
> >>> >> > > >> >> > > > don't
> >>> >> > > >> >> > > > > need to put the broker and data integrity under
> >>> risk. If
> >>> >> we
> >>> >> > > >> have to
> >>> >> > > >> >> > > > choose
> >>> >> > > >> >> > > > > from one of them but not both. I would prefer
> >>> server side
> >>> >> > > >> timestamp
> >>> >> > > >> >> > > > because
> >>> >> > > >> >> > > > > for client side timestamp there is always a plan B
> >>> which
> >>> >> is
> >>> >> > > >> putting
> >>> >> > > >> >> > the
> >>> >> > > >> >> > > > > timestamp into payload.
> >>> >> > > >> >> > > > >
> >>> >> > > >> >> > > > > Another reason I am reluctant to use the client
> side
> >>> >> > > timestamp
> >>> >> > > >> is
> >>> >> > > >> >> > that
> >>> >> > > >> >> > > it
> >>> >> > > >> >> > > > > is always dangerous to mix the control plane with
> >>> data
> >>> >> > > plane. IP
> >>> >> > > >> >> did
> >>> >> > > >> >> > > this
> >>> >> > > >> >> > > > > and it has caused so many different breaches so
> >>> people
> >>> >> are
> >>> >> > > >> >> migrating
> >>> >> > > >> >> > to
> >>> >> > > >> >> > > > > something like MPLS. An example in Kafka is that
> any
> >>> >> client
> >>> >> > > can
> >>> >> > > >> >> > > > construct a
> >>> >> > > >> >> > > > >
> >>> >> > > >>
> >>> LeaderAndIsrRequest/UpdateMetadataRequest/ContorlledShutdownRequest
> >>> >> > > >> >> > > (you
> >>> >> > > >> >> > > > > name it) and send it to the broker to mess up the
> >>> entire
> >>> >> > > >> cluster,
> >>> >> > > >> >> > also
> >>> >> > > >> >> > > as
> >>> >> > > >> >> > > > > we already noticed a busy cluster can respond
> quite
> >>> slow
> >>> >> to
> >>> >> > > >> >> > controller
> >>> >> > > >> >> > > > > messages. So it would really be nice if we can
> avoid
> >>> >> giving
> >>> >> > > the
> >>> >> > > >> >> power
> >>> >> > > >> >> > > to
> >>> >> > > >> >> > > > > clients to control the log retention.
> >>> >> > > >> >> > > > >
> >>> >> > > >> >> > > > > Thanks,
> >>> >> > > >> >> > > > >
> >>> >> > > >> >> > > > > Jiangjie (Becket) Qin
> >>> >> > > >> >> > > > >
> >>> >> > > >> >> > > > >
> >>> >> > > >> >> > > > > On Sun, Sep 6, 2015 at 9:54 PM, Todd Palino <
> >>> >> > > tpal...@gmail.com>
> >>> >> > > >> >> > wrote:
> >>> >> > > >> >> > > > >
> >>> >> > > >> >> > > > > > So, with regards to why you want to search by
> >>> >> timestamp,
> >>> >> > > the
> >>> >> > > >> >> > biggest
> >>> >> > > >> >> > > > > > problem I've seen is with consumers who want to
> >>> reset
> >>> >> > their
> >>> >> > > >> >> > > timestamps
> >>> >> > > >> >> > > > > to a
> >>> >> > > >> >> > > > > > specific point, whether it is to replay a
> certain
> >>> >> amount
> >>> >> > of
> >>> >> > > >> >> > messages,
> >>> >> > > >> >> > > > or
> >>> >> > > >> >> > > > > to
> >>> >> > > >> >> > > > > > rewind to before some problem state existed.
> This
> >>> >> happens
> >>> >> > > more
> >>> >> > > >> >> > often
> >>> >> > > >> >> > > > than
> >>> >> > > >> >> > > > > > anyone would like.
> >>> >> > > >> >> > > > > >
> >>> >> > > >> >> > > > > > To handle this now we need to constantly export
> >>> the
> >>> >> > > broker's
> >>> >> > > >> >> offset
> >>> >> > > >> >> > > for
> >>> >> > > >> >> > > > > > every partition to a time-series database and
> >>> then use
> >>> >> > > >> external
> >>> >> > > >> >> > > > processes
> >>> >> > > >> >> > > > > > to query this. I know we're not the only ones
> >>> doing
> >>> >> this.
> >>> >> > > The
> >>> >> > > >> way
> >>> >> > > >> >> > the
> >>> >> > > >> >> > > > > > broker handles requests for offsets by timestamp
> >>> is a
> >>> >> > > little
> >>> >> > > >> >> obtuse
> >>> >> > > >> >> > > > > > (explain it to anyone without intimate knowledge
> >>> of the
> >>> >> > > >> internal
> >>> >> > > >> >> > > > workings
> >>> >> > > >> >> > > > > > of the broker - every time I do I see this). In
> >>> >> addition,
> >>> >> > > as
> >>> >> > > >> >> Becket
> >>> >> > > >> >> > > > > pointed
> >>> >> > > >> >> > > > > > out, it causes problems specifically with
> >>> retention of
> >>> >> > > >> messages
> >>> >> > > >> >> by
> >>> >> > > >> >> > > time
> >>> >> > > >> >> > > > > > when you move partitions around.
> >>> >> > > >> >> > > > > >
> >>> >> > > >> >> > > > > > I'm deliberately avoiding the discussion of what
> >>> >> > timestamp
> >>> >> > > to
> >>> >> > > >> >> use.
> >>> >> > > >> >> > I
> >>> >> > > >> >> > > > can
> >>> >> > > >> >> > > > > > see the argument either way, though I tend to
> lean
> >>> >> > towards
> >>> >> > > the
> >>> >> > > >> >> idea
> >>> >> > > >> >> > > > that
> >>> >> > > >> >> > > > > > the broker timestamp is the only viable source
> of
> >>> truth
> >>> >> > in
> >>> >> > > >> this
> >>> >> > > >> >> > > > > situation.
> >>> >> > > >> >> > > > > >
> >>> >> > > >> >> > > > > > -Todd
> >>> >> > > >> >> > > > > >
> >>> >> > > >> >> > > > > >
> >>> >> > > >> >> > > > > > On Sun, Sep 6, 2015 at 7:08 PM, Ewen
> >>> Cheslack-Postava <
> >>> >> > > >> >> > > > e...@confluent.io
> >>> >> > > >> >> > > > > >
> >>> >> > > >> >> > > > > > wrote:
> >>> >> > > >> >> > > > > >
> >>> >> > > >> >> > > > > > > On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps <
> >>> >> > > j...@confluent.io
> >>> >> > > >> >
> >>> >> > > >> >> > > wrote:
> >>> >> > > >> >> > > > > > >
> >>> >> > > >> >> > > > > > > >
> >>> >> > > >> >> > > > > > > > 2. Nobody cares what time it is on the
> server.
> >>> >> > > >> >> > > > > > > >
> >>> >> > > >> >> > > > > > >
> >>> >> > > >> >> > > > > > > This is a good way of summarizing the issue I
> >>> was
> >>> >> > trying
> >>> >> > > to
> >>> >> > > >> get
> >>> >> > > >> >> > at,
> >>> >> > > >> >> > > > > from
> >>> >> > > >> >> > > > > > an
> >>> >> > > >> >> > > > > > > app's perspective. Of the 3 stated goals of
> the
> >>> KIP,
> >>> >> #2
> >>> >> > > (lot
> >>> >> > > >> >> > > > retention)
> >>> >> > > >> >> > > > > > is
> >>> >> > > >> >> > > > > > > reasonably handled by a server-side
> timestamp. I
> >>> >> really
> >>> >> > > just
> >>> >> > > >> >> care
> >>> >> > > >> >> > > > that
> >>> >> > > >> >> > > > > a
> >>> >> > > >> >> > > > > > > message is there long enough that I have a
> >>> chance to
> >>> >> > > process
> >>> >> > > >> >> it.
> >>> >> > > >> >> > #3
> >>> >> > > >> >> > > > > > > (searching by timestamp) only seems useful if
> >>> we can
> >>> >> > > >> guarantee
> >>> >> > > >> >> > the
> >>> >> > > >> >> > > > > > > server-side timestamp is close enough to the
> >>> original
> >>> >> > > >> >> client-side
> >>> >> > > >> >> > > > > > > timestamp, and any mirror maker step seems to
> >>> break
> >>> >> > that
> >>> >> > > >> (even
> >>> >> > > >> >> > > > ignoring
> >>> >> > > >> >> > > > > > any
> >>> >> > > >> >> > > > > > > issues with broker availability).
> >>> >> > > >> >> > > > > > >
> >>> >> > > >> >> > > > > > > I'm also wondering whether optimizing for
> >>> >> > > >> search-by-timestamp
> >>> >> > > >> >> on
> >>> >> > > >> >> > > the
> >>> >> > > >> >> > > > > > broker
> >>> >> > > >> >> > > > > > > is really something we want to do given that
> >>> messages
> >>> >> > > aren't
> >>> >> > > >> >> > really
> >>> >> > > >> >> > > > > > > guaranteed to be ordered by application-level
> >>> >> > timestamps
> >>> >> > > on
> >>> >> > > >> the
> >>> >> > > >> >> > > > broker.
> >>> >> > > >> >> > > > > > Is
> >>> >> > > >> >> > > > > > > part of the need for this just due to the
> >>> current
> >>> >> > > consumer
> >>> >> > > >> APIs
> >>> >> > > >> >> > > being
> >>> >> > > >> >> > > > > > > difficult to work with? For example, could you
> >>> >> > implement
> >>> >> > > >> this
> >>> >> > > >> >> > > pretty
> >>> >> > > >> >> > > > > > easily
> >>> >> > > >> >> > > > > > > client side just the way you would
> broker-side?
> >>> I'd
> >>> >> > > imagine
> >>> >> > > >> a
> >>> >> > > >> >> > > couple
> >>> >> > > >> >> > > > of
> >>> >> > > >> >> > > > > > > random seeks + reads during very rare
> occasions
> >>> (i.e.
> >>> >> > > when
> >>> >> > > >> the
> >>> >> > > >> >> > app
> >>> >> > > >> >> > > > > starts
> >>> >> > > >> >> > > > > > > up) wouldn't be a problem performance-wise. Or
> >>> is it
> >>> >> > also
> >>> >> > > >> that
> >>> >> > > >> >> > you
> >>> >> > > >> >> > > > need
> >>> >> > > >> >> > > > > > the
> >>> >> > > >> >> > > > > > > broker to enforce things like monotonically
> >>> >> increasing
> >>> >> > > >> >> timestamps
> >>> >> > > >> >> > > > since
> >>> >> > > >> >> > > > > > you
> >>> >> > > >> >> > > > > > > can't do the query properly and efficiently
> >>> without
> >>> >> > that
> >>> >> > > >> >> > guarantee,
> >>> >> > > >> >> > > > and
> >>> >> > > >> >> > > > > > > therefore what applications are actually
> >>> looking for
> >>> >> > *is*
> >>> >> > > >> >> > > broker-side
> >>> >> > > >> >> > > > > > > timestamps?
> >>> >> > > >> >> > > > > > >
> >>> >> > > >> >> > > > > > > -Ewen
> >>> >> > > >> >> > > > > > >
> >>> >> > > >> >> > > > > > >
> >>> >> > > >> >> > > > > > >
> >>> >> > > >> >> > > > > > > > Consider cases where data is being copied
> >>> from a
> >>> >> > > database
> >>> >> > > >> or
> >>> >> > > >> >> > from
> >>> >> > > >> >> > > > log
> >>> >> > > >> >> > > > > > > > files. In steady-state the server time is
> very
> >>> >> close
> >>> >> > to
> >>> >> > > >> the
> >>> >> > > >> >> > > client
> >>> >> > > >> >> > > > > time
> >>> >> > > >> >> > > > > > > if
> >>> >> > > >> >> > > > > > > > their clocks are sync'd (see 1) but there
> >>> will be
> >>> >> > > times of
> >>> >> > > >> >> > large
> >>> >> > > >> >> > > > > > > divergence
> >>> >> > > >> >> > > > > > > > when the copying process is stopped or falls
> >>> >> behind.
> >>> >> > > When
> >>> >> > > >> >> this
> >>> >> > > >> >> > > > occurs
> >>> >> > > >> >> > > > > > it
> >>> >> > > >> >> > > > > > > is
> >>> >> > > >> >> > > > > > > > clear that the time the data arrived on the
> >>> server
> >>> >> is
> >>> >> > > >> >> > irrelevant,
> >>> >> > > >> >> > > > it
> >>> >> > > >> >> > > > > is
> >>> >> > > >> >> > > > > > > the
> >>> >> > > >> >> > > > > > > > source timestamp that matters. This is the
> >>> problem
> >>> >> > you
> >>> >> > > are
> >>> >> > > >> >> > trying
> >>> >> > > >> >> > > > to
> >>> >> > > >> >> > > > > > fix
> >>> >> > > >> >> > > > > > > by
> >>> >> > > >> >> > > > > > > > retaining the mm timestamp but really the
> >>> client
> >>> >> > should
> >>> >> > > >> >> always
> >>> >> > > >> >> > > set
> >>> >> > > >> >> > > > > the
> >>> >> > > >> >> > > > > > > time
> >>> >> > > >> >> > > > > > > > with the use of server-side time as a
> >>> fallback. It
> >>> >> > > would
> >>> >> > > >> be
> >>> >> > > >> >> > worth
> >>> >> > > >> >> > > > > > talking
> >>> >> > > >> >> > > > > > > > to the Samza folks and reading through this
> >>> blog
> >>> >> > post (
> >>> >> > > >> >> > > > > > > >
> >>> >> > > >> >> > > > > > >
> >>> >> > > >> >> > > > > >
> >>> >> > > >> >> > > > >
> >>> >> > > >> >> > > >
> >>> >> > > >> >> > >
> >>> >> > > >> >> >
> >>> >> > > >> >>
> >>> >> > > >>
> >>> >> > >
> >>> >> >
> >>> >>
> >>>
> http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html
> >>> >> > > >> >> > > > > > > > )
> >>> >> > > >> >> > > > > > > > on this subject since we went through
> similar
> >>> >> > > learnings on
> >>> >> > > >> >> the
> >>> >> > > >> >> > > > stream
> >>> >> > > >> >> > > > > > > > processing side.
> >>> >> > > >> >> > > > > > > >
> >>> >> > > >> >> > > > > > > > I think the implication of these two is that
> >>> we
> >>> >> need
> >>> >> > a
> >>> >> > > >> >> proposal
> >>> >> > > >> >> > > > that
> >>> >> > > >> >> > > > > > > > handles potentially very out-of-order
> >>> timestamps in
> >>> >> > > some
> >>> >> > > >> kind
> >>> >> > > >> >> > of
> >>> >> > > >> >> > > > > sanish
> >>> >> > > >> >> > > > > > > way
> >>> >> > > >> >> > > > > > > > (buggy clients will set something totally
> >>> wrong as
> >>> >> > the
> >>> >> > > >> time).
> >>> >> > > >> >> > > > > > > >
> >>> >> > > >> >> > > > > > > > -Jay
> >>> >> > > >> >> > > > > > > >
> >>> >> > > >> >> > > > > > > > On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps <
> >>> >> > > >> j...@confluent.io>
> >>> >> > > >> >> > > > wrote:
> >>> >> > > >> >> > > > > > > >
> >>> >> > > >> >> > > > > > > > > The magic byte is used to version message
> >>> format
> >>> >> so
> >>> >> > > >> we'll
> >>> >> > > >> >> > need
> >>> >> > > >> >> > > to
> >>> >> > > >> >> > > > > > make
> >>> >> > > >> >> > > > > > > > > sure that check is in place--I actually
> >>> don't see
> >>> >> > it
> >>> >> > > in
> >>> >> > > >> the
> >>> >> > > >> >> > > > current
> >>> >> > > >> >> > > > > > > > > consumer code which I think is a bug we
> >>> should
> >>> >> fix
> >>> >> > > for
> >>> >> > > >> the
> >>> >> > > >> >> > next
> >>> >> > > >> >> > > > > > release
> >>> >> > > >> >> > > > > > > > > (filed KAFKA-2523). The purpose of that
> >>> field is
> >>> >> so
> >>> >> > > >> there
> >>> >> > > >> >> is
> >>> >> > > >> >> > a
> >>> >> > > >> >> > > > > clear
> >>> >> > > >> >> > > > > > > > check
> >>> >> > > >> >> > > > > > > > > on the format rather than the scrambled
> >>> scenarios
> >>> >> > > Becket
> >>> >> > > >> >> > > > describes.
> >>> >> > > >> >> > > > > > > > >
> >>> >> > > >> >> > > > > > > > > Also, Becket, I don't think just fixing
> the
> >>> java
> >>> >> > > client
> >>> >> > > >> is
> >>> >> > > >> >> > > > > sufficient
> >>> >> > > >> >> > > > > > > as
> >>> >> > > >> >> > > > > > > > > that would break other clients--i.e. if
> >>> anyone
> >>> >> > > writes a
> >>> >> > > >> v1
> >>> >> > > >> >> > > > > messages,
> >>> >> > > >> >> > > > > > > even
> >>> >> > > >> >> > > > > > > > > by accident, any non-v1-capable consumer
> >>> will
> >>> >> > break.
> >>> >> > > I
> >>> >> > > >> >> think
> >>> >> > > >> >> > we
> >>> >> > > >> >> > > > > > > probably
> >>> >> > > >> >> > > > > > > > > need a way to have the server ensure a
> >>> particular
> >>> >> > > >> message
> >>> >> > > >> >> > > format
> >>> >> > > >> >> > > > > > either
> >>> >> > > >> >> > > > > > > > at
> >>> >> > > >> >> > > > > > > > > read or write time.
> >>> >> > > >> >> > > > > > > > >
> >>> >> > > >> >> > > > > > > > > -Jay
> >>> >> > > >> >> > > > > > > > >
> >>> >> > > >> >> > > > > > > > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie
> Qin
> >>> >> > > >> >> > > > > > <j...@linkedin.com.invalid
> >>> >> > > >> >> > > > > > > >
> >>> >> > > >> >> > > > > > > > > wrote:
> >>> >> > > >> >> > > > > > > > >
> >>> >> > > >> >> > > > > > > > >> Hi Guozhang,
> >>> >> > > >> >> > > > > > > > >>
> >>> >> > > >> >> > > > > > > > >> I checked the code again. Actually CRC
> >>> check
> >>> >> > > probably
> >>> >> > > >> >> won't
> >>> >> > > >> >> > > > fail.
> >>> >> > > >> >> > > > > > The
> >>> >> > > >> >> > > > > > > > >> newly
> >>> >> > > >> >> > > > > > > > >> added timestamp field might be treated as
> >>> >> > keyLength
> >>> >> > > >> >> instead,
> >>> >> > > >> >> > > so
> >>> >> > > >> >> > > > we
> >>> >> > > >> >> > > > > > are
> >>> >> > > >> >> > > > > > > > >> likely to receive an
> >>> IllegalArgumentException
> >>> >> when
> >>> >> > > try
> >>> >> > > >> to
> >>> >> > > >> >> > read
> >>> >> > > >> >> > > > the
> >>> >> > > >> >> > > > > > > key.
> >>> >> > > >> >> > > > > > > > >> I'll update the KIP.
> >>> >> > > >> >> > > > > > > > >>
> >>> >> > > >> >> > > > > > > > >> Thanks,
> >>> >> > > >> >> > > > > > > > >>
> >>> >> > > >> >> > > > > > > > >> Jiangjie (Becket) Qin
> >>> >> > > >> >> > > > > > > > >>
> >>> >> > > >> >> > > > > > > > >> On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie
> >>> Qin <
> >>> >> > > >> >> > > > j...@linkedin.com>
> >>> >> > > >> >> > > > > > > > wrote:
> >>> >> > > >> >> > > > > > > > >>
> >>> >> > > >> >> > > > > > > > >> > Hi, Guozhang,
> >>> >> > > >> >> > > > > > > > >> >
> >>> >> > > >> >> > > > > > > > >> > Thanks for reading the KIP. By "old
> >>> >> consumer", I
> >>> >> > > >> meant
> >>> >> > > >> >> the
> >>> >> > > >> >> > > > > > > > >> > ZookeeperConsumerConnector in trunk
> now,
> >>> i.e.
> >>> >> > > without
> >>> >> > > >> >> this
> >>> >> > > >> >> > > bug
> >>> >> > > >> >> > > > > > > fixed.
> >>> >> > > >> >> > > > > > > > >> If we
> >>> >> > > >> >> > > > > > > > >> > fix the ZookeeperConsumerConnector then
> >>> it
> >>> >> will
> >>> >> > > throw
> >>> >> > > >> >> > > > exception
> >>> >> > > >> >> > > > > > > > >> complaining
> >>> >> > > >> >> > > > > > > > >> > about the unsupported version when it
> >>> sees
> >>> >> > message
> >>> >> > > >> >> format
> >>> >> > > >> >> > > V1.
> >>> >> > > >> >> > > > > > What I
> >>> >> > > >> >> > > > > > > > was
> >>> >> > > >> >> > > > > > > > >> > trying to say is that if we have some
> >>> >> > > >> >> > > > ZookeeperConsumerConnector
> >>> >> > > >> >> > > > > > > > running
> >>> >> > > >> >> > > > > > > > >> > without the fix, the consumer will
> >>> complain
> >>> >> > about
> >>> >> > > CRC
> >>> >> > > >> >> > > mismatch
> >>> >> > > >> >> > > > > > > instead
> >>> >> > > >> >> > > > > > > > >> of
> >>> >> > > >> >> > > > > > > > >> > unsupported version.
> >>> >> > > >> >> > > > > > > > >> >
> >>> >> > > >> >> > > > > > > > >> > Thanks,
> >>> >> > > >> >> > > > > > > > >> >
> >>> >> > > >> >> > > > > > > > >> > Jiangjie (Becket) Qin
> >>> >> > > >> >> > > > > > > > >> >
> >>> >> > > >> >> > > > > > > > >> > On Thu, Sep 3, 2015 at 12:15 PM,
> Guozhang
> >>> >> Wang <
> >>> >> > > >> >> > > > > > wangg...@gmail.com>
> >>> >> > > >> >> > > > > > > > >> wrote:
> >>> >> > > >> >> > > > > > > > >> >
> >>> >> > > >> >> > > > > > > > >> >> Thanks for the write-up Jiangjie.
> >>> >> > > >> >> > > > > > > > >> >>
> >>> >> > > >> >> > > > > > > > >> >> One comment about migration plan: "For
> >>> old
> >>> >> > > >> consumers,
> >>> >> > > >> >> if
> >>> >> > > >> >> > > they
> >>> >> > > >> >> > > > > see
> >>> >> > > >> >> > > > > > > the
> >>> >> > > >> >> > > > > > > > >> new
> >>> >> > > >> >> > > > > > > > >> >> protocol the CRC check will fail"..
> >>> >> > > >> >> > > > > > > > >> >>
> >>> >> > > >> >> > > > > > > > >> >> Do you mean this bug in the old
> consumer
> >>> >> cannot
> >>> >> > > be
> >>> >> > > >> >> fixed
> >>> >> > > >> >> > > in a
> >>> >> > > >> >> > > > > > > > >> >> backward-compatible way?
> >>> >> > > >> >> > > > > > > > >> >>
> >>> >> > > >> >> > > > > > > > >> >> Guozhang
> >>> >> > > >> >> > > > > > > > >> >>
> >>> >> > > >> >> > > > > > > > >> >>
> >>> >> > > >> >> > > > > > > > >> >> On Thu, Sep 3, 2015 at 8:35 AM,
> >>> Jiangjie Qin
> >>> >> > > >> >> > > > > > > > <j...@linkedin.com.invalid
> >>> >> > > >> >> > > > > > > > >> >
> >>> >> > > >> >> > > > > > > > >> >> wrote:
> >>> >> > > >> >> > > > > > > > >> >>
> >>> >> > > >> >> > > > > > > > >> >> > Hi,
> >>> >> > > >> >> > > > > > > > >> >> >
> >>> >> > > >> >> > > > > > > > >> >> > We just created KIP-31 to propose a
> >>> message
> >>> >> > > format
> >>> >> > > >> >> > change
> >>> >> > > >> >> > > > in
> >>> >> > > >> >> > > > > > > Kafka.
> >>> >> > > >> >> > > > > > > > >> >> >
> >>> >> > > >> >> > > > > > > > >> >> >
> >>> >> > > >> >> > > > > > > > >> >> >
> >>> >> > > >> >> > > > > > > > >> >>
> >>> >> > > >> >> > > > > > > > >>
> >>> >> > > >> >> > > > > > > >
> >>> >> > > >> >> > > > > > >
> >>> >> > > >> >> > > > > >
> >>> >> > > >> >> > > > >
> >>> >> > > >> >> > > >
> >>> >> > > >> >> > >
> >>> >> > > >> >> >
> >>> >> > > >> >>
> >>> >> > > >>
> >>> >> > >
> >>> >> >
> >>> >>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal
> >>> >> > > >> >> > > > > > > > >> >> >
> >>> >> > > >> >> > > > > > > > >> >> > As a summary, the motivations are:
> >>> >> > > >> >> > > > > > > > >> >> > 1. Avoid server side message
> >>> re-compression
> >>> >> > > >> >> > > > > > > > >> >> > 2. Honor time-based log roll and
> >>> retention
> >>> >> > > >> >> > > > > > > > >> >> > 3. Enable offset search by timestamp
> >>> at a
> >>> >> > finer
> >>> >> > > >> >> > > > granularity.
> >>> >> > > >> >> > > > > > > > >> >> >
> >>> >> > > >> >> > > > > > > > >> >> > Feedback and comments are welcome!
> >>> >> > > >> >> > > > > > > > >> >> >
> >>> >> > > >> >> > > > > > > > >> >> > Thanks,
> >>> >> > > >> >> > > > > > > > >> >> >
> >>> >> > > >> >> > > > > > > > >> >> > Jiangjie (Becket) Qin
> >>> >> > > >> >> > > > > > > > >> >> >
> >>> >> > > >> >> > > > > > > > >> >>
> >>> >> > > >> >> > > > > > > > >> >>
> >>> >> > > >> >> > > > > > > > >> >>
> >>> >> > > >> >> > > > > > > > >> >> --
> >>> >> > > >> >> > > > > > > > >> >> -- Guozhang
> >>> >> > > >> >> > > > > > > > >> >>
> >>> >> > > >> >> > > > > > > > >> >
> >>> >> > > >> >> > > > > > > > >> >
> >>> >> > > >> >> > > > > > > > >>
> >>> >> > > >> >> > > > > > > > >
> >>> >> > > >> >> > > > > > > > >
> >>> >> > > >> >> > > > > > > >
> >>> >> > > >> >> > > > > > >
> >>> >> > > >> >> > > > > > >
> >>> >> > > >> >> > > > > > >
> >>> >> > > >> >> > > > > > > --
> >>> >> > > >> >> > > > > > > Thanks,
> >>> >> > > >> >> > > > > > > Ewen
> >>> >> > > >> >> > > > > > >
> >>> >> > > >> >> > > > > >
> >>> >> > > >> >> > > > >
> >>> >> > > >> >> > > >
> >>> >> > > >> >> > >
> >>> >> > > >> >> > >
> >>> >> > > >> >> > >
> >>> >> > > >> >> > > --
> >>> >> > > >> >> > > Thanks,
> >>> >> > > >> >> > > Neha
> >>> >> > > >> >> > >
> >>> >> > > >> >> >
> >>> >> > > >> >>
> >>> >> > > >>
> >>> >> > >
> >>> >> >
> >>> >>
> >>> >>
> >>> >>
> >>> >> --
> >>> >> Thanks,
> >>> >> Ewen
> >>> >>
> >>>
> >>>
> >>
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Reply via email to