Hi Jay, I just changed the KIP title and updated the KIP page.
And yes, we are working on a general version control proposal to make the protocol migration like this more smooth. I will also create a KIP for that soon. Thanks, Jiangjie (Becket) Qin On Thu, Sep 10, 2015 at 2:21 PM, Jay Kreps <j...@confluent.io> wrote: > Great, can we change the name to something related to the change--"KIP-31: > Move to relative offsets in compressed message sets". > > Also you had mentioned before you were going to expand on the mechanics of > handling these log format changes, right? > > -Jay > > On Thu, Sep 10, 2015 at 12:42 PM, Jiangjie Qin <j...@linkedin.com.invalid> > wrote: > > > Neha and Jay, > > > > Thanks a lot for the feedback. Good point about splitting the > discussion. I > > have split the proposal to three KIPs and it does make each discussion > more > > clear: > > KIP-31 - Message format change (Use relative offset) > > KIP-32 - Add CreateTime and LogAppendTime to Kafka message > > KIP-33 - Build a time-based log index > > > > KIP-33 can be a follow up KIP for KIP-32, so we can discuss about KIP-31 > > and KIP-32 first for now. I will create a separate discussion thread for > > KIP-32 and reply the concerns you raised regarding the timestamp. > > > > So far it looks there is no objection to KIP-31. Since I removed a few > part > > from previous KIP and only left the relative offset proposal, it would be > > great if people can take another look to see if there is any concerns. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > On Tue, Sep 8, 2015 at 1:28 PM, Neha Narkhede <n...@confluent.io> wrote: > > > > > Becket, > > > > > > Nice write-up. Few thoughts - > > > > > > I'd split up the discussion for simplicity. Note that you can always > > group > > > several of these in one patch to reduce the protocol changes people > have > > to > > > deal with.This is just a suggestion, but I think the following split > > might > > > make it easier to tackle the changes being proposed - > > > > > > - Relative offsets > > > - Introducing the concept of time > > > - Time-based indexing (separate the usage of the timestamp field > from > > > how/whether we want to include a timestamp in the message) > > > > > > I'm a +1 on relative offsets, we should've done it back when we > > introduced > > > it. Other than reducing the CPU overhead, this will also reduce the > > garbage > > > collection overhead on the brokers. > > > > > > On the timestamp field, I generally agree that we should add a > timestamp > > to > > > a Kafka message but I'm not quite sold on how this KIP suggests the > > > timestamp be set. Will avoid repeating the downsides of a broker side > > > timestamp mentioned previously in this thread. I think the topic of > > > including a timestamp in a Kafka message requires a lot more thought > and > > > details than what's in this KIP. I'd suggest we make it a separate KIP > > that > > > includes a list of all the different use cases for the timestamp > (beyond > > > log retention) including stream processing and discuss tradeoffs of > > > including client and broker side timestamps. > > > > > > Agree with the benefit of time-based indexing, but haven't had a chance > > to > > > dive into the design details yet. > > > > > > Thanks, > > > Neha > > > > > > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps <j...@confluent.io> wrote: > > > > > > > Hey Beckett, > > > > > > > > I was proposing splitting up the KIP just for simplicity of > discussion. > > > You > > > > can still implement them in one patch. I think otherwise it will be > > hard > > > to > > > > discuss/vote on them since if you like the offset proposal but not > the > > > time > > > > proposal what do you do? > > > > > > > > Introducing a second notion of time into Kafka is a pretty massive > > > > philosophical change so it kind of warrants it's own KIP I think it > > isn't > > > > just "Change message format". > > > > > > > > WRT time I think one thing to clarify in the proposal is how MM will > > have > > > > access to set the timestamp? Presumably this will be a new field in > > > > ProducerRecord, right? If so then any user can set the timestamp, > > right? > > > > I'm not sure you answered the questions around how this will work for > > MM > > > > since when MM retains timestamps from multiple partitions they will > > then > > > be > > > > out of order and in the past (so the max(lastAppendedTimestamp, > > > > currentTimeMillis) override you proposed will not work, right?). If > we > > > > don't do this then when you set up mirroring the data will all be new > > and > > > > you have the same retention problem you described. Maybe I missed > > > > something...? > > > > > > > > My main motivation is that given that both Samza and Kafka streams > are > > > > doing work that implies a mandatory client-defined notion of time, I > > > really > > > > think introducing a different mandatory notion of time in Kafka is > > going > > > to > > > > be quite odd. We should think hard about how client-defined time > could > > > > work. I'm not sure if it can, but I'm also not sure that it can't. > > Having > > > > both will be odd. Did you chat about this with Yi/Kartik on the Samza > > > side? > > > > > > > > When you are saying it won't work you are assuming some particular > > > > implementation? Maybe that the index is a monotonically increasing > set > > of > > > > pointers to the least record with a timestamp larger than the index > > time? > > > > In other words a search for time X gives the largest offset at which > > all > > > > records are <= X? > > > > > > > > For retention, I agree with the problem you point out, but I think > what > > > you > > > > are saying in that case is that you want a size limit too. If you use > > > > system time you actually hit the same problem: say you do a full dump > > of > > > a > > > > DB table with a setting of 7 days retention, your retention will > > actually > > > > not get enforced for the first 7 days because the data is "new to > > Kafka". > > > > > > > > -Jay > > > > > > > > > > > > On Mon, Sep 7, 2015 at 10:44 AM, Jiangjie Qin > > <j...@linkedin.com.invalid > > > > > > > > wrote: > > > > > > > > > Jay, > > > > > > > > > > Thanks for the comments. Yes, there are actually three proposals as > > you > > > > > pointed out. > > > > > > > > > > We will have a separate proposal for (1) - version control > mechanism. > > > We > > > > > actually thought about whether we want to separate 2 and 3 > internally > > > > > before creating the KIP. The reason we put 2 and 3 together is it > > will > > > > > saves us another cross board wire protocol change. Like you said, > we > > > have > > > > > to migrate all the clients in all languages. To some extent, the > > effort > > > > to > > > > > spend on upgrading the clients can be even bigger than implementing > > the > > > > new > > > > > feature itself. So there are some attractions if we can do 2 and 3 > > > > together > > > > > instead of separately. Maybe after (1) is done it will be easier to > > do > > > > > protocol migration. But if we are able to come to an agreement on > the > > > > > timestamp solution, I would prefer to have it together with > relative > > > > offset > > > > > in the interest of avoiding another wire protocol change (the > process > > > to > > > > > migrate to relative offset is exactly the same as migrate to > message > > > with > > > > > timestamp). > > > > > > > > > > In terms of timestamp. I completely agree that having client > > timestamp > > > is > > > > > more useful if we can make sure the timestamp is good. But in > reality > > > > that > > > > > can be a really big *IF*. I think the problem is exactly as Ewen > > > > mentioned, > > > > > if we let the client to set the timestamp, it would be very hard > for > > > the > > > > > broker to utilize it. If broker apply retention policy based on the > > > > client > > > > > timestamp. One misbehave producer can potentially completely mess > up > > > the > > > > > retention policy on the broker. Although people don't care about > > server > > > > > side timestamp. People do care a lot when timestamp breaks. > Searching > > > by > > > > > timestamp is a really important use case even though it is not used > > as > > > > > often as searching by offset. It has significant direct impact on > RTO > > > > when > > > > > there is a cross cluster failover as Todd mentioned. > > > > > > > > > > The trick using max(lastAppendedTimestamp, currentTimeMillis) is to > > > > > guarantee monotonic increase of the timestamp. Many commercial > system > > > > > actually do something similar to this to solve the time skew. About > > > > > changing the time, I am not sure if people use NTP like using a > watch > > > to > > > > > just set it forward/backward by an hour or so. The time adjustment > I > > > used > > > > > to do is typically to adjust something like a minute / week. So > for > > > each > > > > > second, there might be a few mircoseconds slower/faster but should > > not > > > > > break the clock completely to make sure all the time-based > > transactions > > > > are > > > > > not affected. The one minute change will be done within a week but > > not > > > > > instantly. > > > > > > > > > > Personally, I think having client side timestamp will be useful if > we > > > > don't > > > > > need to put the broker and data integrity under risk. If we have to > > > > choose > > > > > from one of them but not both. I would prefer server side timestamp > > > > because > > > > > for client side timestamp there is always a plan B which is putting > > the > > > > > timestamp into payload. > > > > > > > > > > Another reason I am reluctant to use the client side timestamp is > > that > > > it > > > > > is always dangerous to mix the control plane with data plane. IP > did > > > this > > > > > and it has caused so many different breaches so people are > migrating > > to > > > > > something like MPLS. An example in Kafka is that any client can > > > > construct a > > > > > LeaderAndIsrRequest/UpdateMetadataRequest/ContorlledShutdownRequest > > > (you > > > > > name it) and send it to the broker to mess up the entire cluster, > > also > > > as > > > > > we already noticed a busy cluster can respond quite slow to > > controller > > > > > messages. So it would really be nice if we can avoid giving the > power > > > to > > > > > clients to control the log retention. > > > > > > > > > > Thanks, > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > On Sun, Sep 6, 2015 at 9:54 PM, Todd Palino <tpal...@gmail.com> > > wrote: > > > > > > > > > > > So, with regards to why you want to search by timestamp, the > > biggest > > > > > > problem I've seen is with consumers who want to reset their > > > timestamps > > > > > to a > > > > > > specific point, whether it is to replay a certain amount of > > messages, > > > > or > > > > > to > > > > > > rewind to before some problem state existed. This happens more > > often > > > > than > > > > > > anyone would like. > > > > > > > > > > > > To handle this now we need to constantly export the broker's > offset > > > for > > > > > > every partition to a time-series database and then use external > > > > processes > > > > > > to query this. I know we're not the only ones doing this. The way > > the > > > > > > broker handles requests for offsets by timestamp is a little > obtuse > > > > > > (explain it to anyone without intimate knowledge of the internal > > > > workings > > > > > > of the broker - every time I do I see this). In addition, as > Becket > > > > > pointed > > > > > > out, it causes problems specifically with retention of messages > by > > > time > > > > > > when you move partitions around. > > > > > > > > > > > > I'm deliberately avoiding the discussion of what timestamp to > use. > > I > > > > can > > > > > > see the argument either way, though I tend to lean towards the > idea > > > > that > > > > > > the broker timestamp is the only viable source of truth in this > > > > > situation. > > > > > > > > > > > > -Todd > > > > > > > > > > > > > > > > > > On Sun, Sep 6, 2015 at 7:08 PM, Ewen Cheslack-Postava < > > > > e...@confluent.io > > > > > > > > > > > > wrote: > > > > > > > > > > > > > On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps <j...@confluent.io> > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > 2. Nobody cares what time it is on the server. > > > > > > > > > > > > > > > > > > > > > > This is a good way of summarizing the issue I was trying to get > > at, > > > > > from > > > > > > an > > > > > > > app's perspective. Of the 3 stated goals of the KIP, #2 (lot > > > > retention) > > > > > > is > > > > > > > reasonably handled by a server-side timestamp. I really just > care > > > > that > > > > > a > > > > > > > message is there long enough that I have a chance to process > it. > > #3 > > > > > > > (searching by timestamp) only seems useful if we can guarantee > > the > > > > > > > server-side timestamp is close enough to the original > client-side > > > > > > > timestamp, and any mirror maker step seems to break that (even > > > > ignoring > > > > > > any > > > > > > > issues with broker availability). > > > > > > > > > > > > > > I'm also wondering whether optimizing for search-by-timestamp > on > > > the > > > > > > broker > > > > > > > is really something we want to do given that messages aren't > > really > > > > > > > guaranteed to be ordered by application-level timestamps on the > > > > broker. > > > > > > Is > > > > > > > part of the need for this just due to the current consumer APIs > > > being > > > > > > > difficult to work with? For example, could you implement this > > > pretty > > > > > > easily > > > > > > > client side just the way you would broker-side? I'd imagine a > > > couple > > > > of > > > > > > > random seeks + reads during very rare occasions (i.e. when the > > app > > > > > starts > > > > > > > up) wouldn't be a problem performance-wise. Or is it also that > > you > > > > need > > > > > > the > > > > > > > broker to enforce things like monotonically increasing > timestamps > > > > since > > > > > > you > > > > > > > can't do the query properly and efficiently without that > > guarantee, > > > > and > > > > > > > therefore what applications are actually looking for *is* > > > broker-side > > > > > > > timestamps? > > > > > > > > > > > > > > -Ewen > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Consider cases where data is being copied from a database or > > from > > > > log > > > > > > > > files. In steady-state the server time is very close to the > > > client > > > > > time > > > > > > > if > > > > > > > > their clocks are sync'd (see 1) but there will be times of > > large > > > > > > > divergence > > > > > > > > when the copying process is stopped or falls behind. When > this > > > > occurs > > > > > > it > > > > > > > is > > > > > > > > clear that the time the data arrived on the server is > > irrelevant, > > > > it > > > > > is > > > > > > > the > > > > > > > > source timestamp that matters. This is the problem you are > > trying > > > > to > > > > > > fix > > > > > > > by > > > > > > > > retaining the mm timestamp but really the client should > always > > > set > > > > > the > > > > > > > time > > > > > > > > with the use of server-side time as a fallback. It would be > > worth > > > > > > talking > > > > > > > > to the Samza folks and reading through this blog post ( > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html > > > > > > > > ) > > > > > > > > on this subject since we went through similar learnings on > the > > > > stream > > > > > > > > processing side. > > > > > > > > > > > > > > > > I think the implication of these two is that we need a > proposal > > > > that > > > > > > > > handles potentially very out-of-order timestamps in some kind > > of > > > > > sanish > > > > > > > way > > > > > > > > (buggy clients will set something totally wrong as the time). > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps <j...@confluent.io> > > > > wrote: > > > > > > > > > > > > > > > > > The magic byte is used to version message format so we'll > > need > > > to > > > > > > make > > > > > > > > > sure that check is in place--I actually don't see it in the > > > > current > > > > > > > > > consumer code which I think is a bug we should fix for the > > next > > > > > > release > > > > > > > > > (filed KAFKA-2523). The purpose of that field is so there > is > > a > > > > > clear > > > > > > > > check > > > > > > > > > on the format rather than the scrambled scenarios Becket > > > > describes. > > > > > > > > > > > > > > > > > > Also, Becket, I don't think just fixing the java client is > > > > > sufficient > > > > > > > as > > > > > > > > > that would break other clients--i.e. if anyone writes a v1 > > > > > messages, > > > > > > > even > > > > > > > > > by accident, any non-v1-capable consumer will break. I > think > > we > > > > > > > probably > > > > > > > > > need a way to have the server ensure a particular message > > > format > > > > > > either > > > > > > > > at > > > > > > > > > read or write time. > > > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin > > > > > > <j...@linkedin.com.invalid > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > >> Hi Guozhang, > > > > > > > > >> > > > > > > > > >> I checked the code again. Actually CRC check probably > won't > > > > fail. > > > > > > The > > > > > > > > >> newly > > > > > > > > >> added timestamp field might be treated as keyLength > instead, > > > so > > > > we > > > > > > are > > > > > > > > >> likely to receive an IllegalArgumentException when try to > > read > > > > the > > > > > > > key. > > > > > > > > >> I'll update the KIP. > > > > > > > > >> > > > > > > > > >> Thanks, > > > > > > > > >> > > > > > > > > >> Jiangjie (Becket) Qin > > > > > > > > >> > > > > > > > > >> On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin < > > > > j...@linkedin.com> > > > > > > > > wrote: > > > > > > > > >> > > > > > > > > >> > Hi, Guozhang, > > > > > > > > >> > > > > > > > > > >> > Thanks for reading the KIP. By "old consumer", I meant > the > > > > > > > > >> > ZookeeperConsumerConnector in trunk now, i.e. without > this > > > bug > > > > > > > fixed. > > > > > > > > >> If we > > > > > > > > >> > fix the ZookeeperConsumerConnector then it will throw > > > > exception > > > > > > > > >> complaining > > > > > > > > >> > about the unsupported version when it sees message > format > > > V1. > > > > > > What I > > > > > > > > was > > > > > > > > >> > trying to say is that if we have some > > > > ZookeeperConsumerConnector > > > > > > > > running > > > > > > > > >> > without the fix, the consumer will complain about CRC > > > mismatch > > > > > > > instead > > > > > > > > >> of > > > > > > > > >> > unsupported version. > > > > > > > > >> > > > > > > > > > >> > Thanks, > > > > > > > > >> > > > > > > > > > >> > Jiangjie (Becket) Qin > > > > > > > > >> > > > > > > > > > >> > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang < > > > > > > wangg...@gmail.com> > > > > > > > > >> wrote: > > > > > > > > >> > > > > > > > > > >> >> Thanks for the write-up Jiangjie. > > > > > > > > >> >> > > > > > > > > >> >> One comment about migration plan: "For old consumers, > if > > > they > > > > > see > > > > > > > the > > > > > > > > >> new > > > > > > > > >> >> protocol the CRC check will fail".. > > > > > > > > >> >> > > > > > > > > >> >> Do you mean this bug in the old consumer cannot be > fixed > > > in a > > > > > > > > >> >> backward-compatible way? > > > > > > > > >> >> > > > > > > > > >> >> Guozhang > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > > >> >> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin > > > > > > > > <j...@linkedin.com.invalid > > > > > > > > >> > > > > > > > > > >> >> wrote: > > > > > > > > >> >> > > > > > > > > >> >> > Hi, > > > > > > > > >> >> > > > > > > > > > >> >> > We just created KIP-31 to propose a message format > > change > > > > in > > > > > > > Kafka. > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal > > > > > > > > >> >> > > > > > > > > > >> >> > As a summary, the motivations are: > > > > > > > > >> >> > 1. Avoid server side message re-compression > > > > > > > > >> >> > 2. Honor time-based log roll and retention > > > > > > > > >> >> > 3. Enable offset search by timestamp at a finer > > > > granularity. > > > > > > > > >> >> > > > > > > > > > >> >> > Feedback and comments are welcome! > > > > > > > > >> >> > > > > > > > > > >> >> > Thanks, > > > > > > > > >> >> > > > > > > > > > >> >> > Jiangjie (Becket) Qin > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > > >> >> -- > > > > > > > > >> >> -- Guozhang > > > > > > > > >> >> > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > Thanks, > > > > > > > Ewen > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > Thanks, > > > Neha > > > > > >