Igor, thanks for taking the time to look and to review the code. I regret that I have not pushed the latest code, but I will do so and will see what I can do about answering your Bloom filter related questions here.
How would an operator know or decide to change the configuration > for the number layers – producer.id.quota.cache.layer.count – > e.g. increasing from 4 to 5; and why? > Do we need a new metric to indicate that change could be useful? In our usage the layered Bloom filter [6] retains the record of a PID for producer.id.quota.window.size.seconds. It breaks that window down into multiple fragments, so 4 layers = 15 minute fragments. It "forgets" a fragment worth of data when the fragment has been around for window.size.seconds. The layers will determine how big a chunk of time is deleted at once. Changing the layers to 10 will yield 6 minute fragments, 60 will yield 1 minute fragments and so on. There are other idiosyncrasies that I will get into later. I would not set the value lower than 3. If you find that there are multiple reports of new PIDs because on average they only ping every 50 minutes it might make sense to use more layers. If you use too many layers then there will only be one PID in each layer, and at that point a simple list of Filters would be faster to search, but in reality does not make sense. If you have two layers then recurring PIDs will be recorded in both layers. Is producer.id.quota.cache.cleanup.scheduler.interval.ms a > guaranteed interval, or rather simply a delay between cleanups? > How did you decide on the default value of 10ms? In the code this is not used. Cleanups are amortized across inserts to keep the layers balanced. There is a thread that does a cleanup every producer.id.quota.window.size.seconds / producer.id.quota.cache.layer.count seconds to detect principals that are no longer sending data. This is a reasonable frequency as it will align well with when the layers actually expire. Under "New ProducerIdQuotaManagerCache", the documentation for > the constructor params for ProducerIDQuotaManagerCache does not > match the constructor signature. Sorry, this is because I did not push the changes. The constructor is ProducerIDQuotaManagerCache(Double falsePositiveRate, long ttl, int layerCount). Where falsePositiveRate is the Bloom filter false positive rate, ttl is producer.id.quota.window.size.seconds in milliseconds, and the layerCount is the desired number of layers. Under "New ProducerIdQuotaManagerCache": > public boolean track(KafkaPrincipal principal, int producerIdRate, long > pid) > How is producerIdRate used? The reference implementation Claude shared > does not use it. > > https://github.com/Claudenw/kafka/blob/49b6eb0fb5cfaf19b072fd87986072a683ab976c/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java Again, sorry for not updating the code. The producer rate is used to build a Bloom filter of the proper size. The producer rate is the number of PIDs per hour expected to be created by the principal. The Bloom filter shape [1] is determined by the expected number of PIDs per layer (producerRate * seconds_per_hour / producer.id.quota.window.size.seconds / producer.id.quota.cache.layer.count) and the falsePositiveRate from the constructor. These values are used to call the Shape.fromNP() method. This is the Shape of the Bloom filters in the layer. It is only used when the principal is not found in the cache. Thomas Hurst has provided a web page [5] where you can explore the interaction between number of items and false positive rate. I could not find a description or definition for > TimestampedBloomFilter, could we add that to the KIP? I will add it. It is simply an implementation of WrappedBloomFilter [2] that adds the timestamp for when the filter was created. LayeredBloomFilter will have a fixed size (right?), but some > users (KafkaPrincipal) might only use a small number of PIDs. > It it worth having a dual strategy, where we simply keep a Set of > PIDs until we reach certain size where it pays off to use > the LayeredBloomFilter? Each principal has its own Layered Bloom filter. Here come the idiosyncrasies and benefits of the layered Bloom filter. The layered Bloom filter can be thought of as a collection of bloom filters that are queried to see if the item being searched for (target) has been seen. There are a bunch of ways that the layered filter could be used. You could have a layer for each storage location in a multiple location storage engine for example. But in our case the layer signifies a starting time fragment. That fragment will be at most producer.id.quota.window.size.seconds / producer.id.quota.cache.layer.count seconds long. The earliest layers are at the lower indices, the latest one at the highest. In general, when an item is added to the Layered Bloom filter the following processes take place: - old layers filters are removed using the LayerManager.Cleanup [3] instance. - a new layer is added if necessary / requested using the LayerManager.ExtendCheck [4] and a Supplier<BloomFilter>. - The target is added to the last layer. In our case, the Cleanup starts at layer 0 and removes any layer for which the timestamp has expired. The ExtendCheck adds a layer if the current layer is full or producer.id.quota.window.size.seconds / producer.id.quota.cache.layer.count seconds has expired. When a new layer is created it is created with the timestamp of now + ttl (ttl from the constructor) So principals that are expected to produce fewer PIDs have smaller Bloom filters in the layers than principals that are expected to produce more PIDs. Since the producerIdRate is an estimation we need to look at 3 cases: 1. The number of PIDs is equal to the producerIdRate. They layered Bloom filter works as explained above. 2. The number of PIDs is smaller than the producer Rate. The number of layers may decrease depending on the frequency of PIDs. If there is one pid every 15 minutes (assuming the default settings) then there would be 4 layers each having one PID stored in it. If the production was bursty and there were 4 pids and then nothing for 50 minutes there would only be 2 layers. (the intervening layers would have been disposed of). 3. The number of PIDS is larger than the producer Rate. Then the ExtendCheck will detect when the last layer has recorded producerIdRate entries and create a new layer. This has two effects. First, it means that the false positive rate is not exceeded. Second, the number of layers will grow as needed and will shrink back to the 4 layer size when the excess PIDs are producer.id.quota.window.size.seconds old. So in summary: producer.id.quota.window.size.seconds determines how long the record of a PID being used is retained, how long it takes for bursts of excessive PIDs (extra layers) to be removed from the system. It influences the number of bits in the Bloom filters producer.id.quota.cache.layer.count identifies the ideal number of layers. It influences the number of bits in the Bloom filters. producerIdRate strongly influences the number of bits in the Bloom filters but is only used when the principal is not already active in the cache. producer.id.quota.cache.false.positive.rate influences the number of bits in the Bloom filters. My statement "If you have two layers then recurring PIDs will be recorded in both layers." needs some explanation. Let's look at the normal processing of our default configuration. A PID is passed to the track() method. If the PID is not found in the Bloom filter it is added to the last layer and the caller is signaled that the PID is new. If the PID is found in the Bloom filter the caller is signaled that the PID has been seen. in an hour the layer with the PID will be cleared from the layered Bloom filter. If the PID is reported again it will be seen as a new PID. However, if the PID is seen at a time beyond 15 minutes after the original recording, that sighting is not recorded (because the layered Bloom filter says that there). This means that when the layer with PID recording is removed the PID will be considered new even though it was seen within the last hour. To solve this problem, when a PID is seen we verify that it is recorded in a layer that is included in the last logical layer time window. A logical layer time window is the expected length of time for the layer (producer.id.quota.window.size.seconds / producer.id.quota.cache.layer.count seconds), this just means that we are accounting for the cases where the number of layers has increased due to the volume of PIDs. I hope this answers some questions and doesn't open up too many more, Claude [1] https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/Shape.html [2] https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/WrappedBloomFilter.html [3] https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/LayerManager.Cleanup.html [4] https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/LayerManager.ExtendCheck.html [5] https://hur.st/bloomfilter/ [6] https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/LayeredBloomFilter.html On Wed, May 1, 2024 at 4:42 PM Igor Soarez <i...@soarez.me> wrote: > Hi Omnia, Hi Claude, > > Thanks for putting this KIP together. > This is an important unresolved issue in Kafka, > which I have witnessed several times in production. > > Please see my questions below: > > 10 Given the goal is to prevent OOMs, do we also need to > limit the number of KafkaPrincipals in use? > > 11. How would an operator know or decide to change the configuration > for the number layers – producer.id.quota.cache.layer.count – > e.g. increasing from 4 to 5; and why? > Do we need a new metric to indicate that change could be useful? > > 12. Is producer.id.quota.cache.cleanup.scheduler.interval.ms a > guaranteed interval, or rather simply a delay between cleanups? > How did you decide on the default value of 10ms? > > 13. Under "New ProducerIdQuotaManagerCache", the documentation for > the constructor params for ProducerIDQuotaManagerCache does not > match the constructor signature. > > 14. Under "New ProducerIdQuotaManagerCache": > public boolean track(KafkaPrincipal principal, int producerIdRate, long > pid) > How is producerIdRate used? The reference implementation Claude shared > does not use it. > > https://github.com/Claudenw/kafka/blob/49b6eb0fb5cfaf19b072fd87986072a683ab976c/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java > > 15. I could not find a description or definition for > TimestampedBloomFilter, could we add that to the KIP? > > 16. LayeredBloomFilter will have a fixed size (right?), but some > users (KafkaPrincipal) might only use a small number of PIDs. > It it worth having a dual strategy, where we simply keep a Set of > PIDs until we reach certain size where it pays off to use > the LayeredBloomFilter? > > 17. Under "Rejected Alternatives" > "4. Throttle INIT_PRODUCER_ID > requests", > the KIP states: > > a. INIT_PRODUCER_ID for idempotent producer request PIDs from > random controller every time so if a client got throttled on > one controller doesn't guarantee it will not go through on next > controller causing OOM at the leader later. > > Is the INIT_PRODUCER_ID request really sent to a "random controller"? > From a quick look at Sender.maybeSendAndPollTransactionalRequest, > for an idempotent producer, targetNode is set to the broker with > fewest outstanding requests. Am I looking at the wrong place? > > 18. Under "Rejected Alternatives" > "4. Throttle INIT_PRODUCER_ID > requests", > the KIP states: > > This solution might look simple however throttling the INIT_PRODUCER_ID > doesn't guarantee the OOM wouldn't happened as > (...) > b. The problem happened on the activation of the PID when it > produce and not at the initialisation. Which means Kafka wouldn't > have OOM problem if the producer got assigned PID but crashed before > producing anything. > > Point b. does not seem to support the claim above? > > 19. Under "Rejected Alternatives" > "4. Throttle INIT_PRODUCER_ID > requests", > the KIP states: > > c. Throttling producers that crash between initialisation and > producing could slow them down when they recover/fix the > problem that caused them to crash right after initialising PID. > > Doesn't it depend on the back-off time or how quotas are enforced? > I’m not sure this would necessarily be a problem? > > 20. If the allocation of PIDs for idempotent producers was > centralized, or otherwise the the targetNode for that request > was predictable, would that make throttling INIT_PRODUCER_ID > a viable solution? > > > Best, > > -- > Igor > > >