Hi Andrew,

Thank you for the KIP, it is a great read ! I just have a small question.

28. I noticed that the "*--state*" and "*--timeout*" options are not
mentioned for the kafka-share-groups.sh tool. Was this omission
intentional, or is it possibly an oversight in the KIP?
Thanks,
Chirag

On Mon, Feb 12, 2024 at 5:25 PM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

> Hi Jun
> Thanks for your comments.
>
> 10. For read-uncommitted isolation level, the consumer just reads all
> records.
> For read-committed isolation level, the share-partition leader does the
> filtering to
> enable correct skipping of aborted records. The consumers in a share group
> are not
> aware of the filtering, unlike consumers in consumer groups.
>
> 11. The “classic” type is the pre-KIP 848 consumer groups.
>
> 12. By setting the configuration for a group resource, you are saying
> “when a new group is
> created with this name, it must have this type”. It’s not changing the
> type of an existing
> group.
>
> 13. Good catch. The Server Assignor should be at group level. I will
> change it.
>
> 14. That is true. I have maintained it to keep similarity with consumer
> groups,
> but it is not currently exposed to clients. It might be best to remove it.
>
> 15. I had intended that SimpleAssignor implements
> org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.
> Actually, I think there’s benefit to using a new interface so that someone
> doesn’t inadvertently
> configure something like the RoundRobinAssignor for a share group. It
> wouldn’t go well. I will
> add a new interface to the KIP.
>
> 16. When an existing member issues a ShareGroupHeartbeatRequest to the new
> coordinator,
> the coordinator returns UNKNOWN_MEMBER_ID. The client then sends another
> ShareGroupHeartbeatRequest
> containing no member ID and epoch 0. The coordinator then returns the
> member ID.
>
> 17. I don’t think so. What is the client going to do with the exception?
> Share groups are
> intentionally removing some of the details of using Kafka offsets from the
> consumers. If the
> SPSO needs to be reset due to retention, it just does that automatically.
>
> 18. The proposed use of control records needs some careful thought.
> 18.1. They’re written by the share-partition leader, not the coordinator.
> 18.2. If the client commits the acknowledgement, it is only confirmed to
> the client
> once it has been replicated to the other replica brokers. So, committing
> an acknowledgement
> is very similar to sending a record to a topic in terms of the behaviour.
>
> 19. You are correct. The possibility of record duplication exists in
> failure scenarios. A future KIP
> will add EOS support for share groups.
>
> 20.1. Yes, an exception. I was thinking InvalidOffsetException. I will
> update the KIP with more
> detail about protocol error codes and API exceptions.
> 20.2. I think that’s a mistake. I’ll rectify it.
>
> 21. The message sets for the new control records would be filtered out for
> all consumers.
>
> 22. Fetch from follower is not supported. I will update the KIP.
>
> 23.1. I am not quite happy with the explanation of the checkpoint and
> delta records. Essentially,
> there needs to be one checkpoint and then any number of deltas. Then
> another checkpoint supersedes
> the previous records, and can have its own sequence of deltas, and so on.
> Because recovery requires the
> leader to read the latest checkpoint and all subsequent deltas, you want
> to take checkpoints frequently
> enough to speed up recovery, but infrequently enough to minimise the
> performance impact of reserializing
> all the state.
> 23.2. I’ll check the document again carefully, but the SHARE_DELTA should
> always contain DeliveryCount
> for every member of the States array.
>
> 24. I was anticipating added to the index files which are part of each log
> segment.
>
> 25. The acknowledgements for each topic-partition are atomic. All this
> really means is that we perform the
> state checking and the state persistence atomically (one control record).
> The callback tells you whether the
> acknowledgements for the entire topic-partition succeeded or failed,
> rather than each record individually.
> I could have gone with a callback with a record-based interface. Would
> that be preferable, do you think?
> For one thing, that does give more flexibility for optimisations such as
> fetch pipelining in the future.
>
> 26. The metadata is unused. This is re-using an existing class
> (OffsetAndMetadata). Perhaps it would be better
> not to.
>
> 27. Yes, agreed. I will add it.
>
> Thanks,
> Andrew
>
> > On 9 Feb 2024, at 23:14, Jun Rao <j...@confluent.io.INVALID> wrote:
> >
> > Hi, Andrew,
> >
> > Thanks for the KIP. A few comments below.
> >
> > 10. ShareFetchResponse: To consume transactional data, currently
> > FetchResponse includes the AbortedTransactions fields for the client to
> > properly skip aborted records. ShareFetchResponse doesn't include that.
> How
> > do we prevent the consumer from reading aborted records in a share group?
> >
> > 11. "adding "share"  to the existing group types of "consumer"  and
> > "classic" "
> > What's the "classic" type?
> >
> > 12. bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-name
> > group --entity-name G1 --alter --add-config group.type=share
> > So, one could change the group type? What happens to the states
> associated
> > with the group (members, epoch, offsets, etc)?
> >
> > 13. Why is Server Assignor at member level, instead of group level?
> >
> > 14. Member.metadata: How is that being used? It isn't exposed to the
> client.
> >
> > 15. What public interface does SimpleAssignor implement?
> >
> > 16. "This means that existing members will have to rejoin the share group
> > following a coordinator failover."
> > When an existing member issues a ShareGroupHeartbeatRequest to the new
> > coordinator, does the coordinator return UNKNOWN_MEMBER_ID and a new
> > memberId?
> >
> > 17. auto.offset.reset has the option to throw an exception to the client
> if
> > the current offset does not exist any more on the server (e.g. due to
> > retention). Should group.share.auto.offset.reset support  that too?
> >
> > 18. SHARE_CHECKPOINT and SHARE_DELTA records:
> > 18.1 When does the coordinator write them?
> > 18.2 If the client commits the acknowledgement successfully, could
> > the acknowledgement be lost on the broker if the coordinator fails over?
> >
> > 19. In the current consumer model, coordinator failover doesn't cause
> > duplicate records in consumers. In the share group model, I guess this is
> > no longer true since we are not persisting the acquired state?
> >
> > 20. "The calls to KafkaShareConsumer.acknowledge(ConsumerRecord,
> > AcknowledgeType) must be issued in the order in which the records appear
> in
> > the ConsumerRecords object, which will be in order of increasing offset
> for
> > each share-partition."
> > 20.1  What happens if the acknowledge() call doesn't follow this? Does
> > the caller get an exception?
> > 20.2 The example with Acknowledge 119. It seems the acknowledgement is
> out
> > of order since records at offset 111-118 haven't been acknowledged?
> >
> > 21. "Indeed, these message sets are not returned to consumer". Are we
> > excluding those control records for non-shared consumers too?
> >
> > 22. The design doesn't seem to support fetching from the followers. This
> > might be ok, but it will be useful to explicitly mention this.
> >
> > 23. Examples with control records for SHARE_DELTA:
> > 23.1 Some of the state changes contain cumulative state instead of delta.
> > For example, "record 110 (available, delivery count 1), records 111-118
> > acquired, record 119 acknowledged" for "Acknowledge 119".
> > 23.2 SHARE_DELTA sometimes include available records with DeliveryCount
> of
> > 0. But we don't do that for every record. What's the convention?
> >    {
> >      "BaseOffset": 111,
> >      "LastOffset": 118,
> >      "State": 0 (Available),
> >      "DeliveryCount": 0
> >    }
> >
> > 24. "when a broker becomes the leader of a share-partition, it must read
> > the most recent SHARE_CHECKPOINT": How does a broker find this
> efficiently
> > on restart?
> >
> > 25. AcknowledgeCommitCallback: How would an application use it? It
> doesn't
> > indicate which record's acknowledgement has failed.
> >
> > 26. AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId,
> > Map<TopicPartition, OffsetAndMetadata> offsets): How is the metadata
> used?
> > It doesn't seem there is an API to use it in either the client
> application
> > or the broker.
> >
> > 27. It would be useful to add a section on downgradability since the KIP
> > changes the record format in the internal offset topic.
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Wed, Oct 11, 2023 at 8:25 AM Andrew Schofield <
> > andrew_schofield_j...@outlook.com> wrote:
> >
> >> Hi Jack,
> >> Thanks for your comments.
> >>
> >> I have added a new section on Log Retention which describes the
> behaviour
> >> of the SPSO as the LSO advances. That makes total sense
> >> and was an omission from the KIP.
> >>
> >> I have added the other ideas as potential future work. I do like the
> idea
> >> of having the SPSO influence the advancements of the LSO
> >> for topics which are primarily being using with share groups.
> >>
> >> I have published an updated version of the KIP.
> >>
> >> Thanks,
> >> Andrew
> >>
> >>> On 4 Oct 2023, at 10:09, Jack Vanlightly <vanligh...@apache.org>
> wrote:
> >>>
> >>> I would like to see more explicit discussion of topic retention and
> >> share groups. There are a few options here from simple to more
> >> sophisticated. There are also topic-level and share-group level options.
> >>>
> >>> The simple thing would be to ensure that the SPSO of each share group
> is
> >> bounded by the Log Start Offset (LSO) of each partition which itself is
> >> managed by the retention policy. This is a topic-level control which
> >> applies to all share-groups. I would say that this shared retention is
> the
> >> largest drawback of modeling queues on shared logs and this is worth
> noting.
> >>>
> >>> More sophisticated approaches can be to allow the LSO to advance not
> >> (only) by retention policy but by the advancement of the lowest SPSO.
> This
> >> can keep the amount of data lower by garbage collecting messages that
> have
> >> been acknowledged by all share groups. Some people may like that
> behaviour
> >> on those topics where share groups are the only consumption model and no
> >> replay is needed.
> >>>
> >>> There are per-share-group possibilities such as share-group TTLs where
> >> messages can be archived on a per share group basis.
> >>>
> >>> Thanks
> >>> Jack
> >>
> >>
>
>

-- 

[image: Confluent] <https://www.confluent.io>
Chirag Wadhwa
Software Engineer Intern
+91 9873590730 <+91+9873590730>
Follow us: [image: Blog]
<https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog>[image:
Twitter] <https://twitter.com/ConfluentInc>

[image: Try Confluent Cloud for Free]
<https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic>

Reply via email to