Bump up this thread.

Just to recap, the last proposal Jay made (with some implementation details
added) was:

   1. Allow user to stamp the message when produce
   2. When broker receives a message it take a look at the difference
   between its local time and the timestamp in the message.
      - If the time difference is within a configurable
      max.message.time.difference.ms, the server will accept it and append
      it to the log.
      - If the time difference is beyond the configured
      max.message.time.difference.ms, the server will override the
      timestamp with its current local time and append the message to the log.
      - The default value of max.message.time.difference would be set to
      Long.MaxValue.
      3. The configurable time difference threshold
   max.message.time.difference.ms will be a per topic configuration.
   4. The indexed will be built so it has the following guarantee.
      - If user search by time stamp:
   - all the messages after that timestamp will be consumed.
      - user might see earlier messages.
      - The log retention will take a look at the last time index entry in
      the time index file. Because the last entry will be the latest
timestamp in
      the entire log segment. If that entry expires, the log segment will be
      deleted.
      - The log rolling has to depend on the earliest timestamp. In this
      case we may need to keep a in memory timestamp only for the
current active
      log. On recover, we will need to read the active log segment to get this
      timestamp of the earliest messages.
   5. The downside of this proposal are:
      - The timestamp might not be monotonically increasing.
      - The log retention might become non-deterministic. i.e. When a
      message will be deleted now depends on the timestamp of the
other messages
      in the same log segment. And those timestamps are provided by
user within a
      range depending on what the time difference threshold configuration is.
      - The semantic meaning of the timestamp in the messages could be a
      little bit vague because some of them come from the producer and some of
      them are overwritten by brokers.
      6. Although the proposal has some downsides, it gives user the
   flexibility to use the timestamp.
   - If the threshold is set to Long.MaxValue. The timestamp in the message
      is equivalent to CreateTime.
      - If the threshold is set to 0. The timestamp in the message is
      equivalent to LogAppendTime.

This proposal actually allows user to use either CreateTime or
LogAppendTime without introducing two timestamp concept at the same time. I
have updated the wiki for KIP-32 and KIP-33 with this proposal.

One thing I am thinking is that instead of having a time difference
threshold, should we simply set have a TimestampType configuration? Because
in most cases, people will either set the threshold to 0 or Long.MaxValue.
Setting anything in between will make the timestamp in the message
meaningless to user - user don't know if the timestamp has been overwritten
by the brokers.

Any thoughts?

Thanks,
Jiangjie (Becket) Qin

On Mon, Oct 26, 2015 at 1:23 PM, Jiangjie Qin <j...@linkedin.com> wrote:

