rreddy-22 commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1353832984
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ########## @@ -898,6 +900,47 @@ public void createGroupTombstoneRecords(List<Record> records) { records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId())); } + @Override + public boolean isEmpty() { + return isInState(EMPTY); + } + + /** + * Return the offset expiration condition to be used for this group. This is based on several factors + * such as the group state, the protocol type, and the GroupMetadata record version. + * + * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition} + * + * @return The offset expiration condition for the group or Empty of no such condition exists. + */ + @Override + public Optional<OffsetExpirationCondition> offsetExpirationCondition() { + if (protocolType.isPresent()) { + if (isInState(EMPTY)) { + // No consumer exists in the group => + // - If current state timestamp exists and retention period has passed since group became Empty, + // expire all offsets with no pending offset commit; + // - If there is no current state timestamp (old group metadata schema) and retention period has passed + // since the last commit timestamp, expire the offset + return Optional.of(new OffsetExpirationConditionImpl( + offsetAndMetadata -> currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs)) + ); + } else if (usesConsumerGroupProtocol() && subscribedTopics.isPresent() && isInState(STABLE)) { + // Consumers exist in the group and group is Stable => + // - If the group is aware of the subscribed topics and retention period had passed since the + // last commit timestamp, expire the offset. offset with pending offset commit are not Review Comment: nit: Offsets* with pending o...are not expired.* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org