Hi Daniel,
Thanks for your questions.

1) Yes, read_committed fetch will still be possible.

2) You weren’t wrong that this is a broad question :)

Broadly speaking, I can see two ways of managing the in-flight records:
the share-partition leader does it, or the share-group coordinator does it.
I want to choose what works best, and I happen to have started with trying
the share-partition leader doing it. This is just a whiteboard exercise at the
moment, looking at the potential protocol flows and how well it all hangs
together. When I have something coherent and understandable and worth
reviewing, I’ll update the KIP with a proposal.

I think it’s probably worth doing a similar exercise for the share-group
coordinator way too. There are bound to be pros and cons, and I don’t really
mind which way prevails.

If the share-group coordinator does it, I already have experience of efficient
storage of in-flight record state in a way that scales and is space-efficient.
If the share-partition leader does it, storage of in-flight state is a bit more
tricky.

I think it’s worth thinking ahead to how EOS will work and also another
couple of enhancements (key-based ordering and acquisition lock
extension) so it’s somewhat future-proof.

Thanks,
Andrew

> On 1 Jun 2023, at 11:51, Dániel Urbán <urb.dani...@gmail.com> wrote:
>
> Hi Andrew,
>
> Thank you for the KIP, exciting work you are doing :)
> I have 2 questions:
> 1. I understand that EOS won't be supported for share-groups (yet), but
> read_committed fetch will still be possible, correct?
>
> 2. I have a very broad question about the proposed solution: why not let
> the share-group coordinator manage the states of the in-flight records?
> I'm asking this because it seems to me that using the same pattern as the
> existing group coordinator would
> a, solve the durability of the message state storage (same method as the
> one used by the current group coordinator)
>
> b, pave the way for EOS with share-groups (same method as the one used by
> the current group coordinator)
>
> c, allow follower-fetching
> I saw your point about this: "FFF gives freedom to fetch records from a
> nearby broker, but it does not also give the ability to commit offsets to a
> nearby broker"
> But does it matter if message acknowledgement is not "local"? Supposedly,
> fetching is the actual hard work which benefits from follower fetching, not
> the group related requests.
>
> The only problem I see with the share-group coordinator managing the
> in-flight message state is that the coordinator is not aware of the exact
> available offsets of a partition, nor how the messages are batched. For
> this problem, maybe the share group coordinator could use some form of
> "logical" addresses, such as "the next 2 batches after offset X", or "after
> offset X, skip 2 batches, fetch next 2". Acknowledgements always contain
> the exact offset, but for the "unknown" sections of a partition, these
> logical addresses would be used. The coordinator could keep track of
> message states with a mix of offsets and these batch based addresses. The
> partition leader could support "skip X, fetch Y batches" fetch requests.
> This solution would need changes in the Fetch API to allow such batch based
> addresses, but I assume that fetch protocol changes will be needed
> regardless of the specific solution.
>
> Thanks,
> Daniel
>
> Andrew Schofield <andrew_schofi...@live.com> ezt írta (időpont: 2023. máj.
> 30., K, 18:15):
>
>> Yes, that’s it. I imagine something similar to KIP-848 for managing the
>> share group
>> membership, and consumers that fetch records from their assigned
>> partitions and
>> acknowledge when delivery completes.
>>
>> Thanks,
>> Andrew
>>
>>> On 30 May 2023, at 16:52, Adam Warski <a...@warski.org> wrote:
>>>
>>> Thanks for the explanation!
>>>
>>> So effectively, a share group is subscribed to each partition - but the
>> data is not pushed to the consumer, but only sent on demand. And when
>> demand is signalled, a batch of messages is sent?
>>> Hence it would be up to the consumer to prefetch a sufficient number of
>> batches to ensure, that it will never be "bored"?
>>>
>>> Adam
>>>
>>>> On 30 May 2023, at 15:25, Andrew Schofield <andrew_schofi...@live.com>
>> wrote:
>>>>
>>>> Hi Adam,
>>>> Thanks for your question.
>>>>
>>>> With a share group, each fetch is able to grab available records from
>> any partition. So, it alleviates
>>>> the “head-of-line” blocking problem where a slow consumer gets in the
>> way. There’s no actual
>>>> stealing from a slow consumer, but it can be overtaken and must
>> complete its processing within
>>>> the timeout.
>>>>
>>>> The way I see this working is that when a consumer joins a share group,
>> it receives a set of
>>>> assigned share-partitions. To start with, every consumer will be
>> assigned all partitions. We
>>>> can be smarter than that, but I think that’s really a question of
>> writing a smarter assignor
>>>> just as has occurred over the years with consumer groups.
>>>>
>>>> Only a small proportion of Kafka workloads are super high throughput.
>> Share groups would
>>>> struggle with those I’m sure. Share groups do not diminish the value of
>> consumer groups
>>>> for streaming. They just give another option for situations where a
>> different style of
>>>> consumption is more appropriate.
>>>>
>>>> Thanks,
>>>> Andrew
>>>>
>>>>> On 29 May 2023, at 17:18, Adam Warski <a...@warski.org> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> thank you for the proposal! A very interesting read.
>>>>>
>>>>> I do have one question, though. When you subscribe to a topic using
>> consumer groups, it might happen that one consumer has processed all
>> messages from its partitions, while another one still has a lot of work to
>> do (this might be due to unbalanced partitioning, long processing times
>> etc.). In a message-queue approach, it would be great to solve this problem
>> - so that a consumer that is free can steal work from other consumers. Is
>> this somehow covered by share groups?
>>>>>
>>>>> Maybe this is planned as "further work", as indicated here:
>>>>>
>>>>> "
>>>>> It manages the topic-partition assignments for the share-group
>> members. An initial, trivial implementation would be to give each member
>> the list of all topic-partitions which matches its subscriptions and then
>> use the pull-based protocol to fetch records from all partitions. A more
>> sophisticated implementation could use topic-partition load and lag metrics
>> to distribute partitions among the consumers as a kind of autonomous,
>> self-balancing partition assignment, steering more consumers to busier
>> partitions, for example. Alternatively, a push-based fetching scheme could
>> be used. Protocol details will follow later.
>>>>> "
>>>>>
>>>>> but I’m not sure if I understand this correctly. A fully-connected
>> graph seems like a lot of connections, and I’m not sure if this would play
>> well with streaming.
>>>>>
>>>>> This also seems as one of the central problems - a key differentiator
>> between share and consumer groups (the other one being persisting state of
>> messages). And maybe the exact way we’d want to approach this would, to a
>> certain degree, dictate the design of the queueing system?
>>>>>
>>>>> Best,
>>>>> Adam Warski
>>>>>
>>>>> On 2023/05/15 11:55:14 Andrew Schofield wrote:
>>>>>> Hi,
>>>>>> I would like to start a discussion thread on KIP-932: Queues for
>> Kafka. This KIP proposes an alternative to consumer groups to enable
>> cooperative consumption by consumers without partition assignment. You end
>> up with queue semantics on top of regular Kafka topics, with per-message
>> acknowledgement and automatic handling of messages which repeatedly fail to
>> be processed.
>>>>>>
>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
>>>>>>
>>>>>> Please take a look and let me know what you think.
>>>>>>
>>>>>> Thanks.
>>>>>> Andrew
>>>>>
>>>>
>>>
>>> --
>>> Adam Warski
>>>
>>> https://www.softwaremill.com/
>>> https://twitter.com/adamwarski


Reply via email to