Hi Matthias,
Thanks for your comments. This KIP is clearly quite a big piece of work and 
it’s not complete at this stage. I think it’s a good
principle to develop it in the community. I’m prototyping the code and will 
soon start filling in some of the missing details
with concrete proposals for changes to the protocol and so on.

(1) The “fetch” request from a share-group consumer will be recognisable as 
such. The broker will know not to offer a preferred
replica for fetch-from-follower.

(2) Pattern subscription is not included in this initial KIP to reduce 
complexity. A pattern subscription has the characteristic
that the set of matching topics can change after the subscription has already 
been established.

(3) I think that “auto.offset.reset” should not be supported for share groups. 
Because each consumer gets its own configuration,
the initial offset is essentially unpredictable and I prefer to be explicit in 
this. If you want the share-group to start
at the latest offset, you need do nothing other than start your consumers. If 
you want the share-group to start at the earliest
offset, you need to use an Admin API or kafka-share-groups.sh. If you decide 
upon the earliest offset and use a topic which
has copious amounts of tiered data, there’s quite a performance implication to 
the decision. That’s why I have done it like this.

(3B) If the data is purged broker-side, the SPSO will leap forwards over the 
gap in a similar way as a consumer group
leaps over records which are purged.

(3C) Good question. The purged records logically become ARCHIVED.

(3D) Most messaging systems have the idea of message expiration. There’s an API 
option to say “this message expires after
X hours” and the messaging system will silently discard expired messages when 
their expiration intervals have elapsed.
Kafka does offer a kind of expiration in the form of retention, but it’s done 
in terms of the age of log segments rather than
the individual records within. There’s also the option of retention based on 
size.

In this environment, the question is how best to offer queuing semantics while 
working with the retention and log cleaning
behaviour that Kafka has. Personally, I think that time-based log retention 
offers an approximate equivalence of message
expiration in other systems. You typically do not know which unconsumed 
messages were expired. I can see that it would
perhaps be interesting to have metrics for this, but knowing precisely which 
records were archived without being acked
seems tricky.

Using a share-group with a compacted topic would give quite an unusual 
behaviour for a queue, but then it’s also an
unusual behaviour for a topic. I personally would avoid this combination.

(4) The design is per-message, but the code will reflect the fact that Kafka 
itself is more per-batch. Here’s an example of
what I mean.

When the first share-group consumer fetches records to process, the records 
will be fetched using the replica
manager. When reading from the log, it’s much cheaper to deal in batches of 
records rather than iterating over the
individual records. If a new batch of records is added to the in-flight records 
(so none of them has yet been delivered
for this share-partition), the batch of records will all be returned to one 
consumer. If the consumer processes and acknowledges
all of the records in the batch, this is more efficient than a mixture of 
acknowledgements and rejections.

In more complicated situations where perhaps one record in a batch was released 
but the rest were acknowledged,
we now have to re-deliver that individual record. A share-consumer’s fetch 
response will be able to contain a record which
is no longer being delivered with the rest of its original batch, but it’s less 
efficient.

This is what I mean by “prefer”. When it’s possible to deliver and acknowledge 
batches in their entirety, that’s what the
code will do and it will be at its most efficient. When batches become split, 
it will just be a bit more work.

I think this covers (4B), (4D) and (4E).

(4A) We keep record states per record. We could use any number of encoding 
formats to optimise the space this requires.

(4C) My current thinking is that each consumer will be allowed to express a 
preference for how much data can be returned
on each fetch request.

(5A) When I was writing the KIP, I played around with a few different ways of 
acknowledging the records and the API in the
KIP is the result of that thinking. Since most applications will simply iterate 
along the ConsumerRecords, saying that the
acknowledgements have to be in the same order doesn’t seem that burdensome.

(5B) There’s no shortcut way to release or reject all records in a batch at 
once. If you want to continue processing afterwards,
you must reject or release each record. If you want to give up processing, just 
close the consumer which releases the records.

(5C) ConsumerRecords could indeed include records from different partitions.

(6) Heartbeating and group membership will be built upon the new KIP-848 
protocol. This is the next piece I’m going
to work on.

(7A) We could do at-most-once. This KIP does not offer that.

(7B) I see from my inbox there’s been a bit of discussion about EOS. I need to 
write down with more precision
how it will work and then we can have a proper discussion. In short, it will be 
made to work in a way that fits
naturally with Kafka EOS. It’s not going to do anything wilfully wrong.

(7C) Salty?! Exactly-once delivery with acknowledgement inside Kafka 
transactions and acknowledgement
state written as control records to topics is entirely possible. It is 
definitely very precise work. It is beyond the scope
of this KIP.

(8) I am not entirely happy with the extent of the overloading in the API at 
the moment. While it seems like a nice
idea to use the existing KafkaConsumer and just configure it to use a share 
group, there are some downsides.

a) There are a lot of methods which do not apply to share groups.
b) If we enable `group.type=share` for any old KafkaConsumer, embedded ones 
such as exist in Kafka Connect would
get surprising behaviour.

I am going to put an alternative API into the next update to the KIP which 
doesn’t use the KafkaConsumer interface
but has its own interface with the same shape, just omitting all of the baggage 
of unimplemented methods. Then you
choose between a consumer group and a share group by your choice of consumer 
class. The code snippets in the KIP
would be almost identical.

Actually, I was talking to Tom Bentley about this subject at Kafka Summit, so 
thanks to Tom.

(9)

(9A) I would look to expose the SPSO and SPEO in the 
Admin.listShareGroupOffsets. These are not committed offsets,
but they are the equivalent for a share group.

(9B) Share groups do not support commit. They support acknowledgement. I do not 
think auto-acknowledgement is
sensible with share groups. So, ‘enable.auto.commit’ is definitely a no-no, and 
I didn’t create `enable.auto,acknowledge`.

(9C) I did try to follow the existing Kafka config naming conventions.

`share.group.enable` matches the existing `delete.topic.enable`.

`share.delivery.count.limit` should not be a client configuration. I want to 
avoid one client’s configuration influencing the
experience of the other clients. If one client uses 
`share.delivery.count.limit=1`, it would mean that single client only permits
a single delivery attempt, which prevents redelivery in the other clients too.

`share.record.lock.duration.ms` is the server default config for the client’s 
`record.lock.duration.ms`. This naming is similar
to the broker’s `log.retention.ms` which is the server default of the topic 
config `retention.ms`.

(9D) It is important for maintaining progress of delivery to have a relatively 
small number of delivery attempts. This is why
I have specified a maximum of 10.

(9E) The only new consumer configuration `record.lock.duration.ms` has a broker 
equivalent.

(10) The content of the consumer buffer is a detail of the implementation. The 
KIP is supposed to be the specification
of the interface.

I do want to ensure atomicity. Basically, if a KafkaConsumer.poll() delivers a 
bunch of records which are acknowledged
by the client, either the acknowledgements of all of them are recorded durably 
or none of them.

The risk of “excessive” redelivery is mitigated by restricting the number of 
in-flight records and limiting the number of
delivery attempts.


My immediate plan is to make a significant update to the KIP in the next few 
days that addresses:

* Write down how the KIP-848 RPCs are used by share groups.
* Describe an alternative to KafkaConsumer that is very similar in style but 
does not have all of the unimplemented
methods which are needed because of reusing KafkaConsumer for share groups. 
I’ll put both in the KIP and see what
people think.
* Document the consumer configurations.

Thanks,
Andrew


