Hi Justine, Thanks for having a look > One question I have is how will we handle a scenario where potentially each new client has a new Kafka Principal? Is that simply not covered by throttling? if any new client setup a new principal they will be throttled based on the throttling for `/config/users/<default>` or /config/users/<the_new_principal>
On Wed, May 31, 2023 at 6:50 PM Justine Olshan <jols...@confluent.io> wrote: > Hey Omnia, > > I was doing a bit of snooping (I actually just get updates for the KIP > page) and I saw this draft was in progress. I shared it with some of my > colleagues as well who I previously discussed the issue with. > > The initial look was pretty promising to me. I appreciate the detailing of > the rejected options since we had quite a few we worked through :) > > One question I have is how will we handle a scenario where potentially > each new client has a new Kafka Principal? Is that simply not covered by > throttling? > > Thanks, > Justine > > On Wed, May 31, 2023 at 10:08 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com> > wrote: > >> Hi Justine and Luke, >> >> I started a KIP draft here >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs >> for a proposal would appreciate it if you could provide any initial >> feedback before opening a broader discussion. >> >> Thanks >> >> On Wed, Feb 22, 2023 at 4:35 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >> wrote: >> >>> >>> *Hi Justine, * >>> >>> *My initial thought of throttling the initProducerId was to get ripped >>> off the problem at the source (which creates too many PIDs per client) and >>> fail faster but if having this on the produce request level is easier this >>> should be fine. I am guessing it will be the same direction as we may >>> ClientQuotaManage for Produce throttling with a different quota window than >>> `quota.window.size.seconds `. * >>> >>> *If this is good as an initial solution I can put start a KIP and see >>> what the wider community feels about this. * >>> >>> *Also, I noticed that at some point one of us hit "Replay" instead of >>> "Replay to All" :) So here are the previous conversations* >>> >>> *On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan <jols...@confluent.io >>> <jols...@confluent.io>> wrote:* >>> >>>> Hey Omnia, >>>> >>>> Thanks for the response. I think I understand your explanations here >>>> with respect to principal and clientId usage. >>>> >>>> For the throttling -- handleInitProducerIdRequest will allocate the ID >>>> to the producer, but we don't actually store it on the broker or increment >>>> our metric until the first produce request for that producer is sent (or >>>> sent again after previously expiring). Would you consider throttling the >>>> produce request instead? It may be hard to get any metrics from the >>>> transaction coordinator where the initProducerId request is handled. >>>> >>>> Justine >>> >>> >>> *On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com >>> <o.g.h.ibra...@gmail.com>> wrote:* >>> >>>> Hey Justine, >>>> > If I understand your message correctly, there are issues with >>>> identifying the source of the rogue clients? So you propose to add a new >>>> metric for that? >>>> > And also proposing to throttle based on clientId as a potential >>>> follow up? >>>> I want to identify rogue clients by KafkaPrincipal (and/or clientId) >>>> similarly to how we identify clients in Fetch/Produce/Request >>>> QuotaManagers. Using KafkaPrincipal should give cluster admin the ability >>>> to throttle later based on principal which is most likely to be a smaller >>>> set than clientIds. My initial thought was to add a metrics that represent >>>> how many InitProducerIDRequest are sent by KafkaPrincipal (and/or clientId) >>>> similar to Fetch/Produce QuotaManagers. >>>> Then as a follow-up, we can throttle based on either KafkaPrinciple or >>>> clientId (maybe default as well to align this with other QuotaManagers in >>>> Kafka). >>>> >>>> >1. Does we rely on the client using the same ID? What if there are >>>> many clients that all use different client IDs? >>>> This is why I want to use the combination of KafkaPrincipal or clientId >>>> similar to some other quotas we have in Kafka already. This will be a >>>> similar risk to Fetch/Produce quota in Kafka which also relay on the client >>>> to use the same clientId and KafkaPrincipal. >>>> >>>> >2. Are there places where high cardinality of this metric is a >>>> concern? I can imagine many client IDs in the system. Would we treat this >>>> as a rate metric (ie, when we get an init producer ID and return a new >>>> producer ID we emit a count for that client id?) Or something else? >>>> My initial thought here was to follow the steps of ClientQuotaManager >>>> and ClientRequestQuotaManager and use a rate metric. However, I think we >>>> can emit it either >>>> >>>> 1. when we return the new PID. However, I have concerns that we may >>>> circle back to the previous concerns with OMM due to keeping track of >>>> ACTIVE PIDs per KafkaPrincipal(and/or) clientId in the future. Also this >>>> would be the first time Kafka throttle IDs for any client. >>>> 2. or once we recieve initProducerIDRequest and throttle before >>>> even hitting `handleInitProducerIdRequest`. Going this direction we may >>>> need to throttle it within a different quota window than ` >>>> quota.window.size.seconds ` as throttling INIT_PRODUCER_ID request >>>> per second wouldn't be efficient for most cases. I personally think this >>>> direction is easier as it seems more consistent with the existing quota >>>> implementation. Specially that Kafka has already the concept of >>>> throttling >>>> subset of requests (in ControllerMutationQuotaManager) but never had any >>>> concept of throttling active IDs for any client. >>>> >>>> >>>> What do you think? >>>> >>>> Thanks >>>> Omnia >>>> >>> >>> *On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan <jols...@confluent.io >>> <jols...@confluent.io>> wrote:* >>> >>>> Hey Omnia, >>>> Sorry for losing track of this. >>>> >>>> If I understand your message correctly, there are issues with >>>> identifying the source of the rogue clients? So you propose to add a new >>>> metric for that? >>>> And also proposing to throttle based on clientId as a potential follow >>>> up? >>>> >>>> I think both of these make sense. The only things I can think of for >>>> the metric are: >>>> 1. Does we rely on the client using the same ID? What if there are many >>>> clients that all use different client IDs? >>>> 2. Are there places where high cardinality of this metric is a concern? >>>> I can imagine many client IDs in the system. Would we treat this as a rate >>>> metric (ie, when we get an init producer ID and return a new producer ID we >>>> emit a count for that client id?) Or something else? >>>> >>>> Thanks, >>>> Justine >>>> >>> >>> Thanks >>> Omnia >>> >>> On Thu, Feb 2, 2023 at 4:44 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>> wrote: >>> >>>> Hi Luke and Justine, >>>> Are there any thoughts or updates on this? I would love to help with >>>> this as we are hitting this more frequently now. >>>> >>>> best, >>>> >>>> On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>>> wrote: >>>> >>>>> Hi Luke and Justine, >>>>> >>>>>> For (3), you said: >>>>>> > - I have some concerns about the impact of this option on the >>>>>> transactional >>>>>> producers, for example, what will happen to an ongoing transaction >>>>>> associated with an expired PID? Would this leave the transactions in a >>>>>> "hanging" state? >>>>>> >>>>>> - How will we notify the client that the transaction can't continue >>>>>> due to >>>>>> an expired PID? >>>>>> >>>>>> - If PID got marked as `expired` this will mean that >>>>>> `admin.DescribeProducers` will not list them which will make >>>>>> *`kafka-transactions.sh >>>>>> --list`* a bit tricky as we can't identify if there are transactions >>>>>> linked >>>>>> to this expired PID or not. The same concern applies to >>>>>> *`kafka-transactions.sh >>>>>> --find-hanging`*. >>>>>> >>>>>> --> Yes, you're right. Those are also concerns for this solution. >>>>>> Currently, there's no way to notify clients about the expiration. >>>>>> Also, the ongoing transactions will be hanging. For the admin cli, >>>>>> we've >>>>>> never thought about it. Good point. >>>>>> In summary, to adopt this solution, there are many issues needed to >>>>>> get >>>>>> fixed. >>>>>> >>>>> >>>>> Justin already clarified that if PID is attached to a transaction it >>>>> will not expire so identifying the transactions shouldn't be a concern >>>>> anymore. >>>>> The only concern here will be that this solution will not solve the >>>>> problem if the rouge producer is a transactional producer with hanging >>>>> transactions. >>>>> If anyone faced this situation they will need to abort the hanging >>>>> transactions manually and then the solution to expire a PID can then work. >>>>> >>>>> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear) >>>>>> Yes, We were thinking about throttling by KafkaPrinciple. Client Id >>>>>> is also >>>>>> workable. >>>>>> It's just these 2 attributes are not required. >>>>>> That is, it's possible we take all clients as the same one: {default >>>>>> KafkaPrinciple + default clientID}, and apply throttling on it. >>>>>> Do you have any thoughts about it? >>>>>> Maybe skip throttling for {default KafkaPrinciple + default clientID} >>>>>> >>>>> >>>>> Throttling for default KafkaPrinciple and default ClientID is useful >>>>> when we need to have a hard limit on the whole cluster and whoever is >>>>> running the cluster doesn't knowclientsntIDs or if a KafkaPrinciple is >>>>> reused between different producer applications. >>>>> I usually find it helpful to have a way to apply throttling only on >>>>> the rough clients only once I identify them without punishing everyone on >>>>> the cluster. However, there are two problems with this >>>>> - There's no easy way at the moment to link PIDs to clientId or >>>>> KafkaPrinciple. This need to be addressed first. >>>>> - Is Justin's comment on the throttling, and the fact that will mean >>>>> we either block all requests or have to store the request in memory which >>>>> in both cases has side downs on the producer experince. >>>>> >>>>> I recently had another discussion with my team and it does seem like >>>>>> there >>>>>> should be a way to make it more clear to the clients what is going >>>>>> on. A >>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a >>>>>> way to >>>>>> improve the story for newer clients. (Ie if we choose to expire based >>>>>> on a >>>>>> size limit, we should include a response indicating the ID has >>>>>> expired.) We >>>>>> also discussed ways to redefine the guarantees so that users who have >>>>>> stronger idempotency requirements can ensure them (over >>>>>> availability/memory >>>>>> concerns). Let me know if you have any ideas here. >>>>>> >>>>> >>>>> It may be easier to improve the experience for new clients. However, >>>>> if we improved only the new clients we may need a way to help teams who >>>>> run >>>>> Kafka with rough clients on old versions by at least giving them an easy >>>>> way to identify the clientId/ or KafkaPrinciple that generated these PIDs. >>>>> >>>>> For context, it's very tricky to even identify which clientId is >>>>> creating all these PIDs that caused OOM, which is a contributing part of >>>>> the issue at the moment. So maybe one option here could be adding a new >>>>> metric that tracks the number of generated PIDs per clientId. This will >>>>> help the team who runs the Kafka cluster to >>>>> - contact these rough clients and ask them to fix their clients or >>>>> upgrade to a new client if the new client version has a better experience. >>>>> - or if ended with a throttling solution this may help identify which >>>>> clientId needs to be throttled. >>>>> >>>>> Maybe we can start with a solution for identifying the rough clients >>>>> first and keep looking for a solution to limit them, what do you think? >>>>> >>>>> Thanks >>>>> >>>>> On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan >>>>> <jols...@confluent.io.invalid> wrote: >>>>> >>>>>> Oops. I realized I just replied to Omnia 🤦♀️ >>>>>> >>>>>> Here was my response for the mailing thread: >>>>>> >>>>>> Hey Omnia, >>>>>> Sorry to hear this is a problem for you as well. :( >>>>>> > * I have some concerns about the impact of this option on the >>>>>> transactional producers, for example, what will happen to an ongoing >>>>>> transaction associated with an expired PID? Would this leave the >>>>>> transactions in a "hanging" state?* >>>>>> We currently check if a transaction is ongoing and do not expire the >>>>>> producer ID if it has an ongoing transaction. I suspect we will >>>>>> continue to >>>>>> do this with any solution we pick. >>>>>> >>>>>> My team members and I looked a bit into the throttling case and it >>>>>> can get >>>>>> a bit tricky since it means we need to throttle the produce request >>>>>> before >>>>>> it is processed. This means we either block all requests or have to >>>>>> store >>>>>> the request in memory (which is not great if we are trying to save >>>>>> memory). >>>>>> >>>>>> I recently had another discussion with my team and it does seem like >>>>>> there >>>>>> should be a way to make it more clear to the clients what is going >>>>>> on. A >>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a >>>>>> way to >>>>>> improve the story for newer clients. (Ie if we choose to expire based >>>>>> on a >>>>>> size limit, we should include a response indicating the ID has >>>>>> expired.) We >>>>>> also discussed ways to redefine the guarantees so that users who have >>>>>> stronger idempotency requirements can ensure them (over >>>>>> availability/memory >>>>>> concerns). Let me know if you have any ideas here. >>>>>> >>>>>> Thanks again for commenting here, hopefully we can come to a good >>>>>> solution. >>>>>> >>>>>> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen <show...@gmail.com> wrote: >>>>>> >>>>>> > Hi Omnia, >>>>>> > >>>>>> > Thanks for your reply. >>>>>> > >>>>>> > For (3), you said: >>>>>> > > - I have some concerns about the impact of this option on the >>>>>> > transactional >>>>>> > producers, for example, what will happen to an ongoing transaction >>>>>> > associated with an expired PID? Would this leave the transactions >>>>>> in a >>>>>> > "hanging" state? >>>>>> > >>>>>> > - How will we notify the client that the transaction can't continue >>>>>> due to >>>>>> > an expired PID? >>>>>> > >>>>>> > - If PID got marked as `expired` this will mean that >>>>>> > `admin.DescribeProducers` will not list them which will make >>>>>> > *`kafka-transactions.sh >>>>>> > --list`* a bit tricky as we can't identify if there are >>>>>> transactions linked >>>>>> > to this expired PID or not. The same concern applies to >>>>>> > *`kafka-transactions.sh >>>>>> > --find-hanging`*. >>>>>> > >>>>>> > --> Yes, you're right. Those are also concerns for this solution. >>>>>> > Currently, there's no way to notify clients about the expiration. >>>>>> > Also, the ongoing transactions will be hanging. For the admin cli, >>>>>> we've >>>>>> > never thought about it. Good point. >>>>>> > In summary, to adopt this solution, there are many issues needed to >>>>>> get >>>>>> > fixed. >>>>>> > >>>>>> > >>>>>> > For (5), you said: >>>>>> > > I am assuming you mean KafkaPrincipal here! If so is your concern >>>>>> here >>>>>> > that >>>>>> > those good clients that use the same principal as a rogue one will >>>>>> get >>>>>> > throttled? >>>>>> > >>>>>> > If this is the case, then I believe it should be okay as other >>>>>> throttling >>>>>> > in Kafka on *`/config/users/<user>`* has the same behaviour. >>>>>> > >>>>>> > >>>>>> > What about applying limit/throttling to >>>>>> > *`/config/users/<user>/clients/<client-id>` >>>>>> > *similar to what we have with client quota? This should reduce the >>>>>> concern >>>>>> > about throttling good clients, right? >>>>>> > >>>>>> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear) >>>>>> > Yes, We were thinking about throttling by KafkaPrinciple. Client Id >>>>>> is >>>>>> > also workable. >>>>>> > It's just these 2 attributes are not required. >>>>>> > That is, it's possible we take all clients as the same one: {default >>>>>> > KafkaPrinciple + default clientID}, and apply throttling on it. >>>>>> > Do you have any thoughts about it? >>>>>> > Maybe skip throttling for {default KafkaPrinciple + default >>>>>> clientID} ? >>>>>> > >>>>>> > Luke >>>>>> > >>>>>> > >>>>>> > >>>>>> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim < >>>>>> o.g.h.ibra...@gmail.com> >>>>>> > wrote: >>>>>> > >>>>>> >> Hi Luke & Justine, >>>>>> >> Thanks for looking into this issue, we have been experiencing this >>>>>> because >>>>>> >> of rouge clients as well. >>>>>> >> >>>>>> >> > 3. Having a limit to the number of active producer IDs (sort of >>>>>> like an >>>>>> >> LRU >>>>>> >> >cache) >>>>>> >> >-> The idea here is that if we hit a misconfigured client, we >>>>>> will expire >>>>>> >> >the older entries. The concern here is we have risks to lose >>>>>> idempotency >>>>>> >> >guarantees, and currently, we don't have a way to notify clients >>>>>> about >>>>>> >> >losing idempotency guarantees. Besides, the least recently used >>>>>> entries >>>>>> >> >got removed are not always from the "bad" clients. >>>>>> >> >>>>>> >> - I have some concerns about the impact of this option on the >>>>>> >> transactional >>>>>> >> producers, for example, what will happen to an ongoing transaction >>>>>> >> associated with an expired PID? Would this leave the transactions >>>>>> in a >>>>>> >> "hanging" state? >>>>>> >> >>>>>> >> - How will we notify the client that the transaction can't >>>>>> continue due to >>>>>> >> an expired PID? >>>>>> >> >>>>>> >> - If PID got marked as `expired` this will mean that >>>>>> >> `admin.DescribeProducers` will not list them which will make >>>>>> >> *`kafka-transactions.sh >>>>>> >> --list`* a bit tricky as we can't identify if there are >>>>>> transactions >>>>>> >> linked >>>>>> >> to this expired PID or not. The same concern applies to >>>>>> >> *`kafka-transactions.sh >>>>>> >> --find-hanging`*. >>>>>> >> >>>>>> >> >>>>>> >> >5. limit/throttling the producer id based on the principle >>>>>> >> >-> Although we can limit the impact to a certain principle with >>>>>> this >>>>>> >> idea, >>>>>> >> >same concern still exists as solution #1 #2. >>>>>> >> >>>>>> >> I am assuming you mean KafkaPrincipal here! If so is your concern >>>>>> here >>>>>> >> that >>>>>> >> those good clients that use the same principal as a rogue one will >>>>>> get >>>>>> >> throttled? >>>>>> >> >>>>>> >> If this is the case, then I believe it should be okay as other >>>>>> throttling >>>>>> >> in Kafka on *`/config/users/<user>`* has the same behaviour. >>>>>> >> >>>>>> >> >>>>>> >> What about applying limit/throttling to >>>>>> >> *`/config/users/<user>/clients/<client-id>` >>>>>> >> *similar to what we have with client quota? This should reduce the >>>>>> concern >>>>>> >> about throttling good clients, right? >>>>>> >> >>>>>> >> best, >>>>>> >> >>>>>> >> Omnia >>>>>> >> >>>>>> >> On Tue, Oct 11, 2022 at 4:18 AM Luke Chen <show...@gmail.com> >>>>>> wrote: >>>>>> >> >>>>>> >> > Bump this thread to see if there are any comments/thoughts. >>>>>> >> > Thanks. >>>>>> >> > >>>>>> >> > Luke >>>>>> >> > >>>>>> >> > On Mon, Sep 26, 2022 at 11:06 AM Luke Chen <show...@gmail.com> >>>>>> wrote: >>>>>> >> > >>>>>> >> > > Hi devs, >>>>>> >> > > >>>>>> >> > > As stated in the motivation section in KIP-854 >>>>>> >> > > < >>>>>> >> > >>>>>> >> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry >>>>>> >> > >: >>>>>> >> > > >>>>>> >> > > With idempotent producers becoming the default in Kafka, this >>>>>> means >>>>>> >> that >>>>>> >> > > unless otherwise specified, all new producers will be given >>>>>> producer >>>>>> >> IDs. >>>>>> >> > > Some (inefficient) applications may now create many >>>>>> non-transactional >>>>>> >> > > idempotent producers. Each of these producers will be assigned >>>>>> a >>>>>> >> producer >>>>>> >> > > ID and these IDs and their metadata are stored in the broker >>>>>> memory, >>>>>> >> > which >>>>>> >> > > might cause brokers out of memory. >>>>>> >> > > >>>>>> >> > > Justine (in cc.) and I and some other team members are working >>>>>> on the >>>>>> >> > > solutions for this issue. But none of them solves it completely >>>>>> >> without >>>>>> >> > > side effects. Among them, "availability" VS "idempotency >>>>>> guarantees" >>>>>> >> is >>>>>> >> > > what we can't decide which to sacrifice. Some of these >>>>>> solutions >>>>>> >> > sacrifice >>>>>> >> > > availability of produce (1,2,5) and others sacrifice >>>>>> idempotency >>>>>> >> > guarantees >>>>>> >> > > (3). It could be useful to know if people generally have a >>>>>> preference >>>>>> >> one >>>>>> >> > > way or the other. Or what other better solutions there might >>>>>> be. >>>>>> >> > > >>>>>> >> > > Here are the proposals we came up with: >>>>>> >> > > >>>>>> >> > > 1. Limit the total active producer ID allocation number. >>>>>> >> > > -> This is the simplest solution. But since the OOM issue is >>>>>> usually >>>>>> >> > > caused by a rogue or misconfigured client, and this solution >>>>>> might >>>>>> >> > "punish" >>>>>> >> > > the good client from sending messages. >>>>>> >> > > >>>>>> >> > > 2. Throttling the producer ID allocation rate >>>>>> >> > > -> Same concern as the solution #1. >>>>>> >> > > >>>>>> >> > > 3. Having a limit to the number of active producer IDs (sort >>>>>> of like >>>>>> >> an >>>>>> >> > > LRU cache) >>>>>> >> > > -> The idea here is that if we hit a misconfigured client, we >>>>>> will >>>>>> >> expire >>>>>> >> > > the older entries. The concern here is we have risks to lose >>>>>> >> idempotency >>>>>> >> > > guarantees, and currently, we don't have a way to notify >>>>>> clients about >>>>>> >> > > losing idempotency guarantees. Besides, the least recently >>>>>> used >>>>>> >> entries >>>>>> >> > > got removed are not always from the "bad" clients. >>>>>> >> > > >>>>>> >> > > 4. allow clients to "close" the producer ID usage >>>>>> >> > > -> We can provide a way for producer to "close" producerID >>>>>> usage. >>>>>> >> > > Currently, we only have a way to INIT_PRODUCER_ID requested to >>>>>> >> allocate >>>>>> >> > > one. After that, we'll keep the producer ID metadata in broker >>>>>> even if >>>>>> >> > the >>>>>> >> > > producer is "closed". Having a closed API (ex: >>>>>> END_PRODUCER_ID), we >>>>>> >> can >>>>>> >> > > remove the entry from broker side. In client side, we can send >>>>>> it when >>>>>> >> > > producer closing. The concern is, the old clients (including >>>>>> non-java >>>>>> >> > > clients) will still suffer from the OOM issue. >>>>>> >> > > >>>>>> >> > > 5. limit/throttling the producer id based on the principle >>>>>> >> > > -> Although we can limit the impact to a certain principle >>>>>> with this >>>>>> >> > idea, >>>>>> >> > > same concern still exists as solution #1 #2. >>>>>> >> > > >>>>>> >> > > Any thoughts/feedback are welcomed. >>>>>> >> > > >>>>>> >> > > Thank you. >>>>>> >> > > Luke >>>>>> >> > > >>>>>> >> > >>>>>> >> >>>>>> > >>>>>> >>>>> >>> On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan <jols...@confluent.io> >>> wrote: >>> >>>> Hey Omnia, >>>> >>>> Thanks for the response. I think I understand your explanations here >>>> with respect to principal and clientId usage. >>>> >>>> For the throttling -- handleInitProducerIdRequest will allocate the ID >>>> to the producer, but we don't actually store it on the broker or increment >>>> our metric until the first produce request for that producer is sent (or >>>> sent again after previously expiring). Would you consider throttling the >>>> produce request instead? It may be hard to get any metrics from the >>>> transaction coordinator where the initProducerId request is handled. >>>> >>>> Justine >>>> >>>> On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>>> wrote: >>>> >>>>> Hey Justine, >>>>> > If I understand your message correctly, there are issues with >>>>> identifying the source of the rogue clients? So you propose to add a new >>>>> metric for that? >>>>> > And also proposing to throttle based on clientId as a potential >>>>> follow up? >>>>> I want to identify rogue clients by KafkaPrincipal (and/or clientId) >>>>> similarly to how we identify clients in Fetch/Produce/Request >>>>> QuotaManagers. Using KafkaPrincipal should give cluster admin the ability >>>>> to throttle later based on principal which is most likely to be a smaller >>>>> set than clientIds. My initial thought was to add a metrics that represent >>>>> how many InitProducerIDRequest are sent by KafkaPrincipal (and/or >>>>> clientId) >>>>> similar to Fetch/Produce QuotaManagers. >>>>> Then as a follow-up, we can throttle based on either KafkaPrinciple or >>>>> clientId (maybe default as well to align this with other QuotaManagers in >>>>> Kafka). >>>>> >>>>> >1. Does we rely on the client using the same ID? What if there are >>>>> many clients that all use different client IDs? >>>>> This is why I want to use the combination of KafkaPrincipal or >>>>> clientId similar to some other quotas we have in Kafka already. This will >>>>> be a similar risk to Fetch/Produce quota in Kafka which also relay on the >>>>> client to use the same clientId and KafkaPrincipal. >>>>> >>>>> >2. Are there places where high cardinality of this metric is a >>>>> concern? I can imagine many client IDs in the system. Would we treat this >>>>> as a rate metric (ie, when we get an init producer ID and return a new >>>>> producer ID we emit a count for that client id?) Or something else? >>>>> My initial thought here was to follow the steps of ClientQuotaManager >>>>> and ClientRequestQuotaManager and use a rate metric. However, I think we >>>>> can emit it either >>>>> >>>>> 1. when we return the new PID. However, I have concerns that we >>>>> may circle back to the previous concerns with OMM due to keeping track >>>>> of >>>>> ACTIVE PIDs per KafkaPrincipal(and/or) clientId in the future. Also >>>>> this >>>>> would be the first time Kafka throttle IDs for any client. >>>>> 2. or once we recieve initProducerIDRequest and throttle before >>>>> even hitting `handleInitProducerIdRequest`. Going this direction we may >>>>> need to throttle it within a different quota window than ` >>>>> quota.window.size.seconds ` as throttling INIT_PRODUCER_ID request >>>>> per second wouldn't be efficient for most cases. I personally think >>>>> this >>>>> direction is easier as it seems more consistent with the existing quota >>>>> implementation. Specially that Kafka has already the concept of >>>>> throttling >>>>> subset of requests (in ControllerMutationQuotaManager) but never had >>>>> any >>>>> concept of throttling active IDs for any client. >>>>> >>>>> >>>>> What do you think? >>>>> >>>>> Thanks >>>>> Omnia >>>>> >>>>> On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan <jols...@confluent.io> >>>>> wrote: >>>>> >>>>>> Hey Omnia, >>>>>> Sorry for losing track of this. >>>>>> >>>>>> If I understand your message correctly, there are issues with >>>>>> identifying the source of the rogue clients? So you propose to add a new >>>>>> metric for that? >>>>>> And also proposing to throttle based on clientId as a potential >>>>>> follow up? >>>>>> >>>>>> I think both of these make sense. The only things I can think of for >>>>>> the metric are: >>>>>> 1. Does we rely on the client using the same ID? What if there are >>>>>> many clients that all use different client IDs? >>>>>> 2. Are there places where high cardinality of this metric is a >>>>>> concern? I can imagine many client IDs in the system. Would we treat this >>>>>> as a rate metric (ie, when we get an init producer ID and return a new >>>>>> producer ID we emit a count for that client id?) Or something else? >>>>>> >>>>>> Thanks, >>>>>> Justine >>>>>> >>>>>> On Thu, Feb 2, 2023 at 8:44 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Luke and Justine, >>>>>>> Are there any thoughts or updates on this? I would love to help with >>>>>>> this as we are hitting this more frequently now. >>>>>>> >>>>>>> best, >>>>>>> >>>>>>> On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim < >>>>>>> o.g.h.ibra...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Luke and Justine, >>>>>>>> >>>>>>>>> For (3), you said: >>>>>>>>> > - I have some concerns about the impact of this option on the >>>>>>>>> transactional >>>>>>>>> producers, for example, what will happen to an ongoing transaction >>>>>>>>> associated with an expired PID? Would this leave the transactions >>>>>>>>> in a >>>>>>>>> "hanging" state? >>>>>>>>> >>>>>>>>> - How will we notify the client that the transaction can't >>>>>>>>> continue due to >>>>>>>>> an expired PID? >>>>>>>>> >>>>>>>>> - If PID got marked as `expired` this will mean that >>>>>>>>> `admin.DescribeProducers` will not list them which will make >>>>>>>>> *`kafka-transactions.sh >>>>>>>>> --list`* a bit tricky as we can't identify if there are >>>>>>>>> transactions linked >>>>>>>>> to this expired PID or not. The same concern applies to >>>>>>>>> *`kafka-transactions.sh >>>>>>>>> --find-hanging`*. >>>>>>>>> >>>>>>>>> --> Yes, you're right. Those are also concerns for this solution. >>>>>>>>> Currently, there's no way to notify clients about the expiration. >>>>>>>>> Also, the ongoing transactions will be hanging. For the admin cli, >>>>>>>>> we've >>>>>>>>> never thought about it. Good point. >>>>>>>>> In summary, to adopt this solution, there are many issues needed >>>>>>>>> to get >>>>>>>>> fixed. >>>>>>>>> >>>>>>>> >>>>>>>> Justin already clarified that if PID is attached to a transaction >>>>>>>> it will not expire so identifying the transactions shouldn't be a >>>>>>>> concern >>>>>>>> anymore. >>>>>>>> The only concern here will be that this solution will not solve the >>>>>>>> problem if the rouge producer is a transactional producer with hanging >>>>>>>> transactions. >>>>>>>> If anyone faced this situation they will need to abort the hanging >>>>>>>> transactions manually and then the solution to expire a PID can then >>>>>>>> work. >>>>>>>> >>>>>>>> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear) >>>>>>>>> Yes, We were thinking about throttling by KafkaPrinciple. Client >>>>>>>>> Id is also >>>>>>>>> workable. >>>>>>>>> It's just these 2 attributes are not required. >>>>>>>>> That is, it's possible we take all clients as the same one: >>>>>>>>> {default >>>>>>>>> KafkaPrinciple + default clientID}, and apply throttling on it. >>>>>>>>> Do you have any thoughts about it? >>>>>>>>> Maybe skip throttling for {default KafkaPrinciple + default >>>>>>>>> clientID} >>>>>>>>> >>>>>>>> >>>>>>>> Throttling for default KafkaPrinciple and default ClientID is >>>>>>>> useful when we need to have a hard limit on the whole cluster and >>>>>>>> whoever >>>>>>>> is running the cluster doesn't knowclientsntIDs or if a KafkaPrinciple >>>>>>>> is >>>>>>>> reused between different producer applications. >>>>>>>> I usually find it helpful to have a way to apply throttling only on >>>>>>>> the rough clients only once I identify them without punishing everyone >>>>>>>> on >>>>>>>> the cluster. However, there are two problems with this >>>>>>>> - There's no easy way at the moment to link PIDs to clientId or >>>>>>>> KafkaPrinciple. This need to be addressed first. >>>>>>>> - Is Justin's comment on the throttling, and the fact that will >>>>>>>> mean we either block all requests or have to store the request in >>>>>>>> memory >>>>>>>> which in both cases has side downs on the producer experince. >>>>>>>> >>>>>>>> I recently had another discussion with my team and it does seem >>>>>>>>> like there >>>>>>>>> should be a way to make it more clear to the clients what is going >>>>>>>>> on. A >>>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is >>>>>>>>> a way to >>>>>>>>> improve the story for newer clients. (Ie if we choose to expire >>>>>>>>> based on a >>>>>>>>> size limit, we should include a response indicating the ID has >>>>>>>>> expired.) We >>>>>>>>> also discussed ways to redefine the guarantees so that users who >>>>>>>>> have >>>>>>>>> stronger idempotency requirements can ensure them (over >>>>>>>>> availability/memory >>>>>>>>> concerns). Let me know if you have any ideas here. >>>>>>>>> >>>>>>>> >>>>>>>> It may be easier to improve the experience for new clients. >>>>>>>> However, if we improved only the new clients we may need a way to help >>>>>>>> teams who run Kafka with rough clients on old versions by at least >>>>>>>> giving >>>>>>>> them an easy way to identify the clientId/ or KafkaPrinciple that >>>>>>>> generated >>>>>>>> these PIDs. >>>>>>>> >>>>>>>> For context, it's very tricky to even identify which clientId is >>>>>>>> creating all these PIDs that caused OOM, which is a contributing part >>>>>>>> of >>>>>>>> the issue at the moment. So maybe one option here could be adding a new >>>>>>>> metric that tracks the number of generated PIDs per clientId. This will >>>>>>>> help the team who runs the Kafka cluster to >>>>>>>> - contact these rough clients and ask them to fix their clients or >>>>>>>> upgrade to a new client if the new client version has a better >>>>>>>> experience. >>>>>>>> - or if ended with a throttling solution this may help identify >>>>>>>> which clientId needs to be throttled. >>>>>>>> >>>>>>>> Maybe we can start with a solution for identifying the rough >>>>>>>> clients first and keep looking for a solution to limit them, what do >>>>>>>> you >>>>>>>> think? >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan >>>>>>>> <jols...@confluent.io.invalid> wrote: >>>>>>>> >>>>>>>>> Oops. I realized I just replied to Omnia 🤦♀️ >>>>>>>>> >>>>>>>>> Here was my response for the mailing thread: >>>>>>>>> >>>>>>>>> Hey Omnia, >>>>>>>>> Sorry to hear this is a problem for you as well. :( >>>>>>>>> > * I have some concerns about the impact of this option on the >>>>>>>>> transactional producers, for example, what will happen to an >>>>>>>>> ongoing >>>>>>>>> transaction associated with an expired PID? Would this leave the >>>>>>>>> transactions in a "hanging" state?* >>>>>>>>> We currently check if a transaction is ongoing and do not expire >>>>>>>>> the >>>>>>>>> producer ID if it has an ongoing transaction. I suspect we will >>>>>>>>> continue to >>>>>>>>> do this with any solution we pick. >>>>>>>>> >>>>>>>>> My team members and I looked a bit into the throttling case and it >>>>>>>>> can get >>>>>>>>> a bit tricky since it means we need to throttle the produce >>>>>>>>> request before >>>>>>>>> it is processed. This means we either block all requests or have >>>>>>>>> to store >>>>>>>>> the request in memory (which is not great if we are trying to save >>>>>>>>> memory). >>>>>>>>> >>>>>>>>> I recently had another discussion with my team and it does seem >>>>>>>>> like there >>>>>>>>> should be a way to make it more clear to the clients what is going >>>>>>>>> on. A >>>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is >>>>>>>>> a way to >>>>>>>>> improve the story for newer clients. (Ie if we choose to expire >>>>>>>>> based on a >>>>>>>>> size limit, we should include a response indicating the ID has >>>>>>>>> expired.) We >>>>>>>>> also discussed ways to redefine the guarantees so that users who >>>>>>>>> have >>>>>>>>> stronger idempotency requirements can ensure them (over >>>>>>>>> availability/memory >>>>>>>>> concerns). Let me know if you have any ideas here. >>>>>>>>> >>>>>>>>> Thanks again for commenting here, hopefully we can come to a good >>>>>>>>> solution. >>>>>>>>> >>>>>>>>> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen <show...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> > Hi Omnia, >>>>>>>>> > >>>>>>>>> > Thanks for your reply. >>>>>>>>> > >>>>>>>>> > For (3), you said: >>>>>>>>> > > - I have some concerns about the impact of this option on the >>>>>>>>> > transactional >>>>>>>>> > producers, for example, what will happen to an ongoing >>>>>>>>> transaction >>>>>>>>> > associated with an expired PID? Would this leave the >>>>>>>>> transactions in a >>>>>>>>> > "hanging" state? >>>>>>>>> > >>>>>>>>> > - How will we notify the client that the transaction can't >>>>>>>>> continue due to >>>>>>>>> > an expired PID? >>>>>>>>> > >>>>>>>>> > - If PID got marked as `expired` this will mean that >>>>>>>>> > `admin.DescribeProducers` will not list them which will make >>>>>>>>> > *`kafka-transactions.sh >>>>>>>>> > --list`* a bit tricky as we can't identify if there are >>>>>>>>> transactions linked >>>>>>>>> > to this expired PID or not. The same concern applies to >>>>>>>>> > *`kafka-transactions.sh >>>>>>>>> > --find-hanging`*. >>>>>>>>> > >>>>>>>>> > --> Yes, you're right. Those are also concerns for this solution. >>>>>>>>> > Currently, there's no way to notify clients about the expiration. >>>>>>>>> > Also, the ongoing transactions will be hanging. For the admin >>>>>>>>> cli, we've >>>>>>>>> > never thought about it. Good point. >>>>>>>>> > In summary, to adopt this solution, there are many issues needed >>>>>>>>> to get >>>>>>>>> > fixed. >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > For (5), you said: >>>>>>>>> > > I am assuming you mean KafkaPrincipal here! If so is your >>>>>>>>> concern here >>>>>>>>> > that >>>>>>>>> > those good clients that use the same principal as a rogue one >>>>>>>>> will get >>>>>>>>> > throttled? >>>>>>>>> > >>>>>>>>> > If this is the case, then I believe it should be okay as other >>>>>>>>> throttling >>>>>>>>> > in Kafka on *`/config/users/<user>`* has the same behaviour. >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > What about applying limit/throttling to >>>>>>>>> > *`/config/users/<user>/clients/<client-id>` >>>>>>>>> > *similar to what we have with client quota? This should reduce >>>>>>>>> the concern >>>>>>>>> > about throttling good clients, right? >>>>>>>>> > >>>>>>>>> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear) >>>>>>>>> > Yes, We were thinking about throttling by KafkaPrinciple. Client >>>>>>>>> Id is >>>>>>>>> > also workable. >>>>>>>>> > It's just these 2 attributes are not required. >>>>>>>>> > That is, it's possible we take all clients as the same one: >>>>>>>>> {default >>>>>>>>> > KafkaPrinciple + default clientID}, and apply throttling on it. >>>>>>>>> > Do you have any thoughts about it? >>>>>>>>> > Maybe skip throttling for {default KafkaPrinciple + default >>>>>>>>> clientID} ? >>>>>>>>> > >>>>>>>>> > Luke >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim < >>>>>>>>> o.g.h.ibra...@gmail.com> >>>>>>>>> > wrote: >>>>>>>>> > >>>>>>>>> >> Hi Luke & Justine, >>>>>>>>> >> Thanks for looking into this issue, we have been experiencing >>>>>>>>> this because >>>>>>>>> >> of rouge clients as well. >>>>>>>>> >> >>>>>>>>> >> > 3. Having a limit to the number of active producer IDs (sort >>>>>>>>> of like an >>>>>>>>> >> LRU >>>>>>>>> >> >cache) >>>>>>>>> >> >-> The idea here is that if we hit a misconfigured client, we >>>>>>>>> will expire >>>>>>>>> >> >the older entries. The concern here is we have risks to lose >>>>>>>>> idempotency >>>>>>>>> >> >guarantees, and currently, we don't have a way to notify >>>>>>>>> clients about >>>>>>>>> >> >losing idempotency guarantees. Besides, the least recently >>>>>>>>> used entries >>>>>>>>> >> >got removed are not always from the "bad" clients. >>>>>>>>> >> >>>>>>>>> >> - I have some concerns about the impact of this option on the >>>>>>>>> >> transactional >>>>>>>>> >> producers, for example, what will happen to an ongoing >>>>>>>>> transaction >>>>>>>>> >> associated with an expired PID? Would this leave the >>>>>>>>> transactions in a >>>>>>>>> >> "hanging" state? >>>>>>>>> >> >>>>>>>>> >> - How will we notify the client that the transaction can't >>>>>>>>> continue due to >>>>>>>>> >> an expired PID? >>>>>>>>> >> >>>>>>>>> >> - If PID got marked as `expired` this will mean that >>>>>>>>> >> `admin.DescribeProducers` will not list them which will make >>>>>>>>> >> *`kafka-transactions.sh >>>>>>>>> >> --list`* a bit tricky as we can't identify if there are >>>>>>>>> transactions >>>>>>>>> >> linked >>>>>>>>> >> to this expired PID or not. The same concern applies to >>>>>>>>> >> *`kafka-transactions.sh >>>>>>>>> >> --find-hanging`*. >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >5. limit/throttling the producer id based on the principle >>>>>>>>> >> >-> Although we can limit the impact to a certain principle >>>>>>>>> with this >>>>>>>>> >> idea, >>>>>>>>> >> >same concern still exists as solution #1 #2. >>>>>>>>> >> >>>>>>>>> >> I am assuming you mean KafkaPrincipal here! If so is your >>>>>>>>> concern here >>>>>>>>> >> that >>>>>>>>> >> those good clients that use the same principal as a rogue one >>>>>>>>> will get >>>>>>>>> >> throttled? >>>>>>>>> >> >>>>>>>>> >> If this is the case, then I believe it should be okay as other >>>>>>>>> throttling >>>>>>>>> >> in Kafka on *`/config/users/<user>`* has the same behaviour. >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> What about applying limit/throttling to >>>>>>>>> >> *`/config/users/<user>/clients/<client-id>` >>>>>>>>> >> *similar to what we have with client quota? This should reduce >>>>>>>>> the concern >>>>>>>>> >> about throttling good clients, right? >>>>>>>>> >> >>>>>>>>> >> best, >>>>>>>>> >> >>>>>>>>> >> Omnia >>>>>>>>> >> >>>>>>>>> >> On Tue, Oct 11, 2022 at 4:18 AM Luke Chen <show...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >> >>>>>>>>> >> > Bump this thread to see if there are any comments/thoughts. >>>>>>>>> >> > Thanks. >>>>>>>>> >> > >>>>>>>>> >> > Luke >>>>>>>>> >> > >>>>>>>>> >> > On Mon, Sep 26, 2022 at 11:06 AM Luke Chen <show...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >> > >>>>>>>>> >> > > Hi devs, >>>>>>>>> >> > > >>>>>>>>> >> > > As stated in the motivation section in KIP-854 >>>>>>>>> >> > > < >>>>>>>>> >> > >>>>>>>>> >> >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry >>>>>>>>> >> > >: >>>>>>>>> >> > > >>>>>>>>> >> > > With idempotent producers becoming the default in Kafka, >>>>>>>>> this means >>>>>>>>> >> that >>>>>>>>> >> > > unless otherwise specified, all new producers will be given >>>>>>>>> producer >>>>>>>>> >> IDs. >>>>>>>>> >> > > Some (inefficient) applications may now create many >>>>>>>>> non-transactional >>>>>>>>> >> > > idempotent producers. Each of these producers will be >>>>>>>>> assigned a >>>>>>>>> >> producer >>>>>>>>> >> > > ID and these IDs and their metadata are stored in the >>>>>>>>> broker memory, >>>>>>>>> >> > which >>>>>>>>> >> > > might cause brokers out of memory. >>>>>>>>> >> > > >>>>>>>>> >> > > Justine (in cc.) and I and some other team members are >>>>>>>>> working on the >>>>>>>>> >> > > solutions for this issue. But none of them solves it >>>>>>>>> completely >>>>>>>>> >> without >>>>>>>>> >> > > side effects. Among them, "availability" VS "idempotency >>>>>>>>> guarantees" >>>>>>>>> >> is >>>>>>>>> >> > > what we can't decide which to sacrifice. Some of these >>>>>>>>> solutions >>>>>>>>> >> > sacrifice >>>>>>>>> >> > > availability of produce (1,2,5) and others sacrifice >>>>>>>>> idempotency >>>>>>>>> >> > guarantees >>>>>>>>> >> > > (3). It could be useful to know if people generally have a >>>>>>>>> preference >>>>>>>>> >> one >>>>>>>>> >> > > way or the other. Or what other better solutions there >>>>>>>>> might be. >>>>>>>>> >> > > >>>>>>>>> >> > > Here are the proposals we came up with: >>>>>>>>> >> > > >>>>>>>>> >> > > 1. Limit the total active producer ID allocation number. >>>>>>>>> >> > > -> This is the simplest solution. But since the OOM issue >>>>>>>>> is usually >>>>>>>>> >> > > caused by a rogue or misconfigured client, and this >>>>>>>>> solution might >>>>>>>>> >> > "punish" >>>>>>>>> >> > > the good client from sending messages. >>>>>>>>> >> > > >>>>>>>>> >> > > 2. Throttling the producer ID allocation rate >>>>>>>>> >> > > -> Same concern as the solution #1. >>>>>>>>> >> > > >>>>>>>>> >> > > 3. Having a limit to the number of active producer IDs >>>>>>>>> (sort of like >>>>>>>>> >> an >>>>>>>>> >> > > LRU cache) >>>>>>>>> >> > > -> The idea here is that if we hit a misconfigured client, >>>>>>>>> we will >>>>>>>>> >> expire >>>>>>>>> >> > > the older entries. The concern here is we have risks to lose >>>>>>>>> >> idempotency >>>>>>>>> >> > > guarantees, and currently, we don't have a way to notify >>>>>>>>> clients about >>>>>>>>> >> > > losing idempotency guarantees. Besides, the least recently >>>>>>>>> used >>>>>>>>> >> entries >>>>>>>>> >> > > got removed are not always from the "bad" clients. >>>>>>>>> >> > > >>>>>>>>> >> > > 4. allow clients to "close" the producer ID usage >>>>>>>>> >> > > -> We can provide a way for producer to "close" producerID >>>>>>>>> usage. >>>>>>>>> >> > > Currently, we only have a way to INIT_PRODUCER_ID requested >>>>>>>>> to >>>>>>>>> >> allocate >>>>>>>>> >> > > one. After that, we'll keep the producer ID metadata in >>>>>>>>> broker even if >>>>>>>>> >> > the >>>>>>>>> >> > > producer is "closed". Having a closed API (ex: >>>>>>>>> END_PRODUCER_ID), we >>>>>>>>> >> can >>>>>>>>> >> > > remove the entry from broker side. In client side, we can >>>>>>>>> send it when >>>>>>>>> >> > > producer closing. The concern is, the old clients >>>>>>>>> (including non-java >>>>>>>>> >> > > clients) will still suffer from the OOM issue. >>>>>>>>> >> > > >>>>>>>>> >> > > 5. limit/throttling the producer id based on the principle >>>>>>>>> >> > > -> Although we can limit the impact to a certain principle >>>>>>>>> with this >>>>>>>>> >> > idea, >>>>>>>>> >> > > same concern still exists as solution #1 #2. >>>>>>>>> >> > > >>>>>>>>> >> > > Any thoughts/feedback are welcomed. >>>>>>>>> >> > > >>>>>>>>> >> > > Thank you. >>>>>>>>> >> > > Luke >>>>>>>>> >> > > >>>>>>>>> >> > >>>>>>>>> >> >>>>>>>>> > >>>>>>>>> >>>>>>>>