Igor,  thanks for taking the time to look and to review the code.  I regret
that I have not pushed the latest code, but I will do so and will see what
I can do about answering your Bloom filter related questions here.

 How would an operator know or decide to change the configuration
> for the number layers – producer.id.quota.cache.layer.count –
> e.g. increasing from 4 to 5; and why?
> Do we need a new metric to indicate that change could be useful?


In our usage the layered Bloom filter [6] retains the record of a PID for
producer.id.quota.window.size.seconds.  It breaks that window down into
multiple fragments, so 4 layers = 15 minute fragments.  It "forgets" a
fragment worth of data when the fragment has been around for
window.size.seconds.  The layers will determine how big a chunk of time is
deleted at once.  Changing the layers to 10 will yield 6 minute fragments,
60 will yield 1 minute fragments and so on.  There are other idiosyncrasies
that I will get into later.  I would not set the value lower than 3.  If
you find that there are multiple reports of new PIDs because on average
they only ping every 50 minutes it might make sense to use more layers.  If
you use too many layers then there will only be one PID in each layer, and
at that point a simple list of Filters would be faster to search, but in
reality does not make sense.  If you have two layers then recurring PIDs
will be recorded in both layers.

 Is producer.id.quota.cache.cleanup.scheduler.interval.ms a
> guaranteed interval, or rather simply a delay between cleanups?
> How did you decide on the default value of 10ms?


In the code this is not used.  Cleanups are amortized across inserts to
keep the layers balanced.  There is a thread that does a cleanup every
 producer.id.quota.window.size.seconds /
producer.id.quota.cache.layer.count seconds to detect principals that are
no longer sending data.  This is a reasonable frequency as it will align
well with when the layers actually expire.

 Under "New ProducerIdQuotaManagerCache", the documentation for
> the constructor params for ProducerIDQuotaManagerCache does not
> match the constructor signature.


Sorry, this is because I did not push the changes.  The constructor is
ProducerIDQuotaManagerCache(Double falsePositiveRate, long ttl, int
layerCount).  Where falsePositiveRate is the Bloom filter false positive
rate, ttl is producer.id.quota.window.size.seconds in milliseconds, and the
layerCount is the desired number of layers.

Under "New ProducerIdQuotaManagerCache":
>   public boolean track(KafkaPrincipal principal, int producerIdRate, long
> pid)
> How is producerIdRate used? The reference implementation Claude shared
> does not use it.
>
> https://github.com/Claudenw/kafka/blob/49b6eb0fb5cfaf19b072fd87986072a683ab976c/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java


Again, sorry for not updating the code.  The producer rate is used to build
a Bloom filter of the proper size.  The producer rate is the number of PIDs
per hour expected to be created by the principal.  The Bloom filter shape
[1] is determined by the expected number of PIDs per layer (producerRate *
seconds_per_hour / producer.id.quota.window.size.seconds /
producer.id.quota.cache.layer.count) and the falsePositiveRate from the
constructor.  These values are used to call the Shape.fromNP() method.
This is the Shape of the Bloom filters in the layer.  It is only used when
the principal is not found in the cache.  Thomas Hurst has provided a web
page [5] where you can explore the interaction between number of items and
false positive rate.

I could not find a description or definition for
> TimestampedBloomFilter, could we add that to the KIP?


I will add it.  It is simply an implementation of WrappedBloomFilter [2]
that adds the timestamp for when the filter was created.

LayeredBloomFilter will have a fixed size (right?), but some
> users (KafkaPrincipal) might only use a small number of PIDs.
> It it worth having a dual strategy, where we simply keep a Set of
> PIDs until we reach certain size where it pays off to use
> the LayeredBloomFilter?


Each principal has its own Layered Bloom filter.

Here come the idiosyncrasies and benefits of the layered Bloom filter.  The
layered Bloom filter can be thought of as a collection of bloom filters
that are queried to see if the item being searched for (target) has been
seen.  There are a bunch of ways that the layered filter could be used.
You could have a layer for each storage location in a multiple location
storage engine for example.  But in our case the layer signifies a starting
time fragment.  That fragment will be at most
producer.id.quota.window.size.seconds / producer.id.quota.cache.layer.count
seconds long.  The earliest layers are at the lower indices, the latest one
at the highest.

In general, when an item is added to the Layered Bloom filter the following
processes take place:

   - old layers filters are removed using the LayerManager.Cleanup [3]
   instance.
   - a new layer is added if necessary / requested using
   the LayerManager.ExtendCheck [4] and a Supplier<BloomFilter>.
   - The target is added to the last layer.

In our case, the Cleanup starts at layer 0 and removes any layer for which
the timestamp has expired.
The ExtendCheck adds a layer if the current layer is full or
producer.id.quota.window.size.seconds / producer.id.quota.cache.layer.count
seconds has expired.
When a new layer is created it is created with the timestamp of now + ttl
(ttl from the constructor)

So principals that are expected to produce fewer PIDs have smaller Bloom
filters in the layers than principals that are expected to produce more
PIDs.

Since the producerIdRate is an estimation we need to look at 3 cases:

   1. The number of PIDs is equal to the producerIdRate.  They layered
   Bloom filter works as explained above.
   2. The number of PIDs is smaller than the producer Rate.  The number of
   layers may decrease depending on the frequency of PIDs.  If there is one
   pid every 15 minutes (assuming the default settings) then there would be 4
   layers each having one PID stored in it.  If the production was bursty and
   there were 4 pids and then nothing for 50 minutes there would only be 2
   layers. (the intervening layers would have been disposed of).
   3. The number of PIDS is larger  than the producer Rate.  Then the
   ExtendCheck will detect when the last layer has recorded producerIdRate
   entries and create a new layer.  This has two effects.  First, it means
   that the false positive rate is not exceeded.  Second, the number of layers
   will grow as needed and will shrink back to the 4 layer size when the
   excess PIDs are producer.id.quota.window.size.seconds old.