> On 6 Jul 2023, at 18:26, Matthias J. Sax <mj...@apache.org> wrote:
>
> Thanks for the KIP.
>
> It seems we are in very early stage, and some very important sections in the 
> KIP are still marked as TODO. In particular, I am curious about the protocol 
> changes, how the "queuing state" will be represented and made durable, and 
> all the error edge case / fail-over / fencing (broker/clients) that we need 
> to put in place.
>
>
> A few other comments/question from my side:
>
> (1) Fetch from follower: this was already touched on, but the point is really 
> that the consumer does not decide about it, but the broker does. When a 
> consumer sends it's first fetch request it will always go to the leader, and 
> the broker would reply to the consumer "go and fetch from this other broker". 
> -- I think it's ok to exclude fetch from follower in the first version of the 
> KIP, but it would need a broker change such that the broker knows it's a 
> "queue fetch" request. -- It would also be worth to explore how fetch from 
> follow could work in the future and ensure that our initial design allows for 
> it and is future proof.
>
>
> (2) Why do we not allow pattern subscription and what happens if different 
> consumers subscribe to different topics? It's not fully explained in the KIP.
>
>
> (3) auto.offset.reset and SPSO/SPSE -- I don't understand why we would not 
> allow auto.offset.reset? In the discussion, you mentioned that "first 
> consumer would win, if two consumers have a different config" -- while this 
> is correct, it's the same for a consumer group right now. Maybe we should not 
> try to solve a "non problem"? -- In general, my impression is that we are 
> going to do Kafkaeque Queuing, what is fine, but it might be to our advantage 
> to carry over as many established concepts as we can? And if not, have a very 
> good reason not to.
>
> In the end, it find if very clumsy to only have an admin API to change the 
> starting point of a consumer.
>
> (3B) What happens if lag grows and data is purged broker side?
>
> (3C) What happens if the broker released records (based on "timeout / 
> exceeding deliver count), and the "ack/reject" comes afterwards?
>
> (3D) How to find out what records got archived but where not acked (ie, lost) 
> for re-processing/debugging purpose? The question was already asked and the 
> answer was "not supported", but I think it would be must-have before the 
> feature is usable in production? We can of course also only do it in a future 
> release and not the first "MVP" implementation, but the KIP should address 
> it. In the end, the overall group monitoring story is missing.
>
>
> (4) I am also wondering about the overall design with regard to "per record" 
> vs "per batch" granularity. In the end, queuing usually aims for "per 
> records" semantics, but "per record" implies to keep track of a lot of 
> metadata. Kafka is designed on a "per batch" granularity, and it's unclear to 
> me how both will go together?
>
> (4A) Do we keep "ack/reject/..." state per-record, or per batch? It seems per 
> record, but it would require to hold a lot of meta-data. Also, how does it 
> work for the current protocol, is a batch is partially acked and we need to 
> re-deliver? Would we add metadata and the let client filter acked messages 
> (similar to how "read-committed" mode works)?
>
> (4B) What does "the share-partition leader prefers to return complete
> record batches." exactly mean? "Prefers" is a fuzzy word. What happens if we 
> cannot return a complete record batch?
>
> (4C) What happens if different consumer of the same group configure different 
> batch sizes for fetching records? How do we track the corresponding meta-data?
>
> (4D)
>
>> In the situation where some records in a batch have been released or 
>> rejected separately, subsequent fetches of those records are more likely to 
>> have gaps.
>
> What does this mean?
>
> (4E)
>
>> For efficiency, the consumer preferentially returns complete record sets 
>> with no gaps
>
> Can you elaborate on the details?
>
>
> API contract:
>
> (5A)
>> acks must be issued in the order in which the records appear
>
> Why is this the case? Sounds like an arbitrary restriction to me? Can you 
> share your reasoning?
>
>
> (5B) How to "reject" (or just "release") all records of a batch at once? It 
> seem the API only allows to "ack" all record of a batch at once.
>
> (5C) Currently, `ConsumerRecords` object may contain records from different 
> partitions? Would this still be the case?
>
>
> (6) Group management / re-balancing:
>
> (6A) The KIP should explain better how heart-beating works (was already 
> partially discussed). How does `max.poll.interval.ms` interact? Would it 
> trigger a "release" of records if violated?
>
> (6B) You mentioned that a consumer that does not heartbeat would just be 
> removed from the group with a rebalance: Given the current design to assign 
> all partitions to every consumer in the group, that would be ok. But as you 
> mentioned on the KIP, we might want to be more clever with regard to 
> assigning partitions in the future, and I think we would actually need to 
> trigger a rebalance to avoid a later protocol change: otherwise, partition X 
> could be assigned to a single consumer and could become an offline partitions 
> if we don't rebalance and re-assign it if its current consumer stops 
> heartbeating.
>
>
> (7) Delivery Semantics
>
> (7A) You state that we aim for "at-least-once delivery". But why could we not 
> also provide "at-most-once delivery" (ie, fire an forget)?
>
> (7B) There was also a discussion about "read-commmitted" and reading beyond 
> the LSO and what happens if consumer have different configs. First, you need 
> to understand that aborted records are filtered client side, and thus for 
> "read-committed" we can never read beyond the LSO, and the same seems to 
> apply for queuing. Second, if different client have different config, it 
> seems "ok" -- of course you can consider it a miss-configuration, but the 
> same issue applies to Kafka now. I am not sure if we want to try fixing a 
> non-issue? In the end, the user should be able to configure the client 
> however they want, and we should not artificially restrict it IMHO.
>
> Btw: "read-committed" mode does NOT give you "exaclty-once"!
>
> (7C)
>> Finally, this KIP does not include support for acknowledging delivery using 
>> transactions for exactly-once semantics. Conceptually, this is quite 
>> straightforward but would take changes to the API.
>
> Doing exactly-once delivery is not possible. For a "consumer only" 
> application, talking about transactions does not make any sense, and 
> "conceptually, this is quite straightforward" is (sorry for being salty) just 
> a non-sense statement.
>
>
> (8) API design:
>
> (8A) Are we sure we want to overload the semantics of 
> `beginningOffsets/endOffsets` ?
>
>
> (8B) Why does `close()` not commit?
>
> (8C) Why do the following methods throw?
> - enforceRebalance
> - offsetsForTimes
> - pause / paused / resume
>
>
> (9) Configs. In general I am wondering how existing consumer config would 
> work, or to what extend configs would "disable" each other, and how 
> established mechanism align.
>
> (9A) Committing (auto-commit configs?)? Is there a consumer group "status / 
> lag" (in the end if we don't commit an offset, how can users monitor the 
> progress of the group)? We could for example commit the SPSO to give an upper 
> bound on lag, without breaking existing tooling?
> Or we commit both SPSO and SPSE and change consumer-group tool to show both?
>
> (9B) Your example code shows "enable.auto.commit=false" -- is this a 
> requirement? Overall, it seem we have a "auto commit" mechanism, so should 
> this config be `true` -- or why do we not actually allow for both auto-commit 
> and manual commit? The KIP does not really discuss tradeoffs / reasoning 
> about it.
>
> (9C) Config names:
>
> `share.group.enable` -> `shared.group.enabled` ?
>
> `share.delivery.count.limit` -> Should this be a client config? And should we 
> add a broker config `max.share.delivery.count.limit`? This would follow 
> established Kafka patterns.
>
> `share.record.lock.duration.ms` -> `default.share.record.lock.duration.ms` ?
>
> (9D) Why do we have hard limits broker side (eg maximum of 10 for 
> `share.delivery.count.limit`). Seems rather arbitrary. If there is no good 
> reason, should we ship with a good default but no hard-coded limit what the 
> user can config?
>
> (9E) Should we have a client and broker config for all new configs (ie, 
> broker set a max/min, but client can pick within those bounds)
>
>
>
> (10) Couple of question about certain statements:
>
>> If any records in the batch were not acknowledged, they remain acquired and 
>> will be presented to the application in response to a future poll.
>
> Do the un-acked records stay in the consumer buffer? Or would 
> `ConsumerRecords` be "purged" for this case?
>
>
>> the share-partition leader guarantees that acknowledgements for the records 
>> in a batch are performed atomically.
>
> How are ackds for partial batches handled? And do you want to ensure 
> atomicity?
>
>
>> and the Acquired state is not persisted. This minimises the amount of 
>> share-partition state that has to be logged.
>
> Is there any concerned about excessive re-delivery in case of error and a 
> large "window"? What are the tradeoffs with regard to re-delivery vs 
> maintaining too much meta-data?
>
>
> Thanks for reading all this...
>
>
>
> -Matthias
>
>
>
> On 7/1/23 4:42 AM, Kamal Chandraprakash wrote:
>> Hi Andrew,
>> Thank you for the KIP -- interesting read. I have some questions:
>> 101. "The calls to KafkaConsumer.acknowledge(ConsumerRecord,
>> AcknowledgeType) must be
>> issued in the order in which the records appear in the ConsumerRecords
>> object, which will
>> be in order of increasing offset for each share-partition"
>> If the share-consumer uses thread pool internally and acknowledges the
>> records in out-of-order fashion.
>> Will this use case be supported? The "Managing durable share-partition
>> state" have transitions where the
>> records are ack'ed in out-of-order fashion so want to confirm this.
>> 102. Will the configs be maintained in fine-grain per topic-to-share-group?
>> Some share-consumer groups
>> may want to increase the "record.lock.duration.ms" dynamically if record
>> processing is taking longer time
>> than usual during external system outage/downtime.
>> 103. Can we also define whether all the consumer configs are eligible for
>> share-consumer-group. (eg)
>> `max.poll.interval.ms` default is 5 mins. Will this config have any effect
>> on the share consumers?
>> 104. How will the consumer quota work? Will it be similar to the existing
>> consumer quota mechanism?
>> --
>> Kamal
>> On Wed, Jun 7, 2023 at 9:17 PM Andrew Schofield <
>> andrew_schofield_j...@outlook.com> wrote:
>>> Hi Daniel,
>>> True, I see your point. It’s analogous to a KafkaConsumer fetching
>>> uncommitted records but not delivering them to the application.
>>>
>>> Thanks,
>>> Andrew
>>>
>>>> On 7 Jun 2023, at 16:38, Dániel Urbán <urb.dani...@gmail.com> wrote:
>>>>
>>>> Hi Andrew,
>>>>
>>>> I think the "pending" state could be the solution for reading beyond the
>>>> LSO. Pending could indicate that a message is not yet available for
>>>> consumption (so they won't be offered for consumers), but with
>>> transactions
>>>> ending, they can become "available". With a pending state, records
>>> wouldn't
>>>> "disappear", they would simply not show up until they become available on
>>>> commit, or archived on abort.
>>>>
>>>> Regardless, I understand that this might be some extra, unwanted
>>>> complexity, I just thought that with the message ordering guarantee gone,
>>>> it would be a cool feature for share-groups. I've seen use-cases where
>>> the
>>>> LSO being blocked for an extended period of time caused huge lag for
>>>> traditional read_committed consumers, which could be completely avoided
>>> by
>>>> share-groups.
>>>>
>>>> Thanks,
>>>> Daniel
>>>>
>>>> Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta (időpont:
>>>> 2023. jún. 7., Sze, 17:28):
>>>>
>>>>> Hi Daniel,
>>>>> Kind of. I don’t want a transaction abort to cause disappearance of
>>>>> records which are already in-flight. A “pending” state doesn’t seem
>>>>> helpful for read_committed. There’s no such disappearance problem
>>>>> for read_uncommitted.
>>>>>
>>>>> Thanks,
>>>>> Andrew
>>>>>
>>>>>> On 7 Jun 2023, at 16:19, Dániel Urbán <urb.dani...@gmail.com> wrote:
>>>>>>
>>>>>> Hi Andrew,
>>>>>>
>>>>>> I agree with having a single isolation.level for the whole group, it
>>>>> makes
>>>>>> sense.
>>>>>> As for:
>>>>>> "b) The default isolation level for a share group is read_committed, in
>>>>>> which case
>>>>>> the SPSO and SPEO cannot move past the LSO."
>>>>>>
>>>>>> With this limitation (SPEO not moving beyond LSO), are you trying to
>>>>> avoid
>>>>>> handling the complexity of some kind of a "pending" state for the
>>>>>> uncommitted in-flight messages?
>>>>>>
>>>>>> Thanks,
>>>>>> Daniel
>>>>>>
>>>>>> Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta
>>> (időpont:
>>>>>> 2023. jún. 7., Sze, 16:52):
>>>>>>
>>>>>>> HI Daniel,
>>>>>>> I’ve been thinking about this question and I think this area is a bit
>>>>>>> tricky.
>>>>>>>
>>>>>>> If there are some consumers in a share group with isolation level
>>>>>>> read_uncommitted
>>>>>>> and other consumers with read_committed, they have different
>>>>> expectations
>>>>>>> with
>>>>>>> regards to which messages are visible when EOS comes into the picture.
>>>>>>> It seems to me that this is not necessarily a good thing.
>>>>>>>
>>>>>>> One option would be to support just read_committed in KIP-932. This
>>>>> means
>>>>>>> it is unambiguous which records are in-flight, because they’re only
>>>>>>> committed
>>>>>>> ones.
>>>>>>>
>>>>>>> Another option would be to have the entire share group have an
>>> isolation
>>>>>>> level,
>>>>>>> which again gives an unambiguous set of in-flight records but without
>>>>> the
>>>>>>> restriction of permitting just read_committed behaviour.
>>>>>>>
>>>>>>> So, my preference is for the following:
>>>>>>> a) A share group has an isolation level that applies to all consumers
>>> in
>>>>>>> the group.
>>>>>>> b) The default isolation level for a share group is read_committed, in
>>>>>>> which case
>>>>>>> the SPSO and SPEO cannot move past the LSO.
>>>>>>> c) For a share group with read_uncommited isolation level, the SPSO
>>> and
>>>>>>> SPEO
>>>>>>> can move past the LSO.
>>>>>>> d) The kafka_configs.sh tool or the AdminClient can be used to set a
>>>>>>> non-default
>>>>>>> value for the isolation level for a share group. The value is applied
>>>>> when
>>>>>>> the first
>>>>>>> member joins.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Andrew
>>>>>>>
>>>>>>>> On 2 Jun 2023, at 10:02, Dániel Urbán <urb.dani...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Hi Andrew,
>>>>>>>> Thank you for the clarification. One follow-up to read_committed
>>> mode:
>>>>>>>> Taking the change in message ordering guarantees into account, does
>>>>> this
>>>>>>>> mean that in queues, share-group consumers will be able to consume
>>>>>>>> committed records AFTER the LSO?
>>>>>>>> Thanks,
>>>>>>>> Daniel
>>>>>>>>
>>>>>>>> Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta
>>>>> (időpont:
>>>>>>>> 2023. jún. 2., P, 10:39):
>>>>>>>>
>>>>>>>>> Hi Daniel,
>>>>>>>>> Thanks for your questions.
>>>>>>>>>
>>>>>>>>> 1) Yes, read_committed fetch will still be possible.
>>>>>>>>>
>>>>>>>>> 2) You weren’t wrong that this is a broad question :)
>>>>>>>>>
>>>>>>>>> Broadly speaking, I can see two ways of managing the in-flight
>>>>> records:
>>>>>>>>> the share-partition leader does it, or the share-group coordinator
>>>>> does
>>>>>>> it.
>>>>>>>>> I want to choose what works best, and I happen to have started with
>>>>>>> trying
>>>>>>>>> the share-partition leader doing it. This is just a whiteboard
>>>>> exercise
>>>>>>> at
>>>>>>>>> the
>>>>>>>>> moment, looking at the potential protocol flows and how well it all
>>>>>>> hangs
>>>>>>>>> together. When I have something coherent and understandable and
>>> worth
>>>>>>>>> reviewing, I’ll update the KIP with a proposal.
>>>>>>>>>
>>>>>>>>> I think it’s probably worth doing a similar exercise for the
>>>>> share-group
>>>>>>>>> coordinator way too. There are bound to be pros and cons, and I
>>> don’t
>>>>>>>>> really
>>>>>>>>> mind which way prevails.
>>>>>>>>>
>>>>>>>>> If the share-group coordinator does it, I already have experience of
>>>>>>>>> efficient
>>>>>>>>> storage of in-flight record state in a way that scales and is
>>>>>>>>> space-efficient.
>>>>>>>>> If the share-partition leader does it, storage of in-flight state
>>> is a
>>>>>>> bit
>>>>>>>>> more
>>>>>>>>> tricky.
>>>>>>>>>
>>>>>>>>> I think it’s worth thinking ahead to how EOS will work and also
>>>>> another
>>>>>>>>> couple of enhancements (key-based ordering and acquisition lock
>>>>>>>>> extension) so it’s somewhat future-proof.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Andrew
>>>>>>>>>
>>>>>>>>>> On 1 Jun 2023, at 11:51, Dániel Urbán <urb.dani...@gmail.com>
>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi Andrew,
>>>>>>>>>>
>>>>>>>>>> Thank you for the KIP, exciting work you are doing :)
>>>>>>>>>> I have 2 questions:
>>>>>>>>>> 1. I understand that EOS won't be supported for share-groups (yet),
>>>>> but
>>>>>>>>>> read_committed fetch will still be possible, correct?
>>>>>>>>>>
>>>>>>>>>> 2. I have a very broad question about the proposed solution: why
>>> not
>>>>>>> let
>>>>>>>>>> the share-group coordinator manage the states of the in-flight
>>>>> records?
>>>>>>>>>> I'm asking this because it seems to me that using the same pattern
>>> as
>>>>>>> the
>>>>>>>>>> existing group coordinator would
>>>>>>>>>> a, solve the durability of the message state storage (same method
>>> as
>>>>>>> the
>>>>>>>>>> one used by the current group coordinator)
>>>>>>>>>>
>>>>>>>>>> b, pave the way for EOS with share-groups (same method as the one
>>>>> used
>>>>>>> by
>>>>>>>>>> the current group coordinator)
>>>>>>>>>>
>>>>>>>>>> c, allow follower-fetching
>>>>>>>>>> I saw your point about this: "FFF gives freedom to fetch records
>>>>> from a
>>>>>>>>>> nearby broker, but it does not also give the ability to commit
>>>>> offsets
>>>>>>>>> to a
>>>>>>>>>> nearby broker"
>>>>>>>>>> But does it matter if message acknowledgement is not "local"?
>>>>>>> Supposedly,
>>>>>>>>>> fetching is the actual hard work which benefits from follower
>>>>> fetching,
>>>>>>>>> not
>>>>>>>>>> the group related requests.
>>>>>>>>>>
>>>>>>>>>> The only problem I see with the share-group coordinator managing
>>> the
>>>>>>>>>> in-flight message state is that the coordinator is not aware of the
>>>>>>> exact
>>>>>>>>>> available offsets of a partition, nor how the messages are batched.
>>>>> For
>>>>>>>>>> this problem, maybe the share group coordinator could use some form
>>>>> of
>>>>>>>>>> "logical" addresses, such as "the next 2 batches after offset X",
>>> or
>>>>>>>>> "after
>>>>>>>>>> offset X, skip 2 batches, fetch next 2". Acknowledgements always
>>>>>>> contain
>>>>>>>>>> the exact offset, but for the "unknown" sections of a partition,
>>>>> these
>>>>>>>>>> logical addresses would be used. The coordinator could keep track
>>> of
>>>>>>>>>> message states with a mix of offsets and these batch based
>>> addresses.
>>>>>>> The
>>>>>>>>>> partition leader could support "skip X, fetch Y batches" fetch
>>>>>>> requests.
>>>>>>>>>> This solution would need changes in the Fetch API to allow such
>>> batch
>>>>>>>>> based
>>>>>>>>>> addresses, but I assume that fetch protocol changes will be needed
>>>>>>>>>> regardless of the specific solution.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Daniel
>>>>>>>>>>
>>>>>>>>>> Andrew Schofield <andrew_schofi...@live.com> ezt írta (időpont:
>>>>> 2023.
>>>>>>>>> máj.
>>>>>>>>>> 30., K, 18:15):
>>>>>>>>>>
>>>>>>>>>>> Yes, that’s it. I imagine something similar to KIP-848 for
>>> managing
>>>>>>> the
>>>>>>>>>>> share group
>>>>>>>>>>> membership, and consumers that fetch records from their assigned
>>>>>>>>>>> partitions and
>>>>>>>>>>> acknowledge when delivery completes.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Andrew
>>>>>>>>>>>
>>>>>>>>>>>> On 30 May 2023, at 16:52, Adam Warski <a...@warski.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the explanation!
>>>>>>>>>>>>
>>>>>>>>>>>> So effectively, a share group is subscribed to each partition -
>>> but
>>>>>>> the
>>>>>>>>>>> data is not pushed to the consumer, but only sent on demand. And
>>>>> when
>>>>>>>>>>> demand is signalled, a batch of messages is sent?
>>>>>>>>>>>> Hence it would be up to the consumer to prefetch a sufficient
>>>>> number
>>>>>>> of
>>>>>>>>>>> batches to ensure, that it will never be "bored"?
>>>>>>>>>>>>
>>>>>>>>>>>> Adam
>>>>>>>>>>>>
>>>>>>>>>>>>> On 30 May 2023, at 15:25, Andrew Schofield <
>>>>>>> andrew_schofi...@live.com
>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Adam,
>>>>>>>>>>>>> Thanks for your question.
>>>>>>>>>>>>>
>>>>>>>>>>>>> With a share group, each fetch is able to grab available records
>>>>>>> from
>>>>>>>>>>> any partition. So, it alleviates
>>>>>>>>>>>>> the “head-of-line” blocking problem where a slow consumer gets
>>> in
>>>>>>> the
>>>>>>>>>>> way. There’s no actual
>>>>>>>>>>>>> stealing from a slow consumer, but it can be overtaken and must
>>>>>>>>>>> complete its processing within
>>>>>>>>>>>>> the timeout.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The way I see this working is that when a consumer joins a share
>>>>>>>>> group,
>>>>>>>>>>> it receives a set of
>>>>>>>>>>>>> assigned share-partitions. To start with, every consumer will be
>>>>>>>>>>> assigned all partitions. We
>>>>>>>>>>>>> can be smarter than that, but I think that’s really a question
>>> of
>>>>>>>>>>> writing a smarter assignor
>>>>>>>>>>>>> just as has occurred over the years with consumer groups.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Only a small proportion of Kafka workloads are super high
>>>>>>> throughput.
>>>>>>>>>>> Share groups would
>>>>>>>>>>>>> struggle with those I’m sure. Share groups do not diminish the
>>>>> value
>>>>>>>>> of
>>>>>>>>>>> consumer groups
>>>>>>>>>>>>> for streaming. They just give another option for situations
>>> where
>>>>> a
>>>>>>>>>>> different style of
>>>>>>>>>>>>> consumption is more appropriate.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 29 May 2023, at 17:18, Adam Warski <a...@warski.org> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> thank you for the proposal! A very interesting read.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I do have one question, though. When you subscribe to a topic
>>>>> using
>>>>>>>>>>> consumer groups, it might happen that one consumer has processed
>>> all
>>>>>>>>>>> messages from its partitions, while another one still has a lot of
>>>>>>> work
>>>>>>>>> to
>>>>>>>>>>> do (this might be due to unbalanced partitioning, long processing
>>>>>>> times
>>>>>>>>>>> etc.). In a message-queue approach, it would be great to solve
>>> this
>>>>>>>>> problem
>>>>>>>>>>> - so that a consumer that is free can steal work from other
>>>>> consumers.
>>>>>>>>> Is
>>>>>>>>>>> this somehow covered by share groups?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Maybe this is planned as "further work", as indicated here:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> "
>>>>>>>>>>>>>> It manages the topic-partition assignments for the share-group
>>>>>>>>>>> members. An initial, trivial implementation would be to give each
>>>>>>> member
>>>>>>>>>>> the list of all topic-partitions which matches its subscriptions
>>> and
>>>>>>>>> then
>>>>>>>>>>> use the pull-based protocol to fetch records from all partitions.
>>> A
>>>>>>> more
>>>>>>>>>>> sophisticated implementation could use topic-partition load and
>>> lag
>>>>>>>>> metrics
>>>>>>>>>>> to distribute partitions among the consumers as a kind of
>>>>> autonomous,
>>>>>>>>>>> self-balancing partition assignment, steering more consumers to
>>>>> busier
>>>>>>>>>>> partitions, for example. Alternatively, a push-based fetching
>>> scheme
>>>>>>>>> could
>>>>>>>>>>> be used. Protocol details will follow later.
>>>>>>>>>>>>>> "
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> but I’m not sure if I understand this correctly. A
>>>>> fully-connected
>>>>>>>>>>> graph seems like a lot of connections, and I’m not sure if this
>>>>> would
>>>>>>>>> play
>>>>>>>>>>> well with streaming.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This also seems as one of the central problems - a key
>>>>>>> differentiator
>>>>>>>>>>> between share and consumer groups (the other one being persisting
>>>>>>> state
>>>>>>>>> of
>>>>>>>>>>> messages). And maybe the exact way we’d want to approach this
>>> would,
>>>>>>> to
>>>>>>>>> a
>>>>>>>>>>> certain degree, dictate the design of the queueing system?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Adam Warski
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 2023/05/15 11:55:14 Andrew Schofield wrote:
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>> I would like to start a discussion thread on KIP-932: Queues
>>> for
>>>>>>>>>>> Kafka. This KIP proposes an alternative to consumer groups to
>>> enable
>>>>>>>>>>> cooperative consumption by consumers without partition assignment.
>>>>> You
>>>>>>>>> end
>>>>>>>>>>> up with queue semantics on top of regular Kafka topics, with
>>>>>>> per-message
>>>>>>>>>>> acknowledgement and automatic handling of messages which
>>> repeatedly
>>>>>>>>> fail to
>>>>>>>>>>> be processed.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Please take a look and let me know what you think.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Adam Warski
>>>>>>>>>>>>
>>>>>>>>>>>> https://www.softwaremill.com/
>>>>>>>>>>>> https://twitter.com/adamwarski
>>>
>>>
>>>

Reply via email to