Hi Andrew (and Omnia),

Thanks for the KIP. I hope to provide some feedback on this KIP soon, but I
had a thought on the specific subject of group configs and MM2. If brokers
validate for known groups configs then doesn't this induce an ordering
requirement on upgrading clusters: Wouldn't you have to upgrade a
destination cluster first, in order that it knew about `group.type`,
otherwise it would reject attempts to configure an unknown group config
parameter? A similar issue arises wrt topic configs, but this is the first
instance (that I'm aware of) of a config being added during the MM2 era, so
perhaps this is a minor problem worth thinking about.

Cheers,

Tom

On Wed, 3 Apr 2024 at 05:31, Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

> Hi Omnia,
> Thanks for your questions.
>
> The DR angle on `group.type` is interesting and I had not considered it.
> The namespace of groups contains
> both consumer groups and share groups, so I was trying to ensure that
> which group type was used was
> deterministic rather than a race to create the first member. There are
> already other uses of the group protocol
> such as Kafka Connect, so it’s all a bit confusing even today.
>
> It is actually KIP-848 which introduces configurations for group resources
> and KIP-932 is just building on
> the idea. I think that MM2 will need to sync these configurations. The
> question of whether `group.type` is
> a sensible configuration I think is separate.
>
> Imagine that we do have `group.type` as a group configuration. How would
> we end up with groups with
> the same ID but different types on the two ends of MM2? Assuming that both
> ends have KIP-932 enabled,
> either the configuration was not set, and a consumer group was made on one
> end while a share group was
> made on the other, OR, the configuration was set but its value changed,
> and again we get a divergence.
>
> I think that on balance, having `group.type` as a configuration does at
> least mean there’s a better chance that
> the two ends of MM2 do agree on the type of group. I’m happy to consider
> other ways to do this better. The
> fact that we have different kinds of group in the same namespace is the
> tricky thing. I think this was possible
> before this KIP, but it’s much more likely now.
>
>
> Onto the question of memory. There are several different parts to this,
> all of which are distributed across
> the cluster.
>
> * For the group coordinator, the memory consumption will be affected by
> the number of groups,
> the number of members and the number of topic-partitions to be assigned to
> the members. The
> group coordinator is concerned with membership and assignment, so the
> memory per topic-partition
> will be small.
> * For the share coordinator, the memory consumption will be affected by
> the number of groups, the
> number of topic-partitions being consumed in the group, and the number of
> in-flight records, but not
> the number of members. We should be talking about no more than kilobytes
> per topic-partition.
> * For the share-partition leader, the memory consumption will be affected
> by the number of share group
> members assigned the topic-partition and the number of in-flight records.
> Again, we should be talking
> about no more than kilobytes per topic-partition.
>
> Of these, the factor that is not directly under control is the number of
> topic-partitions. The reason is that
> I wanted to avoid a situation where the number of partitions in a topic
> was increased and suddenly
> consumption in a share group hit a limit that was not anticipated.
>
> I could introduce a configuration for controlling the number of topics
> allowed to be subscribed in a share
> group. Personally, I think 1 would be a good starting point.
>
> Let me know what you think.
>
> Thanks,
> Andrew
>
>
> > On 2 Apr 2024, at 15:39, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote:
> >
> > Hi Andrew,
> > Thanks for the KIP it is definitely an interesting read. I have few
> questions
> > As the KIP proposing extending `AdminClient.incrementalAlterConfigs` to
> add an explicit `group.type` what would this means for DR feature in MM2
> offering?
> > Right now MM2 sync consumer group offsets from source to destination
> cluster. And it also offer sync ACLs which contribute to DR feature. Would
> this KIP means MM2 needs to also sync the type of groups to destination?
> > As `AdminClient.incrementalAlterConfigs` means "when a new group is
> created with this name, it must have this type”. What will happened if
> clusters on both ends of MM2 has same group id but with different types?
> > If this concern is out of the scope we might need to call this out
> somewhere in the KIP.
> > While the number of share-group and the number of consumers in
> share-group is limited by `group.share.max.groups`and
> `group.share.max.size` the total number of share-group state records that
> might need to be loaded in-memeory has another factor which is the number
> of partitions. In cases where group is consuming from large number of
> topics with large number of partitions what will be the impact on
> coordinator memory?
> >
> > Thanks
> > Omnia
> >
> >
> >> On 25 Mar 2024, at 10:23, Andrew Schofield <
> andrew_schofield_j...@outlook.com> wrote:
> >>
> >> Hi Justine,
> >> Thanks for your questions.
> >>
> >> There are several limits in this KIP. With consumer groups, we see
> problems
> >> where there are huge numbers of consumer groups, and we also see
> problems
> >> when there are huge number of members in a consumer group.
> >>
> >> There’s a limit on the number of members in share group. When the limit
> is reached,
> >> additional members are not admitted to the group. The members heartbeat
> to remain
> >> in the group and that enables timely expiration.
> >>
> >> There’s also a limit of the number of share groups in a cluster.
> Initially, this limit has been
> >> set low. As a result, it would be possible to create sufficient groups
> to reach the limit,
> >> and then creating additional groups will fail. It will be possible to
> delete a share group
> >> administratively, but share groups do not automatically expire (just
> like topics do not
> >> expire and queues in message-queuing systems do not expire).
> >>
> >> The `kafka-console-share-consumer.sh` tool in the KIP defaults the
> group name to
> >> “share”. This has two benefits. First, it means that the trivial
> exploratory use of it running
> >> multiple concurrent copies will naturally get sharing of the records
> consumed.
> >> Second, it means that only one share group is being create, rather than
> generating another
> >> group ID for each execution.
> >>
> >> Please do re-read the read-committed section. I’ll grateful for all the
> thoughtful reviews
> >> that the community is able to provide. The KIP says that broker-side
> filtering
> >> removes the records for aborted transactions. This is obviously quite a
> difference compared
> >> with consumers in consumer groups. It think it would also be possible
> to do it client-side
> >> but the records fetched from the replica manager are distributed among
> the consumers,
> >> and I’m concerned that it would be difficult to distribute the list of
> aborted transactions
> >> relevant to the records each consumer receives. I’m considering
> prototyping client-side
> >> filtering to see how well it works in practice.
> >>
> >> I am definitely thoughtful about the inter-broker hops in order to
> persist the share-group
> >> state. Originally, I did look at writing the state directly into the
> user’s topic-partitions
> >> because this means the share-partition leader would be able to write
> directly.
> >> This has downsides as documented in the “Rejected Alternatives” section
> of the KIP.
> >>
> >> We do have opportunities for pipelining and batching which I expect we
> will exploit
> >> in order to improve the performance.
> >>
> >> This KIP is only the beginning. I expect a future KIP will address
> storage of metadata
> >> in a more performant way.
> >>
> >> Thanks,
> >> Andrew
> >>
> >>> On 21 Mar 2024, at 15:40, Justine Olshan <jols...@confluent.io.INVALID>
> wrote:
> >>>
> >>> Thanks Andrew,
> >>>
> >>> That answers some of the questions I have.
> >>>
> >>> With respect to the limits -- how will this be implemented? One issue
> we
> >>> saw with producers is "short-lived" producers that send one message and
> >>> disconnect.
> >>> Due to how expiration works for producer state, if we have a simple
> limit
> >>> for producer IDs, all new producers are blocked until the old ones
> expire.
> >>> Will we block new group members as well if we reach our limit?
> >>>
> >>> In the consumer case, we have a heartbeat which can be used for
> expiration
> >>> behavior and avoid the headache we see on the producer side, but I can
> >>> imagine a case where misuse of the groups themselves could occur -- ie
> >>> creating a short lived share group that I believe will take some time
> to
> >>> expire. Do we have considerations for this case?
> >>>
> >>> I also plan to re-read the read-committed section and may have further
> >>> questions there.
> >>>
> >>> You also mentioned in the KIP how there are a few inter-broker hops to
> the
> >>> share coordinator, etc for a given read operation of a partition. Are
> we
> >>> concerned about performance here? My work in transactions and trying to
> >>> optimize performance made me realize how expensive these inter-broker
> hops
> >>> can be.
> >>>
> >>> Justine
> >>>
> >>> On Thu, Mar 21, 2024 at 7:37 AM Andrew Schofield <
> >>> andrew_schofield_j...@outlook.com> wrote:
> >>>
> >>>> Hi Justine,
> >>>> Thanks for your comment. Sorry for the delay responding.
> >>>>
> >>>> It was not my intent to leave a query unanswered. I have modified the
> KIP
> >>>> as a result
> >>>> of the discussion and I think perhaps I didn’t neatly close off the
> email
> >>>> thread.
> >>>>
> >>>> In summary:
> >>>>
> >>>> * The share-partition leader does not maintain an explicit cache of
> >>>> records that it has
> >>>> fetched. When it fetches records, it does “shallow” iteration to look
> at
> >>>> the batch
> >>>> headers only so that it understands at least the base/last offset of
> the
> >>>> records within.
> >>>> It is left to the consumers to do the “deep” iteration of the records
> >>>> batches they fetch.
> >>>>
> >>>> * It may sometimes be necessary to re-fetch records for redelivery.
> This
> >>>> is essentially
> >>>> analogous to two consumer groups independently fetching the same
> records
> >>>> today.
> >>>> We will be relying on the efficiency of the page cache.
> >>>>
> >>>> * The zero-copy optimisation is not possible for records fetched for
> >>>> consumers in
> >>>> share groups. The KIP does not affect the use of the zero-copy
> >>>> optimisation for any
> >>>> scenarios in which it currently applies (this was not true in one
> earlier
> >>>> version of the KIP).
> >>>>
> >>>> * I share concern about memory usage, partly because of the producer
> state
> >>>> management
> >>>> area. To keep a lid on memory use, the number of share groups, the
> number
> >>>> of members
> >>>> of each share group, and the number of in-flight messages per
> partition in
> >>>> a share group
> >>>> are all limited. The aim is to get the in-memory state to be nice and
> >>>> compact, probably
> >>>> at the expense of throughput. Over time, I’m sure we’ll optimise and
> get a
> >>>> bit more
> >>>> headroom. Limits like these cannot easily be applied retrospectively,
> so
> >>>> the limits are
> >>>> there right at the start.
> >>>>
> >>>> * I have reworked the section on read-committed isolation level, and
> the
> >>>> complexity
> >>>> and memory usage of the approach is significantly improved.
> >>>>
> >>>> I hope this answers your question.
> >>>>
> >>>> Thanks,
> >>>> Andrew
> >>>>
> >>>>
> >>>>> On 18 Mar 2024, at 20:47, Justine Olshan
> <jols...@confluent.io.INVALID>
> >>>> wrote:
> >>>>>
> >>>>> Hey Andrew,
> >>>>>
> >>>>> I noticed you started the voting thread, but there seems to be a few
> >>>>> questions that were not answered. One was Jun's about memory usage
> >>>>>> How much additional heap memory will the server use? Do we need to
> cache
> >>>>> records in heap? If so, is the cache bounded?
> >>>>>
> >>>>> Your response was
> >>>>>> This area needs more work. Using a share group surely gets the
> broker to
> >>>>> do
> >>>>> more manipulation of the data that it fetches than a regular
> consumer. I
> >>>>> want to minimise
> >>>>> this and need to research before providing a comprehensive answer. I
> >>>>> suspect zero-copy
> >>>>> is lost and that we do not cache records in heap. I will confirm
> later
> >>>> on.
> >>>>>
> >>>>> I am also concerned about memory usage from my producer state
> management
> >>>>> work, so I want to make sure we have thought about it here -- not
> just in
> >>>>> the case Jun mentioned but generally.
> >>>>>
> >>>>> Likewise, we have seen issues with large consumer groups and too many
> >>>>> producer IDs. Are there any concerns with an analogous situation
> with too
> >>>>> many share group members or share groups? Are they any ways we try to
> >>>>> handle this or mitigate risks with respect to memory usage and client
> >>>>> connections (wrt to rebalances for example)
> >>>>>
> >>>>> Thanks,
> >>>>>
> >>>>> Justine
> >>>>>
> >>>>> On Fri, Mar 8, 2024 at 12:51 AM Andrew Schofield <
> >>>>> andrew_schofield_j...@outlook.com> wrote:
> >>>>>
> >>>>>> Hi Manikumar,
> >>>>>> Thanks for your queries.
> >>>>>>
> >>>>>> 1) Delivery count is added to the ConsumerRecord class so that a
> >>>> consumer
> >>>>>> can
> >>>>>> tell how often a record has been processed. I imagine that some
> >>>>>> applications might
> >>>>>> want to take different actions based on whether a record has
> previously
> >>>>>> failed. This
> >>>>>> enables richer error handling for bad records. In the future, I plan
> >>>>>> another KIP to
> >>>>>> enhance error handling.
> >>>>>>
> >>>>>> 2) It is only possible to delete a share group which is empty. As a
> >>>>>> result, all
> >>>>>> well-behaved consumers will have closed their share sessions. After
> a
> >>>>>> short while,
> >>>>>> the share-partition leaders will discard the share-partition
> information
> >>>>>> from memory.
> >>>>>>
> >>>>>> In the presence of badly behaved consumers, a consumer would have to
> >>>>>> pretend to
> >>>>>> be a member of a share group. There are several cases:
> >>>>>>
> >>>>>> a) If the share-partition leader still has in-memory state for the
> >>>> deleted
> >>>>>> share-group, it will
> >>>>>> continue to fetch records but it will be fenced by the share
> coordinator
> >>>>>> when it attempts to
> >>>>>> write its persistent state.
> >>>>>>
> >>>>>> b) If the share-partition leader does not have in-memory state, it
> will
> >>>>>> attempt to read it
> >>>>>> from the share coordinator and this will fail.
> >>>>>>
> >>>>>> 3) I will add metrics for the share coordinator today. This was an
> >>>>>> omission. Thanks for catching it.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Andrew
> >>>>>>
> >>>>>>> On 6 Mar 2024, at 17:53, Manikumar <manikumar.re...@gmail.com>
> wrote:
> >>>>>>>
> >>>>>>> Hi Andrew,
> >>>>>>>
> >>>>>>> Thanks for the updated KIP. Few queries below:
> >>>>>>>
> >>>>>>> 1. What is the use-case of deliveryCount in ShareFetchResponse?
> >>>>>>> 2. During delete share groups, Do we need to clean any in-memory
> state
> >>>>>> from
> >>>>>>> share-partition leaders?
> >>>>>>> 3. Any metrics for the share-coordinator?
> >>>>>>>
> >>>>>>> Thanks
> >>>>>>> Manikumar
> >>>>>>>
> >>>>>>> On Wed, Feb 21, 2024 at 12:11 AM Andrew Schofield <
> >>>>>>> andrew_schofield_j...@outlook.com> wrote:
> >>>>>>>
> >>>>>>>> Hi Manikumar,
> >>>>>>>> Thanks for your comments.
> >>>>>>>>
> >>>>>>>> 1. I believe that in general, there are not situations in which a
> >>>>>> dynamic
> >>>>>>>> config
> >>>>>>>> change is prevented because of the existence of a resource. So,
> if we
> >>>>>>>> prevented
> >>>>>>>> setting config `group.type=consumer` on resource G of GROUP type
> >>>>>>>> if there was a share group G in existence, it would be a bit
> weird.
> >>>>>>>>
> >>>>>>>> I wonder whether changing the config name to `new.group.type`
> would
> >>>>>> help.
> >>>>>>>> It’s
> >>>>>>>> ensuring the type of a new group created.
> >>>>>>>>
> >>>>>>>> 2. The behaviour for a DEAD share group is intended to be the
> same as
> >>>> a
> >>>>>>>> DEAD
> >>>>>>>> consumer group. The group cannot be “reused” again as such, but
> the
> >>>>>> group
> >>>>>>>> ID
> >>>>>>>> can be used by a new group.
> >>>>>>>>
> >>>>>>>> 3. Yes. AlterShareGroupOffsets will cause a new SHARE_CHECKPOINT.
> >>>>>>>>
> >>>>>>>> 4. In common with Admin.deleteConsumerGroups, the underlying
> Kafka RPC
> >>>>>>>> for Admin.deleteShareGroups is DeleteGroups. This is handled by
> the
> >>>>>> group
> >>>>>>>> coordinator and it does this by writing control records (a
> tombstone
> >>>> in
> >>>>>>>> this case).
> >>>>>>>> The KIP doesn’t say anything about this because it’s the same as
> >>>>>> consumer
> >>>>>>>> groups.
> >>>>>>>> Perhaps it would be sensible to add a GroupType to
> DeleteGroupsRequest
> >>>>>> so
> >>>>>>>> we can
> >>>>>>>> make sure we are deleting the correct type of group. The fact that
> >>>> there
> >>>>>>>> is not a specific
> >>>>>>>> RPC for DeleteShareGroups seems correct to me.
> >>>>>>>>
> >>>>>>>> 5. I prefer using “o.a.k.clients.consumer” because it’s already a
> >>>> public
> >>>>>>>> package and
> >>>>>>>> many of the classes and interfaces such as ConsumerRecord are in
> that
> >>>>>>>> package.
> >>>>>>>>
> >>>>>>>> I definitely need to add more information about how the Admin
> >>>> operations
> >>>>>>>> work.
> >>>>>>>> I will add a section to the KIP in the next version, later today.
> This
> >>>>>>>> will fill in details for
> >>>>>>>> your questions (3) and (4).
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Andrew
> >>>>>>>>
> >>>>>>>>> On 14 Feb 2024, at 18:04, Manikumar <manikumar.re...@gmail.com>
> >>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi Andrew,
> >>>>>>>>>
> >>>>>>>>> Thanks for the KIP. A few comments below.
> >>>>>>>>>
> >>>>>>>>> 1. kafka-configs.sh (incrementalAlterConfigs) allows you to
> >>>> dynamically
> >>>>>>>>> change the configs. Maybe in this case, we should not allow the
> user
> >>>> to
> >>>>>>>>> change `group.type` if it's already set.
> >>>>>>>>> 2. What's the behaviour after a group transitions into DEAD
> state. Do
> >>>>>> we
> >>>>>>>>> add new control records to reset the state? Can we reuse the
> group
> >>>>>> again?
> >>>>>>>>> 3. Are we going to write new control records after the
> >>>>>>>>> AlterShareGroupOffsets API to reset the state?
> >>>>>>>>> 4. Is there any API for DeleteShareGroups? I assume, group
> >>>> co-ordinator
> >>>>>>>> is
> >>>>>>>>> going to handle the API. If so, Does this mean the group
> co-ordinator
> >>>>>>>> also
> >>>>>>>>> needs to write control records?
> >>>>>>>>> 5. How about using "org.apache.kafka.clients.consumer.share"
> package
> >>>>>> for
> >>>>>>>>> new interfaces/classes?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Manikumar
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> >>
> >
>
>

Reply via email to