Bill,

That's a good question. I am thinking of the following approach for
implementing trim(): (1) client issues metadata request to the broker to
determine the leader of topic/partitions and groups topic/partitions by the
leader broker; (2) client sends a TrimRequest to each broker with
partitions whose leader is on the broker; (3) leader trim the corresponding
segments locally; (4) extend the fetch request/response protocol such that
the leader propagates a firstOffset (first available offset) for each
partition in the follower fetch response; (5) follower trims the local
segment according to firstOffset in the fetch response. We have been
talking about adding an admin client as part of KIP-4. We could add such a
trim() method in the admin client.

Thanks,

Jun


On Mon, Oct 24, 2016 at 5:29 PM, Bill Warshaw <wdwars...@gmail.com> wrote:

> Hi Jun,
>
> Those are valid concerns.  For our particular use case, application events
> triggering the timestamp update will never occur more than once an hour,
> and we maintain a sliding window so that we don't delete messages too close
> to what our consumers may be reading.
> For more general use cases, developers will need to be aware of these
> issues, and would need to write their application code with that in mind.
>
>
> To your second point: I initially wanted to just have a trim() admin api.
> I started implementing it, but ran into difficulties with synchronously
> acknowledging to the calling code that all brokers had truncated the given
> partitions.  It seemed like we would have to do something similar to how
> topic deletion is implemented, where the initial broker uses Zookeeper to
> coordinate the deletion on the other brokers.  If you have a simpler idea
> in mind, I'd be happy to update this KIP to provide a trim() api instead.
>
> On Mon, Oct 24, 2016 at 8:15 PM Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Bill,
> >
> > Thanks for the proposal. Sorry for the late reply.
> >
> > The motivation of the proposal makes sense: don't delete the messages
> until
> > the application tells you so.
> >
> > I am wondering if the current proposal is the best way to address the
> need
> > though. There are couple of issues that I saw with the proposal. (1)
> > Messages in the log may not always be stored in increasing timestamp
> order.
> > Suppose that the application sets log.retention.min.timestamp to T and
> > after that messages with timestamp older than T ((either due to delay or
> > reprocessing) are published to that topic. Those newly published messages
> > are likely going to be deleted immediately before the application gets a
> > chance to read them, which is probably not what the application wants.
> (2)
> > The configuration for the topic has to be changed continuously to
> implement
> > the use case. Intuitively, one probably shouldn't be changing a
> > configuration all the time.
> >
> > Another way to achieve the goal is what Jay mentioned earlier. We could
> add
> > a trim() api like the following that will trim the log up to the
> specified
> > offsets. This addresses both of the above issues that I mentioned. Will
> > that work for you?
> >
> > void trim(Map<TopicPartition, Long> offsetsToTruncate)
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Oct 5, 2016 at 1:55 PM, Bill Warshaw <wdwars...@gmail.com>
> wrote:
> >
> > > Bumping for visibility.  KIP is here:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 47+-+Add+timestamp-based+log+deletion+policy
> > >
> > > On Wed, Aug 24, 2016 at 2:32 PM Bill Warshaw <wdwars...@gmail.com>
> > wrote:
> > >
> > > > Hello Guozhang,
> > > >
> > > > KIP-71 seems unrelated to this KIP.  KIP-47 is just adding a new
> > deletion
> > > > policy (minimum timestamp), while KIP-71 is allowing deletion and
> > > > compaction to coexist.
> > > >
> > > > They both will touch LogManager, but the change for KIP-47 is very
> > > > isolated.
> > > >
> > > > On Wed, Aug 24, 2016 at 2:21 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > Hi Bill,
> > > >
> > > > I would like to reason if there is any correlation between this KIP
> and
> > > > KIP-71
> > > >
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 71%3A+Enable+log+compaction+and+deletion+to+co-exist
> > > >
> > > > I feel they are orthogonal but would like to double check with you.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Wed, Aug 24, 2016 at 11:05 AM, Bill Warshaw <wdwars...@gmail.com>
> > > > wrote:
> > > >
> > > > > I'd like to re-awaken this voting thread now that KIP-33 has
> merged.
> > > > This
> > > > > KIP is now completely unblocked.  I have a working branch off of
> > trunk
> > > > with
> > > > > my proposed fix, including testing.
> > > > >
> > > > > On Mon, May 9, 2016 at 8:30 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Jay, Bill:
> > > > > >
> > > > > > I'm thinking of one general use case of using timestamp rather
> than
> > > > > offset
> > > > > > for log deletion, which is that for expiration handling in data
> > > > > > replication, when the source data store decides to expire some
> data
> > > > > records
> > > > > > based on their timestamps, today we need to configure the
> > > corresponding
> > > > > > Kafka changelog topic for compaction, and actively send a
> tombstone
> > > for
> > > > > > each expired record. Since expiration usually happens with a
> bunch
> > of
> > > > > > records, this could generate large tombstone traffic. For
> example I
> > > > think
> > > > > > LI's data replication for Espresso is seeing similar issues and
> > they
> > > > are
> > > > > > just not sending tombstone at all.
> > > > > >
> > > > > > With timestamp based log deletion policy, this can be easily
> > handled
> > > by
> > > > > > simply setting the current expiration timestamp; but ideally one
> > > would
> > > > > > prefer to configure this topic to be both log compaction enabled
> as
> > > > well
> > > > > as
> > > > > > log deletion enabled. From that point of view, I feel that
> current
> > > KIP
> > > > > > still has value to be accepted.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Mon, May 2, 2016 at 2:37 PM, Bill Warshaw <
> wdwars...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Yes, I'd agree that offset is a more precise configuration than
> > > > > > timestamp.
> > > > > > > If there was a way to set a partition-level configuration, I
> > would
> > > > > rather
> > > > > > > use log.retention.min.offset than timestamp.  If you have an
> > > approach
> > > > > in
> > > > > > > mind I'd be open to investigating it.
> > > > > > >
> > > > > > > On Mon, May 2, 2016 at 5:33 PM, Jay Kreps <j...@confluent.io>
> > > wrote:
> > > > > > >
> > > > > > > > Gotcha, good point. But barring that limitation, you agree
> that
> > > > that
> > > > > > > makes
> > > > > > > > more sense?
> > > > > > > >
> > > > > > > > -Jay
> > > > > > > >
> > > > > > > > On Mon, May 2, 2016 at 2:29 PM, Bill Warshaw <
> > > wdwars...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > The problem with offset as a config option is that offsets
> > are
> > > > > > > > > partition-specific, so we'd need a per-partition config.
> > This
> > > > > would
> > > > > > > work
> > > > > > > > > for our particular use case, where we have single-partition
> > > > topics,
> > > > > > but
> > > > > > > > for
> > > > > > > > > multiple-partition topics it would delete from all
> partitions
> > > > based
> > > > > > on
> > > > > > > a
> > > > > > > > > global topic-level offset.
> > > > > > > > >
> > > > > > > > > On Mon, May 2, 2016 at 4:32 PM, Jay Kreps <
> j...@confluent.io>
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > > I think you are saying you considered a kind of trim()
> api
> > > that
> > > > > > would
> > > > > > > > > > synchronously chop off the tail of the log starting from
> a
> > > > given
> > > > > > > > offset.
> > > > > > > > > > That would be one option, but what I was saying was
> > slightly
> > > > > > > different:
> > > > > > > > > in
> > > > > > > > > > the proposal you have where there is a config that
> controls
> > > > > > retention
> > > > > > > > > that
> > > > > > > > > > the user would update, wouldn't it make more sense for
> this
> > > > > config
> > > > > > to
> > > > > > > > be
> > > > > > > > > > based on offset rather than timestamp?
> > > > > > > > > >
> > > > > > > > > > -Jay
> > > > > > > > > >
> > > > > > > > > > On Mon, May 2, 2016 at 12:53 PM, Bill Warshaw <
> > > > > wdwars...@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > 1.  Initially I looked at using the actual offset, by
> > > adding
> > > > a
> > > > > > call
> > > > > > > > to
> > > > > > > > > > > AdminUtils to just delete anything in a given
> > > topic/partition
> > > > > to
> > > > > > a
> > > > > > > > > given
> > > > > > > > > > > offset.  I ran into a lot of trouble here trying to
> work
> > > out
> > > > > how
> > > > > > > the
> > > > > > > > > > system
> > > > > > > > > > > would recognize that every broker had successfully
> > deleted
> > > > that
> > > > > > > range
> > > > > > > > > > from
> > > > > > > > > > > the partition before returning to the client.  If we
> were
> > > ok
> > > > > > > treating
> > > > > > > > > > this
> > > > > > > > > > > as a completely asynchronous operation I would be open
> to
> > > > > > > revisiting
> > > > > > > > > this
> > > > > > > > > > > approach.
> > > > > > > > > > >
> > > > > > > > > > > 2.  For our use case, we would be updating the config
> > every
> > > > few
> > > > > > > hours
> > > > > > > > > > for a
> > > > > > > > > > > given topic, and there would not a be a sizable amount
> of
> > > > > > > > consumers.  I
> > > > > > > > > > > imagine that this would not scale well if someone was
> > > > adjusting
> > > > > > > this
> > > > > > > > > > config
> > > > > > > > > > > very frequently on a large system, but I don't know if
> > > there
> > > > > are
> > > > > > > any
> > > > > > > > > use
> > > > > > > > > > > cases where that would occur.  I imagine most use cases
> > > would
> > > > > > > involve
> > > > > > > > > > > truncating the log after taking a snapshot or doing
> some
> > > > other
> > > > > > > > > expensive
> > > > > > > > > > > operation that didn't occur very frequently.
> > > > > > > > > > >
> > > > > > > > > > > On Mon, May 2, 2016 at 2:23 PM, Jay Kreps <
> > > j...@confluent.io>
> > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Two comments:
> > > > > > > > > > > >
> > > > > > > > > > > >    1. Is there a reason to use physical time rather
> > than
> > > > > > offset?
> > > > > > > > The
> > > > > > > > > > idea
> > > > > > > > > > > >    is for the consumer to say when it has consumed
> > > > something
> > > > > so
> > > > > > > it
> > > > > > > > > can
> > > > > > > > > > be
> > > > > > > > > > > >    deleted, right? It seems like offset would be a
> much
> > > > more
> > > > > > > > precise
> > > > > > > > > > way
> > > > > > > > > > > > to do
> > > > > > > > > > > >    this--i.e. the consumer says "I have checkpointed
> > > state
> > > > up
> > > > > > to
> > > > > > > > > > offset X
> > > > > > > > > > > > you
> > > > > > > > > > > >    can get rid of anything prior to that". Doing this
> > by
> > > > > > > timestamp
> > > > > > > > > > seems
> > > > > > > > > > > > like
> > > > > > > > > > > >    it is just more error prone...
> > > > > > > > > > > >    2. Is this mechanism practical to use at scale? It
> > > > > requires
> > > > > > > > > several
> > > > > > > > > > ZK
> > > > > > > > > > > >    writes per config change, so I guess that depends
> on
> > > how
> > > > > > > > > frequently
> > > > > > > > > > > the
> > > > > > > > > > > >    consumers would update the value and how many
> > > consumers
> > > > > > there
> > > > > > > > > > > are...any
> > > > > > > > > > > >    thoughts on this?
> > > > > > > > > > > >
> > > > > > > > > > > > -Jay
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Apr 28, 2016 at 8:28 AM, Bill Warshaw <
> > > > > > > wdwars...@gmail.com
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > I'd like to re-initiate the vote for KIP-47 now
> that
> > > > KIP-33
> > > > > > has
> > > > > > > > > been
> > > > > > > > > > > > > accepted and is in-progress.  I've updated the KIP
> (
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 47+-+Add+timestamp-based+log+deletion+policy
> > > > > > > > > > > > > ).
> > > > > > > > > > > > > I have a commit with the functionality for KIP-47
> > ready
> > > > to
> > > > > go
> > > > > > > > once
> > > > > > > > > > > KIP-33
> > > > > > > > > > > > > is complete; it's a fairly minor change.
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Mar 9, 2016 at 8:42 PM, Gwen Shapira <
> > > > > > > g...@confluent.io>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > For convenience, the KIP is here:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 47+-+Add+timestamp-based+log+deletion+policy
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Do you mind updating the KIP with  time formats
> we
> > > plan
> > > > > on
> > > > > > > > > > supporting
> > > > > > > > > > > > > > in the configuration?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Mar 9, 2016 at 11:44 AM, Bill Warshaw <
> > > > > > > > > wdwars...@gmail.com
> > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > Hello,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I'd like to initiate the vote for KIP-47.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Bill Warshaw
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > > >
> > >
> >
>

Reply via email to