> Hi Jay,
>
> Thanks for such detailed explanation. I think we both are trying to make
> CreateTime work for us if possible. To me by "work" it means clear
> guarantees on:
> 1. Log Retention Time enforcement.
> 2. Log Rolling time enforcement (This might be less a concern as you
> pointed out)
> 3. Application search message by time.
>
> WRT (1), I agree the expectation for log retention might be different
> depending on who we ask. But my concern is about the level of guarantee we
> give to user. My observation is that a clear guarantee to user is critical
> regardless of the mechanism we choose. And this is the subtle but important
> difference between using LogAppendTime and CreateTime.
>
> Let's say user asks this question: How long will my message stay in Kafka?
>
> If we use LogAppendTime for log retention, the answer is message will stay
> in Kafka for retention time after the message is produced (to be more
> precise, upper bounded by log.rolling.ms + log.retention.ms). User has a
> clear guarantee and they may decide whether or not to put the message into
> Kafka. Or how to adjust the retention time according to their requirements.
> If we use create time for log retention, the answer would be it depends.
> The best answer we can give is at least retention.ms because there is no
> guarantee when the messages will be deleted after that. If a message sits
> somewhere behind a larger create time, the message might stay longer than
> expected. But we don't know how longer it would be because it depends on
> the create time. In this case, it is hard for user to decide what to do.
>
> I am worrying about this because a blurring guarantee has bitten us
> before, e.g. Topic creation. We have received many questions like "why my
> topic is not there after I created it". I can imagine we receive similar
> question asking "why my message is still there after retention time has
> reached". So my understanding is that a clear and solid guarantee is better
> than having a mechanism that works in most cases but occasionally does not
> work.
>
> If we think of the retention guarantee we provide with LogAppendTime, it
> is not broken as you said, because we are telling user the log retention is
> NOT based on create time at the first place.
>
> WRT (3), no matter whether we index on LogAppendTime or CreateTime, the
> best guarantee we can provide with user is "not missing message after a
> certain timestamp". Therefore I actually really like to index on CreateTime
> because that is the timestamp we provide to user, and we can have the solid
> guarantee.
> On the other hand, indexing on LogAppendTime and giving user CreateTime
> does not provide solid guarantee when user do search based on timestamp. It
> only works when LogAppendTime is always no earlier than CreateTime. This is
> a reasonable assumption and we can easily enforce it.
>
> With above, I am not sure if we can avoid server timestamp to make log
> retention work with a clear guarantee. For searching by timestamp use case,
> I really want to have the index built on CreateTime. But with a reasonable
> assumption and timestamp enforcement, a LogAppendTime index would also work.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Thu, Oct 22, 2015 at 10:48 AM, Jay Kreps <j...@confluent.io> wrote:
>
>> Hey Becket,
>>
>> Let me see if I can address your concerns:
>>
>> 1. Let's say we have two source clusters that are mirrored to the same
>> > target cluster. For some reason one of the mirror maker from a cluster
>> dies
>> > and after fix the issue we want to resume mirroring. In this case it is
>> > possible that when the mirror maker resumes mirroring, the timestamp of
>> the
>> > messages have already gone beyond the acceptable timestamp range on
>> broker.
>> > In order to let those messages go through, we have to bump up the
>> > *max.append.delay
>> > *for all the topics on the target broker. This could be painful.
>>
>>
>> Actually what I was suggesting was different. Here is my observation:
>> clusters/topics directly produced to by applications have a valid
>> assertion
>> that log append time and create time are similar (let's call these
>> "unbuffered"); other cluster/topic such as those that receive data from a
>> database, a log file, or another kafka cluster don't have that assertion,
>> for these "buffered" clusters data can be arbitrarily late. This means any
>> use of log append time on these buffered clusters is not very meaningful,
>> and create time and log append time "should" be similar on unbuffered
>> clusters so you can probably use either.
>>
>> Using log append time on buffered clusters actually results in bad things.
>> If you request the offset for a given time you get don't end up getting
>> data for that time but rather data that showed up at that time. If you try
>> to retain 7 days of data it may mostly work but any kind of bootstrapping
>> will result in retaining much more (potentially the whole database
>> contents!).
>>
>> So what I am suggesting in terms of the use of the max.append.delay is
>> that
>> unbuffered clusters would have this set and buffered clusters would not.
>> In
>> other words, in LI terminology, tracking and metrics clusters would have
>> this enforced, aggregate and replica clusters wouldn't.
>>
>> So you DO have the issue of potentially maintaining more data than you
>> need
>> to on aggregate clusters if your mirroring skews, but you DON'T need to
>> tweak the setting as you described.
>>
>> 2. Let's say in the above scenario we let the messages in, at that point
>> > some log segments in the target cluster might have a wide range of
>> > timestamps, like Guozhang mentioned the log rolling could be tricky
>> because
>> > the first time index entry does not necessarily have the smallest
>> timestamp
>> > of all the messages in the log segment. Instead, it is the largest
>> > timestamp ever seen. We have to scan the entire log to find the message
>> > with smallest offset to see if we should roll.
>>
>>
>> I think there are two uses for time-based log rolling:
>> 1. Making the offset lookup by timestamp work
>> 2. Ensuring we don't retain data indefinitely if it is supposed to get
>> purged after 7 days
>>
>> But think about these two use cases. (1) is totally obviated by the
>> time=>offset index we are adding which yields much more granular offset
>> lookups. (2) Is actually totally broken if you switch to append time,
>> right? If you want to be sure for security/privacy reasons you only retain
>> 7 days of data then if the log append and create time diverge you actually
>> violate this requirement.
>>
>> I think 95% of people care about (1) which is solved in the proposal and
>> (2) is actually broken today as well as in both proposals.
>>
>> 3. Theoretically it is possible that an older log segment contains
>> > timestamps that are older than all the messages in a newer log segment.
>> It
>> > would be weird that we are supposed to delete the newer log segment
>> before
>> > we delete the older log segment.
>>
>>
>> The index timestamps would always be a lower bound (i.e. the maximum at
>> that time) so I don't think that is possible.
>>
>>  4. In bootstrap case, if we reload the data to a Kafka cluster, we have
>> to
>> > make sure we configure the topic correctly before we load the data.
>> > Otherwise the message might either be rejected because the timestamp is
>> too
>> > old, or it might be deleted immediately because the retention time has
>> > reached.
>>
>>
>> See (1).
>>
>> -Jay
>>
>> On Tue, Oct 13, 2015 at 7:30 PM, Jiangjie Qin <j...@linkedin.com.invalid>
>> wrote:
>>
>> > Hey Jay and Guozhang,
>> >
>> > Thanks a lot for the reply. So if I understand correctly, Jay's proposal
>> > is:
>> >
>> > 1. Let client stamp the message create time.
>> > 2. Broker build index based on client-stamped message create time.
>> > 3. Broker only takes message whose create time is withing current time
>> > plus/minus T (T is a configuration *max.append.delay*, could be topic
>> level
>> > configuration), if the timestamp is out of this range, broker rejects
>> the
>> > message.
>> > 4. Because the create time of messages can be out of order, when broker
>> > builds the time based index it only provides the guarantee that if a
>> > consumer starts consuming from the offset returned by searching by
>> > timestamp t, they will not miss any message created after t, but might
>> see
>> > some messages created before t.
>> >
>> > To build the time based index, every time when a broker needs to insert
>> a
>> > new time index entry, the entry would be {Largest_Timestamp_Ever_Seen ->
>> > Current_Offset}. This basically means any timestamp larger than the
>> > Largest_Timestamp_Ever_Seen must come after this offset because it never
>> > saw them before. So we don't miss any message with larger timestamp.
>> >
>> > (@Guozhang, in this case, for log retention we only need to take a look
>> at
>> > the last time index entry, because it must be the largest timestamp
>> ever,
>> > if that timestamp is overdue, we can safely delete any log segment
>> before
>> > that. So we don't need to scan the log segment file for log retention)
>> >
>> > I assume that we are still going to have the new FetchRequest to allow
>> the
>> > time index replication for replicas.
>> >
>> > I think Jay's main point here is that we don't want to have two
>> timestamp
>> > concepts in Kafka, which I agree is a reasonable concern. And I also
>> agree
>> > that create time is more meaningful than LogAppendTime for users. But I
>> am
>> > not sure if making everything base on Create Time would work in all
>> cases.
>> > Here are my questions about this approach:
>> >
>> > 1. Let's say we have two source clusters that are mirrored to the same
>> > target cluster. For some reason one of the mirror maker from a cluster
>> dies
>> > and after fix the issue we want to resume mirroring. In this case it is
>> > possible that when the mirror maker resumes mirroring, the timestamp of
>> the
>> > messages have already gone beyond the acceptable timestamp range on
>> broker.
>> > In order to let those messages go through, we have to bump up the
>> > *max.append.delay
>> > *for all the topics on the target broker. This could be painful.
>> >
>> > 2. Let's say in the above scenario we let the messages in, at that point
>> > some log segments in the target cluster might have a wide range of
>> > timestamps, like Guozhang mentioned the log rolling could be tricky
>> because
>> > the first time index entry does not necessarily have the smallest
>> timestamp
>> > of all the messages in the log segment. Instead, it is the largest
>> > timestamp ever seen. We have to scan the entire log to find the message
>> > with smallest offset to see if we should roll.
>> >
>> > 3. Theoretically it is possible that an older log segment contains
>> > timestamps that are older than all the messages in a newer log segment.
>> It
>> > would be weird that we are supposed to delete the newer log segment
>> before
>> > we delete the older log segment.
>> >
>> > 4. In bootstrap case, if we reload the data to a Kafka cluster, we have
>> to
>> > make sure we configure the topic correctly before we load the data.
>> > Otherwise the message might either be rejected because the timestamp is
>> too
>> > old, or it might be deleted immediately because the retention time has
>> > reached.
>> >
>> > I am very concerned about the operational overhead and the ambiguity of
>> > guarantees we introduce if we purely rely on CreateTime.
>> >
>> > It looks to me that the biggest issue of adopting CreateTime everywhere
>> is
>> > CreateTime can have big gaps. These gaps could be caused by several
>> cases:
>> > [1]. Faulty clients
>> > [2]. Natural delays from different source
>> > [3]. Bootstrap
>> > [4]. Failure recovery
>> >
>> > Jay's alternative proposal solves [1], perhaps solve [2] as well if we
>> are
>> > able to set a reasonable max.append.delay. But it does not seem address
>> [3]
>> > and [4]. I actually doubt if [3] and [4] are solvable because it looks
>> the
>> > CreateTime gap is unavoidable in those two cases.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> >
>> > On Tue, Oct 13, 2015 at 3:23 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> >
>> > > Just to complete Jay's option, here is my understanding:
>> > >
>> > > 1. For log retention: if we want to remove data before time t, we look
>> > into
>> > > the index file of each segment and find the largest timestamp t' < t,
>> > find
>> > > the corresponding timestamp and start scanning to the end of the
>> segment,
>> > > if there is no entry with timestamp >= t, we can delete this segment;
>> if
>> > a
>> > > segment's index smallest timestamp is larger than t, we can skip that
>> > > segment.
>> > >
>> > > 2. For log rolling: if we want to start a new segment after time t, we
>> > look
>> > > into the active segment's index file, if the largest timestamp is
>> > already >
>> > > t, we can roll a new segment immediately; if it is < t, we read its
>> > > corresponding offset and start scanning to the end of the segment, if
>> we
>> > > find a record whose timestamp > t, we can roll a new segment.
>> > >
>> > > For log rolling we only need to possibly scan a small portion the
>> active
>> > > segment, which should be fine; for log retention we may in the worst
>> case
>> > > end up scanning all segments, but in practice we may skip most of them
>> > > since their smallest timestamp in the index file is larger than t.
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Tue, Oct 13, 2015 at 12:52 AM, Jay Kreps <j...@confluent.io> wrote:
>> > >
>> > > > I think it should be possible to index out-of-order timestamps. The
>> > > > timestamp index would be similar to the offset index, a memory
>> mapped
>> > > file
>> > > > appended to as part of the log append, but would have the format
>> > > >   timestamp offset
>> > > > The timestamp entries would be monotonic and as with the offset
>> index
>> > > would
>> > > > be no more often then every 4k (or some configurable threshold to
>> keep
>> > > the
>> > > > index small--actually for timestamp it could probably be much more
>> > sparse
>> > > > than 4k).
>> > > >
>> > > > A search for a timestamp t yields an offset o before which no prior
>> > > message
>> > > > has a timestamp >= t. In other words if you read the log starting
>> with
>> > o
>> > > > you are guaranteed not to miss any messages occurring at t or later
>> > > though
>> > > > you may get many before t (due to out-of-orderness). Unlike the
>> offset
>> > > > index this bound doesn't really have to be tight (i.e. probably no
>> need
>> > > to
>> > > > go search the log itself, though you could).
>> > > >
>> > > > -Jay
>> > > >
>> > > > On Tue, Oct 13, 2015 at 12:32 AM, Jay Kreps <j...@confluent.io>
>> wrote:
>> > > >
>> > > > > Here's my basic take:
>> > > > > - I agree it would be nice to have a notion of time baked in if it
>> > were
>> > > > > done right
>> > > > > - All the proposals so far seem pretty complex--I think they might
>> > make
>> > > > > things worse rather than better overall
>> > > > > - I think adding 2x8 byte timestamps to the message is probably a
>> > > > > non-starter from a size perspective
>> > > > > - Even if it isn't in the message, having two notions of time that
>> > > > control
>> > > > > different things is a bit confusing
>> > > > > - The mechanics of basing retention etc on log append time when
>> > that's
>> > > > not
>> > > > > in the log seem complicated
>> > > > >
>> > > > > To that end here is a possible 4th option. Let me know what you
>> > think.
>> > > > >
>> > > > > The basic idea is that the message creation time is closest to
>> what
>> > the
>> > > > > user actually cares about but is dangerous if set wrong. So rather
>> > than
>> > > > > substitute another notion of time, let's try to ensure the
>> > correctness
>> > > of
>> > > > > message creation time by preventing arbitrarily bad message
>> creation
>> > > > times.
>> > > > >
>> > > > > First, let's see if we can agree that log append time is not
>> > something
>> > > > > anyone really cares about but rather an implementation detail. The
>> > > > > timestamp that matters to the user is when the message occurred
>> (the
>> > > > > creation time). The log append time is basically just an
>> > approximation
>> > > to
>> > > > > this on the assumption that the message creation and the message
>> > > receive
>> > > > on
>> > > > > the server occur pretty close together and the reason to prefer .
>> > > > >
>> > > > > But as these values diverge the issue starts to become apparent.
>> Say
>> > > you
>> > > > > set the retention to one week and then mirror data from a topic
>> > > > containing
>> > > > > two years of retention. Your intention is clearly to keep the last
>> > > week,
>> > > > > but because the mirroring is appending right now you will keep two
>> > > years.
>> > > > >
>> > > > > The reason we are liking log append time is because we are
>> > > (justifiably)
>> > > > > concerned that in certain situations the creation time may not be
>> > > > > trustworthy. This same problem exists on the servers but there are
>> > > fewer
>> > > > > servers and they just run the kafka code so it is less of an
>> issue.
>> > > > >
>> > > > > There are two possible ways to handle this:
>> > > > >
>> > > > >    1. Just tell people to add size based retention. I think this
>> is
>> > not
>> > > > >    entirely unreasonable, we're basically saying we retain data
>> based
>> > > on
>> > > > the
>> > > > >    timestamp you give us in the data. If you give us bad data we
>> will
>> > > > retain
>> > > > >    it for a bad amount of time. If you want to ensure we don't
>> retain
>> > > > "too
>> > > > >    much" data, define "too much" by setting a time-based retention
>> > > > setting.
>> > > > >    This is not entirely unreasonable but kind of suffers from a
>> "one
>> > > bad
>> > > > >    apple" problem in a very large environment.
>> > > > >    2. Prevent bad timestamps. In general we can't say a timestamp
>> is
>> > > bad.
>> > > > >    However the definition we're implicitly using is that we think
>> > there
>> > > > are a
>> > > > >    set of topics/clusters where the creation timestamp should
>> always
>> > be
>> > > > "very
>> > > > >    close" to the log append timestamp. This is true for data
>> sources
>> > > > that have
>> > > > >    no buffering capability (which at LinkedIn is very common, but
>> is
>> > > > more rare
>> > > > >    elsewhere). The solution in this case would be to allow a
>> setting
>> > > > along the
>> > > > >    lines of max.append.delay which checks the creation timestamp
>> > > against
>> > > > the
>> > > > >    server time to look for too large a divergence. The solution
>> would
>> > > > either
>> > > > >    be to reject the message or to override it with the server
>> time.
>> > > > >
>> > > > > So in LI's environment you would configure the clusters used for
>> > > direct,
>> > > > > unbuffered, message production (e.g. tracking and metrics local)
>> to
>> > > > enforce
>> > > > > a reasonably aggressive timestamp bound (say 10 mins), and all
>> other
>> > > > > clusters would just inherent these.
>> > > > >
>> > > > > The downside of this approach is requiring the special
>> configuration.
>> > > > > However I think in 99% of environments this could be skipped
>> > entirely,
>> > > > it's
>> > > > > only when the ratio of clients to servers gets so massive that you
>> > need
>> > > > to
>> > > > > do this. The primary upside is that you have a single
>> authoritative
>> > > > notion
>> > > > > of time which is closest to what a user would want and is stored
>> > > directly
>> > > > > in the message.
>> > > > >
>> > > > > I'm also assuming there is a workable approach for indexing
>> > > non-monotonic
>> > > > > timestamps, though I haven't actually worked that out.
>> > > > >
>> > > > > -Jay
>> > > > >
>> > > > > On Mon, Oct 5, 2015 at 8:52 PM, Jiangjie Qin
>> > <j...@linkedin.com.invalid
>> > > >
>> > > > > wrote:
>> > > > >
>> > > > >> Bumping up this thread although most of the discussion were on
>> the
>> > > > >> discussion thread of KIP-31 :)
>> > > > >>
>> > > > >> I just updated the KIP page to add detailed solution for the
>> option
>> > > > >> (Option
>> > > > >> 3) that does not expose the LogAppendTime to user.
>> > > > >>
>> > > > >>
>> > > > >>
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+CreateTime+and+LogAppendTime+to+Kafka+message
>> > > > >>
>> > > > >> The option has a minor change to the fetch request to allow
>> fetching
>> > > > time
>> > > > >> index entry as well. I kind of like this solution because its
>> just
>> > > doing
>> > > > >> what we need without introducing other things.
>> > > > >>
>> > > > >> It will be great to see what are the feedback. I can explain more
>> > > during
>> > > > >> tomorrow's KIP hangout.
>> > > > >>
>> > > > >> Thanks,
>> > > > >>
>> > > > >> Jiangjie (Becket) Qin
>> > > > >>
>> > > > >> On Thu, Sep 10, 2015 at 2:47 PM, Jiangjie Qin <j...@linkedin.com
>> >
>> > > > wrote:
>> > > > >>
>> > > > >> > Hi Jay,
>> > > > >> >
>> > > > >> > I just copy/pastes here your feedback on the timestamp proposal
>> > that
>> > > > was
>> > > > >> > in the discussion thread of KIP-31. Please see the replies
>> inline.
>> > > > >> > The main change I made compared with previous proposal is to
>> add
>> > > both
>> > > > >> > CreateTime and LogAppendTime to the message.
>> > > > >> >
>> > > > >> > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps <j...@confluent.io>
>> > > wrote:
>> > > > >> >
>> > > > >> > > Hey Beckett,
>> > > > >> > >
>> > > > >> > > I was proposing splitting up the KIP just for simplicity of
>> > > > >> discussion.
>> > > > >> > You
>> > > > >> > > can still implement them in one patch. I think otherwise it
>> will
>> > > be
>> > > > >> hard
>> > > > >> > to
>> > > > >> > > discuss/vote on them since if you like the offset proposal
>> but
>> > not
>> > > > the
>> > > > >> > time
>> > > > >> > > proposal what do you do?
>> > > > >> > >
>> > > > >> > > Introducing a second notion of time into Kafka is a pretty
>> > massive
>> > > > >> > > philosophical change so it kind of warrants it's own KIP I
>> think
>> > > it
>> > > > >> > isn't
>> > > > >> > > just "Change message format".
>> > > > >> > >
>> > > > >> > > WRT time I think one thing to clarify in the proposal is how
>> MM
>> > > will
>> > > > >> have
>> > > > >> > > access to set the timestamp? Presumably this will be a new
>> field
>> > > in
>> > > > >> > > ProducerRecord, right? If so then any user can set the
>> > timestamp,
>> > > > >> right?
>> > > > >> > > I'm not sure you answered the questions around how this will
>> > work
>> > > > for
>> > > > >> MM
>> > > > >> > > since when MM retains timestamps from multiple partitions
>> they
>> > > will
>> > > > >> then
>> > > > >> > be
>> > > > >> > > out of order and in the past (so the
>> max(lastAppendedTimestamp,
>> > > > >> > > currentTimeMillis) override you proposed will not work,
>> right?).
>> > > If
>> > > > we
>> > > > >> > > don't do this then when you set up mirroring the data will
>> all
>> > be
>> > > > new
>> > > > >> and
>> > > > >> > > you have the same retention problem you described. Maybe I
>> > missed
>> > > > >> > > something...?
>> > > > >> > lastAppendedTimestamp means the timestamp of the message that
>> last
>> > > > >> > appended to the log.
>> > > > >> > If a broker is a leader, since it will assign the timestamp by
>> > > itself,
>> > > > >> the
>> > > > >> > lastAppenedTimestamp will be its local clock when append the
>> last
>> > > > >> message.
>> > > > >> > So if there is no leader migration, max(lastAppendedTimestamp,
>> > > > >> > currentTimeMillis) = currentTimeMillis.
>> > > > >> > If a broker is a follower, because it will keep the leader's
>> > > timestamp
>> > > > >> > unchanged, the lastAppendedTime would be the leader's clock
>> when
>> > it
>> > > > >> appends
>> > > > >> > that message message. It keeps track of the lastAppendedTime
>> only
>> > in
>> > > > >> case
>> > > > >> > it becomes leader later on. At that point, it is possible that
>> the
>> > > > >> > timestamp of the last appended message was stamped by old
>> leader,
>> > > but
>> > > > >> the
>> > > > >> > new leader's currentTimeMillis < lastAppendedTime. If a new
>> > message
>> > > > >> comes,
>> > > > >> > instead of stamp it with new leader's currentTimeMillis, we
>> have
>> > to
>> > > > >> stamp
>> > > > >> > it to lastAppendedTime to avoid the timestamp in the log going
>> > > > backward.
>> > > > >> > The max(lastAppendedTimestamp, currentTimeMillis) is purely
>> based
>> > on
>> > > > the
>> > > > >> > broker side clock. If MM produces message with different
>> > > LogAppendTime
>> > > > >> in
>> > > > >> > source clusters to the same target cluster, the LogAppendTime
>> will
>> > > be
>> > > > >> > ignored  re-stamped by target cluster.
>> > > > >> > I added a use case example for mirror maker in KIP-32. Also
>> there
>> > > is a
>> > > > >> > corner case discussion about when we need the
>> > max(lastAppendedTime,
>> > > > >> > currentTimeMillis) trick. Could you take a look and see if that
>> > > > answers
>> > > > >> > your question?
>> > > > >> >
>> > > > >> > >
>> > > > >> > > My main motivation is that given that both Samza and Kafka
>> > streams
>> > > > are
>> > > > >> > > doing work that implies a mandatory client-defined notion of
>> > > time, I
>> > > > >> > really
>> > > > >> > > think introducing a different mandatory notion of time in
>> Kafka
>> > is
>> > > > >> going
>> > > > >> > to
>> > > > >> > > be quite odd. We should think hard about how client-defined
>> time
>> > > > could
>> > > > >> > > work. I'm not sure if it can, but I'm also not sure that it
>> > can't.
>> > > > >> Having
>> > > > >> > > both will be odd. Did you chat about this with Yi/Kartik on
>> the
>> > > > Samza
>> > > > >> > side?
>> > > > >> > I talked with Kartik and realized that it would be useful to
>> have
>> > a
>> > > > >> client
>> > > > >> > timestamp to facilitate use cases like stream processing.
>> > > > >> > I was trying to figure out if we can simply use client
>> timestamp
>> > > > without
>> > > > >> > introducing the server time. There are some discussion in the
>> KIP.
>> > > > >> > The key problem we want to solve here is
>> > > > >> > 1. We want log retention and rolling to depend on server clock.
>> > > > >> > 2. We want to make sure the log-assiciated timestamp to be
>> > retained
>> > > > when
>> > > > >> > replicas moves.
>> > > > >> > 3. We want to use the timestamp in some way that can allow
>> > searching
>> > > > by
>> > > > >> > timestamp.
>> > > > >> > For 1 and 2, an alternative is to pass the log-associated
>> > timestamp
>> > > > >> > through replication, that means we need to have a different
>> > protocol
>> > > > for
>> > > > >> > replica fetching to pass log-associated timestamp. It is
>> actually
>> > > > >> > complicated and there could be a lot of corner cases to handle.
>> > e.g.
>> > > > >> what
>> > > > >> > if an old leader started to fetch from the new leader, should
>> it
>> > > also
>> > > > >> > update all of its old log segment timestamp?
>> > > > >> > I think actually client side timestamp would be better for 3
>> if we
>> > > can
>> > > > >> > find a way to make it work.
>> > > > >> > So far I am not able to convince myself that only having client
>> > side
>> > > > >> > timestamp would work mainly because 1 and 2. There are a few
>> > > > situations
>> > > > >> I
>> > > > >> > mentioned in the KIP.
>> > > > >> > >
>> > > > >> > > When you are saying it won't work you are assuming some
>> > particular
>> > > > >> > > implementation? Maybe that the index is a monotonically
>> > increasing
>> > > > >> set of
>> > > > >> > > pointers to the least record with a timestamp larger than the
>> > > index
>> > > > >> time?
>> > > > >> > > In other words a search for time X gives the largest offset
>> at
>> > > which
>> > > > >> all
>> > > > >> > > records are <= X?
>> > > > >> > It is a promising idea. We probably can have an in-memory index
>> > like
>> > > > >> that,
>> > > > >> > but might be complicated to have a file on disk like that.
>> Imagine
>> > > > there
>> > > > >> > are two timestamps T0 < T1. We see message Y created at T1 and
>> > > created
>> > > > >> > index like [T1->Y], then we see message created at T1,
>> supposedly
>> > we
>> > > > >> should
>> > > > >> > have index look like [T0->X, T1->Y], it is easy to do in
>> memory,
>> > but
>> > > > we
>> > > > >> > might have to rewrite the index file completely. Maybe we can
>> have
>> > > the
>> > > > >> > first entry with timestamp to 0, and only update the first
>> pointer
>> > > for
>> > > > >> any
>> > > > >> > out of range timestamp, so the index will be [0->X, T1->Y].
>> Also,
>> > > the
>> > > > >> range
>> > > > >> > of timestamps in the log segments can overlap with each other.
>> > That
>> > > > >> means
>> > > > >> > we either need to keep a cross segments index file or we need
>> to
>> > > check
>> > > > >> all
>> > > > >> > the index file for each log segment.
>> > > > >> > I separated out the time based log index to KIP-33 because it
>> can
>> > be
>> > > > an
>> > > > >> > independent follow up feature as Neha suggested. I will try to
>> > make
>> > > > the
>> > > > >> > time based index work with client side timestamp.
>> > > > >> > >
>> > > > >> > > For retention, I agree with the problem you point out, but I
>> > think
>> > > > >> what
>> > > > >> > you
>> > > > >> > > are saying in that case is that you want a size limit too. If
>> > you
>> > > > use
>> > > > >> > > system time you actually hit the same problem: say you do a
>> full
>> > > > dump
>> > > > >> of
>> > > > >> > a
>> > > > >> > > DB table with a setting of 7 days retention, your retention
>> will
>> > > > >> actually
>> > > > >> > > not get enforced for the first 7 days because the data is
>> "new
>> > to
>> > > > >> Kafka".
>> > > > >> > I kind of think the size limit here is orthogonal. It is a
>> valid
>> > use
>> > > > >> case
>> > > > >> > where people only want to use time based retention only. In
>> your
>> > > > >> example,
>> > > > >> > depending on client timestamp might break the functionality -
>> say
>> > it
>> > > > is
>> > > > >> a
>> > > > >> > bootstrap case people actually need to read all the data. If we
>> > > depend
>> > > > >> on
>> > > > >> > the client timestamp, the data might be deleted instantly when
>> > they
>> > > > >> come to
>> > > > >> > the broker. It might be too demanding to expect the broker to
>> > > > understand
>> > > > >> > what people actually want to do with the data coming in. So the
>> > > > >> guarantee
>> > > > >> > of using server side timestamp is that "after appended to the
>> log,
>> > > all
>> > > > >> > messages will be available on broker for retention time",
>> which is
>> > > not
>> > > > >> > changeable by clients.
>> > > > >> > >
>> > > > >> > > -Jay
>> > > > >> >
>> > > > >> > On Thu, Sep 10, 2015 at 12:55 PM, Jiangjie Qin <
>> j...@linkedin.com
>> > >
>> > > > >> wrote:
>> > > > >> >
>> > > > >> >> Hi folks,
>> > > > >> >>
>> > > > >> >> This proposal was previously in KIP-31 and we separated it to
>> > > KIP-32
>> > > > >> per
>> > > > >> >> Neha and Jay's suggestion.
>> > > > >> >>
>> > > > >> >> The proposal is to add the following two timestamps to Kafka
>> > > message.
>> > > > >> >> - CreateTime
>> > > > >> >> - LogAppendTime
>> > > > >> >>
>> > > > >> >> The CreateTime will be set by the producer and will change
>> after
>> > > > that.
>> > > > >> >> The LogAppendTime will be set by broker for purpose such as
>> > enforce
>> > > > log
>> > > > >> >> retention and log rolling.
>> > > > >> >>
>> > > > >> >> Thanks,
>> > > > >> >>
>> > > > >> >> Jiangjie (Becket) Qin
>> > > > >> >>
>> > > > >> >>
>> > > > >> >
>> > > > >>
>> > > > >
>> > > > >
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>
>

Reply via email to