Bill, That's a good question. I am thinking of the following approach for implementing trim(): (1) client issues metadata request to the broker to determine the leader of topic/partitions and groups topic/partitions by the leader broker; (2) client sends a TrimRequest to each broker with partitions whose leader is on the broker; (3) leader trim the corresponding segments locally; (4) extend the fetch request/response protocol such that the leader propagates a firstOffset (first available offset) for each partition in the follower fetch response; (5) follower trims the local segment according to firstOffset in the fetch response. We have been talking about adding an admin client as part of KIP-4. We could add such a trim() method in the admin client.
Thanks, Jun On Mon, Oct 24, 2016 at 5:29 PM, Bill Warshaw <wdwars...@gmail.com> wrote: > Hi Jun, > > Those are valid concerns. For our particular use case, application events > triggering the timestamp update will never occur more than once an hour, > and we maintain a sliding window so that we don't delete messages too close > to what our consumers may be reading. > For more general use cases, developers will need to be aware of these > issues, and would need to write their application code with that in mind. > > > To your second point: I initially wanted to just have a trim() admin api. > I started implementing it, but ran into difficulties with synchronously > acknowledging to the calling code that all brokers had truncated the given > partitions. It seemed like we would have to do something similar to how > topic deletion is implemented, where the initial broker uses Zookeeper to > coordinate the deletion on the other brokers. If you have a simpler idea > in mind, I'd be happy to update this KIP to provide a trim() api instead. > > On Mon, Oct 24, 2016 at 8:15 PM Jun Rao <j...@confluent.io> wrote: > > > Hi, Bill, > > > > Thanks for the proposal. Sorry for the late reply. > > > > The motivation of the proposal makes sense: don't delete the messages > until > > the application tells you so. > > > > I am wondering if the current proposal is the best way to address the > need > > though. There are couple of issues that I saw with the proposal. (1) > > Messages in the log may not always be stored in increasing timestamp > order. > > Suppose that the application sets log.retention.min.timestamp to T and > > after that messages with timestamp older than T ((either due to delay or > > reprocessing) are published to that topic. Those newly published messages > > are likely going to be deleted immediately before the application gets a > > chance to read them, which is probably not what the application wants. > (2) > > The configuration for the topic has to be changed continuously to > implement > > the use case. Intuitively, one probably shouldn't be changing a > > configuration all the time. > > > > Another way to achieve the goal is what Jay mentioned earlier. We could > add > > a trim() api like the following that will trim the log up to the > specified > > offsets. This addresses both of the above issues that I mentioned. Will > > that work for you? > > > > void trim(Map<TopicPartition, Long> offsetsToTruncate) > > > > Thanks, > > > > Jun > > > > On Wed, Oct 5, 2016 at 1:55 PM, Bill Warshaw <wdwars...@gmail.com> > wrote: > > > > > Bumping for visibility. KIP is here: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > 47+-+Add+timestamp-based+log+deletion+policy > > > > > > On Wed, Aug 24, 2016 at 2:32 PM Bill Warshaw <wdwars...@gmail.com> > > wrote: > > > > > > > Hello Guozhang, > > > > > > > > KIP-71 seems unrelated to this KIP. KIP-47 is just adding a new > > deletion > > > > policy (minimum timestamp), while KIP-71 is allowing deletion and > > > > compaction to coexist. > > > > > > > > They both will touch LogManager, but the change for KIP-47 is very > > > > isolated. > > > > > > > > On Wed, Aug 24, 2016 at 2:21 PM Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > Hi Bill, > > > > > > > > I would like to reason if there is any correlation between this KIP > and > > > > KIP-71 > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > 71%3A+Enable+log+compaction+and+deletion+to+co-exist > > > > > > > > I feel they are orthogonal but would like to double check with you. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Wed, Aug 24, 2016 at 11:05 AM, Bill Warshaw <wdwars...@gmail.com> > > > > wrote: > > > > > > > > > I'd like to re-awaken this voting thread now that KIP-33 has > merged. > > > > This > > > > > KIP is now completely unblocked. I have a working branch off of > > trunk > > > > with > > > > > my proposed fix, including testing. > > > > > > > > > > On Mon, May 9, 2016 at 8:30 PM Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > > > Jay, Bill: > > > > > > > > > > > > I'm thinking of one general use case of using timestamp rather > than > > > > > offset > > > > > > for log deletion, which is that for expiration handling in data > > > > > > replication, when the source data store decides to expire some > data > > > > > records > > > > > > based on their timestamps, today we need to configure the > > > corresponding > > > > > > Kafka changelog topic for compaction, and actively send a > tombstone > > > for > > > > > > each expired record. Since expiration usually happens with a > bunch > > of > > > > > > records, this could generate large tombstone traffic. For > example I > > > > think > > > > > > LI's data replication for Espresso is seeing similar issues and > > they > > > > are > > > > > > just not sending tombstone at all. > > > > > > > > > > > > With timestamp based log deletion policy, this can be easily > > handled > > > by > > > > > > simply setting the current expiration timestamp; but ideally one > > > would > > > > > > prefer to configure this topic to be both log compaction enabled > as > > > > well > > > > > as > > > > > > log deletion enabled. From that point of view, I feel that > current > > > KIP > > > > > > still has value to be accepted. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Mon, May 2, 2016 at 2:37 PM, Bill Warshaw < > wdwars...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Yes, I'd agree that offset is a more precise configuration than > > > > > > timestamp. > > > > > > > If there was a way to set a partition-level configuration, I > > would > > > > > rather > > > > > > > use log.retention.min.offset than timestamp. If you have an > > > approach > > > > > in > > > > > > > mind I'd be open to investigating it. > > > > > > > > > > > > > > On Mon, May 2, 2016 at 5:33 PM, Jay Kreps <j...@confluent.io> > > > wrote: > > > > > > > > > > > > > > > Gotcha, good point. But barring that limitation, you agree > that > > > > that > > > > > > > makes > > > > > > > > more sense? > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > On Mon, May 2, 2016 at 2:29 PM, Bill Warshaw < > > > wdwars...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > The problem with offset as a config option is that offsets > > are > > > > > > > > > partition-specific, so we'd need a per-partition config. > > This > > > > > would > > > > > > > work > > > > > > > > > for our particular use case, where we have single-partition > > > > topics, > > > > > > but > > > > > > > > for > > > > > > > > > multiple-partition topics it would delete from all > partitions > > > > based > > > > > > on > > > > > > > a > > > > > > > > > global topic-level offset. > > > > > > > > > > > > > > > > > > On Mon, May 2, 2016 at 4:32 PM, Jay Kreps < > j...@confluent.io> > > > > > wrote: > > > > > > > > > > > > > > > > > > > I think you are saying you considered a kind of trim() > api > > > that > > > > > > would > > > > > > > > > > synchronously chop off the tail of the log starting from > a > > > > given > > > > > > > > offset. > > > > > > > > > > That would be one option, but what I was saying was > > slightly > > > > > > > different: > > > > > > > > > in > > > > > > > > > > the proposal you have where there is a config that > controls > > > > > > retention > > > > > > > > > that > > > > > > > > > > the user would update, wouldn't it make more sense for > this > > > > > config > > > > > > to > > > > > > > > be > > > > > > > > > > based on offset rather than timestamp? > > > > > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > On Mon, May 2, 2016 at 12:53 PM, Bill Warshaw < > > > > > wdwars...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > 1. Initially I looked at using the actual offset, by > > > adding > > > > a > > > > > > call > > > > > > > > to > > > > > > > > > > > AdminUtils to just delete anything in a given > > > topic/partition > > > > > to > > > > > > a > > > > > > > > > given > > > > > > > > > > > offset. I ran into a lot of trouble here trying to > work > > > out > > > > > how > > > > > > > the > > > > > > > > > > system > > > > > > > > > > > would recognize that every broker had successfully > > deleted > > > > that > > > > > > > range > > > > > > > > > > from > > > > > > > > > > > the partition before returning to the client. If we > were > > > ok > > > > > > > treating > > > > > > > > > > this > > > > > > > > > > > as a completely asynchronous operation I would be open > to > > > > > > > revisiting > > > > > > > > > this > > > > > > > > > > > approach. > > > > > > > > > > > > > > > > > > > > > > 2. For our use case, we would be updating the config > > every > > > > few > > > > > > > hours > > > > > > > > > > for a > > > > > > > > > > > given topic, and there would not a be a sizable amount > of > > > > > > > > consumers. I > > > > > > > > > > > imagine that this would not scale well if someone was > > > > adjusting > > > > > > > this > > > > > > > > > > config > > > > > > > > > > > very frequently on a large system, but I don't know if > > > there > > > > > are > > > > > > > any > > > > > > > > > use > > > > > > > > > > > cases where that would occur. I imagine most use cases > > > would > > > > > > > involve > > > > > > > > > > > truncating the log after taking a snapshot or doing > some > > > > other > > > > > > > > > expensive > > > > > > > > > > > operation that didn't occur very frequently. > > > > > > > > > > > > > > > > > > > > > > On Mon, May 2, 2016 at 2:23 PM, Jay Kreps < > > > j...@confluent.io> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Two comments: > > > > > > > > > > > > > > > > > > > > > > > > 1. Is there a reason to use physical time rather > > than > > > > > > offset? > > > > > > > > The > > > > > > > > > > idea > > > > > > > > > > > > is for the consumer to say when it has consumed > > > > something > > > > > so > > > > > > > it > > > > > > > > > can > > > > > > > > > > be > > > > > > > > > > > > deleted, right? It seems like offset would be a > much > > > > more > > > > > > > > precise > > > > > > > > > > way > > > > > > > > > > > > to do > > > > > > > > > > > > this--i.e. the consumer says "I have checkpointed > > > state > > > > up > > > > > > to > > > > > > > > > > offset X > > > > > > > > > > > > you > > > > > > > > > > > > can get rid of anything prior to that". Doing this > > by > > > > > > > timestamp > > > > > > > > > > seems > > > > > > > > > > > > like > > > > > > > > > > > > it is just more error prone... > > > > > > > > > > > > 2. Is this mechanism practical to use at scale? It > > > > > requires > > > > > > > > > several > > > > > > > > > > ZK > > > > > > > > > > > > writes per config change, so I guess that depends > on > > > how > > > > > > > > > frequently > > > > > > > > > > > the > > > > > > > > > > > > consumers would update the value and how many > > > consumers > > > > > > there > > > > > > > > > > > are...any > > > > > > > > > > > > thoughts on this? > > > > > > > > > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Apr 28, 2016 at 8:28 AM, Bill Warshaw < > > > > > > > wdwars...@gmail.com > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > I'd like to re-initiate the vote for KIP-47 now > that > > > > KIP-33 > > > > > > has > > > > > > > > > been > > > > > > > > > > > > > accepted and is in-progress. I've updated the KIP > ( > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > 47+-+Add+timestamp-based+log+deletion+policy > > > > > > > > > > > > > ). > > > > > > > > > > > > > I have a commit with the functionality for KIP-47 > > ready > > > > to > > > > > go > > > > > > > > once > > > > > > > > > > > KIP-33 > > > > > > > > > > > > > is complete; it's a fairly minor change. > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 9, 2016 at 8:42 PM, Gwen Shapira < > > > > > > > g...@confluent.io> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > For convenience, the KIP is here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > 47+-+Add+timestamp-based+log+deletion+policy > > > > > > > > > > > > > > > > > > > > > > > > > > > > Do you mind updating the KIP with time formats > we > > > plan > > > > > on > > > > > > > > > > supporting > > > > > > > > > > > > > > in the configuration? > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 9, 2016 at 11:44 AM, Bill Warshaw < > > > > > > > > > wdwars...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hello, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'd like to initiate the vote for KIP-47. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > Bill Warshaw > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > >