This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 7b5d640cc65 KAFKA-14987; Implement Group/Offset expiration in the new coordinator (#14467) 7b5d640cc65 is described below commit 7b5d640cc656443a078bda096d01910b3edfdb37 Author: Jeff Kim <kimkb2...@gmail.com> AuthorDate: Thu Oct 12 02:45:13 2023 -0400 KAFKA-14987; Implement Group/Offset expiration in the new coordinator (#14467) This patch implements the groups and offsets expiration in the new group coordinator. Reviewers: Ritika Reddy <rre...@confluent.io>, David Jacot <dja...@confluent.io> --- checkstyle/suppressions.xml | 2 +- .../src/main/scala/kafka/server/BrokerServer.scala | 4 +- .../org/apache/kafka/coordinator/group/Group.java | 16 +- .../coordinator/group/GroupCoordinatorConfig.java | 27 +++- .../coordinator/group/GroupCoordinatorShard.java | 69 +++++++- .../coordinator/group/GroupMetadataManager.java | 20 +++ .../group/OffsetExpirationCondition.java | 36 +++++ .../group/OffsetExpirationConditionImpl.java | 62 +++++++ .../coordinator/group/OffsetMetadataManager.java | 95 +++++++++-- .../coordinator/group/consumer/ConsumerGroup.java | 23 ++- .../coordinator/group/generic/GenericGroup.java | 54 ++++++- .../group/GroupCoordinatorConfigTest.java | 29 +++- .../group/GroupCoordinatorServiceTest.java | 4 +- .../group/GroupCoordinatorShardTest.java | 178 ++++++++++++++++++--- .../group/GroupMetadataManagerTest.java | 47 ++++++ .../group/OffsetExpirationConditionImplTest.java | 70 ++++++++ .../group/OffsetMetadataManagerTest.java | 173 ++++++++++++++++++-- .../group/consumer/ConsumerGroupTest.java | 67 ++++++++ .../group/generic/GenericGroupTest.java | 151 +++++++++++++++++ 19 files changed, 1063 insertions(+), 64 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index c9601753c1e..678765aa6bf 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -328,7 +328,7 @@ <suppress checks="ClassFanOutComplexity" files="(GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorService|GroupCoordinatorServiceTest).java"/> <suppress checks="ParameterNumber" - files="(ConsumerGroupMember|GroupMetadataManager).java"/> + files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorConfig).java"/> <suppress checks="ClassDataAbstractionCouplingCheck" files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest).java"/> <suppress checks="JavaNCSS" diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 96e8dc8a5a1..daec8f73089 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -522,7 +522,9 @@ class BrokerServer( config.groupInitialRebalanceDelay, GroupCoordinatorConfig.GENERIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS, config.groupMinSessionTimeoutMs, - config.groupMaxSessionTimeoutMs + config.groupMaxSessionTimeoutMs, + config.offsetsRetentionCheckIntervalMs, + config.offsetsRetentionMinutes * 60 * 1000L ) val timer = new SystemTimerReaper( "group-coordinator-reaper", diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java index 29a252e47bf..0cb10b12a51 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.message.ListGroupsResponseData; import java.util.List; +import java.util.Optional; /** * Interface common for all groups. @@ -106,7 +107,8 @@ public interface Group { /** * Returns true if the group is actively subscribed to the topic. * - * @param topic The topic name. + * @param topic The topic name. + * * @return Whether the group is subscribed to the topic. */ boolean isSubscribedToTopic(String topic); @@ -117,4 +119,16 @@ public interface Group { * @param records The list of records. */ void createGroupTombstoneRecords(List<Record> records); + + /** + * @return Whether the group is in Empty state. + */ + boolean isEmpty(); + + /** + * See {@link OffsetExpirationCondition} + * + * @return The offset expiration condition for the group or Empty if no such condition exists. + */ + Optional<OffsetExpirationCondition> offsetExpirationCondition(); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index fe31a24524f..330813ee912 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -91,6 +91,27 @@ public class GroupCoordinatorConfig { */ public final int genericGroupMaxSessionTimeoutMs; + /** + * Frequency at which to check for expired offsets. + */ + public final long offsetsRetentionCheckIntervalMs; + + /** + * For subscribed consumers, committed offset of a specific partition will be expired and discarded when: + * 1) This retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); + * 2) This retention period has elapsed since the last time an offset is committed for the partition AND + * the group is no longer subscribed to the corresponding topic. + * + * For standalone consumers (using manual assignment), offsets will be expired after this retention period has + * elapsed since the time of last commit. + * + * Note that when a group is deleted via the DeleteGroups request, its committed offsets will also be deleted immediately; + * + * Also, when a topic is deleted via the delete-topic request, upon propagated metadata update any group's + * committed offsets for that topic will also be deleted without extra retention period. + */ + public final long offsetsRetentionMs; + public GroupCoordinatorConfig( int numThreads, int consumerGroupSessionTimeoutMs, @@ -103,7 +124,9 @@ public class GroupCoordinatorConfig { int genericGroupInitialRebalanceDelayMs, int genericGroupNewMemberJoinTimeoutMs, int genericGroupMinSessionTimeoutMs, - int genericGroupMaxSessionTimeoutMs + int genericGroupMaxSessionTimeoutMs, + long offsetsRetentionCheckIntervalMs, + long offsetsRetentionMs ) { this.numThreads = numThreads; this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs; @@ -117,5 +140,7 @@ public class GroupCoordinatorConfig { this.genericGroupNewMemberJoinTimeoutMs = genericGroupNewMemberJoinTimeoutMs; this.genericGroupMinSessionTimeoutMs = genericGroupMinSessionTimeoutMs; this.genericGroupMaxSessionTimeoutMs = genericGroupMaxSessionTimeoutMs; + this.offsetsRetentionCheckIntervalMs = offsetsRetentionCheckIntervalMs; + this.offsetsRetentionMs = offsetsRetentionMs; } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index c30d348336b..d0e67c1e693 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -70,6 +70,7 @@ import org.slf4j.Logger; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; /** * The group coordinator shard is a replicated state machine that manages the metadata of all @@ -159,17 +160,26 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> { .withSnapshotRegistry(snapshotRegistry) .withTime(time) .withGroupMetadataManager(groupMetadataManager) - .withOffsetMetadataMaxSize(config.offsetMetadataMaxSize) + .withGroupCoordinatorConfig(config) .build(); return new GroupCoordinatorShard( logContext, groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + timer, + config ); } } + /** + * The group/offsets expiration key to schedule a timer task. + * + * Visible for testing. + */ + static final String GROUP_EXPIRATION_KEY = "expire-group-metadata"; + /** * The logger. */ @@ -185,6 +195,16 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> { */ private final OffsetMetadataManager offsetMetadataManager; + /** + * The coordinator timer. + */ + private final CoordinatorTimer<Void, Record> timer; + + /** + * The group coordinator config. + */ + private final GroupCoordinatorConfig config; + /** * Constructor. * @@ -195,11 +215,15 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> { GroupCoordinatorShard( LogContext logContext, GroupMetadataManager groupMetadataManager, - OffsetMetadataManager offsetMetadataManager + OffsetMetadataManager offsetMetadataManager, + CoordinatorTimer<Void, Record> timer, + GroupCoordinatorConfig config ) { this.log = logContext.logger(GroupCoordinatorShard.class); this.groupMetadataManager = groupMetadataManager; this.offsetMetadataManager = offsetMetadataManager; + this.timer = timer; + this.config = config; } /** @@ -435,6 +459,39 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> { return offsetMetadataManager.deleteOffsets(request); } + /** + * For each group, remove all expired offsets. If all offsets for the group are removed and the group is eligible + * for deletion, delete the group. + * + * @return The list of tombstones (offset commit and group metadata) to append. + */ + public CoordinatorResult<Void, Record> cleanupGroupMetadata() { + List<Record> records = new ArrayList<>(); + groupMetadataManager.groupIds().forEach(groupId -> { + if (offsetMetadataManager.cleanupExpiredOffsets(groupId, records)) { + groupMetadataManager.maybeDeleteGroup(groupId, records); + } + }); + + // Reschedule the next cycle. + scheduleGroupMetadataExpiration(); + return new CoordinatorResult<>(records); + } + + /** + * Schedule the group/offsets expiration job. If any exceptions are thrown above, the timer will retry. + */ + private void scheduleGroupMetadataExpiration() { + timer.schedule( + GROUP_EXPIRATION_KEY, + config.offsetsRetentionCheckIntervalMs, + TimeUnit.MILLISECONDS, + true, + this::cleanupGroupMetadata + ); + } + + /** * The coordinator has been loaded. This is used to apply any * post loading operations (e.g. registering timers). @@ -448,6 +505,12 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> { offsetMetadataManager.onNewMetadataImage(newImage, emptyDelta); groupMetadataManager.onLoaded(); + scheduleGroupMetadataExpiration(); + } + + @Override + public void onUnloaded() { + timer.cancel(GROUP_EXPIRATION_KEY); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 7bfe243d026..5974aabbe4c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -3166,6 +3166,19 @@ public class GroupMetadataManager { group.validateDeleteGroup(); } + /** + * Delete the group if it exists and is in Empty state. + * + * @param groupId The group id. + * @param records The list of records to append the group metadata tombstone records. + */ + public void maybeDeleteGroup(String groupId, List<Record> records) { + Group group = groups.get(groupId); + if (group != null && group.isEmpty()) { + deleteGroup(groupId, records); + } + } + /** * Checks whether the given protocol type or name in the request is inconsistent with the group's. * @@ -3183,6 +3196,13 @@ public class GroupMetadataManager { && !groupProtocolTypeOrName.equals(protocolTypeOrName); } + /** + * @return The set of all groups' ids. + */ + public Set<String> groupIds() { + return Collections.unmodifiableSet(this.groups.keySet()); + } + /** * Generate a generic group heartbeat key for the timer. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java new file mode 100644 index 00000000000..ce3f299b40a --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +/** + * An offset is considered expired based on different factors, such as the state of the group + * and/or the GroupMetadata record version (for generic groups). This class is used to check + * how offsets for the group should be expired. + */ +public interface OffsetExpirationCondition { + + /** + * Given an offset metadata and offsets retention, return whether the offset is expired or not. + * + * @param offset The offset metadata. + * @param currentTimestampMs The current timestamp. + * @param offsetsRetentionMs The offset retention. + * + * @return Whether the offset is considered expired or not. + */ + boolean isOffsetExpired(OffsetAndMetadata offset, long currentTimestampMs, long offsetsRetentionMs); +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImpl.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImpl.java new file mode 100644 index 00000000000..cd823108ef3 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImpl.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import java.util.function.Function; + +public class OffsetExpirationConditionImpl implements OffsetExpirationCondition { + + /** + * Given an offset and metadata, obtain the base timestamp that should be used + * as the start of the offsets retention period. + */ + private final Function<OffsetAndMetadata, Long> baseTimestamp; + + public OffsetExpirationConditionImpl(Function<OffsetAndMetadata, Long> baseTimestamp) { + this.baseTimestamp = baseTimestamp; + } + + /** + * Determine whether an offset is expired. Older versions have an expire timestamp per partition. If this + * exists, compare against the current timestamp. Otherwise, use the base timestamp (either commit timestamp + * or current state timestamp if group is empty for generic groups) and check whether the offset has + * exceeded the offset retention. + * + * @param offset The offset and metadata. + * @param currentTimestampMs The current timestamp. + * @param offsetsRetentionMs The offsets retention in milliseconds. + * + * @return Whether the given offset is expired or not. + */ + @Override + public boolean isOffsetExpired(OffsetAndMetadata offset, long currentTimestampMs, long offsetsRetentionMs) { + if (offset.expireTimestampMs.isPresent()) { + // Older versions with explicit expire_timestamp field => old expiration semantics is used + return currentTimestampMs >= offset.expireTimestampMs.getAsLong(); + } else { + // Current version with no per partition retention + return currentTimestampMs - baseTimestamp.apply(offset) >= offsetsRetentionMs; + } + } + + /** + * @return The base timestamp. + */ + public Function<OffsetAndMetadata, Long> baseTimestamp() { + return this.baseTimestamp; + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index 9744e492f49..076ec24476f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.StaleMemberEpochException; @@ -45,9 +46,13 @@ import org.slf4j.Logger; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.OptionalLong; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET; @@ -61,13 +66,14 @@ import static org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSE * handling as well as during the initial loading of the records from the partitions. */ public class OffsetMetadataManager { + public static class Builder { private LogContext logContext = null; private SnapshotRegistry snapshotRegistry = null; private Time time = null; private GroupMetadataManager groupMetadataManager = null; - private int offsetMetadataMaxSize = 4096; private MetadataImage metadataImage = null; + private GroupCoordinatorConfig config = null; Builder withLogContext(LogContext logContext) { this.logContext = logContext; @@ -89,8 +95,8 @@ public class OffsetMetadataManager { return this; } - Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) { - this.offsetMetadataMaxSize = offsetMetadataMaxSize; + Builder withGroupCoordinatorConfig(GroupCoordinatorConfig config) { + this.config = config; return this; } @@ -115,7 +121,7 @@ public class OffsetMetadataManager { time, metadataImage, groupMetadataManager, - offsetMetadataMaxSize + config ); } } @@ -146,9 +152,9 @@ public class OffsetMetadataManager { private final GroupMetadataManager groupMetadataManager; /** - * The maximum allowed metadata for any offset commit. + * The group coordinator config. */ - private final int offsetMetadataMaxSize; + private final GroupCoordinatorConfig config; /** * The offsets keyed by group id, topic name and partition id. @@ -161,14 +167,14 @@ public class OffsetMetadataManager { Time time, MetadataImage metadataImage, GroupMetadataManager groupMetadataManager, - int offsetMetadataMaxSize + GroupCoordinatorConfig config ) { this.snapshotRegistry = snapshotRegistry; this.log = logContext.logger(OffsetMetadataManager.class); this.time = time; this.metadataImage = metadataImage; this.groupMetadataManager = groupMetadataManager; - this.offsetMetadataMaxSize = offsetMetadataMaxSize; + this.config = config; this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); } @@ -316,7 +322,7 @@ public class OffsetMetadataManager { response.topics().add(topicResponse); topic.partitions().forEach(partition -> { - if (partition.committedMetadata() != null && partition.committedMetadata().length() > offsetMetadataMaxSize) { + if (partition.committedMetadata() != null && partition.committedMetadata().length() > config.offsetMetadataMaxSize) { topicResponse.partitions().add(new OffsetCommitResponsePartition() .setPartitionIndex(partition.partitionIndex()) .setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code())); @@ -544,6 +550,77 @@ public class OffsetMetadataManager { .setTopics(topicResponses); } + /** + * Remove expired offsets for the given group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * + * @return True if no offsets exist or if all offsets expired, false otherwise. + */ + public boolean cleanupExpiredOffsets(String groupId, List<Record> records) { + TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = offsetsByGroup.get(groupId); + if (offsetsByTopic == null) { + return true; + } + + // We expect the group to exist. + Group group = groupMetadataManager.group(groupId); + Set<String> expiredPartitions = new HashSet<>(); + long currentTimestampMs = time.milliseconds(); + Optional<OffsetExpirationCondition> offsetExpirationCondition = group.offsetExpirationCondition(); + + if (!offsetExpirationCondition.isPresent()) { + return false; + } + + AtomicBoolean allOffsetsExpired = new AtomicBoolean(true); + OffsetExpirationCondition condition = offsetExpirationCondition.get(); + + offsetsByTopic.forEach((topic, partitions) -> { + if (!group.isSubscribedToTopic(topic)) { + partitions.forEach((partition, offsetAndMetadata) -> { + if (condition.isOffsetExpired(offsetAndMetadata, currentTimestampMs, config.offsetsRetentionMs)) { + expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, records).toString()); + } else { + allOffsetsExpired.set(false); + } + }); + } else { + allOffsetsExpired.set(false); + } + }); + + if (!expiredPartitions.isEmpty()) { + log.info("[GroupId {}] Expiring offsets of partitions (allOffsetsExpired={}): {}", + groupId, allOffsetsExpired, String.join(", ", expiredPartitions)); + } + + return allOffsetsExpired.get(); + } + + /** + * Add an offset commit tombstone record for the group. + * + * @param groupId The group id. + * @param topic The topic name. + * @param partition The partition. + * @param records The list of records to append the tombstone. + * + * @return The topic partition of the corresponding tombstone. + */ + private TopicPartition appendOffsetCommitTombstone( + String groupId, + String topic, + int partition, + List<Record> records + ) { + records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition)); + TopicPartition tp = new TopicPartition(topic, partition); + log.trace("[GroupId {}] Removing expired offset and metadata for {}", groupId, tp); + return tp; + } + /** * Replays OffsetCommitKey/Value to update or delete the corresponding offsets. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index 21f6f8124cd..e5144953a97 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -24,6 +24,8 @@ import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.OffsetExpirationCondition; +import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; import org.apache.kafka.coordinator.group.Record; import org.apache.kafka.coordinator.group.RecordHelpers; import org.apache.kafka.image.ClusterImage; @@ -349,9 +351,11 @@ public class ConsumerGroup implements Group { /** * Returns true if the consumer group is actively subscribed to the topic. * - * @param topic The topic name. - * @return whether the group is subscribed to the topic. + * @param topic The topic name. + * + * @return Whether the group is subscribed to the topic. */ + @Override public boolean isSubscribedToTopic(String topic) { return subscribedTopicNames.containsKey(topic); } @@ -635,6 +639,21 @@ public class ConsumerGroup implements Group { records.add(RecordHelpers.newGroupEpochTombstoneRecord(groupId())); } + @Override + public boolean isEmpty() { + return state() == ConsumerGroupState.EMPTY; + } + + /** + * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition} + * + * @return The offset expiration condition for the group or Empty if no such condition exists. + */ + @Override + public Optional<OffsetExpirationCondition> offsetExpirationCondition() { + return Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)); + } + /** * Throws a StaleMemberEpochException if the received member epoch does not match * the expected member epoch. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java index 88ec52727fa..4b408e0484a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java @@ -33,6 +33,8 @@ import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.OffsetExpirationCondition; +import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; import org.apache.kafka.coordinator.group.Record; import org.apache.kafka.coordinator.group.RecordHelpers; import org.slf4j.Logger; @@ -898,6 +900,46 @@ public class GenericGroup implements Group { records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId())); } + @Override + public boolean isEmpty() { + return isInState(EMPTY); + } + + /** + * Return the offset expiration condition to be used for this group. This is based on several factors + * such as the group state, the protocol type, and the GroupMetadata record version. + * + * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition} + * + * @return The offset expiration condition for the group or Empty if no such condition exists. + */ + @Override + public Optional<OffsetExpirationCondition> offsetExpirationCondition() { + if (protocolType.isPresent()) { + if (isInState(EMPTY)) { + // No members exist in the group => + // - If current state timestamp exists and retention period has passed since group became Empty, + // expire all offsets with no pending offset commit; + // - If there is no current state timestamp (old group metadata schema) and retention period has passed + // since the last commit timestamp, expire the offset + return Optional.of(new OffsetExpirationConditionImpl( + offsetAndMetadata -> currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs)) + ); + } else if (usesConsumerGroupProtocol() && subscribedTopics.isPresent() && isInState(STABLE)) { + // Consumers exist in the group and group is Stable => + // - If the group is aware of the subscribed topics and retention period has passed since the + // last commit timestamp, expire the offset. + return Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)); + } + } else { + // protocolType is None => standalone (simple) consumer, that uses Kafka for offset storage. Only + // expire offsets where retention period has passed since their last commit. + return Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)); + } + // If none of the conditions above are met, do not expire any offsets. + return Optional.empty(); + } + /** * Verify the member id is up to date for static members. Return true if both conditions met: * 1. given member is a known static member to group @@ -1063,16 +1105,18 @@ public class GenericGroup implements Group { } /** - * Returns true if the consumer group is actively subscribed to the topic. When the consumer - * group does not know, because the information is not available yet or because it has - * failed to parse the Consumer Protocol, it returns true to be safe. + * Returns true if the generic group is actively subscribed to the topic. When the generic group does not know, + * because the information is not available yet or because it has failed to parse the Consumer Protocol, we + * consider the group not subscribed to the topic if the group is not using any protocol or not using the + * consumer group protocol. + * + * @param topic The topic name. * - * @param topic The topic name. * @return whether the group is subscribed to the topic. */ public boolean isSubscribedToTopic(String topic) { return subscribedTopics.map(topics -> topics.contains(topic)) - .orElse(true); + .orElse(usesConsumerGroupProtocol()); } /** diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 1d31bd50845..f26b5ce6306 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -40,7 +40,9 @@ public class GroupCoordinatorConfigTest { 3000, 5 * 60 * 1000, 120, - 10 * 60 * 1000 + 10 * 60 * 1000, + 600000L, + 24 * 60 * 60 * 1000L ); assertEquals(10, config.numThreads); @@ -55,5 +57,30 @@ public class GroupCoordinatorConfigTest { assertEquals(5 * 60 * 1000, config.genericGroupNewMemberJoinTimeoutMs); assertEquals(120, config.genericGroupMinSessionTimeoutMs); assertEquals(10 * 60 * 1000, config.genericGroupMaxSessionTimeoutMs); + assertEquals(10 * 60 * 1000, config.offsetsRetentionCheckIntervalMs); + assertEquals(24 * 60 * 60 * 1000L, config.offsetsRetentionMs); + } + + public static GroupCoordinatorConfig createGroupCoordinatorConfig( + int offsetMetadataMaxSize, + long offsetsRetentionCheckIntervalMs, + long offsetsRetentionMs + ) { + return new GroupCoordinatorConfig( + 1, + 45, + 5, + Integer.MAX_VALUE, + Collections.singletonList(new RangeAssignor()), + 1000, + offsetMetadataMaxSize, + Integer.MAX_VALUE, + 3000, + 5 * 60 * 1000, + 120, + 10 * 5 * 1000, + offsetsRetentionCheckIntervalMs, + offsetsRetentionMs + ); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index d54d574f9e5..9a18db8f202 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -117,7 +117,9 @@ public class GroupCoordinatorServiceTest { 3000, 5 * 60 * 1000, 120, - 10 * 5 * 1000 + 10 * 5 * 1000, + 600000L, + 24 * 60 * 1000L ); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index 1ed931c029d..c95e9459285 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -25,6 +25,8 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; @@ -45,6 +47,7 @@ import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import java.util.ArrayList; @@ -52,9 +55,14 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_EXPIRATION_KEY; import static org.apache.kafka.coordinator.group.TestUtil.requestContext; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; @@ -75,7 +83,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); RequestContext context = requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT); @@ -100,7 +110,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); RequestContext context = requestContext(ApiKeys.OFFSET_COMMIT); @@ -125,7 +137,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); RequestContext context = requestContext(ApiKeys.DELETE_GROUPS); @@ -177,7 +191,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); RequestContext context = requestContext(ApiKeys.DELETE_GROUPS); @@ -240,7 +256,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); OffsetCommitKey key = new OffsetCommitKey(); @@ -266,7 +284,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); OffsetCommitKey key = new OffsetCommitKey(); @@ -291,7 +311,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey(); @@ -312,7 +334,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey(); @@ -332,7 +356,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); ConsumerGroupPartitionMetadataKey key = new ConsumerGroupPartitionMetadataKey(); @@ -353,7 +379,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); ConsumerGroupPartitionMetadataKey key = new ConsumerGroupPartitionMetadataKey(); @@ -373,7 +401,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); ConsumerGroupMemberMetadataKey key = new ConsumerGroupMemberMetadataKey(); @@ -394,7 +424,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); ConsumerGroupMemberMetadataKey key = new ConsumerGroupMemberMetadataKey(); @@ -414,7 +446,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); ConsumerGroupTargetAssignmentMetadataKey key = new ConsumerGroupTargetAssignmentMetadataKey(); @@ -435,7 +469,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); ConsumerGroupTargetAssignmentMetadataKey key = new ConsumerGroupTargetAssignmentMetadataKey(); @@ -455,7 +491,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); ConsumerGroupTargetAssignmentMemberKey key = new ConsumerGroupTargetAssignmentMemberKey(); @@ -476,7 +514,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); ConsumerGroupTargetAssignmentMemberKey key = new ConsumerGroupTargetAssignmentMemberKey(); @@ -496,7 +536,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey(); @@ -517,7 +559,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey(); @@ -537,7 +581,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); assertThrows(NullPointerException.class, () -> coordinator.replay(new Record(null, null))); @@ -550,7 +596,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey(); @@ -570,7 +618,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); coordinator.onLoaded(image); @@ -590,7 +640,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); GroupMetadataKey key = new GroupMetadataKey(); @@ -611,7 +663,9 @@ public class GroupCoordinatorShardTest { GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, - offsetMetadataManager + offsetMetadataManager, + new MockCoordinatorTimer<>(new MockTime()), + mock(GroupCoordinatorConfig.class) ); GroupMetadataKey key = new GroupMetadataKey(); @@ -623,4 +677,82 @@ public class GroupCoordinatorShardTest { verify(groupMetadataManager, times(1)).replay(key, null); } + + @Test + public void testScheduleCleanupGroupMetadata() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + Time mockTime = new MockTime(); + MockCoordinatorTimer<Void, Record> timer = new MockCoordinatorTimer<>(mockTime); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager, + timer, + GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 1000L, 24 * 60) + ); + MetadataImage image = MetadataImage.EMPTY; + + // Confirm the cleanup is scheduled when the coordinator is initially loaded. + coordinator.onLoaded(image); + assertTrue(timer.contains(GROUP_EXPIRATION_KEY)); + + // Confirm that it is rescheduled after completion. + mockTime.sleep(1000L); + List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> tasks = timer.poll(); + assertEquals(1, tasks.size()); + assertTrue(timer.contains(GROUP_EXPIRATION_KEY)); + + coordinator.onUnloaded(); + assertFalse(timer.contains(GROUP_EXPIRATION_KEY)); + } + + @Test + public void testCleanupGroupMetadata() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + MockCoordinatorTimer<Void, Record> timer = new MockCoordinatorTimer<>(new MockTime()); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager, + timer, + mock(GroupCoordinatorConfig.class) + ); + + Record offsetCommitTombstone = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "topic", 0); + Record groupMetadataTombstone = RecordHelpers.newGroupMetadataTombstoneRecord("group-id"); + + @SuppressWarnings("unchecked") + ArgumentCaptor<List<Record>> recordsCapture = ArgumentCaptor.forClass(List.class); + + when(groupMetadataManager.groupIds()).thenReturn(mkSet("group-id", "other-group-id")); + when(offsetMetadataManager.cleanupExpiredOffsets(eq("group-id"), recordsCapture.capture())) + .thenAnswer(invocation -> { + List<Record> records = recordsCapture.getValue(); + records.add(offsetCommitTombstone); + return true; + }); + when(offsetMetadataManager.cleanupExpiredOffsets("other-group-id", Collections.emptyList())).thenReturn(false); + doAnswer(invocation -> { + List<Record> records = recordsCapture.getValue(); + records.add(groupMetadataTombstone); + return null; + }).when(groupMetadataManager).maybeDeleteGroup(eq("group-id"), recordsCapture.capture()); + + assertFalse(timer.contains(GROUP_EXPIRATION_KEY)); + CoordinatorResult<Void, Record> result = coordinator.cleanupGroupMetadata(); + assertTrue(timer.contains(GROUP_EXPIRATION_KEY)); + + List<Record> expectedRecords = Arrays.asList(offsetCommitTombstone, groupMetadataTombstone); + assertEquals(expectedRecords, result.records()); + assertNull(result.response()); + assertNull(result.appendFuture()); + + verify(groupMetadataManager, times(1)).groupIds(); + verify(offsetMetadataManager, times(1)).cleanupExpiredOffsets(eq("group-id"), any()); + verify(offsetMetadataManager, times(1)).cleanupExpiredOffsets(eq("other-group-id"), any()); + verify(groupMetadataManager, times(1)).maybeDeleteGroup(eq("group-id"), any()); + verify(groupMetadataManager, times(0)).maybeDeleteGroup(eq("other-group-id"), any()); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index f299705016d..ee44304072e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -9494,6 +9494,27 @@ public class GroupMetadataManagerTest { assertEquals(expectedRecords, records); } + @Test + public void testGenericGroupMaybeDelete() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + List<Record> expectedRecords = Collections.singletonList(RecordHelpers.newGroupMetadataTombstoneRecord("group-id")); + List<Record> records = new ArrayList<>(); + context.groupMetadataManager.maybeDeleteGroup("group-id", records); + assertEquals(expectedRecords, records); + + records = new ArrayList<>(); + group.transitionTo(PREPARING_REBALANCE); + context.groupMetadataManager.maybeDeleteGroup("group-id", records); + assertEquals(Collections.emptyList(), records); + + records = new ArrayList<>(); + context.groupMetadataManager.maybeDeleteGroup("invalid-group-id", records); + assertEquals(Collections.emptyList(), records); + } + @Test public void testConsumerGroupDelete() { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() @@ -9510,6 +9531,32 @@ public class GroupMetadataManagerTest { assertEquals(expectedRecords, records); } + @Test + public void testConsumerGroupMaybeDelete() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group-id", true); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newTargetAssignmentEpochTombstoneRecord("group-id"), + RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord("group-id"), + RecordHelpers.newGroupEpochTombstoneRecord("group-id") + ); + List<Record> records = new ArrayList<>(); + context.groupMetadataManager.maybeDeleteGroup("group-id", records); + assertEquals(expectedRecords, records); + + records = new ArrayList<>(); + group.updateMember(new ConsumerGroupMember.Builder("member") + .setMemberEpoch(10) + .setTargetMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build() + ); + context.groupMetadataManager.maybeDeleteGroup("group-id", records); + assertEquals(Collections.emptyList(), records); + } + private static void assertNoOrEmptyResult(List<ExpiredTimeout<Void, Record>> timeouts) { assertTrue(timeouts.size() <= 1); timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT, timeout.result)); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java new file mode 100644 index 00000000000..2f1cb354a5a --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.junit.jupiter.api.Test; + +import java.util.OptionalInt; +import java.util.OptionalLong; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class OffsetExpirationConditionImplTest { + + @Test + public void testIsOffsetExpired() { + long currentTimestamp = 1500L; + long commitTimestamp = 500L; + OptionalLong expireTimestampMs = OptionalLong.of(1500); + long offsetsRetentionMs = 500L; + + OffsetExpirationConditionImpl condition = new OffsetExpirationConditionImpl(__ -> commitTimestamp); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata( + 100, + OptionalInt.of(1), + "metadata", + commitTimestamp, + expireTimestampMs + ); + + // Test when expire timestamp exists (older versions with per partition retention) + // 1. Current timestamp >= expire timestamp => should expire + assertTrue(condition.isOffsetExpired(offsetAndMetadata, currentTimestamp, offsetsRetentionMs)); + + // 2. Current timestamp < expire timestamp => should not expire + currentTimestamp = 499; + assertFalse(condition.isOffsetExpired(offsetAndMetadata, currentTimestamp, offsetsRetentionMs)); + + // Test when expire timestamp does not exist (current version with no per partition retention) + offsetAndMetadata = new OffsetAndMetadata( + 100, + OptionalInt.of(1), + "metadata", + commitTimestamp, + OptionalLong.empty() + ); + + // 3. Current timestamp - base timestamp >= offsets retention => should expire + currentTimestamp = 1000L; + assertTrue(condition.isOffsetExpired(offsetAndMetadata, currentTimestamp, offsetsRetentionMs)); + + // 4. Current timestamp - base timestamp < offsets retention => should not expire + currentTimestamp = 999L; + assertFalse(condition.isOffsetExpired(offsetAndMetadata, currentTimestamp, offsetsRetentionMs)); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index 5598a74e50e..82fca8b0e25 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -76,33 +76,51 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class OffsetMetadataManagerTest { static class OffsetMetadataManagerTestContext { public static class Builder { - final private MockTime time = new MockTime(); - final private MockCoordinatorTimer<Void, Record> timer = new MockCoordinatorTimer<>(time); - final private LogContext logContext = new LogContext(); - final private SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + private final MockTime time = new MockTime(); + private final MockCoordinatorTimer<Void, Record> timer = new MockCoordinatorTimer<>(time); + private final LogContext logContext = new LogContext(); + private final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + private GroupMetadataManager groupMetadataManager = null; private MetadataImage metadataImage = null; - private int offsetMetadataMaxSize = 4096; + private GroupCoordinatorConfig config = null; Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) { - this.offsetMetadataMaxSize = offsetMetadataMaxSize; + config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(offsetMetadataMaxSize, 60000L, 24 * 60 * 1000); + return this; + } + + Builder withOffsetsRetentionMs(long offsetsRetentionMs) { + config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, offsetsRetentionMs); + return this; + } + + Builder withGroupMetadataManager(GroupMetadataManager groupMetadataManager) { + this.groupMetadataManager = groupMetadataManager; return this; } OffsetMetadataManagerTestContext build() { if (metadataImage == null) metadataImage = MetadataImage.EMPTY; + if (config == null) { + config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, 24 * 60 * 1000); + } - GroupMetadataManager groupMetadataManager = new GroupMetadataManager.Builder() - .withTime(time) - .withTimer(timer) - .withSnapshotRegistry(snapshotRegistry) - .withLogContext(logContext) - .withMetadataImage(metadataImage) - .withConsumerGroupAssignors(Collections.singletonList(new RangeAssignor())) - .build(); + if (groupMetadataManager == null) { + groupMetadataManager = new GroupMetadataManager.Builder() + .withTime(time) + .withTimer(timer) + .withSnapshotRegistry(snapshotRegistry) + .withLogContext(logContext) + .withMetadataImage(metadataImage) + .withConsumerGroupAssignors(Collections.singletonList(new RangeAssignor())) + .build(); + } OffsetMetadataManager offsetMetadataManager = new OffsetMetadataManager.Builder() .withTime(time) @@ -110,7 +128,7 @@ public class OffsetMetadataManagerTest { .withSnapshotRegistry(snapshotRegistry) .withMetadataImage(metadataImage) .withGroupMetadataManager(groupMetadataManager) - .withOffsetMetadataMaxSize(offsetMetadataMaxSize) + .withGroupCoordinatorConfig(config) .build(); return new OffsetMetadataManagerTestContext( @@ -203,6 +221,15 @@ public class OffsetMetadataManagerTest { return numDeletedOffsets; } + public boolean cleanupExpiredOffsets(String groupId, List<Record> records) { + List<Record> addedRecords = new ArrayList<>(); + boolean isOffsetsEmptyForGroup = offsetMetadataManager.cleanupExpiredOffsets(groupId, addedRecords); + addedRecords.forEach(this::replay); + + records.addAll(addedRecords); + return isOffsetsEmptyForGroup; + } + public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchOffsets( String groupId, List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics, @@ -282,6 +309,18 @@ public class OffsetMetadataManagerTest { int partition, long offset, int leaderEpoch + ) { + commitOffset(groupId, topic, partition, offset, leaderEpoch, time.milliseconds()); + + } + + public void commitOffset( + String groupId, + String topic, + int partition, + long offset, + int leaderEpoch, + long commitTimestamp ) { replay(RecordHelpers.newOffsetCommitRecord( groupId, @@ -291,7 +330,7 @@ public class OffsetMetadataManagerTest { offset, OptionalInt.of(leaderEpoch), "metadata", - time.milliseconds(), + commitTimestamp, OptionalLong.empty() ), MetadataVersion.latest() @@ -1763,6 +1802,108 @@ public class OffsetMetadataManagerTest { assertEquals(3, numDeleteOffsets); } + @Test + public void testCleanupExpiredOffsetsGroupHasNoOffsets() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() + .build(); + + List<Record> records = new ArrayList<>(); + assertTrue(context.cleanupExpiredOffsets("unknown-group-id", records)); + assertEquals(Collections.emptyList(), records); + } + + @Test + public void testCleanupExpiredOffsetsGroupDoesNotExist() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() + .withGroupMetadataManager(groupMetadataManager) + .build(); + + when(groupMetadataManager.group("unknown-group-id")).thenThrow(GroupIdNotFoundException.class); + context.commitOffset("unknown-group-id", "topic", 0, 100L, 0); + assertThrows(GroupIdNotFoundException.class, () -> context.cleanupExpiredOffsets("unknown-group-id", new ArrayList<>())); + } + + @Test + public void testCleanupExpiredOffsetsEmptyOffsetExpirationCondition() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + Group group = mock(Group.class); + + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() + .withGroupMetadataManager(groupMetadataManager) + .build(); + + context.commitOffset("group-id", "topic", 0, 100L, 0); + + when(groupMetadataManager.group("group-id")).thenReturn(group); + when(group.offsetExpirationCondition()).thenReturn(Optional.empty()); + + List<Record> records = new ArrayList<>(); + assertFalse(context.cleanupExpiredOffsets("group-id", records)); + assertEquals(Collections.emptyList(), records); + } + + @Test + public void testCleanupExpiredOffsets() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + Group group = mock(Group.class); + + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() + .withGroupMetadataManager(groupMetadataManager) + .withOffsetsRetentionMs(1000) + .build(); + + long commitTimestamp = context.time.milliseconds(); + + context.commitOffset("group-id", "firstTopic", 0, 100L, 0, commitTimestamp); + context.commitOffset("group-id", "secondTopic", 0, 100L, 0, commitTimestamp); + context.commitOffset("group-id", "secondTopic", 1, 100L, 0, commitTimestamp + 500); + + context.time.sleep(1000); + + // firstTopic-0: group is still subscribed to firstTopic. Do not expire. + // secondTopic-0: should expire as offset retention has passed. + // secondTopic-1: has not passed offset retention. Do not expire. + List<Record> expectedRecords = Collections.singletonList( + RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "secondTopic", 0) + ); + + when(groupMetadataManager.group("group-id")).thenReturn(group); + when(group.offsetExpirationCondition()).thenReturn(Optional.of( + new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs))); + when(group.isSubscribedToTopic("firstTopic")).thenReturn(true); + when(group.isSubscribedToTopic("secondTopic")).thenReturn(false); + + List<Record> records = new ArrayList<>(); + assertFalse(context.cleanupExpiredOffsets("group-id", records)); + assertEquals(expectedRecords, records); + + // Expire secondTopic-1. + context.time.sleep(500); + expectedRecords = Collections.singletonList( + RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "secondTopic", 1) + ); + + records = new ArrayList<>(); + assertFalse(context.cleanupExpiredOffsets("group-id", records)); + assertEquals(expectedRecords, records); + + // Add 2 more commits, then expire all. + when(group.isSubscribedToTopic("firstTopic")).thenReturn(false); + context.commitOffset("group-id", "firstTopic", 1, 100L, 0, commitTimestamp + 500); + context.commitOffset("group-id", "secondTopic", 0, 101L, 0, commitTimestamp + 500); + + expectedRecords = Arrays.asList( + RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "firstTopic", 0), + RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "firstTopic", 1), + RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "secondTopic", 0) + ); + + records = new ArrayList<>(); + assertTrue(context.cleanupExpiredOffsets("group-id", records)); + assertEquals(expectedRecords, records); + } + static private OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse( int partition, long offset, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java index 210aa8eb901..df643a5d2f0 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java @@ -23,6 +23,9 @@ import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.coordinator.group.GroupMetadataManagerTest; +import org.apache.kafka.coordinator.group.OffsetAndMetadata; +import org.apache.kafka.coordinator.group.OffsetExpirationCondition; +import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.Test; @@ -30,6 +33,8 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; @@ -703,4 +708,66 @@ public class ConsumerGroupTest { assertEquals(ConsumerGroup.ConsumerGroupState.STABLE, consumerGroup.state()); assertThrows(GroupNotEmptyException.class, consumerGroup::validateDeleteGroup); } + + @Test + public void testOffsetExpirationCondition() { + long currentTimestamp = 30000L; + long commitTimestamp = 20000L; + long offsetsRetentionMs = 10000L; + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty()); + ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new LogContext()), "group-id"); + + Optional<OffsetExpirationCondition> offsetExpirationCondition = group.offsetExpirationCondition(); + assertTrue(offsetExpirationCondition.isPresent()); + + OffsetExpirationConditionImpl condition = (OffsetExpirationConditionImpl) offsetExpirationCondition.get(); + assertEquals(commitTimestamp, condition.baseTimestamp().apply(offsetAndMetadata)); + assertTrue(condition.isOffsetExpired(offsetAndMetadata, currentTimestamp, offsetsRetentionMs)); + } + + @Test + public void testIsSubscribedToTopic() { + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + + MetadataImage image = new GroupMetadataManagerTest.MetadataImageBuilder() + .addTopic(fooTopicId, "foo", 1) + .addTopic(barTopicId, "bar", 2) + .addRacks() + .build(); + + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder("member1") + .setSubscribedTopicNames(Collections.singletonList("foo")) + .build(); + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder("member2") + .setSubscribedTopicNames(Collections.singletonList("bar")) + .build(); + + ConsumerGroup consumerGroup = createConsumerGroup("group-foo"); + + consumerGroup.updateMember(member1); + consumerGroup.updateMember(member2); + + assertEquals( + mkMap( + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) + ), + consumerGroup.computeSubscriptionMetadata( + null, + null, + image.topics(), + image.cluster() + ) + ); + + assertTrue(consumerGroup.isSubscribedToTopic("foo")); + assertTrue(consumerGroup.isSubscribedToTopic("bar")); + + consumerGroup.removeMember("member1"); + assertFalse(consumerGroup.isSubscribedToTopic("foo")); + + consumerGroup.removeMember("member2"); + assertFalse(consumerGroup.isSubscribedToTopic("bar")); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java index 05afdd26edf..156fdf1b507 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java @@ -31,13 +31,19 @@ import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.OffsetAndMetadata; +import org.apache.kafka.coordinator.group.OffsetExpirationCondition; +import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.HashSet; import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -1085,6 +1091,151 @@ public class GenericGroupTest { assertThrows(GroupIdNotFoundException.class, group::validateDeleteGroup); } + @Test + public void testOffsetExpirationCondition() { + long currentTimestamp = 30000L; + long commitTimestamp = 20000L; + long offsetsRetentionMs = 10000L; + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty()); + MockTime time = new MockTime(); + long currentStateTimestamp = time.milliseconds(); + GenericGroup group = new GenericGroup(new LogContext(), "groupId", EMPTY, time); + + // 1. Test no protocol type. Simple consumer case, Base timestamp based off of last commit timestamp. + Optional<OffsetExpirationCondition> offsetExpirationCondition = group.offsetExpirationCondition(); + assertTrue(offsetExpirationCondition.isPresent()); + + OffsetExpirationConditionImpl condition = (OffsetExpirationConditionImpl) offsetExpirationCondition.get(); + assertEquals(commitTimestamp, condition.baseTimestamp().apply(offsetAndMetadata)); + assertTrue(condition.isOffsetExpired(offsetAndMetadata, currentTimestamp, offsetsRetentionMs)); + + // 2. Test non-consumer protocol type + Empty state. Base timestamp based off of current state timestamp. + JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); + protocols.add(new JoinGroupRequestProtocol() + .setName("range") + .setMetadata(ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic"))).array())); + + GenericGroupMember memberWithNonConsumerProtocol = new GenericGroupMember( + "memberWithNonConsumerProtocol", + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + "My Protocol", + protocols + ); + + group.add(memberWithNonConsumerProtocol); + assertEquals("My Protocol", group.protocolType().get()); + + offsetExpirationCondition = group.offsetExpirationCondition(); + assertTrue(offsetExpirationCondition.isPresent()); + + condition = (OffsetExpirationConditionImpl) offsetExpirationCondition.get(); + assertEquals(currentStateTimestamp, condition.baseTimestamp().apply(offsetAndMetadata)); + assertTrue(condition.isOffsetExpired(offsetAndMetadata, currentStateTimestamp + offsetsRetentionMs, offsetsRetentionMs)); + + // 3. Test non-consumer protocol type + non-Empty state. Do not expire any offsets. + group.transitionTo(PREPARING_REBALANCE); + offsetExpirationCondition = group.offsetExpirationCondition(); + assertFalse(offsetExpirationCondition.isPresent()); + + // 4. Test consumer protocol type + subscribed topics + Stable state. Base timestamp based off of last commit timestamp. + group.remove("memberWithNonConsumerProtocol"); + GenericGroupMember memberWithConsumerProtocol = new GenericGroupMember( + "memberWithConsumerProtocol", + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + "consumer", + protocols + ); + group.add(memberWithConsumerProtocol); + group.initNextGeneration(); + group.transitionTo(STABLE); + assertTrue(group.subscribedTopics().get().contains("topic")); + + offsetExpirationCondition = group.offsetExpirationCondition(); + assertTrue(offsetExpirationCondition.isPresent()); + + condition = (OffsetExpirationConditionImpl) offsetExpirationCondition.get(); + assertEquals(commitTimestamp, condition.baseTimestamp().apply(offsetAndMetadata)); + assertTrue(condition.isOffsetExpired(offsetAndMetadata, currentTimestamp, offsetsRetentionMs)); + + // 5. Test consumer protocol type + subscribed topics + non-Stable state. Do not expire any offsets. + group.transitionTo(PREPARING_REBALANCE); + offsetExpirationCondition = group.offsetExpirationCondition(); + assertFalse(offsetExpirationCondition.isPresent()); + } + + @Test + public void testIsSubscribedToTopic() { + GenericGroup group = new GenericGroup(new LogContext(), "groupId", EMPTY, Time.SYSTEM); + + // 1. group has no protocol type => not subscribed + assertFalse(group.isSubscribedToTopic("topic")); + + // 2. group does not use consumer group protocol type => not subscribed + JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); + protocols.add(new JoinGroupRequestProtocol() + .setName("range") + .setMetadata(ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic"))).array())); + + GenericGroupMember memberWithNonConsumerProtocol = new GenericGroupMember( + "memberWithNonConsumerProtocol", + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + "My Protocol", + protocols + ); + + group.add(memberWithNonConsumerProtocol); + group.transitionTo(PREPARING_REBALANCE); + group.initNextGeneration(); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + assertEquals(Optional.empty(), group.computeSubscribedTopics()); + assertFalse(group.isSubscribedToTopic("topic")); + + // 3. group uses consumer group protocol type but empty members => not subscribed + group.remove("memberWithNonConsumerProtocol"); + GenericGroupMember memberWithConsumerProtocol = new GenericGroupMember( + "memberWithConsumerProtocol", + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + "consumer", + protocols + ); + + group.add(memberWithConsumerProtocol); + group.remove("memberWithConsumerProtocol"); + group.transitionTo(PREPARING_REBALANCE); + group.initNextGeneration(); + assertTrue(group.isInState(EMPTY)); + assertEquals(Optional.of(Collections.emptySet()), group.computeSubscribedTopics()); + assertTrue(group.usesConsumerGroupProtocol()); + assertFalse(group.isSubscribedToTopic("topic")); + + // 4. group uses consumer group protocol type with member subscription => subscribed + group.add(memberWithConsumerProtocol); + group.transitionTo(PREPARING_REBALANCE); + group.initNextGeneration(); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + assertEquals(Optional.of(Collections.singleton("topic")), group.computeSubscribedTopics()); + assertTrue(group.usesConsumerGroupProtocol()); + assertTrue(group.isSubscribedToTopic("topic")); + } + private void assertState(GenericGroup group, GenericGroupState targetState) { Set<GenericGroupState> otherStates = new HashSet<>(); otherStates.add(STABLE);