First, a "mea culpa".   In the KIP I state that several other KIPs are
rejected alternatives.  That is not the case, they are addressing different
issues and would still work with KIP-1044.  I will address this with an
update to the KIP.

Addressing the case where PID has not been seen before:

The KIP adds Bloom filter on disk for each Snapshot file.  This is simply
to quickly load the Bloom filter at startup and is not read again in the
operation once the system is running.

The Snapshot file names and Bloom filters are paired <fileName, Bloom
filter> and placed in a sorted in memory list with the most recent snapshot
listed first: [<fn1,bf1>, <fn2,bf2>,<fn3,bf3>...<fnN,bfN>]

When a PID that has not been seen is presented the following would occur:


   1. The cache is searched and fails.
   2. The cache tries to find the PID in the snapshot files and fails.
   3. the PID is added to the cache.

So in step 2 the actions taken are effectively (some hand waving here to
gloss over some optimizations)

   1. a Bloom filter for the PID is created.
   2. Each of the Bloom filters in the list is checked to see if it
   contains the PID Bloom filter.  This operation is extremely fast (on the
   order of 4 machine instructions per 64 bits).  if we have properly
   configured the Bloom filters there should be a false positive 1 in 1K or 1
   in 10K times (This is adjustable).
   3. If there is a match on a Bloom filter the Snapshot file is scanned
   looking for the PID.  If found PID is returned.
   4. If we get here, none of the filters match, or none of the snapshots
   associated with matching filters contains the PID, so "no match" is
   reported.


Some thoughts:
Using the Commons-Collection bloom filter implementation and a sample of
snapshots I should be able to give you some idea of how fast the solution
is.  However, I have a couple of engagements (Community over Code, EU for
one) in the next few weeks that may delay that a bit.

I do recognize your point that the scanning of the snapshots may be too
slow for the new PID case, but I think with tuning we can adjust that.

Claude


On Tue, May 21, 2024 at 5:50 PM Justine Olshan <jols...@confluent.io.invalid>
wrote:

