Hi Jun
Thanks for your comments.

10. For read-uncommitted isolation level, the consumer just reads all records.
For read-committed isolation level, the share-partition leader does the 
filtering to
enable correct skipping of aborted records. The consumers in a share group are 
not
aware of the filtering, unlike consumers in consumer groups.

11. The “classic” type is the pre-KIP 848 consumer groups.

12. By setting the configuration for a group resource, you are saying “when a 
new group is
created with this name, it must have this type”. It’s not changing the type of 
an existing
group.

13. Good catch. The Server Assignor should be at group level. I will change it.

14. That is true. I have maintained it to keep similarity with consumer groups,
but it is not currently exposed to clients. It might be best to remove it.

15. I had intended that SimpleAssignor implements 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.
Actually, I think there’s benefit to using a new interface so that someone 
doesn’t inadvertently
configure something like the RoundRobinAssignor for a share group. It wouldn’t 
go well. I will
add a new interface to the KIP.

16. When an existing member issues a ShareGroupHeartbeatRequest to the new 
coordinator,
the coordinator returns UNKNOWN_MEMBER_ID. The client then sends another 
ShareGroupHeartbeatRequest
containing no member ID and epoch 0. The coordinator then returns the member ID.

17. I don’t think so. What is the client going to do with the exception? Share 
groups are
intentionally removing some of the details of using Kafka offsets from the 
consumers. If the
SPSO needs to be reset due to retention, it just does that automatically.

18. The proposed use of control records needs some careful thought.
18.1. They’re written by the share-partition leader, not the coordinator.
18.2. If the client commits the acknowledgement, it is only confirmed to the 
client
once it has been replicated to the other replica brokers. So, committing an 
acknowledgement
is very similar to sending a record to a topic in terms of the behaviour.

19. You are correct. The possibility of record duplication exists in failure 
scenarios. A future KIP
will add EOS support for share groups.

20.1. Yes, an exception. I was thinking InvalidOffsetException. I will update 
the KIP with more
detail about protocol error codes and API exceptions.
20.2. I think that’s a mistake. I’ll rectify it.

21. The message sets for the new control records would be filtered out for all 
consumers.

22. Fetch from follower is not supported. I will update the KIP.

23.1. I am not quite happy with the explanation of the checkpoint and delta 
records. Essentially,
there needs to be one checkpoint and then any number of deltas. Then another 
checkpoint supersedes
the previous records, and can have its own sequence of deltas, and so on. 
Because recovery requires the
leader to read the latest checkpoint and all subsequent deltas, you want to 
take checkpoints frequently
enough to speed up recovery, but infrequently enough to minimise the 
performance impact of reserializing
all the state.
23.2. I’ll check the document again carefully, but the SHARE_DELTA should 
always contain DeliveryCount
for every member of the States array.

24. I was anticipating added to the index files which are part of each log 
segment.

25. The acknowledgements for each topic-partition are atomic. All this really 
means is that we perform the
state checking and the state persistence atomically (one control record). The 
callback tells you whether the
acknowledgements for the entire topic-partition succeeded or failed, rather 
than each record individually.
I could have gone with a callback with a record-based interface. Would that be 
preferable, do you think?
For one thing, that does give more flexibility for optimisations such as fetch 
pipelining in the future.

26. The metadata is unused. This is re-using an existing class 
(OffsetAndMetadata). Perhaps it would be better
not to.

27. Yes, agreed. I will add it.

Thanks,
Andrew

