Quick note:  I renamed the example code.  It is now at
https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManagerCache.java

On Thu, May 2, 2024 at 10:47 AM Claude Warren, Jr <claude.war...@aiven.io>
wrote:

> Igor,  thanks for taking the time to look and to review the code.  I
> regret that I have not pushed the latest code, but I will do so and will
> see what I can do about answering your Bloom filter related questions here.
>
>  How would an operator know or decide to change the configuration
>> for the number layers – producer.id.quota.cache.layer.count –
>> e.g. increasing from 4 to 5; and why?
>> Do we need a new metric to indicate that change could be useful?
>
>
> In our usage the layered Bloom filter [6] retains the record of a PID for
> producer.id.quota.window.size.seconds.  It breaks that window down into
> multiple fragments, so 4 layers = 15 minute fragments.  It "forgets" a
> fragment worth of data when the fragment has been around for
> window.size.seconds.  The layers will determine how big a chunk of time is
> deleted at once.  Changing the layers to 10 will yield 6 minute fragments,
> 60 will yield 1 minute fragments and so on.  There are other idiosyncrasies
> that I will get into later.  I would not set the value lower than 3.  If
> you find that there are multiple reports of new PIDs because on average
> they only ping every 50 minutes it might make sense to use more layers.  If
> you use too many layers then there will only be one PID in each layer, and
> at that point a simple list of Filters would be faster to search, but in
> reality does not make sense.  If you have two layers then recurring PIDs
> will be recorded in both layers.
>
>  Is producer.id.quota.cache.cleanup.scheduler.interval.ms a
>> guaranteed interval, or rather simply a delay between cleanups?
>> How did you decide on the default value of 10ms?
>
>
> In the code this is not used.  Cleanups are amortized across inserts to
> keep the layers balanced.  There is a thread that does a cleanup every
>  producer.id.quota.window.size.seconds /
> producer.id.quota.cache.layer.count seconds to detect principals that are
> no longer sending data.  This is a reasonable frequency as it will align
> well with when the layers actually expire.
>
>  Under "New ProducerIdQuotaManagerCache", the documentation for
>> the constructor params for ProducerIDQuotaManagerCache does not
>> match the constructor signature.
>
>
> Sorry, this is because I did not push the changes.  The constructor is
> ProducerIDQuotaManagerCache(Double falsePositiveRate, long ttl, int
> layerCount).  Where falsePositiveRate is the Bloom filter false positive
> rate, ttl is producer.id.quota.window.size.seconds in milliseconds, and the
> layerCount is the desired number of layers.
>
> Under "New ProducerIdQuotaManagerCache":
>>   public boolean track(KafkaPrincipal principal, int producerIdRate, long
>> pid)
>> How is producerIdRate used? The reference implementation Claude shared
>> does not use it.
>>
>> https://github.com/Claudenw/kafka/blob/49b6eb0fb5cfaf19b072fd87986072a683ab976c/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java
>
>
> Again, sorry for not updating the code.  The producer rate is used to
> build a Bloom filter of the proper size.  The producer rate is the number
> of PIDs per hour expected to be created by the principal.  The Bloom filter
> shape [1] is determined by the expected number of PIDs per layer
> (producerRate * seconds_per_hour / producer.id.quota.window.size.seconds /
> producer.id.quota.cache.layer.count) and the falsePositiveRate from the
> constructor.  These values are used to call the Shape.fromNP() method.
> This is the Shape of the Bloom filters in the layer.  It is only used when
> the principal is not found in the cache.  Thomas Hurst has provided a web
> page [5] where you can explore the interaction between number of items and
> false positive rate.
>
> I could not find a description or definition for
>> TimestampedBloomFilter, could we add that to the KIP?
>
>
> I will add it.  It is simply an implementation of WrappedBloomFilter [2]
> that adds the timestamp for when the filter was created.
>
> LayeredBloomFilter will have a fixed size (right?), but some
>> users (KafkaPrincipal) might only use a small number of PIDs.
>> It it worth having a dual strategy, where we simply keep a Set of
>> PIDs until we reach certain size where it pays off to use
>> the LayeredBloomFilter?
>
>
> Each principal has its own Layered Bloom filter.
>
> Here come the idiosyncrasies and benefits of the layered Bloom filter.
> The layered Bloom filter can be thought of as a collection of bloom filters
> that are queried to see if the item being searched for (target) has been
> seen.  There are a bunch of ways that the layered filter could be used.
> You could have a layer for each storage location in a multiple location
> storage engine for example.  But in our case the layer signifies a starting
> time fragment.  That fragment will be at most
> producer.id.quota.window.size.seconds / producer.id.quota.cache.layer.count
> seconds long.  The earliest layers are at the lower indices, the latest one
> at the highest.
>
> In general, when an item is added to the Layered Bloom filter the
> following processes take place:
>
>    - old layers filters are removed using the LayerManager.Cleanup [3]
>    instance.
>    - a new layer is added if necessary / requested using
>    the LayerManager.ExtendCheck [4] and a Supplier<BloomFilter>.
>    - The target is added to the last layer.
>
> In our case, the Cleanup starts at layer 0 and removes any layer for which
> the timestamp has expired.
> The ExtendCheck adds a layer if the current layer is full or
> producer.id.quota.window.size.seconds / producer.id.quota.cache.layer.count
> seconds has expired.
> When a new layer is created it is created with the timestamp of now + ttl
> (ttl from the constructor)
>
> So principals that are expected to produce fewer PIDs have smaller Bloom
> filters in the layers than principals that are expected to produce more
> PIDs.
>
> Since the producerIdRate is an estimation we need to look at 3 cases:
>
>    1. The number of PIDs is equal to the producerIdRate.  They layered
>    Bloom filter works as explained above.
>    2. The number of PIDs is smaller than the producer Rate.  The number
>    of layers may decrease depending on the frequency of PIDs.  If there is one
>    pid every 15 minutes (assuming the default settings) then there would be 4
>    layers each having one PID stored in it.  If the production was bursty and
>    there were 4 pids and then nothing for 50 minutes there would only be 2
>    layers. (the intervening layers would have been disposed of).
>    3. The number of PIDS is larger  than the producer Rate.  Then the
>    ExtendCheck will detect when the last layer has recorded producerIdRate
>    entries and create a new layer.  This has two effects.  First, it means
>    that the false positive rate is not exceeded.  Second, the number of layers
>    will grow as needed and will shrink back to the 4 layer size when the
>    excess PIDs are producer.id.quota.window.size.seconds old.
>
> So in summary:
>
> producer.id.quota.window.size.seconds determines how long the record of a
> PID being used is retained, how long it takes for bursts of excessive PIDs
> (extra layers) to be removed from the system.  It influences the number of
> bits in the Bloom filters
>
> producer.id.quota.cache.layer.count identifies the ideal number of
> layers.  It influences the number of bits in the Bloom filters.
>
> producerIdRate strongly influences the number of bits in the Bloom filters
> but is only used when the principal is not already active in the cache.
>
> producer.id.quota.cache.false.positive.rate influences the number of bits
> in the Bloom filters.
>
> My statement "If you have two layers then recurring PIDs will be recorded
> in both layers." needs some explanation.  Let's look at the normal
> processing of our default configuration.
>
> A PID is passed to the track() method.
> If the PID is not found in the Bloom filter it is added to the last layer
> and the caller is signaled that the PID is new.
> If the PID is found in the Bloom filter the caller is signaled that the
> PID has been seen.
>
> in an hour the layer with the PID will be cleared from the layered Bloom
> filter.
> If the PID is reported again it will be seen as a new PID.
>
> However, if the PID is seen at a time beyond 15 minutes after the original
> recording, that sighting is not recorded (because the layered Bloom filter
> says that there).  This means that when the layer with PID recording is
> removed the PID will be considered new even though it was seen within the
> last hour.  To solve this problem, when a PID is seen we verify that it is
> recorded in a layer that is included in the last logical layer time
> window.  A logical layer time window is the expected length of time for the
> layer (producer.id.quota.window.size.seconds /
> producer.id.quota.cache.layer.count seconds), this just means that we are
> accounting for the cases where the number of layers has increased due to
> the volume of PIDs.
>
> I hope this answers some questions and doesn't open up too many more,
> Claude
>
> [1]
> https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/Shape.html
> [2]
> https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/WrappedBloomFilter.html
> [3]
> https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/LayerManager.Cleanup.html
> [4]
> https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/LayerManager.ExtendCheck.html
> [5] https://hur.st/bloomfilter/
> [6]
> https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/LayeredBloomFilter.html
>
> On Wed, May 1, 2024 at 4:42 PM Igor Soarez <i...@soarez.me> wrote:
>
>> Hi Omnia, Hi Claude,
>>
>> Thanks for putting this KIP together.
>> This is an important unresolved issue in Kafka,
>> which I have witnessed several times in production.
>>
>> Please see my questions below:
>>
>> 10 Given the goal is to prevent OOMs, do we also need to
>> limit the number of KafkaPrincipals in use?
>>
>> 11. How would an operator know or decide to change the configuration
>> for the number layers – producer.id.quota.cache.layer.count –
>> e.g. increasing from 4 to 5; and why?
>> Do we need a new metric to indicate that change could be useful?
>>
>> 12. Is producer.id.quota.cache.cleanup.scheduler.interval.ms a
>> guaranteed interval, or rather simply a delay between cleanups?
>> How did you decide on the default value of 10ms?
>>
>> 13. Under "New ProducerIdQuotaManagerCache", the documentation for
>> the constructor params for ProducerIDQuotaManagerCache does not
>> match the constructor signature.
>>
>> 14. Under "New ProducerIdQuotaManagerCache":
>>   public boolean track(KafkaPrincipal principal, int producerIdRate, long
>> pid)
>> How is producerIdRate used? The reference implementation Claude shared
>> does not use it.
>>
>> https://github.com/Claudenw/kafka/blob/49b6eb0fb5cfaf19b072fd87986072a683ab976c/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java
>>
>> 15. I could not find a description or definition for
>> TimestampedBloomFilter, could we add that to the KIP?
>>
>> 16. LayeredBloomFilter will have a fixed size (right?), but some
>> users (KafkaPrincipal) might only use a small number of PIDs.
>> It it worth having a dual strategy, where we simply keep a Set of
>> PIDs until we reach certain size where it pays off to use
>> the LayeredBloomFilter?
>>
>> 17. Under "Rejected Alternatives" > "4. Throttle INIT_PRODUCER_ID
>> requests",
>> the KIP states:
>>
>>   a. INIT_PRODUCER_ID for idempotent producer request PIDs from
>>   random controller every time so if a client got throttled on
>>   one controller doesn't guarantee it will not go through on next
>>   controller causing OOM at the leader later.
>>
>> Is the INIT_PRODUCER_ID request really sent to a "random controller"?
>> From a quick look at Sender.maybeSendAndPollTransactionalRequest,
>> for an idempotent producer, targetNode is set to the broker with
>> fewest outstanding requests. Am I looking at the wrong place?
>>
>> 18. Under "Rejected Alternatives" > "4. Throttle INIT_PRODUCER_ID
>> requests",
>> the KIP states:
>>
>>   This solution might look simple however throttling the INIT_PRODUCER_ID
>>   doesn't guarantee the OOM wouldn't happened as
>>   (...)
>>   b. The problem happened on the activation of the PID when it
>>   produce and not at the initialisation. Which means Kafka wouldn't
>>   have OOM problem if the producer got assigned PID but crashed before
>>   producing anything.
>>
>> Point b. does not seem to support the claim above?
>>
>> 19. Under "Rejected Alternatives" > "4. Throttle INIT_PRODUCER_ID
>> requests",
>> the KIP states:
>>
>>   c. Throttling producers that crash between initialisation and
>>   producing could slow them down when they recover/fix the
>>   problem that caused them to crash right after initialising PID.
>>
>> Doesn't it depend on the back-off time or how quotas are enforced?
>> I’m not sure this would necessarily be a problem?
>>
>> 20. If the allocation of PIDs for idempotent producers was
>> centralized, or otherwise the the targetNode for that request
>> was predictable, would that make throttling INIT_PRODUCER_ID
>> a viable solution?
>>
>>
>> Best,
>>
>> --
>> Igor
>>
>>
>>

Reply via email to