> Can you clarify the intended behavior? If we encounter a producer ID we've
> not seen before, we are supposed to read from disk and try to find it? I
> see the proposal mentions bloom filters, but it seems like it would not be
> cheap to search for the producer ID. I would expect the typical case to be
> that there is a new producer and we don't need to search state.
>
> And we intend to keep all producers we've ever seen on the cluster? I
> didn't see a mechanism to delete any of the information in the snapshots.
> Currently the snapshot logic is decoupled from the log retention as of
> KIP-360.
>
> Justine
>
> On Mon, May 20, 2024 at 11:20 PM Claude Warren <cla...@xenei.com> wrote:
>
> > The LRU cache is just that: a cache, so yes things expire from the cache
> > but they are not gone.  As long as a snapshot containing the PID is
> > available the PID can be found and reloaded into the cache (which is
> > exactly what I would expect it to do).
> >
> > The question of how long a PID is resolvable then becomes a question of
> how
> > long are snapshots retained.
> >
> > There are, in my mind, several advantages:
> >
> >    1. The in-memory cache can be smaller, reducing the memory footprint.
> >    This is not required but is possible.
> >    2. PIDs are never discarded because they are produced by slow
> >    producers.  They are discarded when the snapshots containing them
> > expire.
> >    3. The length of time between when a PID is received by the server and
> >    when it is recorded to a snapshot is significantly reduced.
> > Significantly
> >    reducing the window where PIDs can be lost.
> >    4. Throttling and other changes you wish to make to the cache are
> still
> >    possible.
> >
> >
> > On Mon, May 20, 2024 at 7:32 PM Justine Olshan
> > <jols...@confluent.io.invalid>
> > wrote:
> >
> > > My team has looked at it from a high level, but we haven't had the time
> > to
> > > come up with a full proposal.
> > >
> > > I'm not aware if others have worked on it.
> > >
> > > Justine
> > >
> > > On Mon, May 20, 2024 at 10:21 AM Omnia Ibrahim <
> o.g.h.ibra...@gmail.com>
> > > wrote:
> > >
> > > > Hi Justine are you aware of anyone looking into such new protocol at
> > the
> > > > moment?
> > > >
> > > > > On 20 May 2024, at 18:00, Justine Olshan
> > <jols...@confluent.io.INVALID
> > > >
> > > > wrote:
> > > > >
> > > > > I would say I have first hand knowledge of this issue as someone
> who
> > > > > responds to such incidents as part of my work at Confluent over the
> > > past
> > > > > couple years. :)
> > > > >
> > > > >> We only persist the information for the length of time we retain
> > > > > snapshots.
> > > > > This seems a bit contradictory to me. We are going to persist
> > > > (potentially)
> > > > > useless information if we have no signal if the producer is still
> > > active.
> > > > > This is the problem we have with old clients. We are always going
> to
> > > have
> > > > > to draw the line for how long we allow a producer to have a gap in
> > > > > producing vs how long we allow filling up with short-lived
> producers
> > > that
> > > > > risk OOM.
> > > > >
> > > > > With an LRU cache, we run into the same problem, as we will expire
> > all
> > > > > "well-behaved" infrequent producers that last produced before the
> > burst
> > > > of
> > > > > short-lived clients. The benefit is that we don't have a solid line
> > in
> > > > the
> > > > > sand and we only expire when we need to, but we will still risk
> > > expiring
> > > > > active producers.
> > > > >
> > > > > I am willing to discuss some solutions that work with older
> clients,
> > > but
> > > > my
> > > > > concern is spending too much time on a complicated solution and not
> > > > > encouraging movement to newer and better clients.
> > > > >
> > > > > Justine
> > > > >
> > > > > On Mon, May 20, 2024 at 9:35 AM Claude Warren <cla...@xenei.com>
> > > wrote:
> > > > >
> > > > >>>
> > > > >>> Why should we persist useless information
> > > > >>> for clients that are long gone and will never use it?
> > > > >>
> > > > >>
> > > > >> We are not.  We only persist the information for the length of
> time
> > we
> > > > >> retain snapshots.   The change here is to make the snapshots work
> as
> > > > longer
> > > > >> term storage for infrequent producers and others would would be
> > > > negatively
> > > > >> affected by some of the solutions proposed.
> > > > >>
> > > > >> Your changes require changes in the clients.   Older clients will
> > not
> > > be
> > > > >> able to participate.  My change does not require client change.
> > > > >> There are issues outside of the ones discussed.  I was told of
> this
> > > late
> > > > >> last week.  I will endeavor to find someone with first hand
> > knowledge
> > > of
> > > > >> the issue and have them report on this thread.
> > > > >>
> > > > >> In addition, the use of an LRU amortizes the cache cleanup so we
> > don't
> > > > need
> > > > >> a thread to expire things.  You still have the cache, the point is
> > > that
> > > > it
> > > > >> really is a cache, there is storage behind it.  Let the cache be a
> > > > cache,
> > > > >> let the snapshots be the storage backing behind the cache.
> > > > >>
> > > > >> On Fri, May 17, 2024 at 5:26 PM Justine Olshan
> > > > >> <jols...@confluent.io.invalid>
> > > > >> wrote:
> > > > >>
> > > > >>> Respectfully, I don't agree. Why should we persist useless
> > > information
> > > > >>> for clients that are long gone and will never use it?
> > > > >>> This is why I'm suggesting we do something smarter when it comes
> to
> > > > >> storing
> > > > >>> data and only store data we actually need and have a use for.
> > > > >>>
> > > > >>> This is why I suggest the heartbeat. It gives us clear
> information
> > > (up
> > > > to
> > > > >>> the heartbeat interval) of which producers are worth keeping and
> > > which
> > > > >> that
> > > > >>> are not.
> > > > >>> I'm not in favor of building a new and complicated system to try
> to
> > > > guess
> > > > >>> which information is needed. In my mind, if we have a ton of
> > > > legitimately
> > > > >>> active producers, we should scale up memory. If we don't there is
> > no
> > > > >> reason
> > > > >>> to have high memory usage.
> > > > >>>
> > > > >>> Fixing the client also allows us to fix some of the other issues
> we
> > > > have
> > > > >>> with idempotent producers.
> > > > >>>
> > > > >>> Justine
> > > > >>>
> > > > >>> On Fri, May 17, 2024 at 12:46 AM Claude Warren <cla...@xenei.com
> >
> > > > wrote:
> > > > >>>
> > > > >>>> I think that the point here is that the design that assumes that
> > you
> > > > >> can
> > > > >>>> keep all the PIDs in memory for all server configurations and
> all
> > > > >> usages
> > > > >>>> and all client implementations is fraught with danger.
> > > > >>>>
> > > > >>>> Yes, there are solutions already in place (KIP-854) that attempt
> > to
> > > > >>> address
> > > > >>>> this problem, and other proposed solutions to remove that have
> > > > >>> undesirable
> > > > >>>> side effects (e.g. Heartbeat interrupted by IP failure for a
> slow
> > > > >>> producer
> > > > >>>> with a long delay between posts).  KAFKA-16229 (Slow expiration
> of
> > > > >>> Producer
> > > > >>>> IDs leading to high CPU usage) dealt with how to expire data
> from
> > > the
> > > > >>> cache
> > > > >>>> so that there was minimal lag time.
> > > > >>>>
> > > > >>>> But the net issue is still the underlying design/architecture.
> > > > >>>>
> > > > >>>> There are a  couple of salient points here:
> > > > >>>>
> > > > >>>>   - The state of a state machine is only a view on its
> > transactions.
> > > > >>> This
> > > > >>>>   is the classic stream / table dichotomy.
> > > > >>>>   - What the "cache" is trying to do is create that view.
> > > > >>>>   - In some cases the size of the state exceeds the storage of
> the
> > > > >> cache
> > > > >>>>   and the systems fail.
> > > > >>>>   - The current solutions have attempted to place limits on the
> > size
> > > > >> of
> > > > >>>>   the state.
> > > > >>>>   - Errors in implementation and or configuration will
> eventually
> > > lead
> > > > >>> to
> > > > >>>>   "problem producers"
> > > > >>>>   - Under the adopted fixes and current slate of proposals, the
> > > > >> "problem
> > > > >>>>   producers" solutions have cascading side effects on properly
> > > behaved
> > > > >>>>   producers. (e.g. dropping long running, slow producing
> > producers)
> > > > >>>>
> > > > >>>> For decades (at least since the 1980's and anecdotally since the
> > > > >> 1960's)
> > > > >>>> there has been a solution to processing state where the size of
> > the
> > > > >> state
> > > > >>>> exceeded the memory available.  It is the solution that drove
> the
> > > idea
> > > > >>> that
> > > > >>>> you could have tables in Kafka.  The idea that we can store the
> > hot
> > > > >> PIDs
> > > > >>> in
> > > > >>>> memory using an LRU and write data to storage so that we can
> > quickly
> > > > >> find
> > > > >>>> things not in the cache is not new.  It has been proven.
> > > > >>>>
> > > > >>>> I am arguing that we should not throw away state data because we
> > are
> > > > >>>> running out of memory.  We should persist that data to disk and
> > > > >> consider
> > > > >>>> the disk as the source of truth for state.
> > > > >>>>
> > > > >>>> Claude
> > > > >>>>
> > > > >>>>
> > > > >>>> On Wed, May 15, 2024 at 7:42 PM Justine Olshan
> > > > >>>> <jols...@confluent.io.invalid>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>>> +1 to the comment.
> > > > >>>>>
> > > > >>>>>> I still feel we are doing all of this only because of a few
> > > > >>>> anti-pattern
> > > > >>>>> or misconfigured producers and not because we have “too many
> > > > >> Producer”.
> > > > >>>> I
> > > > >>>>> believe that implementing Producer heartbeat and remove
> > short-lived
> > > > >>> PIDs
> > > > >>>>> from the cache if we didn’t receive heartbeat will be more
> > simpler
> > > > >> and
> > > > >>>> step
> > > > >>>>> on right direction  to improve idempotent logic and maybe try
> to
> > > make
> > > > >>> PID
> > > > >>>>> get reused between session which will implement a real
> idempotent
> > > > >>>> producer
> > > > >>>>> instead of idempotent session.  I admit this wouldn’t help with
> > old
> > > > >>>> clients
> > > > >>>>> but it will put us on the right path.
> > > > >>>>>
> > > > >>>>> This issue is very complicated and I appreciate the attention
> on
> > > it.
> > > > >>>>> Hopefully we can find a good solution working together :)
> > > > >>>>>
> > > > >>>>> Justine
> > > > >>>>>
> > > > >>>>> On Wed, May 15, 2024 at 8:36 AM Omnia Ibrahim <
> > > > >> o.g.h.ibra...@gmail.com
> > > > >>>>
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>>> Also in the rejection alternatives you listed an approved KIP
> > > which
> > > > >>> is
> > > > >>>> a
> > > > >>>>>> bit confusing can you move this to motivations instead
> > > > >>>>>>
> > > > >>>>>>> On 15 May 2024, at 14:35, Claude Warren <cla...@apache.org>
> > > > >> wrote:
> > > > >>>>>>>
> > > > >>>>>>> This is a proposal that should solve the OOM problem on the
> > > > >> servers
> > > > >>>>>> without
> > > > >>>>>>> some of the other proposed KIPs being active.
> > > > >>>>>>>
> > > > >>>>>>> Full details in
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1044%3A+A+proposal+to+change+idempotent+producer+--+server+implementation
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> --
> > > > >>>> LinkedIn: http://www.linkedin.com/in/claudewarren
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> LinkedIn: http://www.linkedin.com/in/claudewarren
> > > > >>
> > > >
> > > >
> > >
> >
> >
> > --
> > LinkedIn: http://www.linkedin.com/in/claudewarren
> >
>

Reply via email to