> On 9 Feb 2024, at 23:14, Jun Rao <[email protected]> wrote:
> 
> Hi, Andrew,
> 
> Thanks for the KIP. A few comments below.
> 
> 10. ShareFetchResponse: To consume transactional data, currently
> FetchResponse includes the AbortedTransactions fields for the client to
> properly skip aborted records. ShareFetchResponse doesn't include that. How
> do we prevent the consumer from reading aborted records in a share group?
> 
> 11. "adding "share"  to the existing group types of "consumer"  and
> "classic" "
> What's the "classic" type?
> 
> 12. bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-name
> group --entity-name G1 --alter --add-config group.type=share
> So, one could change the group type? What happens to the states associated
> with the group (members, epoch, offsets, etc)?
> 
> 13. Why is Server Assignor at member level, instead of group level?
> 
> 14. Member.metadata: How is that being used? It isn't exposed to the client.
> 
> 15. What public interface does SimpleAssignor implement?
> 
> 16. "This means that existing members will have to rejoin the share group
> following a coordinator failover."
> When an existing member issues a ShareGroupHeartbeatRequest to the new
> coordinator, does the coordinator return UNKNOWN_MEMBER_ID and a new
> memberId?
> 
> 17. auto.offset.reset has the option to throw an exception to the client if
> the current offset does not exist any more on the server (e.g. due to
> retention). Should group.share.auto.offset.reset support  that too?
> 
> 18. SHARE_CHECKPOINT and SHARE_DELTA records:
> 18.1 When does the coordinator write them?
> 18.2 If the client commits the acknowledgement successfully, could
> the acknowledgement be lost on the broker if the coordinator fails over?
> 
> 19. In the current consumer model, coordinator failover doesn't cause
> duplicate records in consumers. In the share group model, I guess this is
> no longer true since we are not persisting the acquired state?
> 
> 20. "The calls to KafkaShareConsumer.acknowledge(ConsumerRecord,
> AcknowledgeType) must be issued in the order in which the records appear in
> the ConsumerRecords object, which will be in order of increasing offset for
> each share-partition."
> 20.1  What happens if the acknowledge() call doesn't follow this? Does
> the caller get an exception?
> 20.2 The example with Acknowledge 119. It seems the acknowledgement is out
> of order since records at offset 111-118 haven't been acknowledged?
> 
> 21. "Indeed, these message sets are not returned to consumer". Are we
> excluding those control records for non-shared consumers too?
> 
> 22. The design doesn't seem to support fetching from the followers. This
> might be ok, but it will be useful to explicitly mention this.
> 
> 23. Examples with control records for SHARE_DELTA:
> 23.1 Some of the state changes contain cumulative state instead of delta.
> For example, "record 110 (available, delivery count 1), records 111-118
> acquired, record 119 acknowledged" for "Acknowledge 119".
> 23.2 SHARE_DELTA sometimes include available records with DeliveryCount of
> 0. But we don't do that for every record. What's the convention?
>    {
>      "BaseOffset": 111,
>      "LastOffset": 118,
>      "State": 0 (Available),
>      "DeliveryCount": 0
>    }
> 
> 24. "when a broker becomes the leader of a share-partition, it must read
> the most recent SHARE_CHECKPOINT": How does a broker find this efficiently
> on restart?
> 
> 25. AcknowledgeCommitCallback: How would an application use it? It doesn't
> indicate which record's acknowledgement has failed.
> 
> 26. AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId,
> Map<TopicPartition, OffsetAndMetadata> offsets): How is the metadata used?
> It doesn't seem there is an API to use it in either the client application
> or the broker.
> 
> 27. It would be useful to add a section on downgradability since the KIP
> changes the record format in the internal offset topic.
> 
> Thanks,
> 
> Jun
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> On Wed, Oct 11, 2023 at 8:25 AM Andrew Schofield <
> [email protected]> wrote:
> 
>> Hi Jack,
>> Thanks for your comments.
>> 
>> I have added a new section on Log Retention which describes the behaviour
>> of the SPSO as the LSO advances. That makes total sense
>> and was an omission from the KIP.
>> 
>> I have added the other ideas as potential future work. I do like the idea
>> of having the SPSO influence the advancements of the LSO
>> for topics which are primarily being using with share groups.
>> 
>> I have published an updated version of the KIP.
>> 
>> Thanks,
>> Andrew
>> 
>>> On 4 Oct 2023, at 10:09, Jack Vanlightly <[email protected]> wrote:
>>> 
>>> I would like to see more explicit discussion of topic retention and
>> share groups. There are a few options here from simple to more
>> sophisticated. There are also topic-level and share-group level options.
>>> 
>>> The simple thing would be to ensure that the SPSO of each share group is
>> bounded by the Log Start Offset (LSO) of each partition which itself is
>> managed by the retention policy. This is a topic-level control which
>> applies to all share-groups. I would say that this shared retention is the
>> largest drawback of modeling queues on shared logs and this is worth noting.
>>> 
>>> More sophisticated approaches can be to allow the LSO to advance not
>> (only) by retention policy but by the advancement of the lowest SPSO. This
>> can keep the amount of data lower by garbage collecting messages that have
>> been acknowledged by all share groups. Some people may like that behaviour
>> on those topics where share groups are the only consumption model and no
>> replay is needed.
>>> 
>>> There are per-share-group possibilities such as share-group TTLs where
>> messages can be archived on a per share group basis.
>>> 
>>> Thanks
>>> Jack
>> 
>> 

Reply via email to