Hi Andrew (and Omnia), Thanks for the KIP. I hope to provide some feedback on this KIP soon, but I had a thought on the specific subject of group configs and MM2. If brokers validate for known groups configs then doesn't this induce an ordering requirement on upgrading clusters: Wouldn't you have to upgrade a destination cluster first, in order that it knew about `group.type`, otherwise it would reject attempts to configure an unknown group config parameter? A similar issue arises wrt topic configs, but this is the first instance (that I'm aware of) of a config being added during the MM2 era, so perhaps this is a minor problem worth thinking about.
Cheers, Tom On Wed, 3 Apr 2024 at 05:31, Andrew Schofield < andrew_schofield_j...@outlook.com> wrote: > Hi Omnia, > Thanks for your questions. > > The DR angle on `group.type` is interesting and I had not considered it. > The namespace of groups contains > both consumer groups and share groups, so I was trying to ensure that > which group type was used was > deterministic rather than a race to create the first member. There are > already other uses of the group protocol > such as Kafka Connect, so it’s all a bit confusing even today. > > It is actually KIP-848 which introduces configurations for group resources > and KIP-932 is just building on > the idea. I think that MM2 will need to sync these configurations. The > question of whether `group.type` is > a sensible configuration I think is separate. > > Imagine that we do have `group.type` as a group configuration. How would > we end up with groups with > the same ID but different types on the two ends of MM2? Assuming that both > ends have KIP-932 enabled, > either the configuration was not set, and a consumer group was made on one > end while a share group was > made on the other, OR, the configuration was set but its value changed, > and again we get a divergence. > > I think that on balance, having `group.type` as a configuration does at > least mean there’s a better chance that > the two ends of MM2 do agree on the type of group. I’m happy to consider > other ways to do this better. The > fact that we have different kinds of group in the same namespace is the > tricky thing. I think this was possible > before this KIP, but it’s much more likely now. > > > Onto the question of memory. There are several different parts to this, > all of which are distributed across > the cluster. > > * For the group coordinator, the memory consumption will be affected by > the number of groups, > the number of members and the number of topic-partitions to be assigned to > the members. The > group coordinator is concerned with membership and assignment, so the > memory per topic-partition > will be small. > * For the share coordinator, the memory consumption will be affected by > the number of groups, the > number of topic-partitions being consumed in the group, and the number of > in-flight records, but not > the number of members. We should be talking about no more than kilobytes > per topic-partition. > * For the share-partition leader, the memory consumption will be affected > by the number of share group > members assigned the topic-partition and the number of in-flight records. > Again, we should be talking > about no more than kilobytes per topic-partition. > > Of these, the factor that is not directly under control is the number of > topic-partitions. The reason is that > I wanted to avoid a situation where the number of partitions in a topic > was increased and suddenly > consumption in a share group hit a limit that was not anticipated. > > I could introduce a configuration for controlling the number of topics > allowed to be subscribed in a share > group. Personally, I think 1 would be a good starting point. > > Let me know what you think. > > Thanks, > Andrew > > > > On 2 Apr 2024, at 15:39, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote: > > > > Hi Andrew, > > Thanks for the KIP it is definitely an interesting read. I have few > questions > > As the KIP proposing extending `AdminClient.incrementalAlterConfigs` to > add an explicit `group.type` what would this means for DR feature in MM2 > offering? > > Right now MM2 sync consumer group offsets from source to destination > cluster. And it also offer sync ACLs which contribute to DR feature. Would > this KIP means MM2 needs to also sync the type of groups to destination? > > As `AdminClient.incrementalAlterConfigs` means "when a new group is > created with this name, it must have this type”. What will happened if > clusters on both ends of MM2 has same group id but with different types? > > If this concern is out of the scope we might need to call this out > somewhere in the KIP. > > While the number of share-group and the number of consumers in > share-group is limited by `group.share.max.groups`and > `group.share.max.size` the total number of share-group state records that > might need to be loaded in-memeory has another factor which is the number > of partitions. In cases where group is consuming from large number of > topics with large number of partitions what will be the impact on > coordinator memory? > > > > Thanks > > Omnia > > > > > >> On 25 Mar 2024, at 10:23, Andrew Schofield < > andrew_schofield_j...@outlook.com> wrote: > >> > >> Hi Justine, > >> Thanks for your questions. > >> > >> There are several limits in this KIP. With consumer groups, we see > problems > >> where there are huge numbers of consumer groups, and we also see > problems > >> when there are huge number of members in a consumer group. > >> > >> There’s a limit on the number of members in share group. When the limit > is reached, > >> additional members are not admitted to the group. The members heartbeat > to remain > >> in the group and that enables timely expiration. > >> > >> There’s also a limit of the number of share groups in a cluster. > Initially, this limit has been > >> set low. As a result, it would be possible to create sufficient groups > to reach the limit, > >> and then creating additional groups will fail. It will be possible to > delete a share group > >> administratively, but share groups do not automatically expire (just > like topics do not > >> expire and queues in message-queuing systems do not expire). > >> > >> The `kafka-console-share-consumer.sh` tool in the KIP defaults the > group name to > >> “share”. This has two benefits. First, it means that the trivial > exploratory use of it running > >> multiple concurrent copies will naturally get sharing of the records > consumed. > >> Second, it means that only one share group is being create, rather than > generating another > >> group ID for each execution. > >> > >> Please do re-read the read-committed section. I’ll grateful for all the > thoughtful reviews > >> that the community is able to provide. The KIP says that broker-side > filtering > >> removes the records for aborted transactions. This is obviously quite a > difference compared > >> with consumers in consumer groups. It think it would also be possible > to do it client-side > >> but the records fetched from the replica manager are distributed among > the consumers, > >> and I’m concerned that it would be difficult to distribute the list of > aborted transactions > >> relevant to the records each consumer receives. I’m considering > prototyping client-side > >> filtering to see how well it works in practice. > >> > >> I am definitely thoughtful about the inter-broker hops in order to > persist the share-group > >> state. Originally, I did look at writing the state directly into the > user’s topic-partitions > >> because this means the share-partition leader would be able to write > directly. > >> This has downsides as documented in the “Rejected Alternatives” section > of the KIP. > >> > >> We do have opportunities for pipelining and batching which I expect we > will exploit > >> in order to improve the performance. > >> > >> This KIP is only the beginning. I expect a future KIP will address > storage of metadata > >> in a more performant way. > >> > >> Thanks, > >> Andrew > >> > >>> On 21 Mar 2024, at 15:40, Justine Olshan <jols...@confluent.io.INVALID> > wrote: > >>> > >>> Thanks Andrew, > >>> > >>> That answers some of the questions I have. > >>> > >>> With respect to the limits -- how will this be implemented? One issue > we > >>> saw with producers is "short-lived" producers that send one message and > >>> disconnect. > >>> Due to how expiration works for producer state, if we have a simple > limit > >>> for producer IDs, all new producers are blocked until the old ones > expire. > >>> Will we block new group members as well if we reach our limit? > >>> > >>> In the consumer case, we have a heartbeat which can be used for > expiration > >>> behavior and avoid the headache we see on the producer side, but I can > >>> imagine a case where misuse of the groups themselves could occur -- ie > >>> creating a short lived share group that I believe will take some time > to > >>> expire. Do we have considerations for this case? > >>> > >>> I also plan to re-read the read-committed section and may have further > >>> questions there. > >>> > >>> You also mentioned in the KIP how there are a few inter-broker hops to > the > >>> share coordinator, etc for a given read operation of a partition. Are > we > >>> concerned about performance here? My work in transactions and trying to > >>> optimize performance made me realize how expensive these inter-broker > hops > >>> can be. > >>> > >>> Justine > >>> > >>> On Thu, Mar 21, 2024 at 7:37 AM Andrew Schofield < > >>> andrew_schofield_j...@outlook.com> wrote: > >>> > >>>> Hi Justine, > >>>> Thanks for your comment. Sorry for the delay responding. > >>>> > >>>> It was not my intent to leave a query unanswered. I have modified the > KIP > >>>> as a result > >>>> of the discussion and I think perhaps I didn’t neatly close off the > email > >>>> thread. > >>>> > >>>> In summary: > >>>> > >>>> * The share-partition leader does not maintain an explicit cache of > >>>> records that it has > >>>> fetched. When it fetches records, it does “shallow” iteration to look > at > >>>> the batch > >>>> headers only so that it understands at least the base/last offset of > the > >>>> records within. > >>>> It is left to the consumers to do the “deep” iteration of the records > >>>> batches they fetch. > >>>> > >>>> * It may sometimes be necessary to re-fetch records for redelivery. > This > >>>> is essentially > >>>> analogous to two consumer groups independently fetching the same > records > >>>> today. > >>>> We will be relying on the efficiency of the page cache. > >>>> > >>>> * The zero-copy optimisation is not possible for records fetched for > >>>> consumers in > >>>> share groups. The KIP does not affect the use of the zero-copy > >>>> optimisation for any > >>>> scenarios in which it currently applies (this was not true in one > earlier > >>>> version of the KIP). > >>>> > >>>> * I share concern about memory usage, partly because of the producer > state > >>>> management > >>>> area. To keep a lid on memory use, the number of share groups, the > number > >>>> of members > >>>> of each share group, and the number of in-flight messages per > partition in > >>>> a share group > >>>> are all limited. The aim is to get the in-memory state to be nice and > >>>> compact, probably > >>>> at the expense of throughput. Over time, I’m sure we’ll optimise and > get a > >>>> bit more > >>>> headroom. Limits like these cannot easily be applied retrospectively, > so > >>>> the limits are > >>>> there right at the start. > >>>> > >>>> * I have reworked the section on read-committed isolation level, and > the > >>>> complexity > >>>> and memory usage of the approach is significantly improved. > >>>> > >>>> I hope this answers your question. > >>>> > >>>> Thanks, > >>>> Andrew > >>>> > >>>> > >>>>> On 18 Mar 2024, at 20:47, Justine Olshan > <jols...@confluent.io.INVALID> > >>>> wrote: > >>>>> > >>>>> Hey Andrew, > >>>>> > >>>>> I noticed you started the voting thread, but there seems to be a few > >>>>> questions that were not answered. One was Jun's about memory usage > >>>>>> How much additional heap memory will the server use? Do we need to > cache > >>>>> records in heap? If so, is the cache bounded? > >>>>> > >>>>> Your response was > >>>>>> This area needs more work. Using a share group surely gets the > broker to > >>>>> do > >>>>> more manipulation of the data that it fetches than a regular > consumer. I > >>>>> want to minimise > >>>>> this and need to research before providing a comprehensive answer. I > >>>>> suspect zero-copy > >>>>> is lost and that we do not cache records in heap. I will confirm > later > >>>> on. > >>>>> > >>>>> I am also concerned about memory usage from my producer state > management > >>>>> work, so I want to make sure we have thought about it here -- not > just in > >>>>> the case Jun mentioned but generally. > >>>>> > >>>>> Likewise, we have seen issues with large consumer groups and too many > >>>>> producer IDs. Are there any concerns with an analogous situation > with too > >>>>> many share group members or share groups? Are they any ways we try to > >>>>> handle this or mitigate risks with respect to memory usage and client > >>>>> connections (wrt to rebalances for example) > >>>>> > >>>>> Thanks, > >>>>> > >>>>> Justine > >>>>> > >>>>> On Fri, Mar 8, 2024 at 12:51 AM Andrew Schofield < > >>>>> andrew_schofield_j...@outlook.com> wrote: > >>>>> > >>>>>> Hi Manikumar, > >>>>>> Thanks for your queries. > >>>>>> > >>>>>> 1) Delivery count is added to the ConsumerRecord class so that a > >>>> consumer > >>>>>> can > >>>>>> tell how often a record has been processed. I imagine that some > >>>>>> applications might > >>>>>> want to take different actions based on whether a record has > previously > >>>>>> failed. This > >>>>>> enables richer error handling for bad records. In the future, I plan > >>>>>> another KIP to > >>>>>> enhance error handling. > >>>>>> > >>>>>> 2) It is only possible to delete a share group which is empty. As a > >>>>>> result, all > >>>>>> well-behaved consumers will have closed their share sessions. After > a > >>>>>> short while, > >>>>>> the share-partition leaders will discard the share-partition > information > >>>>>> from memory. > >>>>>> > >>>>>> In the presence of badly behaved consumers, a consumer would have to > >>>>>> pretend to > >>>>>> be a member of a share group. There are several cases: > >>>>>> > >>>>>> a) If the share-partition leader still has in-memory state for the > >>>> deleted > >>>>>> share-group, it will > >>>>>> continue to fetch records but it will be fenced by the share > coordinator > >>>>>> when it attempts to > >>>>>> write its persistent state. > >>>>>> > >>>>>> b) If the share-partition leader does not have in-memory state, it > will > >>>>>> attempt to read it > >>>>>> from the share coordinator and this will fail. > >>>>>> > >>>>>> 3) I will add metrics for the share coordinator today. This was an > >>>>>> omission. Thanks for catching it. > >>>>>> > >>>>>> Thanks, > >>>>>> Andrew > >>>>>> > >>>>>>> On 6 Mar 2024, at 17:53, Manikumar <manikumar.re...@gmail.com> > wrote: > >>>>>>> > >>>>>>> Hi Andrew, > >>>>>>> > >>>>>>> Thanks for the updated KIP. Few queries below: > >>>>>>> > >>>>>>> 1. What is the use-case of deliveryCount in ShareFetchResponse? > >>>>>>> 2. During delete share groups, Do we need to clean any in-memory > state > >>>>>> from > >>>>>>> share-partition leaders? > >>>>>>> 3. Any metrics for the share-coordinator? > >>>>>>> > >>>>>>> Thanks > >>>>>>> Manikumar > >>>>>>> > >>>>>>> On Wed, Feb 21, 2024 at 12:11 AM Andrew Schofield < > >>>>>>> andrew_schofield_j...@outlook.com> wrote: > >>>>>>> > >>>>>>>> Hi Manikumar, > >>>>>>>> Thanks for your comments. > >>>>>>>> > >>>>>>>> 1. I believe that in general, there are not situations in which a > >>>>>> dynamic > >>>>>>>> config > >>>>>>>> change is prevented because of the existence of a resource. So, > if we > >>>>>>>> prevented > >>>>>>>> setting config `group.type=consumer` on resource G of GROUP type > >>>>>>>> if there was a share group G in existence, it would be a bit > weird. > >>>>>>>> > >>>>>>>> I wonder whether changing the config name to `new.group.type` > would > >>>>>> help. > >>>>>>>> It’s > >>>>>>>> ensuring the type of a new group created. > >>>>>>>> > >>>>>>>> 2. The behaviour for a DEAD share group is intended to be the > same as > >>>> a > >>>>>>>> DEAD > >>>>>>>> consumer group. The group cannot be “reused” again as such, but > the > >>>>>> group > >>>>>>>> ID > >>>>>>>> can be used by a new group. > >>>>>>>> > >>>>>>>> 3. Yes. AlterShareGroupOffsets will cause a new SHARE_CHECKPOINT. > >>>>>>>> > >>>>>>>> 4. In common with Admin.deleteConsumerGroups, the underlying > Kafka RPC > >>>>>>>> for Admin.deleteShareGroups is DeleteGroups. This is handled by > the > >>>>>> group > >>>>>>>> coordinator and it does this by writing control records (a > tombstone > >>>> in > >>>>>>>> this case). > >>>>>>>> The KIP doesn’t say anything about this because it’s the same as > >>>>>> consumer > >>>>>>>> groups. > >>>>>>>> Perhaps it would be sensible to add a GroupType to > DeleteGroupsRequest > >>>>>> so > >>>>>>>> we can > >>>>>>>> make sure we are deleting the correct type of group. The fact that > >>>> there > >>>>>>>> is not a specific > >>>>>>>> RPC for DeleteShareGroups seems correct to me. > >>>>>>>> > >>>>>>>> 5. I prefer using “o.a.k.clients.consumer” because it’s already a > >>>> public > >>>>>>>> package and > >>>>>>>> many of the classes and interfaces such as ConsumerRecord are in > that > >>>>>>>> package. > >>>>>>>> > >>>>>>>> I definitely need to add more information about how the Admin > >>>> operations > >>>>>>>> work. > >>>>>>>> I will add a section to the KIP in the next version, later today. > This > >>>>>>>> will fill in details for > >>>>>>>> your questions (3) and (4). > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Andrew > >>>>>>>> > >>>>>>>>> On 14 Feb 2024, at 18:04, Manikumar <manikumar.re...@gmail.com> > >>>> wrote: > >>>>>>>>> > >>>>>>>>> Hi Andrew, > >>>>>>>>> > >>>>>>>>> Thanks for the KIP. A few comments below. > >>>>>>>>> > >>>>>>>>> 1. kafka-configs.sh (incrementalAlterConfigs) allows you to > >>>> dynamically > >>>>>>>>> change the configs. Maybe in this case, we should not allow the > user > >>>> to > >>>>>>>>> change `group.type` if it's already set. > >>>>>>>>> 2. What's the behaviour after a group transitions into DEAD > state. Do > >>>>>> we > >>>>>>>>> add new control records to reset the state? Can we reuse the > group > >>>>>> again? > >>>>>>>>> 3. Are we going to write new control records after the > >>>>>>>>> AlterShareGroupOffsets API to reset the state? > >>>>>>>>> 4. Is there any API for DeleteShareGroups? I assume, group > >>>> co-ordinator > >>>>>>>> is > >>>>>>>>> going to handle the API. If so, Does this mean the group > co-ordinator > >>>>>>>> also > >>>>>>>>> needs to write control records? > >>>>>>>>> 5. How about using "org.apache.kafka.clients.consumer.share" > package > >>>>>> for > >>>>>>>>> new interfaces/classes? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Manikumar > >>>>>>>> > >>>>>>>> > >>>>>> > >>>>>> > >>>> > >>>> > >> > > > >