So in summary:

producer.id.quota.window.size.seconds determines how long the record of a
PID being used is retained, how long it takes for bursts of excessive PIDs
(extra layers) to be removed from the system.  It influences the number of
bits in the Bloom filters

producer.id.quota.cache.layer.count identifies the ideal number of layers.
It influences the number of bits in the Bloom filters.

producerIdRate strongly influences the number of bits in the Bloom filters
but is only used when the principal is not already active in the cache.

producer.id.quota.cache.false.positive.rate influences the number of bits
in the Bloom filters.

My statement "If you have two layers then recurring PIDs will be recorded
in both layers." needs some explanation.  Let's look at the normal
processing of our default configuration.

A PID is passed to the track() method.
If the PID is not found in the Bloom filter it is added to the last layer
and the caller is signaled that the PID is new.
If the PID is found in the Bloom filter the caller is signaled that the PID
has been seen.

in an hour the layer with the PID will be cleared from the layered Bloom
filter.
If the PID is reported again it will be seen as a new PID.

However, if the PID is seen at a time beyond 15 minutes after the original
recording, that sighting is not recorded (because the layered Bloom filter
says that there).  This means that when the layer with PID recording is
removed the PID will be considered new even though it was seen within the
last hour.  To solve this problem, when a PID is seen we verify that it is
recorded in a layer that is included in the last logical layer time
window.  A logical layer time window is the expected length of time for the
layer (producer.id.quota.window.size.seconds /
producer.id.quota.cache.layer.count seconds), this just means that we are
accounting for the cases where the number of layers has increased due to
the volume of PIDs.

I hope this answers some questions and doesn't open up too many more,
Claude

[1]
https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/Shape.html
[2]
https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/WrappedBloomFilter.html
[3]
https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/LayerManager.Cleanup.html
[4]
https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/LayerManager.ExtendCheck.html
[5] https://hur.st/bloomfilter/
[6]
https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/LayeredBloomFilter.html

On Wed, May 1, 2024 at 4:42 PM Igor Soarez <i...@soarez.me> wrote:

> Hi Omnia, Hi Claude,
>
> Thanks for putting this KIP together.
> This is an important unresolved issue in Kafka,
> which I have witnessed several times in production.
>
> Please see my questions below:
>
> 10 Given the goal is to prevent OOMs, do we also need to
> limit the number of KafkaPrincipals in use?
>
> 11. How would an operator know or decide to change the configuration
> for the number layers – producer.id.quota.cache.layer.count –
> e.g. increasing from 4 to 5; and why?
> Do we need a new metric to indicate that change could be useful?
>
> 12. Is producer.id.quota.cache.cleanup.scheduler.interval.ms a
> guaranteed interval, or rather simply a delay between cleanups?
> How did you decide on the default value of 10ms?
>
> 13. Under "New ProducerIdQuotaManagerCache", the documentation for
> the constructor params for ProducerIDQuotaManagerCache does not
> match the constructor signature.
>
> 14. Under "New ProducerIdQuotaManagerCache":
>   public boolean track(KafkaPrincipal principal, int producerIdRate, long
> pid)
> How is producerIdRate used? The reference implementation Claude shared
> does not use it.
>
> https://github.com/Claudenw/kafka/blob/49b6eb0fb5cfaf19b072fd87986072a683ab976c/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java
>
> 15. I could not find a description or definition for
> TimestampedBloomFilter, could we add that to the KIP?
>
> 16. LayeredBloomFilter will have a fixed size (right?), but some
> users (KafkaPrincipal) might only use a small number of PIDs.
> It it worth having a dual strategy, where we simply keep a Set of
> PIDs until we reach certain size where it pays off to use
> the LayeredBloomFilter?
>
> 17. Under "Rejected Alternatives" > "4. Throttle INIT_PRODUCER_ID
> requests",
> the KIP states:
>
>   a. INIT_PRODUCER_ID for idempotent producer request PIDs from
>   random controller every time so if a client got throttled on
>   one controller doesn't guarantee it will not go through on next
>   controller causing OOM at the leader later.
>
> Is the INIT_PRODUCER_ID request really sent to a "random controller"?
> From a quick look at Sender.maybeSendAndPollTransactionalRequest,
> for an idempotent producer, targetNode is set to the broker with
> fewest outstanding requests. Am I looking at the wrong place?
>
> 18. Under "Rejected Alternatives" > "4. Throttle INIT_PRODUCER_ID
> requests",
> the KIP states:
>
>   This solution might look simple however throttling the INIT_PRODUCER_ID
>   doesn't guarantee the OOM wouldn't happened as
>   (...)
>   b. The problem happened on the activation of the PID when it
>   produce and not at the initialisation. Which means Kafka wouldn't
>   have OOM problem if the producer got assigned PID but crashed before
>   producing anything.
>
> Point b. does not seem to support the claim above?
>
> 19. Under "Rejected Alternatives" > "4. Throttle INIT_PRODUCER_ID
> requests",
> the KIP states:
>
>   c. Throttling producers that crash between initialisation and
>   producing could slow them down when they recover/fix the
>   problem that caused them to crash right after initialising PID.
>
> Doesn't it depend on the back-off time or how quotas are enforced?
> I’m not sure this would necessarily be a problem?
>
> 20. If the allocation of PIDs for idempotent producers was
> centralized, or otherwise the the targetNode for that request
> was predictable, would that make throttling INIT_PRODUCER_ID
> a viable solution?
>
>
> Best,
>
> --
> Igor
>
>
>

Reply via email to