Hi Andrew, 
Thanks for the KIP it is definitely an interesting read. I have few questions 
As the KIP proposing extending `AdminClient.incrementalAlterConfigs` to add an 
explicit `group.type` what would this means for DR feature in MM2 offering? 
Right now MM2 sync consumer group offsets from source to destination cluster. 
And it also offer sync ACLs which contribute to DR feature. Would this KIP 
means MM2 needs to also sync the type of groups to destination? 
As `AdminClient.incrementalAlterConfigs` means "when a new group is created 
with this name, it must have this type”. What will happened if clusters on both 
ends of MM2 has same group id but with different types? 
If this concern is out of the scope we might need to call this out somewhere in 
the KIP. 
While the number of share-group and the number of consumers in share-group is 
limited by `group.share.max.groups`and `group.share.max.size` the total number 
of share-group state records that might need to be loaded in-memeory has 
another factor which is the number of partitions. In cases where group is 
consuming from large number of topics with large number of partitions what will 
be the impact on coordinator memory?

Thanks 
Omnia


> On 25 Mar 2024, at 10:23, Andrew Schofield 
> <andrew_schofield_j...@outlook.com> wrote:
> 
> Hi Justine,
> Thanks for your questions.
> 
> There are several limits in this KIP. With consumer groups, we see problems
> where there are huge numbers of consumer groups, and we also see problems
> when there are huge number of members in a consumer group.
> 
> There’s a limit on the number of members in share group. When the limit is 
> reached,
> additional members are not admitted to the group. The members heartbeat to 
> remain
> in the group and that enables timely expiration.
> 
> There’s also a limit of the number of share groups in a cluster. Initially, 
> this limit has been
> set low. As a result, it would be possible to create sufficient groups to 
> reach the limit,
> and then creating additional groups will fail. It will be possible to delete 
> a share group
> administratively, but share groups do not automatically expire (just like 
> topics do not
> expire and queues in message-queuing systems do not expire).
> 
> The `kafka-console-share-consumer.sh` tool in the KIP defaults the group name 
> to
> “share”. This has two benefits. First, it means that the trivial exploratory 
> use of it running
> multiple concurrent copies will naturally get sharing of the records consumed.
> Second, it means that only one share group is being create, rather than 
> generating another
> group ID for each execution.
> 
> Please do re-read the read-committed section. I’ll grateful for all the 
> thoughtful reviews
> that the community is able to provide. The KIP says that broker-side filtering
> removes the records for aborted transactions. This is obviously quite a 
> difference compared
> with consumers in consumer groups. It think it would also be possible to do 
> it client-side
> but the records fetched from the replica manager are distributed among the 
> consumers,
> and I’m concerned that it would be difficult to distribute the list of 
> aborted transactions
> relevant to the records each consumer receives. I’m considering prototyping 
> client-side
> filtering to see how well it works in practice.
> 
> I am definitely thoughtful about the inter-broker hops in order to persist 
> the share-group
> state. Originally, I did look at writing the state directly into the user’s 
> topic-partitions
> because this means the share-partition leader would be able to write directly.
> This has downsides as documented in the “Rejected Alternatives” section of 
> the KIP.
> 
> We do have opportunities for pipelining and batching which I expect we will 
> exploit
> in order to improve the performance.
> 
> This KIP is only the beginning. I expect a future KIP will address storage of 
> metadata
> in a more performant way.
> 
> Thanks,
> Andrew
> 
>> On 21 Mar 2024, at 15:40, Justine Olshan <jols...@confluent.io.INVALID> 
>> wrote:
>> 
>> Thanks Andrew,
>> 
>> That answers some of the questions I have.
>> 
>> With respect to the limits -- how will this be implemented? One issue we
>> saw with producers is "short-lived" producers that send one message and
>> disconnect.
>> Due to how expiration works for producer state, if we have a simple limit
>> for producer IDs, all new producers are blocked until the old ones expire.
>> Will we block new group members as well if we reach our limit?
>> 
>> In the consumer case, we have a heartbeat which can be used for expiration
>> behavior and avoid the headache we see on the producer side, but I can
>> imagine a case where misuse of the groups themselves could occur -- ie
>> creating a short lived share group that I believe will take some time to
>> expire. Do we have considerations for this case?
>> 
>> I also plan to re-read the read-committed section and may have further
>> questions there.
>> 
>> You also mentioned in the KIP how there are a few inter-broker hops to the
>> share coordinator, etc for a given read operation of a partition. Are we
>> concerned about performance here? My work in transactions and trying to
>> optimize performance made me realize how expensive these inter-broker hops
>> can be.
>> 
>> Justine
>> 
>> On Thu, Mar 21, 2024 at 7:37 AM Andrew Schofield <
>> andrew_schofield_j...@outlook.com> wrote:
>> 
>>> Hi Justine,
>>> Thanks for your comment. Sorry for the delay responding.
>>> 
>>> It was not my intent to leave a query unanswered. I have modified the KIP
>>> as a result
>>> of the discussion and I think perhaps I didn’t neatly close off the email
>>> thread.
>>> 
>>> In summary:
>>> 
>>> * The share-partition leader does not maintain an explicit cache of
>>> records that it has
>>> fetched. When it fetches records, it does “shallow” iteration to look at
>>> the batch
>>> headers only so that it understands at least the base/last offset of the
>>> records within.
>>> It is left to the consumers to do the “deep” iteration of the records
>>> batches they fetch.
>>> 
>>> * It may sometimes be necessary to re-fetch records for redelivery. This
>>> is essentially
>>> analogous to two consumer groups independently fetching the same records
>>> today.
>>> We will be relying on the efficiency of the page cache.
>>> 
>>> * The zero-copy optimisation is not possible for records fetched for
>>> consumers in
>>> share groups. The KIP does not affect the use of the zero-copy
>>> optimisation for any
>>> scenarios in which it currently applies (this was not true in one earlier
>>> version of the KIP).
>>> 
>>> * I share concern about memory usage, partly because of the producer state
>>> management
>>> area. To keep a lid on memory use, the number of share groups, the number
>>> of members
>>> of each share group, and the number of in-flight messages per partition in
>>> a share group
>>> are all limited. The aim is to get the in-memory state to be nice and
>>> compact, probably
>>> at the expense of throughput. Over time, I’m sure we’ll optimise and get a
>>> bit more
>>> headroom. Limits like these cannot easily be applied retrospectively, so
>>> the limits are
>>> there right at the start.
>>> 
>>> * I have reworked the section on read-committed isolation level, and the
>>> complexity
>>> and memory usage of the approach is significantly improved.
>>> 
>>> I hope this answers your question.
>>> 
>>> Thanks,
>>> Andrew
>>> 
>>> 
>>>> On 18 Mar 2024, at 20:47, Justine Olshan <jols...@confluent.io.INVALID>
>>> wrote:
>>>> 
>>>> Hey Andrew,
>>>> 
>>>> I noticed you started the voting thread, but there seems to be a few
>>>> questions that were not answered. One was Jun's about memory usage
>>>>> How much additional heap memory will the server use? Do we need to cache
>>>> records in heap? If so, is the cache bounded?
>>>> 
>>>> Your response was
>>>>> This area needs more work. Using a share group surely gets the broker to
>>>> do
>>>> more manipulation of the data that it fetches than a regular consumer. I
>>>> want to minimise
>>>> this and need to research before providing a comprehensive answer. I
>>>> suspect zero-copy
>>>> is lost and that we do not cache records in heap. I will confirm later
>>> on.
>>>> 
>>>> I am also concerned about memory usage from my producer state management
>>>> work, so I want to make sure we have thought about it here -- not just in
>>>> the case Jun mentioned but generally.
>>>> 
>>>> Likewise, we have seen issues with large consumer groups and too many
>>>> producer IDs. Are there any concerns with an analogous situation with too
>>>> many share group members or share groups? Are they any ways we try to
>>>> handle this or mitigate risks with respect to memory usage and client
>>>> connections (wrt to rebalances for example)
>>>> 
>>>> Thanks,
>>>> 
>>>> Justine
>>>> 
>>>> On Fri, Mar 8, 2024 at 12:51 AM Andrew Schofield <
>>>> andrew_schofield_j...@outlook.com> wrote:
>>>> 
>>>>> Hi Manikumar,
>>>>> Thanks for your queries.
>>>>> 
>>>>> 1) Delivery count is added to the ConsumerRecord class so that a
>>> consumer
>>>>> can
>>>>> tell how often a record has been processed. I imagine that some
>>>>> applications might
>>>>> want to take different actions based on whether a record has previously
>>>>> failed. This
>>>>> enables richer error handling for bad records. In the future, I plan
>>>>> another KIP to
>>>>> enhance error handling.
>>>>> 
>>>>> 2) It is only possible to delete a share group which is empty. As a
>>>>> result, all
>>>>> well-behaved consumers will have closed their share sessions. After a
>>>>> short while,
>>>>> the share-partition leaders will discard the share-partition information
>>>>> from memory.
>>>>> 
>>>>> In the presence of badly behaved consumers, a consumer would have to
>>>>> pretend to
>>>>> be a member of a share group. There are several cases:
>>>>> 
>>>>> a) If the share-partition leader still has in-memory state for the
>>> deleted
>>>>> share-group, it will
>>>>> continue to fetch records but it will be fenced by the share coordinator
>>>>> when it attempts to
>>>>> write its persistent state.
>>>>> 
>>>>> b) If the share-partition leader does not have in-memory state, it will
>>>>> attempt to read it
>>>>> from the share coordinator and this will fail.
>>>>> 
>>>>> 3) I will add metrics for the share coordinator today. This was an
>>>>> omission. Thanks for catching it.
>>>>> 
>>>>> Thanks,
>>>>> Andrew
>>>>> 
>>>>>> On 6 Mar 2024, at 17:53, Manikumar <manikumar.re...@gmail.com> wrote:
>>>>>> 
>>>>>> Hi Andrew,
>>>>>> 
>>>>>> Thanks for the updated KIP. Few queries below:
>>>>>> 
>>>>>> 1. What is the use-case of deliveryCount in ShareFetchResponse?
>>>>>> 2. During delete share groups, Do we need to clean any in-memory state
>>>>> from
>>>>>> share-partition leaders?
>>>>>> 3. Any metrics for the share-coordinator?
>>>>>> 
>>>>>> Thanks
>>>>>> Manikumar
>>>>>> 
>>>>>> On Wed, Feb 21, 2024 at 12:11 AM Andrew Schofield <
>>>>>> andrew_schofield_j...@outlook.com> wrote:
>>>>>> 
>>>>>>> Hi Manikumar,
>>>>>>> Thanks for your comments.
>>>>>>> 
>>>>>>> 1. I believe that in general, there are not situations in which a
>>>>> dynamic
>>>>>>> config
>>>>>>> change is prevented because of the existence of a resource. So, if we
>>>>>>> prevented
>>>>>>> setting config `group.type=consumer` on resource G of GROUP type
>>>>>>> if there was a share group G in existence, it would be a bit weird.
>>>>>>> 
>>>>>>> I wonder whether changing the config name to `new.group.type` would
>>>>> help.
>>>>>>> It’s
>>>>>>> ensuring the type of a new group created.
>>>>>>> 
>>>>>>> 2. The behaviour for a DEAD share group is intended to be the same as
>>> a
>>>>>>> DEAD
>>>>>>> consumer group. The group cannot be “reused” again as such, but the
>>>>> group
>>>>>>> ID
>>>>>>> can be used by a new group.
>>>>>>> 
>>>>>>> 3. Yes. AlterShareGroupOffsets will cause a new SHARE_CHECKPOINT.
>>>>>>> 
>>>>>>> 4. In common with Admin.deleteConsumerGroups, the underlying Kafka RPC
>>>>>>> for Admin.deleteShareGroups is DeleteGroups. This is handled by the
>>>>> group
>>>>>>> coordinator and it does this by writing control records (a tombstone
>>> in
>>>>>>> this case).
>>>>>>> The KIP doesn’t say anything about this because it’s the same as
>>>>> consumer
>>>>>>> groups.
>>>>>>> Perhaps it would be sensible to add a GroupType to DeleteGroupsRequest
>>>>> so
>>>>>>> we can
>>>>>>> make sure we are deleting the correct type of group. The fact that
>>> there
>>>>>>> is not a specific
>>>>>>> RPC for DeleteShareGroups seems correct to me.
>>>>>>> 
>>>>>>> 5. I prefer using “o.a.k.clients.consumer” because it’s already a
>>> public
>>>>>>> package and
>>>>>>> many of the classes and interfaces such as ConsumerRecord are in that
>>>>>>> package.
>>>>>>> 
>>>>>>> I definitely need to add more information about how the Admin
>>> operations
>>>>>>> work.
>>>>>>> I will add a section to the KIP in the next version, later today. This
>>>>>>> will fill in details for
>>>>>>> your questions (3) and (4).
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Andrew
>>>>>>> 
>>>>>>>> On 14 Feb 2024, at 18:04, Manikumar <manikumar.re...@gmail.com>
>>> wrote:
>>>>>>>> 
>>>>>>>> Hi Andrew,
>>>>>>>> 
>>>>>>>> Thanks for the KIP. A few comments below.
>>>>>>>> 
>>>>>>>> 1. kafka-configs.sh (incrementalAlterConfigs) allows you to
>>> dynamically
>>>>>>>> change the configs. Maybe in this case, we should not allow the user
>>> to
>>>>>>>> change `group.type` if it's already set.
>>>>>>>> 2. What's the behaviour after a group transitions into DEAD state. Do
>>>>> we
>>>>>>>> add new control records to reset the state? Can we reuse the group
>>>>> again?
>>>>>>>> 3. Are we going to write new control records after the
>>>>>>>> AlterShareGroupOffsets API to reset the state?
>>>>>>>> 4. Is there any API for DeleteShareGroups? I assume, group
>>> co-ordinator
>>>>>>> is
>>>>>>>> going to handle the API. If so, Does this mean the group co-ordinator
>>>>>>> also
>>>>>>>> needs to write control records?
>>>>>>>> 5. How about using "org.apache.kafka.clients.consumer.share" package
>>>>> for
>>>>>>>> new interfaces/classes?
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Manikumar
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
> 

Reply via email to