Hi,
77. I’ve updated the KIP to use log retention rather than log compaction.
The basic ideas of what to persist are unchanged. It makes a few changes:

* It changes the record names: ShareCheckpoint -> ShareSnapshot and
  ShareDelta -> ShareUpdate. They’re equivalent, but renaming makes it
  simple to check I did an atomic change to the new proposal.
* It uses log retention and explicit pruning of elderly records using
  ReplicaManager.deleteRecords
* It gets rid of the nasty DeltaIndex scheme because we don’t need to worry
  about the log compactor and key uniqueness.

I have also changed the ambiguous “State” to “DeliveryState” in RPCs
and records.

And I added a clarification about how the “group.type” configuration should
be used.

Thanks,
Andrew

> On 10 Apr 2024, at 15:33, Andrew Schofield <andrew_schofield_j...@live.com> 
> wrote:
> 
> Hi Jun,
> Thanks for your questions.
> 
> 41.
> 41.1. The partition leader obtains the state epoch in the response from
> ReadShareGroupState. When it becomes a share-partition leader,
> it reads the share-group state and one of the things it learns is the
> current state epoch. Then it uses the state epoch in all subsequent
> calls to WriteShareGroupState. The fencing is to prevent writes for
> a previous state epoch, which are very unlikely but which would mean
> that a leader was using an out-of-date epoch and was likely no longer
> the current leader at all, perhaps due to a long pause for some reason.
> 
> 41.2. If the group coordinator were to set the SPSO, wouldn’t it need
> to discover the initial offset? I’m trying to avoid yet another inter-broker
> hop.
> 
> 42.
> 42.1. I think I’ve confused things. When the share group offset is altered
> using AdminClient.alterShareGroupOffsets, the group coordinator WILL
> update the state epoch. I don’t think it needs to update the group epoch
> at the same time (although it could) because the group epoch will have
> been bumped when the group became empty. If the share group offset
> is altered multiple times when the group remains empty, it would be
> harmless if the same state epoch was reused to initialize the state.
> 
> When the share-partition leader updates the SPSO as a result of
> the usual flow of record delivery, it does not update the state epoch.
> 
> 42.2. The share-partition leader will notice the alteration because,
> when it issues WriteShareGroupState, the response will contain the
> error code FENCED_STATE_EPOCH. This is supposed to be the
> last-resort way of catching this.
> 
> When the share-partition leader handles its first ShareFetch request,
> it learns the state epoch from the response to ReadShareGroupState.
> 
> In normal running, the state epoch will remain constant, but, when there
> are no consumers and the group is empty, it might change. As a result,
> I think it would be sensible when the set of share sessions transitions
> from 0 to 1, which is a reasonable proxy for the share group transitioning
> from empty to non-empty, for the share-partition leader to issue
> ReadShareGroupOffsetsState to validate the state epoch. If its state
> epoch is out of date, it can then ReadShareGroupState to re-initialize.
> 
> I’ve changed the KIP accordingly.
> 
> 47, 56. If I am to change BaseOffset to FirstOffset, we need to have
> a clear view of which is the correct term. Having reviewed all of the
> instances, my view is that BaseOffset should become FirstOffset in
> ALL schemas defined in the KIP. Then, BaseOffset is just used in
> record batches, which is already a known concept.
> 
> Please let me know if you agree.
> 
> 60. I’ve added FindCoordinator to the top level index for protocol changes.
> 
> 61. OK. I expect you are correct about how users will be using the
> console share consumer. When I use the console consumer, I always get
> a new consumer group. I have changed the default group ID for console
> share consumer to “console-share-consumer” to match the console consumer
> better and give more of an idea where this mysterious group has come from.
> 
> 77. I will work on a proposal that does not use compaction and we can
> make a judgement about whether it’s a better course for KIP-932. Personally,
> until I’ve written it down and lived with the ideas for a few days, I won’t be
> able to choose which I prefer.
> 
> I should be able to get the proposal written by the end of this week.
> 
> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs matches
> ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs from KIP-848.
> I prefer to maintain the consistency.
> 
> 101. Thanks for catching this. The ShareGroupHeartbeatResponse was originally
> created from KIP-848. This part of the schema does not apply and I have 
> removed
> it. I have also renamed AssignedTopicPartitions to simply TopicPartitions 
> which
> aligns with the actual definition of ConsumerGroupHeartbeatResponse.
> 
> 102. No, I don’t think we do. Removed.
> 
> 103. I’ve changed the description for the error codes for ShareFetchResponse.
> 
> 104. Interesting. I have added ErrorMessages to these RPCs as you suggest.
> It’s a good improvement for problem determination.
> 
> 105. The values are reserved in my brain. Actually, 1 is Acquired which
> is not persisted, and I have another non-terminal state in mind for 3.
> 
> 106. A few people have raised the question of whether OffsetAndMetadata
> is sensible in this KIP, given that the optional Metadata part comes from when
> a regular consumer commits offsets. However, it is correct that there will
> never be metadata with a share group. I have changed
> the KIP to replace OffsetAndMetadata with Long.
> 
> 107. Yes, you are right. I have learnt during this process that a version bump
> can be a logical not just a physical change to the schema. KIP updated.
> 
> 108. I would prefer not to extend this RPC for all of the states at this 
> point.
> I think there is definitely scope for another KIP focused on administration
> of share groups that might want this information so someone could build a
> UI and other tools on top. Doing that well is quite a lot of work in its own 
> right
> so I would prefer not to do that now.
> 
> 109.Yes, they’re inconsistent and follow the consumer groups equivalents
> which are also inconsistent. In general, the KafkaAdmin.delete…. Methods
> use the Map<XXX, KafkaFuture<YYY>> pattern like DeleteShareGroupsResult.
> 
> Would you prefer that I do that, or remain consistent with consumer groups?
> Happy to change it.
> 
> 110. Again, consumer groups don’t yield that level of detail about epochs.
> The MemberDescription does include the assignment, but not the list of
> subscribed topic names.
> 
> 111. I didn’t include GroupState in GroupListing because there’s no
> single class that includes the states of all group types.
> 
> 112. I think it’s good practice for the API to have ListShareGroupOffsetSpec.
> It makes evolution and extension of the API much easier. Also, it matches
> the precedent set by ListConsumerGroupOffsetSpec.
> 
> 113. ListConsumerGroupsResults.errors() is the same. I think you just have
> to look in the exception details and the same pattern is being followed here.
> 
> 
> Over the next few days, I have committed to writing a proposal for how to 
> persist
> share-group state that doesn’t use log compaction.
> 
> I am also aware that discussion with Justine Olshan on read-committed
> isolation level is not yet quite complete.
> 
> Thanks for the detailed review.
> 
> Andrew
> 
>> On 9 Apr 2024, at 23:58, Jun Rao <j...@confluent.io.INVALID> wrote:
>> 
>> Hi, Andrew,
>> 
>> Thanks for the reply. A few more comments.
>> 
>> 41.
>> 41.1 How does the partition leader obtain the group epoch to set
>> WriteShareGroupStateRequest.StateEpoch?
>> 41.2 What's the benefit of having the group coordinator initialize the
>> state and the partition leader set the SPSO? It seems simpler to have the
>> partition leader initialize both the state and the SPSO together?
>> 
>> 42.
>> 42.1 "I don’t think the group epoch needs to be bumped when the share group
>> offset is altered."
>> But the part on handling Alter share group offsets says "The share
>> coordinator writes a ShareCheckpoint record with the new state epoch to the
>> __share_group_state  topic." So, which is correct? We have two paths to
>> update the state in the share coordinator, one from the group coordinator
>> and another from the partition leader. I thought the benefit of bumping up
>> the epoch is to fence off a late request in the previous epoch from another
>> path.
>> 42.2 When the group coordinator alters the share group offset in share
>> coordinator, how does the partition leader know the share group state has
>> been altered so that it could clear its in-memory state?
>> 
>> 47. 56. BaseOffset typically refers to the base offset for the batch and
>> can be confusing. FirstOffset is clearer and matches LastOffset.
>> 
>> 60. Could we include FindCoordinatorRequest in the top level index for
>> Kafka protocol changes?
>> 
>> 61. I think it's probably ok to add time-based expiration later. But using
>> a default group in console-share-consumer probably won't help reduce the
>> garbage. In the common case, the user of the console consumer likely wants
>> to see the recently produced records for verification. If the default group
>> doesn't provide that (because of the stale state), the user will likely
>> just use a new group. It's true that we don't garbage collect idle topics.
>> However,  the share groups are similar to consumers, which does support
>> garbage collection. Typically, we read topics more than creating them.
>> 
>> 77. If the generic compaction is inconvenient, we could use customized
>> logic. If we go with that route, option (b) seems cleaner and more
>> optimized. Since the share states for all groups fit in memory, we could
>> generate snapshots more efficiently than going through compaction. Having a
>> separate log per share partition is probably too much overhead. It's more
>> efficient to put the state changes for multiple share partitions in a
>> single log.
>> 
>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs: Should we name it
>> SessionTimeoutMs?
>> 
>> 101. ShareGroupHeartbeatResponse.Assignment.Error: What kind of error could
>> we have when assigning partitions? What are the corresponding error codes?
>> 
>> 102. Do we still need
>> ShareGroupDescribeResponse.Members.Assignment.{MetadataVersion,MetadataBytes}?
>> 
>> 103. Could we list the error codes separately for
>> ShareFetchResponse.Responses.Partitions.ErrorCode and
>> ShareFetchResponse.Responses.Partitions.AcknowledgeErrorCode?
>> 
>> 104. Should we add error message for the errorCode in ShareFetchResponse,
>> ShareAcknowledgeResponse, ReadShareGroupStateResponse,
>> WriteShareGroupStateResponse, DeleteShareGroupStateResponse,
>> ReadShareGroupOffsetsStateResponse and InitializeShareGroupStateResponse?
>> 
>> 105. "about": "The state - 0:Available,2:Acked,4:Archived.": What about 1
>> and 3? Are we leaving them out intentionally?
>> 
>> 106. Do we have usage of metadata in OffsetAndMetadata? If not, could we
>> remove it from AdminClient and KafkaShareConsumer?
>> 
>> 107. ListGroupsRequest: Should we bump up the version since it now supports
>> a new group type "share"?
>> 
>> 108. AdminClient.listShareGroupOffsets: Should it expose all the states
>> from ReadShareGroupStateResponse, instead of just SPSO?
>> 
>> 109. DeleteShareGroupOffsetsResult exposes
>> public KafkaFuture<Void> partitionResult(final TopicPartition partition)
>> DeleteShareGroupsResult exposes
>> public Map<String, KafkaFuture<Void>> deletedGroups()
>> Should we make them more consistent?
>> 
>> 110. Should ShareGroupDescription include fields like GroupEpoch,
>> AssignmentEpoch, MemberEpoch, and SubscribedTopicNames?
>> 
>> 111. Should GroupListing include GroupState?
>> 
>> 112. Do we need ListShareGroupOffsetsSpec? Could we just use
>> Set<TopicPartition> directly?
>> 
>> 113. ListShareGroupsResult.errors(): How do we know which group has an
>> error?
>> 
>> Jun
>> 
>> On Mon, Apr 8, 2024 at 9:32 AM Andrew Schofield <
>> andrew_schofield_j...@outlook.com> wrote:
>> 
>>> Hi David,
>>> Thanks for your questions.
>>> 
>>> 70. The Group Coordinator communicates with the Share Coordinator over
>>> RPCs.
>>> In the general case, it’s an inter-broker call. It is probably possible to
>>> optimise
>>> for the situation in which the appropriate GC and SC shards are
>>> co-located, but the
>>> KIP does not delve that deep into potential performance optimisations.
>>> 
>>> 71. Avoiding collisions would be a good idea, but I do worry about
>>> retrospectively
>>> introducing a naming convention for groups. I feel that naming conventions
>>> will
>>> typically be the responsibility of the cluster administrators based on
>>> organizational
>>> factors, such as the name of an application.
>>> 
>>> 72. Personally, I don’t like INVALID_GROUP_ID because the group ID is
>>> correct but
>>> the group is the wrong type. The nearest existing error code that gets
>>> that across
>>> is INCONSISTENT_GROUP_PROTOCOL. Perhaps this is really showing that a new
>>> error code would be better.
>>> 
>>> 73. The Metadata fields are not used. I have removed them.
>>> 
>>> 74. The metadata is re-evaluated on every change, but only a subset is
>>> relevant
>>> for rebalancing. A check is done against the names of the subscribed
>>> topics to
>>> see if any relevant changes may have occurred. Then the changes which
>>> trigger
>>> a rebalance are topic creation, deletion, change in partitions, or rack
>>> IDs for the
>>> replicas. I have updated the KIP to make this more clear.
>>> 
>>> 75. The assignment is not persisted because it is much less important that
>>> the
>>> assignment survives a GC change. There’s no need to transfer partitions
>>> safely from
>>> member to member in the way that is required for consumer groups, so as an
>>> optimisation, the assignments for a share group are not persisted. It
>>> wouldn’t do any
>>> harm, but it just seems unnecessary.
>>> 
>>> 76. In the event that a consumer tries to acknowledge a record that it now
>>> longer
>>> has the right to acknowledge, the INVALID_RECORD_STATE error code is used.
>>> 
>>> If the application uses the KafkaShareConsumer.commitSync method, it will
>>> see an InvalidRecordState exception returned. Alternatively, the
>>> application can
>>> register an acknowledgement commit callback which will be called with the
>>> status
>>> of the acknowledgements that have succeeded or failed.
>>> 
>>> 77. I have tried to tread a careful path with the durable share-partition
>>> state in this
>>> KIP. The significant choices I made are that:
>>> * Topics are used so that the state is replicated between brokers.
>>> * Log compaction is used to keep a lid on the storage.
>>> * Only one topic is required.
>>> 
>>> Log compaction as it stands is not ideal for this kind of data, as
>>> evidenced by
>>> the DeltaIndex technique I employed.
>>> 
>>> I can think of a few relatively simple ways to improve upon it.
>>> 
>>> a) We could use a customised version of the log compactor for this topic
>>> that
>>> understands the rules for ShareCheckpoint and ShareDelta records.
>>> Essentially,
>>> for each share-partition, the latest ShareCheckpoint and any subsequent
>>> ShareDelta
>>> records must not be cleaned. Anything else can be cleaned. We could then
>>> be sure
>>> that multiple ShareDelta records with the same key would survive cleaning
>>> and we could
>>> abandon the DeltaIndex technique.
>>> 
>>> b) Actually what is required is a log per share-partition. Let’s imagine
>>> that we had
>>> a share-state topic per topic being consumed in a share group, with the
>>> same number
>>> of partitions as the topic being consumed. We could write many more deltas
>>> between
>>> checkpoints, and just take periodic checkpoints to keep control of the
>>> storage used.
>>> Once a checkpoint has been taken, we could use KafkaAdmin.deleteRecords()
>>> to
>>> prune all of the older records.
>>> 
>>> The share-state topics would be more numerous, but we are talking one per
>>> topic
>>> per share group that it’s being consumed in. These topics would not be
>>> compacted.
>>> 
>>> As you’ll see in the KIP, the Persister interface is intended to be
>>> pluggable one day.
>>> I know the scheme in the KIP is not ideal. It seems likely to me that
>>> future KIPs will
>>> improve upon it.
>>> 
>>> If I can get buy-in for option (b), I’m happy to change this KIP. While
>>> option (a) is
>>> probably workable, it does seem a bit of a hack to have a customised log
>>> compactor
>>> just for this topic.
>>> 
>>> 78. How about DeliveryState? I agree that State is overloaded.
>>> 
>>> 79. See (77).
>>> 
>>> Thanks,
>>> Andrew
>>> 
>>> 
>>>> On 5 Apr 2024, at 05:07, David Arthur <david.art...@confluent.io.INVALID>
>>> wrote:
>>>> 
>>>> Andrew, thanks for the KIP! This is a pretty exciting effort.
>>>> 
>>>> I've finally made it through the KIP, still trying to grok the whole
>>> thing.
>>>> Sorry if some of my questions are basic :)
>>>> 
>>>> 
>>>> Concepts:
>>>> 
>>>> 70. Does the Group Coordinator communicate with the Share Coordinator
>>> over
>>>> RPC or directly in-process?
>>>> 
>>>> 71. For preventing name collisions with regular consumer groups, could we
>>>> define a reserved share group prefix? E.g., the operator defines "sg_"
>>> as a
>>>> prefix for share groups only, and if a regular consumer group tries to
>>> use
>>>> that name it fails.
>>>> 
>>>> 72. When a consumer tries to use a share group, or a share consumer tries
>>>> to use a regular group, would INVALID_GROUP_ID make more sense
>>>> than INCONSISTENT_GROUP_PROTOCOL?
>>>> 
>>>> --------
>>>> 
>>>> Share Group Membership:
>>>> 
>>>> 73. What goes in the Metadata field for TargetAssignment#Member and
>>>> Assignment?
>>>> 
>>>> 74. Under Trigger a rebalance, it says we rebalance when the partition
>>>> metadata changes. Would this be for any change, or just certain ones? For
>>>> example, if a follower drops out of the ISR and comes back, we probably
>>>> don't need to rebalance.
>>>> 
>>>> 75. "For a share group, the group coordinator does *not* persist the
>>>> assignment" Can you explain why this is not needed?
>>>> 
>>>> 76. " If the consumer just failed to heartbeat due to a temporary pause,
>>> it
>>>> could in theory continue to fetch and acknowledge records. When it
>>> finally
>>>> sends a heartbeat and realises it’s been kicked out of the group, it
>>> should
>>>> stop fetching records because its assignment has been revoked, and rejoin
>>>> the group."
>>>> 
>>>> A consumer with a long pause might still deliver some buffered records,
>>> but
>>>> if the share group coordinator has expired its session, it wouldn't
>>> accept
>>>> acknowledgments for that share consumer. In such a case, is any kind of
>>>> error raised to the application like "hey, I know we gave you these
>>>> records, but really we shouldn't have" ?
>>>> 
>>>> 
>>>> -----
>>>> 
>>>> Record Delivery and acknowledgement
>>>> 
>>>> 77. If we guarantee that a ShareCheckpoint is written at least every so
>>>> often, could we add a new log compactor that avoids compacting
>>> ShareDelta-s
>>>> that are still "active" (i.e., not yet superceded by a new
>>>> ShareCheckpoint). Mechnically, this could be done by keeping the LSO no
>>>> greater than the oldest "active" ShareCheckpoint. This might let us
>>> remove
>>>> the DeltaIndex thing.
>>>> 
>>>> 78. Instead of the State in the ShareDelta/Checkpoint records, how about
>>>> MessageState? (State is kind of overloaded/ambiguous)
>>>> 
>>>> 79. One possible limitation with the current persistence model is that
>>> all
>>>> the share state is stored in one topic. It seems like we are going to be
>>>> storing a lot more state than we do in __consumer_offsets since we're
>>>> dealing with message-level acks. With aggressive checkpointing and
>>>> compaction, we can mitigate the storage requirements, but the throughput
>>>> could be a limiting factor. Have we considered other possibilities for
>>>> persistence?
>>>> 
>>>> 
>>>> Cheers,
>>>> David
>>> 
>>> 
> 

Reply via email to