Hi Andrew, Thank you for the KIP, it is a great read ! I just have a small question.
28. I noticed that the "*--state*" and "*--timeout*" options are not mentioned for the kafka-share-groups.sh tool. Was this omission intentional, or is it possibly an oversight in the KIP? Thanks, Chirag On Mon, Feb 12, 2024 at 5:25 PM Andrew Schofield < andrew_schofield_j...@outlook.com> wrote: > Hi Jun > Thanks for your comments. > > 10. For read-uncommitted isolation level, the consumer just reads all > records. > For read-committed isolation level, the share-partition leader does the > filtering to > enable correct skipping of aborted records. The consumers in a share group > are not > aware of the filtering, unlike consumers in consumer groups. > > 11. The “classic” type is the pre-KIP 848 consumer groups. > > 12. By setting the configuration for a group resource, you are saying > “when a new group is > created with this name, it must have this type”. It’s not changing the > type of an existing > group. > > 13. Good catch. The Server Assignor should be at group level. I will > change it. > > 14. That is true. I have maintained it to keep similarity with consumer > groups, > but it is not currently exposed to clients. It might be best to remove it. > > 15. I had intended that SimpleAssignor implements > org.apache.kafka.clients.consumer.ConsumerPartitionAssignor. > Actually, I think there’s benefit to using a new interface so that someone > doesn’t inadvertently > configure something like the RoundRobinAssignor for a share group. It > wouldn’t go well. I will > add a new interface to the KIP. > > 16. When an existing member issues a ShareGroupHeartbeatRequest to the new > coordinator, > the coordinator returns UNKNOWN_MEMBER_ID. The client then sends another > ShareGroupHeartbeatRequest > containing no member ID and epoch 0. The coordinator then returns the > member ID. > > 17. I don’t think so. What is the client going to do with the exception? > Share groups are > intentionally removing some of the details of using Kafka offsets from the > consumers. If the > SPSO needs to be reset due to retention, it just does that automatically. > > 18. The proposed use of control records needs some careful thought. > 18.1. They’re written by the share-partition leader, not the coordinator. > 18.2. If the client commits the acknowledgement, it is only confirmed to > the client > once it has been replicated to the other replica brokers. So, committing > an acknowledgement > is very similar to sending a record to a topic in terms of the behaviour. > > 19. You are correct. The possibility of record duplication exists in > failure scenarios. A future KIP > will add EOS support for share groups. > > 20.1. Yes, an exception. I was thinking InvalidOffsetException. I will > update the KIP with more > detail about protocol error codes and API exceptions. > 20.2. I think that’s a mistake. I’ll rectify it. > > 21. The message sets for the new control records would be filtered out for > all consumers. > > 22. Fetch from follower is not supported. I will update the KIP. > > 23.1. I am not quite happy with the explanation of the checkpoint and > delta records. Essentially, > there needs to be one checkpoint and then any number of deltas. Then > another checkpoint supersedes > the previous records, and can have its own sequence of deltas, and so on. > Because recovery requires the > leader to read the latest checkpoint and all subsequent deltas, you want > to take checkpoints frequently > enough to speed up recovery, but infrequently enough to minimise the > performance impact of reserializing > all the state. > 23.2. I’ll check the document again carefully, but the SHARE_DELTA should > always contain DeliveryCount > for every member of the States array. > > 24. I was anticipating added to the index files which are part of each log > segment. > > 25. The acknowledgements for each topic-partition are atomic. All this > really means is that we perform the > state checking and the state persistence atomically (one control record). > The callback tells you whether the > acknowledgements for the entire topic-partition succeeded or failed, > rather than each record individually. > I could have gone with a callback with a record-based interface. Would > that be preferable, do you think? > For one thing, that does give more flexibility for optimisations such as > fetch pipelining in the future. > > 26. The metadata is unused. This is re-using an existing class > (OffsetAndMetadata). Perhaps it would be better > not to. > > 27. Yes, agreed. I will add it. > > Thanks, > Andrew > > > On 9 Feb 2024, at 23:14, Jun Rao <j...@confluent.io.INVALID> wrote: > > > > Hi, Andrew, > > > > Thanks for the KIP. A few comments below. > > > > 10. ShareFetchResponse: To consume transactional data, currently > > FetchResponse includes the AbortedTransactions fields for the client to > > properly skip aborted records. ShareFetchResponse doesn't include that. > How > > do we prevent the consumer from reading aborted records in a share group? > > > > 11. "adding "share" to the existing group types of "consumer" and > > "classic" " > > What's the "classic" type? > > > > 12. bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-name > > group --entity-name G1 --alter --add-config group.type=share > > So, one could change the group type? What happens to the states > associated > > with the group (members, epoch, offsets, etc)? > > > > 13. Why is Server Assignor at member level, instead of group level? > > > > 14. Member.metadata: How is that being used? It isn't exposed to the > client. > > > > 15. What public interface does SimpleAssignor implement? > > > > 16. "This means that existing members will have to rejoin the share group > > following a coordinator failover." > > When an existing member issues a ShareGroupHeartbeatRequest to the new > > coordinator, does the coordinator return UNKNOWN_MEMBER_ID and a new > > memberId? > > > > 17. auto.offset.reset has the option to throw an exception to the client > if > > the current offset does not exist any more on the server (e.g. due to > > retention). Should group.share.auto.offset.reset support that too? > > > > 18. SHARE_CHECKPOINT and SHARE_DELTA records: > > 18.1 When does the coordinator write them? > > 18.2 If the client commits the acknowledgement successfully, could > > the acknowledgement be lost on the broker if the coordinator fails over? > > > > 19. In the current consumer model, coordinator failover doesn't cause > > duplicate records in consumers. In the share group model, I guess this is > > no longer true since we are not persisting the acquired state? > > > > 20. "The calls to KafkaShareConsumer.acknowledge(ConsumerRecord, > > AcknowledgeType) must be issued in the order in which the records appear > in > > the ConsumerRecords object, which will be in order of increasing offset > for > > each share-partition." > > 20.1 What happens if the acknowledge() call doesn't follow this? Does > > the caller get an exception? > > 20.2 The example with Acknowledge 119. It seems the acknowledgement is > out > > of order since records at offset 111-118 haven't been acknowledged? > > > > 21. "Indeed, these message sets are not returned to consumer". Are we > > excluding those control records for non-shared consumers too? > > > > 22. The design doesn't seem to support fetching from the followers. This > > might be ok, but it will be useful to explicitly mention this. > > > > 23. Examples with control records for SHARE_DELTA: > > 23.1 Some of the state changes contain cumulative state instead of delta. > > For example, "record 110 (available, delivery count 1), records 111-118 > > acquired, record 119 acknowledged" for "Acknowledge 119". > > 23.2 SHARE_DELTA sometimes include available records with DeliveryCount > of > > 0. But we don't do that for every record. What's the convention? > > { > > "BaseOffset": 111, > > "LastOffset": 118, > > "State": 0 (Available), > > "DeliveryCount": 0 > > } > > > > 24. "when a broker becomes the leader of a share-partition, it must read > > the most recent SHARE_CHECKPOINT": How does a broker find this > efficiently > > on restart? > > > > 25. AcknowledgeCommitCallback: How would an application use it? It > doesn't > > indicate which record's acknowledgement has failed. > > > > 26. AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, > > Map<TopicPartition, OffsetAndMetadata> offsets): How is the metadata > used? > > It doesn't seem there is an API to use it in either the client > application > > or the broker. > > > > 27. It would be useful to add a section on downgradability since the KIP > > changes the record format in the internal offset topic. > > > > Thanks, > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Oct 11, 2023 at 8:25 AM Andrew Schofield < > > andrew_schofield_j...@outlook.com> wrote: > > > >> Hi Jack, > >> Thanks for your comments. > >> > >> I have added a new section on Log Retention which describes the > behaviour > >> of the SPSO as the LSO advances. That makes total sense > >> and was an omission from the KIP. > >> > >> I have added the other ideas as potential future work. I do like the > idea > >> of having the SPSO influence the advancements of the LSO > >> for topics which are primarily being using with share groups. > >> > >> I have published an updated version of the KIP. > >> > >> Thanks, > >> Andrew > >> > >>> On 4 Oct 2023, at 10:09, Jack Vanlightly <vanligh...@apache.org> > wrote: > >>> > >>> I would like to see more explicit discussion of topic retention and > >> share groups. There are a few options here from simple to more > >> sophisticated. There are also topic-level and share-group level options. > >>> > >>> The simple thing would be to ensure that the SPSO of each share group > is > >> bounded by the Log Start Offset (LSO) of each partition which itself is > >> managed by the retention policy. This is a topic-level control which > >> applies to all share-groups. I would say that this shared retention is > the > >> largest drawback of modeling queues on shared logs and this is worth > noting. > >>> > >>> More sophisticated approaches can be to allow the LSO to advance not > >> (only) by retention policy but by the advancement of the lowest SPSO. > This > >> can keep the amount of data lower by garbage collecting messages that > have > >> been acknowledged by all share groups. Some people may like that > behaviour > >> on those topics where share groups are the only consumption model and no > >> replay is needed. > >>> > >>> There are per-share-group possibilities such as share-group TTLs where > >> messages can be archived on a per share group basis. > >>> > >>> Thanks > >>> Jack > >> > >> > > -- [image: Confluent] <https://www.confluent.io> Chirag Wadhwa Software Engineer Intern +91 9873590730 <+91+9873590730> Follow us: [image: Blog] <https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog>[image: Twitter] <https://twitter.com/ConfluentInc> [image: Try Confluent Cloud for Free] <https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic>