This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7b5d640cc65 KAFKA-14987; Implement Group/Offset expiration in the new 
coordinator (#14467)
7b5d640cc65 is described below

commit 7b5d640cc656443a078bda096d01910b3edfdb37
Author: Jeff Kim <kimkb2...@gmail.com>
AuthorDate: Thu Oct 12 02:45:13 2023 -0400

    KAFKA-14987; Implement Group/Offset expiration in the new coordinator 
(#14467)
    
    This patch implements the groups and offsets expiration in the new group 
coordinator.
    
    Reviewers: Ritika Reddy <rre...@confluent.io>, David Jacot 
<dja...@confluent.io>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |   4 +-
 .../org/apache/kafka/coordinator/group/Group.java  |  16 +-
 .../coordinator/group/GroupCoordinatorConfig.java  |  27 +++-
 .../coordinator/group/GroupCoordinatorShard.java   |  69 +++++++-
 .../coordinator/group/GroupMetadataManager.java    |  20 +++
 .../group/OffsetExpirationCondition.java           |  36 +++++
 .../group/OffsetExpirationConditionImpl.java       |  62 +++++++
 .../coordinator/group/OffsetMetadataManager.java   |  95 +++++++++--
 .../coordinator/group/consumer/ConsumerGroup.java  |  23 ++-
 .../coordinator/group/generic/GenericGroup.java    |  54 ++++++-
 .../group/GroupCoordinatorConfigTest.java          |  29 +++-
 .../group/GroupCoordinatorServiceTest.java         |   4 +-
 .../group/GroupCoordinatorShardTest.java           | 178 ++++++++++++++++++---
 .../group/GroupMetadataManagerTest.java            |  47 ++++++
 .../group/OffsetExpirationConditionImplTest.java   |  70 ++++++++
 .../group/OffsetMetadataManagerTest.java           | 173 ++++++++++++++++++--
 .../group/consumer/ConsumerGroupTest.java          |  67 ++++++++
 .../group/generic/GenericGroupTest.java            | 151 +++++++++++++++++
 19 files changed, 1063 insertions(+), 64 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index c9601753c1e..678765aa6bf 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -328,7 +328,7 @@
     <suppress checks="ClassFanOutComplexity"
               
files="(GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorService|GroupCoordinatorServiceTest).java"/>
     <suppress checks="ParameterNumber"
-              files="(ConsumerGroupMember|GroupMetadataManager).java"/>
+              
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorConfig).java"/>
     <suppress checks="ClassDataAbstractionCouplingCheck"
               
files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest).java"/>
     <suppress checks="JavaNCSS"
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 96e8dc8a5a1..daec8f73089 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -522,7 +522,9 @@ class BrokerServer(
         config.groupInitialRebalanceDelay,
         GroupCoordinatorConfig.GENERIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS,
         config.groupMinSessionTimeoutMs,
-        config.groupMaxSessionTimeoutMs
+        config.groupMaxSessionTimeoutMs,
+        config.offsetsRetentionCheckIntervalMs,
+        config.offsetsRetentionMinutes * 60 * 1000L
       )
       val timer = new SystemTimerReaper(
         "group-coordinator-reaper",
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index 29a252e47bf..0cb10b12a51 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 
 import java.util.List;
+import java.util.Optional;
 
 /**
  * Interface common for all groups.
@@ -106,7 +107,8 @@ public interface Group {
     /**
      * Returns true if the group is actively subscribed to the topic.
      *
-     * @param topic The topic name.
+     * @param topic  The topic name.
+     *
      * @return Whether the group is subscribed to the topic.
      */
     boolean isSubscribedToTopic(String topic);
@@ -117,4 +119,16 @@ public interface Group {
      * @param records The list of records.
      */
     void createGroupTombstoneRecords(List<Record> records);
+
+    /**
+     * @return Whether the group is in Empty state.
+     */
+    boolean isEmpty();
+
+    /**
+     * See {@link OffsetExpirationCondition}
+     *
+     * @return The offset expiration condition for the group or Empty if no 
such condition exists.
+     */
+    Optional<OffsetExpirationCondition> offsetExpirationCondition();
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index fe31a24524f..330813ee912 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -91,6 +91,27 @@ public class GroupCoordinatorConfig {
      */
     public final int genericGroupMaxSessionTimeoutMs;
 
+    /**
+     * Frequency at which to check for expired offsets.
+     */
+    public final long offsetsRetentionCheckIntervalMs;
+
+    /**
+     * For subscribed consumers, committed offset of a specific partition will 
be expired and discarded when:
+     *     1) This retention period has elapsed after the consumer group loses 
all its consumers (i.e. becomes empty);
+     *     2) This retention period has elapsed since the last time an offset 
is committed for the partition AND
+     *        the group is no longer subscribed to the corresponding topic.
+     *
+     * For standalone consumers (using manual assignment), offsets will be 
expired after this retention period has
+     * elapsed since the time of last commit.
+     *
+     * Note that when a group is deleted via the DeleteGroups request, its 
committed offsets will also be deleted immediately;
+     *
+     * Also, when a topic is deleted via the delete-topic request, upon 
propagated metadata update any group's
+     *     committed offsets for that topic will also be deleted without extra 
retention period.
+     */
+    public final long offsetsRetentionMs;
+
     public GroupCoordinatorConfig(
         int numThreads,
         int consumerGroupSessionTimeoutMs,
@@ -103,7 +124,9 @@ public class GroupCoordinatorConfig {
         int genericGroupInitialRebalanceDelayMs,
         int genericGroupNewMemberJoinTimeoutMs,
         int genericGroupMinSessionTimeoutMs,
-        int genericGroupMaxSessionTimeoutMs
+        int genericGroupMaxSessionTimeoutMs,
+        long offsetsRetentionCheckIntervalMs,
+        long offsetsRetentionMs
     ) {
         this.numThreads = numThreads;
         this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs;
@@ -117,5 +140,7 @@ public class GroupCoordinatorConfig {
         this.genericGroupNewMemberJoinTimeoutMs = 
genericGroupNewMemberJoinTimeoutMs;
         this.genericGroupMinSessionTimeoutMs = genericGroupMinSessionTimeoutMs;
         this.genericGroupMaxSessionTimeoutMs = genericGroupMaxSessionTimeoutMs;
+        this.offsetsRetentionCheckIntervalMs = offsetsRetentionCheckIntervalMs;
+        this.offsetsRetentionMs = offsetsRetentionMs;
     }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index c30d348336b..d0e67c1e693 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -70,6 +70,7 @@ import org.slf4j.Logger;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * The group coordinator shard is a replicated state machine that manages the 
metadata of all
@@ -159,17 +160,26 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<Record> {
                 .withSnapshotRegistry(snapshotRegistry)
                 .withTime(time)
                 .withGroupMetadataManager(groupMetadataManager)
-                .withOffsetMetadataMaxSize(config.offsetMetadataMaxSize)
+                .withGroupCoordinatorConfig(config)
                 .build();
 
             return new GroupCoordinatorShard(
                 logContext,
                 groupMetadataManager,
-                offsetMetadataManager
+                offsetMetadataManager,
+                timer,
+                config
             );
         }
     }
 
+    /**
+     * The group/offsets expiration key to schedule a timer task.
+     *
+     * Visible for testing.
+     */
+    static final String GROUP_EXPIRATION_KEY = "expire-group-metadata";
+
     /**
      * The logger.
      */
@@ -185,6 +195,16 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<Record> {
      */
     private final OffsetMetadataManager offsetMetadataManager;
 
+    /**
+     * The coordinator timer.
+     */
+    private final CoordinatorTimer<Void, Record> timer;
+
+    /**
+     * The group coordinator config.
+     */
+    private final GroupCoordinatorConfig config;
+
     /**
      * Constructor.
      *
@@ -195,11 +215,15 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<Record> {
     GroupCoordinatorShard(
         LogContext logContext,
         GroupMetadataManager groupMetadataManager,
-        OffsetMetadataManager offsetMetadataManager
+        OffsetMetadataManager offsetMetadataManager,
+        CoordinatorTimer<Void, Record> timer,
+        GroupCoordinatorConfig config
     ) {
         this.log = logContext.logger(GroupCoordinatorShard.class);
         this.groupMetadataManager = groupMetadataManager;
         this.offsetMetadataManager = offsetMetadataManager;
+        this.timer = timer;
+        this.config = config;
     }
 
     /**
@@ -435,6 +459,39 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<Record> {
         return offsetMetadataManager.deleteOffsets(request);
     }
 
+    /**
+     * For each group, remove all expired offsets. If all offsets for the 
group are removed and the group is eligible
+     * for deletion, delete the group.
+     *
+     * @return The list of tombstones (offset commit and group metadata) to 
append.
+     */
+    public CoordinatorResult<Void, Record> cleanupGroupMetadata() {
+        List<Record> records = new ArrayList<>();
+        groupMetadataManager.groupIds().forEach(groupId -> {
+            if (offsetMetadataManager.cleanupExpiredOffsets(groupId, records)) 
{
+                groupMetadataManager.maybeDeleteGroup(groupId, records);
+            }
+        });
+
+        // Reschedule the next cycle.
+        scheduleGroupMetadataExpiration();
+        return new CoordinatorResult<>(records);
+    }
+
+    /**
+     * Schedule the group/offsets expiration job. If any exceptions are thrown 
above, the timer will retry.
+     */
+    private void scheduleGroupMetadataExpiration() {
+        timer.schedule(
+            GROUP_EXPIRATION_KEY,
+            config.offsetsRetentionCheckIntervalMs,
+            TimeUnit.MILLISECONDS,
+            true,
+            this::cleanupGroupMetadata
+        );
+    }
+
+
     /**
      * The coordinator has been loaded. This is used to apply any
      * post loading operations (e.g. registering timers).
@@ -448,6 +505,12 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<Record> {
         offsetMetadataManager.onNewMetadataImage(newImage, emptyDelta);
 
         groupMetadataManager.onLoaded();
+        scheduleGroupMetadataExpiration();
+    }
+
+    @Override
+    public void onUnloaded() {
+        timer.cancel(GROUP_EXPIRATION_KEY);
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 7bfe243d026..5974aabbe4c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -3166,6 +3166,19 @@ public class GroupMetadataManager {
         group.validateDeleteGroup();
     }
 
+    /**
+     * Delete the group if it exists and is in Empty state.
+     *
+     * @param groupId The group id.
+     * @param records The list of records to append the group metadata 
tombstone records.
+     */
+    public void maybeDeleteGroup(String groupId, List<Record> records) {
+        Group group = groups.get(groupId);
+        if (group != null && group.isEmpty()) {
+            deleteGroup(groupId, records);
+        }
+    }
+
     /**
      * Checks whether the given protocol type or name in the request is 
inconsistent with the group's.
      *
@@ -3183,6 +3196,13 @@ public class GroupMetadataManager {
             && !groupProtocolTypeOrName.equals(protocolTypeOrName);
     }
 
+    /**
+     * @return The set of all groups' ids.
+     */
+    public Set<String> groupIds() {
+        return Collections.unmodifiableSet(this.groups.keySet());
+    }
+
     /**
      * Generate a generic group heartbeat key for the timer.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java
new file mode 100644
index 00000000000..ce3f299b40a
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+/**
+ * An offset is considered expired based on different factors, such as the 
state of the group
+ * and/or the GroupMetadata record version (for generic groups). This class is 
used to check
+ * how offsets for the group should be expired.
+ */
+public interface OffsetExpirationCondition {
+
+    /**
+     * Given an offset metadata and offsets retention, return whether the 
offset is expired or not.
+     *
+     * @param offset               The offset metadata.
+     * @param currentTimestampMs   The current timestamp.
+     * @param offsetsRetentionMs   The offset retention.
+     *
+     * @return Whether the offset is considered expired or not.
+     */
+    boolean isOffsetExpired(OffsetAndMetadata offset, long currentTimestampMs, 
long offsetsRetentionMs);
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImpl.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImpl.java
new file mode 100644
index 00000000000..cd823108ef3
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import java.util.function.Function;
+
+public class OffsetExpirationConditionImpl implements 
OffsetExpirationCondition {
+
+    /**
+     * Given an offset and metadata, obtain the base timestamp that should be 
used
+     * as the start of the offsets retention period.
+     */
+    private final Function<OffsetAndMetadata, Long> baseTimestamp;
+
+    public OffsetExpirationConditionImpl(Function<OffsetAndMetadata, Long> 
baseTimestamp) {
+        this.baseTimestamp = baseTimestamp;
+    }
+
+    /**
+     * Determine whether an offset is expired. Older versions have an expire 
timestamp per partition. If this
+     * exists, compare against the current timestamp. Otherwise, use the base 
timestamp (either commit timestamp
+     * or current state timestamp if group is empty for generic groups) and 
check whether the offset has
+     * exceeded the offset retention.
+     *
+     * @param offset              The offset and metadata.
+     * @param currentTimestampMs  The current timestamp.
+     * @param offsetsRetentionMs  The offsets retention in milliseconds.
+     *
+     * @return Whether the given offset is expired or not.
+     */
+    @Override
+    public boolean isOffsetExpired(OffsetAndMetadata offset, long 
currentTimestampMs, long offsetsRetentionMs) {
+        if (offset.expireTimestampMs.isPresent()) {
+            // Older versions with explicit expire_timestamp field => old 
expiration semantics is used
+            return currentTimestampMs >= offset.expireTimestampMs.getAsLong();
+        } else {
+            // Current version with no per partition retention
+            return currentTimestampMs - baseTimestamp.apply(offset) >= 
offsetsRetentionMs;
+        }
+    }
+
+    /**
+     * @return The base timestamp.
+     */
+    public Function<OffsetAndMetadata, Long> baseTimestamp() {
+        return this.baseTimestamp;
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 9744e492f49..076ec24476f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.coordinator.group;
 
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.StaleMemberEpochException;
@@ -45,9 +46,13 @@ import org.slf4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
 
@@ -61,13 +66,14 @@ import static 
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSE
  *    handling as well as during the initial loading of the records from the 
partitions.
  */
 public class OffsetMetadataManager {
+
     public static class Builder {
         private LogContext logContext = null;
         private SnapshotRegistry snapshotRegistry = null;
         private Time time = null;
         private GroupMetadataManager groupMetadataManager = null;
-        private int offsetMetadataMaxSize = 4096;
         private MetadataImage metadataImage = null;
+        private GroupCoordinatorConfig config = null;
 
         Builder withLogContext(LogContext logContext) {
             this.logContext = logContext;
@@ -89,8 +95,8 @@ public class OffsetMetadataManager {
             return this;
         }
 
-        Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
-            this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+        Builder withGroupCoordinatorConfig(GroupCoordinatorConfig config) {
+            this.config = config;
             return this;
         }
 
@@ -115,7 +121,7 @@ public class OffsetMetadataManager {
                 time,
                 metadataImage,
                 groupMetadataManager,
-                offsetMetadataMaxSize
+                config
             );
         }
     }
@@ -146,9 +152,9 @@ public class OffsetMetadataManager {
     private final GroupMetadataManager groupMetadataManager;
 
     /**
-     * The maximum allowed metadata for any offset commit.
+     * The group coordinator config.
      */
-    private final int offsetMetadataMaxSize;
+    private final GroupCoordinatorConfig config;
 
     /**
      * The offsets keyed by group id, topic name and partition id.
@@ -161,14 +167,14 @@ public class OffsetMetadataManager {
         Time time,
         MetadataImage metadataImage,
         GroupMetadataManager groupMetadataManager,
-        int offsetMetadataMaxSize
+        GroupCoordinatorConfig config
     ) {
         this.snapshotRegistry = snapshotRegistry;
         this.log = logContext.logger(OffsetMetadataManager.class);
         this.time = time;
         this.metadataImage = metadataImage;
         this.groupMetadataManager = groupMetadataManager;
-        this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+        this.config = config;
         this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
     }
 
@@ -316,7 +322,7 @@ public class OffsetMetadataManager {
             response.topics().add(topicResponse);
 
             topic.partitions().forEach(partition -> {
-                if (partition.committedMetadata() != null && 
partition.committedMetadata().length() > offsetMetadataMaxSize) {
+                if (partition.committedMetadata() != null && 
partition.committedMetadata().length() > config.offsetMetadataMaxSize) {
                     topicResponse.partitions().add(new 
OffsetCommitResponsePartition()
                         .setPartitionIndex(partition.partitionIndex())
                         
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
@@ -544,6 +550,77 @@ public class OffsetMetadataManager {
             .setTopics(topicResponses);
     }
 
+    /**
+     * Remove expired offsets for the given group.
+     *
+     * @param groupId The group id.
+     * @param records The list of records to populate with offset commit 
tombstone records.
+     *
+     * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+     */
+    public boolean cleanupExpiredOffsets(String groupId, List<Record> records) 
{
+        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
offsetsByTopic = offsetsByGroup.get(groupId);
+        if (offsetsByTopic == null) {
+            return true;
+        }
+
+        // We expect the group to exist.
+        Group group = groupMetadataManager.group(groupId);
+        Set<String> expiredPartitions = new HashSet<>();
+        long currentTimestampMs = time.milliseconds();
+        Optional<OffsetExpirationCondition> offsetExpirationCondition = 
group.offsetExpirationCondition();
+
+        if (!offsetExpirationCondition.isPresent()) {
+            return false;
+        }
+
+        AtomicBoolean allOffsetsExpired = new AtomicBoolean(true);
+        OffsetExpirationCondition condition = offsetExpirationCondition.get();
+
+        offsetsByTopic.forEach((topic, partitions) -> {
+            if (!group.isSubscribedToTopic(topic)) {
+                partitions.forEach((partition, offsetAndMetadata) -> {
+                    if (condition.isOffsetExpired(offsetAndMetadata, 
currentTimestampMs, config.offsetsRetentionMs)) {
+                        
expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, 
records).toString());
+                    } else {
+                        allOffsetsExpired.set(false);
+                    }
+                });
+            } else {
+                allOffsetsExpired.set(false);
+            }
+        });
+
+        if (!expiredPartitions.isEmpty()) {
+            log.info("[GroupId {}] Expiring offsets of partitions 
(allOffsetsExpired={}): {}",
+                groupId, allOffsetsExpired, String.join(", ", 
expiredPartitions));
+        }
+
+        return allOffsetsExpired.get();
+    }
+
+    /**
+     * Add an offset commit tombstone record for the group.
+     *
+     * @param groupId   The group id.
+     * @param topic     The topic name.
+     * @param partition The partition.
+     * @param records   The list of records to append the tombstone.
+     *
+     * @return The topic partition of the corresponding tombstone.
+     */
+    private TopicPartition appendOffsetCommitTombstone(
+        String groupId,
+        String topic,
+        int partition, 
+        List<Record> records
+    ) {
+        records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
topic, partition));
+        TopicPartition tp = new TopicPartition(topic, partition);
+        log.trace("[GroupId {}] Removing expired offset and metadata for {}", 
groupId, tp);
+        return tp;
+    }
+
     /**
      * Replays OffsetCommitKey/Value to update or delete the corresponding 
offsets.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index 21f6f8124cd..e5144953a97 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -24,6 +24,8 @@ import 
org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
 import org.apache.kafka.coordinator.group.Record;
 import org.apache.kafka.coordinator.group.RecordHelpers;
 import org.apache.kafka.image.ClusterImage;
@@ -349,9 +351,11 @@ public class ConsumerGroup implements Group {
     /**
      * Returns true if the consumer group is actively subscribed to the topic.
      *
-     * @param topic The topic name.
-     * @return whether the group is subscribed to the topic.
+     * @param topic  The topic name.
+     *
+     * @return Whether the group is subscribed to the topic.
      */
+    @Override
     public boolean isSubscribedToTopic(String topic) {
         return subscribedTopicNames.containsKey(topic);
     }
@@ -635,6 +639,21 @@ public class ConsumerGroup implements Group {
         records.add(RecordHelpers.newGroupEpochTombstoneRecord(groupId()));
     }
 
+    @Override
+    public boolean isEmpty() {
+        return state() == ConsumerGroupState.EMPTY;
+    }
+
+    /**
+     * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+     *
+     * @return The offset expiration condition for the group or Empty if no 
such condition exists.
+     */
+    @Override
+    public Optional<OffsetExpirationCondition> offsetExpirationCondition() {
+        return Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata 
-> offsetAndMetadata.commitTimestampMs));
+    }
+
     /**
      * Throws a StaleMemberEpochException if the received member epoch does 
not match
      * the expected member epoch.
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
index 88ec52727fa..4b408e0484a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
@@ -33,6 +33,8 @@ import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
 import org.apache.kafka.coordinator.group.Record;
 import org.apache.kafka.coordinator.group.RecordHelpers;
 import org.slf4j.Logger;
@@ -898,6 +900,46 @@ public class GenericGroup implements Group {
         records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
     }
 
+    @Override
+    public boolean isEmpty() {
+        return isInState(EMPTY);
+    }
+
+    /**
+     * Return the offset expiration condition to be used for this group. This 
is based on several factors
+     * such as the group state, the protocol type, and the GroupMetadata 
record version.
+     *
+     * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+     *
+     * @return The offset expiration condition for the group or Empty if no 
such condition exists.
+     */
+    @Override
+    public Optional<OffsetExpirationCondition> offsetExpirationCondition() {
+        if (protocolType.isPresent()) {
+            if (isInState(EMPTY)) {
+                // No members exist in the group =>
+                // - If current state timestamp exists and retention period 
has passed since group became Empty,
+                //   expire all offsets with no pending offset commit;
+                // - If there is no current state timestamp (old group 
metadata schema) and retention period has passed
+                //   since the last commit timestamp, expire the offset
+                return Optional.of(new OffsetExpirationConditionImpl(
+                    offsetAndMetadata -> 
currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs))
+                );
+            } else if (usesConsumerGroupProtocol() && 
subscribedTopics.isPresent() && isInState(STABLE)) {
+                // Consumers exist in the group and group is Stable =>
+                // - If the group is aware of the subscribed topics and 
retention period has passed since the
+                //   last commit timestamp, expire the offset.
+                return Optional.of(new 
OffsetExpirationConditionImpl(offsetAndMetadata -> 
offsetAndMetadata.commitTimestampMs));
+            }
+        } else {
+            // protocolType is None => standalone (simple) consumer, that uses 
Kafka for offset storage. Only
+            // expire offsets where retention period has passed since their 
last commit.
+            return Optional.of(new 
OffsetExpirationConditionImpl(offsetAndMetadata -> 
offsetAndMetadata.commitTimestampMs));
+        }
+        // If none of the conditions above are met, do not expire any offsets.
+        return Optional.empty();
+    }
+
     /**
      * Verify the member id is up to date for static members. Return true if 
both conditions met:
      *   1. given member is a known static member to group
@@ -1063,16 +1105,18 @@ public class GenericGroup implements Group {
     }
 
     /**
-     * Returns true if the consumer group is actively subscribed to the topic. 
When the consumer
-     * group does not know, because the information is not available yet or 
because it has
-     * failed to parse the Consumer Protocol, it returns true to be safe.
+     * Returns true if the generic group is actively subscribed to the topic. 
When the generic group does not know,
+     * because the information is not available yet or because it has failed 
to parse the Consumer Protocol, we
+     * consider the group not subscribed to the topic if the group is not 
using any protocol or not using the
+     * consumer group protocol.
+     *
+     * @param topic  The topic name.
      *
-     * @param topic The topic name.
      * @return whether the group is subscribed to the topic.
      */
     public boolean isSubscribedToTopic(String topic) {
         return subscribedTopics.map(topics -> topics.contains(topic))
-            .orElse(true);
+            .orElse(usesConsumerGroupProtocol());
     }
 
     /**
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 1d31bd50845..f26b5ce6306 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -40,7 +40,9 @@ public class GroupCoordinatorConfigTest {
             3000,
             5 * 60 * 1000,
             120,
-            10 * 60 * 1000
+            10 * 60 * 1000,
+            600000L,
+            24 * 60 * 60 * 1000L
         );
 
         assertEquals(10, config.numThreads);
@@ -55,5 +57,30 @@ public class GroupCoordinatorConfigTest {
         assertEquals(5 * 60 * 1000, config.genericGroupNewMemberJoinTimeoutMs);
         assertEquals(120, config.genericGroupMinSessionTimeoutMs);
         assertEquals(10 * 60 * 1000, config.genericGroupMaxSessionTimeoutMs);
+        assertEquals(10 * 60 * 1000, config.offsetsRetentionCheckIntervalMs);
+        assertEquals(24 * 60 * 60 * 1000L, config.offsetsRetentionMs);
+    }
+
+    public static GroupCoordinatorConfig createGroupCoordinatorConfig(
+        int offsetMetadataMaxSize,
+        long offsetsRetentionCheckIntervalMs,
+        long offsetsRetentionMs
+    ) {
+        return new GroupCoordinatorConfig(
+            1,
+            45,
+            5,
+            Integer.MAX_VALUE,
+            Collections.singletonList(new RangeAssignor()),
+            1000,
+            offsetMetadataMaxSize,
+            Integer.MAX_VALUE,
+            3000,
+            5 * 60 * 1000,
+            120,
+            10 * 5 * 1000,
+            offsetsRetentionCheckIntervalMs,
+            offsetsRetentionMs
+        );
     }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index d54d574f9e5..9a18db8f202 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -117,7 +117,9 @@ public class GroupCoordinatorServiceTest {
             3000,
             5 * 60 * 1000,
             120,
-            10 * 5 * 1000
+            10 * 5 * 1000,
+            600000L,
+            24 * 60 * 1000L
         );
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 1ed931c029d..c95e9459285 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -25,6 +25,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.RequestContext;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
@@ -45,6 +47,7 @@ import 
org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatchers;
 
 import java.util.ArrayList;
@@ -52,9 +55,14 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_EXPIRATION_KEY;
 import static org.apache.kafka.coordinator.group.TestUtil.requestContext;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -75,7 +83,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         RequestContext context = 
requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT);
@@ -100,7 +110,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         RequestContext context = requestContext(ApiKeys.OFFSET_COMMIT);
@@ -125,7 +137,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
@@ -177,7 +191,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
@@ -240,7 +256,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         OffsetCommitKey key = new OffsetCommitKey();
@@ -266,7 +284,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         OffsetCommitKey key = new OffsetCommitKey();
@@ -291,7 +311,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey();
@@ -312,7 +334,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey();
@@ -332,7 +356,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         ConsumerGroupPartitionMetadataKey key = new 
ConsumerGroupPartitionMetadataKey();
@@ -353,7 +379,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         ConsumerGroupPartitionMetadataKey key = new 
ConsumerGroupPartitionMetadataKey();
@@ -373,7 +401,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         ConsumerGroupMemberMetadataKey key = new 
ConsumerGroupMemberMetadataKey();
@@ -394,7 +424,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         ConsumerGroupMemberMetadataKey key = new 
ConsumerGroupMemberMetadataKey();
@@ -414,7 +446,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         ConsumerGroupTargetAssignmentMetadataKey key = new 
ConsumerGroupTargetAssignmentMetadataKey();
@@ -435,7 +469,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         ConsumerGroupTargetAssignmentMetadataKey key = new 
ConsumerGroupTargetAssignmentMetadataKey();
@@ -455,7 +491,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         ConsumerGroupTargetAssignmentMemberKey key = new 
ConsumerGroupTargetAssignmentMemberKey();
@@ -476,7 +514,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         ConsumerGroupTargetAssignmentMemberKey key = new 
ConsumerGroupTargetAssignmentMemberKey();
@@ -496,7 +536,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         ConsumerGroupCurrentMemberAssignmentKey key = new 
ConsumerGroupCurrentMemberAssignmentKey();
@@ -517,7 +559,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         ConsumerGroupCurrentMemberAssignmentKey key = new 
ConsumerGroupCurrentMemberAssignmentKey();
@@ -537,7 +581,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         assertThrows(NullPointerException.class, () -> coordinator.replay(new 
Record(null, null)));
@@ -550,7 +596,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         ConsumerGroupCurrentMemberAssignmentKey key = new 
ConsumerGroupCurrentMemberAssignmentKey();
@@ -570,7 +618,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         coordinator.onLoaded(image);
@@ -590,7 +640,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         GroupMetadataKey key = new GroupMetadataKey();
@@ -611,7 +663,9 @@ public class GroupCoordinatorShardTest {
         GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
             new LogContext(),
             groupMetadataManager,
-            offsetMetadataManager
+            offsetMetadataManager,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class)
         );
 
         GroupMetadataKey key = new GroupMetadataKey();
@@ -623,4 +677,82 @@ public class GroupCoordinatorShardTest {
 
         verify(groupMetadataManager, times(1)).replay(key, null);
     }
+
+    @Test
+    public void testScheduleCleanupGroupMetadata() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        Time mockTime = new MockTime();
+        MockCoordinatorTimer<Void, Record> timer = new 
MockCoordinatorTimer<>(mockTime);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            timer,
+            GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 
1000L, 24 * 60)
+        );
+        MetadataImage image = MetadataImage.EMPTY;
+
+        // Confirm the cleanup is scheduled when the coordinator is initially 
loaded.
+        coordinator.onLoaded(image);
+        assertTrue(timer.contains(GROUP_EXPIRATION_KEY));
+
+        // Confirm that it is rescheduled after completion.
+        mockTime.sleep(1000L);
+        List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> tasks = 
timer.poll();
+        assertEquals(1, tasks.size());
+        assertTrue(timer.contains(GROUP_EXPIRATION_KEY));
+
+        coordinator.onUnloaded();
+        assertFalse(timer.contains(GROUP_EXPIRATION_KEY));
+    }
+
+    @Test
+    public void testCleanupGroupMetadata() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        MockCoordinatorTimer<Void, Record> timer = new 
MockCoordinatorTimer<>(new MockTime());
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            timer,
+            mock(GroupCoordinatorConfig.class)
+        );
+
+        Record offsetCommitTombstone = 
RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "topic", 0);
+        Record groupMetadataTombstone = 
RecordHelpers.newGroupMetadataTombstoneRecord("group-id");
+
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<List<Record>> recordsCapture = 
ArgumentCaptor.forClass(List.class);
+
+        when(groupMetadataManager.groupIds()).thenReturn(mkSet("group-id", 
"other-group-id"));
+        when(offsetMetadataManager.cleanupExpiredOffsets(eq("group-id"), 
recordsCapture.capture()))
+            .thenAnswer(invocation -> {
+                List<Record> records = recordsCapture.getValue();
+                records.add(offsetCommitTombstone);
+                return true;
+            });
+        when(offsetMetadataManager.cleanupExpiredOffsets("other-group-id", 
Collections.emptyList())).thenReturn(false);
+        doAnswer(invocation -> {
+            List<Record> records = recordsCapture.getValue();
+            records.add(groupMetadataTombstone);
+            return null;
+        }).when(groupMetadataManager).maybeDeleteGroup(eq("group-id"), 
recordsCapture.capture());
+
+        assertFalse(timer.contains(GROUP_EXPIRATION_KEY));
+        CoordinatorResult<Void, Record> result = 
coordinator.cleanupGroupMetadata();
+        assertTrue(timer.contains(GROUP_EXPIRATION_KEY));
+
+        List<Record> expectedRecords = Arrays.asList(offsetCommitTombstone, 
groupMetadataTombstone);
+        assertEquals(expectedRecords, result.records());
+        assertNull(result.response());
+        assertNull(result.appendFuture());
+
+        verify(groupMetadataManager, times(1)).groupIds();
+        verify(offsetMetadataManager, 
times(1)).cleanupExpiredOffsets(eq("group-id"), any());
+        verify(offsetMetadataManager, 
times(1)).cleanupExpiredOffsets(eq("other-group-id"), any());
+        verify(groupMetadataManager, 
times(1)).maybeDeleteGroup(eq("group-id"), any());
+        verify(groupMetadataManager, 
times(0)).maybeDeleteGroup(eq("other-group-id"), any());
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index f299705016d..ee44304072e 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -9494,6 +9494,27 @@ public class GroupMetadataManagerTest {
         assertEquals(expectedRecords, records);
     }
 
+    @Test
+    public void testGenericGroupMaybeDelete() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        List<Record> expectedRecords = 
Collections.singletonList(RecordHelpers.newGroupMetadataTombstoneRecord("group-id"));
+        List<Record> records = new ArrayList<>();
+        context.groupMetadataManager.maybeDeleteGroup("group-id", records);
+        assertEquals(expectedRecords, records);
+
+        records = new ArrayList<>();
+        group.transitionTo(PREPARING_REBALANCE);
+        context.groupMetadataManager.maybeDeleteGroup("group-id", records);
+        assertEquals(Collections.emptyList(), records);
+
+        records = new ArrayList<>();
+        context.groupMetadataManager.maybeDeleteGroup("invalid-group-id", 
records);
+        assertEquals(Collections.emptyList(), records);
+    }
+
     @Test
     public void testConsumerGroupDelete() {
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
@@ -9510,6 +9531,32 @@ public class GroupMetadataManagerTest {
         assertEquals(expectedRecords, records);
     }
 
+    @Test
+    public void testConsumerGroupMaybeDelete() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group-id", true);
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newTargetAssignmentEpochTombstoneRecord("group-id"),
+            
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord("group-id"),
+            RecordHelpers.newGroupEpochTombstoneRecord("group-id")
+        );
+        List<Record> records = new ArrayList<>();
+        context.groupMetadataManager.maybeDeleteGroup("group-id", records);
+        assertEquals(expectedRecords, records);
+
+        records = new ArrayList<>();
+        group.updateMember(new ConsumerGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setTargetMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .build()
+        );
+        context.groupMetadataManager.maybeDeleteGroup("group-id", records);
+        assertEquals(Collections.emptyList(), records);
+    }
+
     private static void assertNoOrEmptyResult(List<ExpiredTimeout<Void, 
Record>> timeouts) {
         assertTrue(timeouts.size() <= 1);
         timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT, 
timeout.result));
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java
new file mode 100644
index 00000000000..2f1cb354a5a
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class OffsetExpirationConditionImplTest {
+
+    @Test
+    public void testIsOffsetExpired() {
+        long currentTimestamp = 1500L;
+        long commitTimestamp = 500L;
+        OptionalLong expireTimestampMs = OptionalLong.of(1500);
+        long offsetsRetentionMs = 500L;
+
+        OffsetExpirationConditionImpl condition = new 
OffsetExpirationConditionImpl(__ -> commitTimestamp);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(
+            100,
+            OptionalInt.of(1),
+            "metadata",
+            commitTimestamp,
+            expireTimestampMs
+        );
+
+        // Test when expire timestamp exists (older versions with per 
partition retention)
+        // 1. Current timestamp >= expire timestamp => should expire
+        assertTrue(condition.isOffsetExpired(offsetAndMetadata, 
currentTimestamp, offsetsRetentionMs));
+
+        // 2. Current timestamp < expire timestamp => should not expire
+        currentTimestamp = 499;
+        assertFalse(condition.isOffsetExpired(offsetAndMetadata, 
currentTimestamp, offsetsRetentionMs));
+
+        // Test when expire timestamp does not exist (current version with no 
per partition retention)
+        offsetAndMetadata = new OffsetAndMetadata(
+            100,
+            OptionalInt.of(1),
+            "metadata",
+            commitTimestamp,
+            OptionalLong.empty()
+        );
+
+        // 3. Current timestamp - base timestamp >= offsets retention => 
should expire
+        currentTimestamp = 1000L;
+        assertTrue(condition.isOffsetExpired(offsetAndMetadata, 
currentTimestamp, offsetsRetentionMs));
+
+        // 4. Current timestamp - base timestamp < offsets retention => should 
not expire
+        currentTimestamp = 999L;
+        assertFalse(condition.isOffsetExpired(offsetAndMetadata, 
currentTimestamp, offsetsRetentionMs));
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 5598a74e50e..82fca8b0e25 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -76,33 +76,51 @@ import static 
org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class OffsetMetadataManagerTest {
     static class OffsetMetadataManagerTestContext {
         public static class Builder {
-            final private MockTime time = new MockTime();
-            final private MockCoordinatorTimer<Void, Record> timer = new 
MockCoordinatorTimer<>(time);
-            final private LogContext logContext = new LogContext();
-            final private SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
+            private final MockTime time = new MockTime();
+            private final MockCoordinatorTimer<Void, Record> timer = new 
MockCoordinatorTimer<>(time);
+            private final LogContext logContext = new LogContext();
+            private final SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
+            private GroupMetadataManager groupMetadataManager = null;
             private MetadataImage metadataImage = null;
-            private int offsetMetadataMaxSize = 4096;
+            private GroupCoordinatorConfig config = null;
 
             Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
-                this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+                config = 
GroupCoordinatorConfigTest.createGroupCoordinatorConfig(offsetMetadataMaxSize, 
60000L, 24 * 60 * 1000);
+                return this;
+            }
+
+            Builder withOffsetsRetentionMs(long offsetsRetentionMs) {
+                config = 
GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, 
offsetsRetentionMs);
+                return this;
+            }
+
+            Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+                this.groupMetadataManager = groupMetadataManager;
                 return this;
             }
 
             OffsetMetadataManagerTestContext build() {
                 if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+                if (config == null) {
+                    config = 
GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, 24 * 60 * 
1000);
+                }
 
-                GroupMetadataManager groupMetadataManager = new 
GroupMetadataManager.Builder()
-                    .withTime(time)
-                    .withTimer(timer)
-                    .withSnapshotRegistry(snapshotRegistry)
-                    .withLogContext(logContext)
-                    .withMetadataImage(metadataImage)
-                    .withConsumerGroupAssignors(Collections.singletonList(new 
RangeAssignor()))
-                    .build();
+                if (groupMetadataManager == null) {
+                    groupMetadataManager = new GroupMetadataManager.Builder()
+                        .withTime(time)
+                        .withTimer(timer)
+                        .withSnapshotRegistry(snapshotRegistry)
+                        .withLogContext(logContext)
+                        .withMetadataImage(metadataImage)
+                        
.withConsumerGroupAssignors(Collections.singletonList(new RangeAssignor()))
+                        .build();
+                }
 
                 OffsetMetadataManager offsetMetadataManager = new 
OffsetMetadataManager.Builder()
                     .withTime(time)
@@ -110,7 +128,7 @@ public class OffsetMetadataManagerTest {
                     .withSnapshotRegistry(snapshotRegistry)
                     .withMetadataImage(metadataImage)
                     .withGroupMetadataManager(groupMetadataManager)
-                    .withOffsetMetadataMaxSize(offsetMetadataMaxSize)
+                    .withGroupCoordinatorConfig(config)
                     .build();
 
                 return new OffsetMetadataManagerTestContext(
@@ -203,6 +221,15 @@ public class OffsetMetadataManagerTest {
             return numDeletedOffsets;
         }
 
+        public boolean cleanupExpiredOffsets(String groupId, List<Record> 
records) {
+            List<Record> addedRecords = new ArrayList<>();
+            boolean isOffsetsEmptyForGroup = 
offsetMetadataManager.cleanupExpiredOffsets(groupId, addedRecords);
+            addedRecords.forEach(this::replay);
+
+            records.addAll(addedRecords);
+            return isOffsetsEmptyForGroup;
+        }
+
         public List<OffsetFetchResponseData.OffsetFetchResponseTopics> 
fetchOffsets(
             String groupId,
             List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
@@ -282,6 +309,18 @@ public class OffsetMetadataManagerTest {
             int partition,
             long offset,
             int leaderEpoch
+        ) {
+            commitOffset(groupId, topic, partition, offset, leaderEpoch, 
time.milliseconds());
+
+        }
+
+        public void commitOffset(
+            String groupId,
+            String topic,
+            int partition,
+            long offset,
+            int leaderEpoch,
+            long commitTimestamp
         ) {
             replay(RecordHelpers.newOffsetCommitRecord(
                 groupId,
@@ -291,7 +330,7 @@ public class OffsetMetadataManagerTest {
                     offset,
                     OptionalInt.of(leaderEpoch),
                     "metadata",
-                    time.milliseconds(),
+                    commitTimestamp,
                     OptionalLong.empty()
                 ),
                 MetadataVersion.latest()
@@ -1763,6 +1802,108 @@ public class OffsetMetadataManagerTest {
         assertEquals(3, numDeleteOffsets);
     }
 
+    @Test
+    public void testCleanupExpiredOffsetsGroupHasNoOffsets() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+            .build();
+
+        List<Record> records = new ArrayList<>();
+        assertTrue(context.cleanupExpiredOffsets("unknown-group-id", records));
+        assertEquals(Collections.emptyList(), records);
+    }
+
+    @Test
+    public void testCleanupExpiredOffsetsGroupDoesNotExist() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+            .withGroupMetadataManager(groupMetadataManager)
+            .build();
+
+        
when(groupMetadataManager.group("unknown-group-id")).thenThrow(GroupIdNotFoundException.class);
+        context.commitOffset("unknown-group-id", "topic", 0, 100L, 0);
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.cleanupExpiredOffsets("unknown-group-id", new ArrayList<>()));
+    }
+
+    @Test
+    public void testCleanupExpiredOffsetsEmptyOffsetExpirationCondition() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        Group group = mock(Group.class);
+
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+            .withGroupMetadataManager(groupMetadataManager)
+            .build();
+
+        context.commitOffset("group-id", "topic", 0, 100L, 0);
+
+        when(groupMetadataManager.group("group-id")).thenReturn(group);
+        when(group.offsetExpirationCondition()).thenReturn(Optional.empty());
+
+        List<Record> records = new ArrayList<>();
+        assertFalse(context.cleanupExpiredOffsets("group-id", records));
+        assertEquals(Collections.emptyList(), records);
+    }
+
+    @Test
+    public void testCleanupExpiredOffsets() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        Group group = mock(Group.class);
+
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+            .withGroupMetadataManager(groupMetadataManager)
+            .withOffsetsRetentionMs(1000)
+            .build();
+
+        long commitTimestamp = context.time.milliseconds();
+
+        context.commitOffset("group-id", "firstTopic", 0, 100L, 0, 
commitTimestamp);
+        context.commitOffset("group-id", "secondTopic", 0, 100L, 0, 
commitTimestamp);
+        context.commitOffset("group-id", "secondTopic", 1, 100L, 0, 
commitTimestamp + 500);
+
+        context.time.sleep(1000);
+
+        // firstTopic-0: group is still subscribed to firstTopic. Do not 
expire.
+        // secondTopic-0: should expire as offset retention has passed.
+        // secondTopic-1: has not passed offset retention. Do not expire.
+        List<Record> expectedRecords = Collections.singletonList(
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id", 
"secondTopic", 0)
+        );
+
+        when(groupMetadataManager.group("group-id")).thenReturn(group);
+        when(group.offsetExpirationCondition()).thenReturn(Optional.of(
+            new OffsetExpirationConditionImpl(offsetAndMetadata -> 
offsetAndMetadata.commitTimestampMs)));
+        when(group.isSubscribedToTopic("firstTopic")).thenReturn(true);
+        when(group.isSubscribedToTopic("secondTopic")).thenReturn(false);
+
+        List<Record> records = new ArrayList<>();
+        assertFalse(context.cleanupExpiredOffsets("group-id", records));
+        assertEquals(expectedRecords, records);
+
+        // Expire secondTopic-1.
+        context.time.sleep(500);
+        expectedRecords = Collections.singletonList(
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id", 
"secondTopic", 1)
+        );
+
+        records = new ArrayList<>();
+        assertFalse(context.cleanupExpiredOffsets("group-id", records));
+        assertEquals(expectedRecords, records);
+
+        // Add 2 more commits, then expire all.
+        when(group.isSubscribedToTopic("firstTopic")).thenReturn(false);
+        context.commitOffset("group-id", "firstTopic", 1, 100L, 0, 
commitTimestamp + 500);
+        context.commitOffset("group-id", "secondTopic", 0, 101L, 0, 
commitTimestamp + 500);
+
+        expectedRecords = Arrays.asList(
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id", 
"firstTopic", 0),
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id", 
"firstTopic", 1),
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id", 
"secondTopic", 0)
+        );
+
+        records = new ArrayList<>();
+        assertTrue(context.cleanupExpiredOffsets("group-id", records));
+        assertEquals(expectedRecords, records);
+    }
+
     static private OffsetFetchResponseData.OffsetFetchResponsePartitions 
mkOffsetPartitionResponse(
         int partition,
         long offset,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
index 210aa8eb901..df643a5d2f0 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -23,6 +23,9 @@ import 
org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.coordinator.group.GroupMetadataManagerTest;
+import org.apache.kafka.coordinator.group.OffsetAndMetadata;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
@@ -30,6 +33,8 @@ import org.junit.jupiter.api.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -703,4 +708,66 @@ public class ConsumerGroupTest {
         assertEquals(ConsumerGroup.ConsumerGroupState.STABLE, 
consumerGroup.state());
         assertThrows(GroupNotEmptyException.class, 
consumerGroup::validateDeleteGroup);
     }
+
+    @Test
+    public void testOffsetExpirationCondition() {
+        long currentTimestamp = 30000L;
+        long commitTimestamp = 20000L;
+        long offsetsRetentionMs = 10000L;
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, 
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
+        ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new 
LogContext()), "group-id");
+
+        Optional<OffsetExpirationCondition> offsetExpirationCondition = 
group.offsetExpirationCondition();
+        assertTrue(offsetExpirationCondition.isPresent());
+
+        OffsetExpirationConditionImpl condition = 
(OffsetExpirationConditionImpl) offsetExpirationCondition.get();
+        assertEquals(commitTimestamp, 
condition.baseTimestamp().apply(offsetAndMetadata));
+        assertTrue(condition.isOffsetExpired(offsetAndMetadata, 
currentTimestamp, offsetsRetentionMs));
+    }
+
+    @Test
+    public void testIsSubscribedToTopic() {
+        Uuid fooTopicId = Uuid.randomUuid();
+        Uuid barTopicId = Uuid.randomUuid();
+
+        MetadataImage image = new 
GroupMetadataManagerTest.MetadataImageBuilder()
+            .addTopic(fooTopicId, "foo", 1)
+            .addTopic(barTopicId, "bar", 2)
+            .addRacks()
+            .build();
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder("member1")
+            .setSubscribedTopicNames(Collections.singletonList("foo"))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder("member2")
+            .setSubscribedTopicNames(Collections.singletonList("bar"))
+            .build();
+
+        ConsumerGroup consumerGroup = createConsumerGroup("group-foo");
+
+        consumerGroup.updateMember(member1);
+        consumerGroup.updateMember(member2);
+
+        assertEquals(
+            mkMap(
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1))),
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2)))
+            ),
+            consumerGroup.computeSubscriptionMetadata(
+                null,
+                null,
+                image.topics(),
+                image.cluster()
+            )
+        );
+
+        assertTrue(consumerGroup.isSubscribedToTopic("foo"));
+        assertTrue(consumerGroup.isSubscribedToTopic("bar"));
+
+        consumerGroup.removeMember("member1");
+        assertFalse(consumerGroup.isSubscribedToTopic("foo"));
+
+        consumerGroup.removeMember("member2");
+        assertFalse(consumerGroup.isSubscribedToTopic("bar"));
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
index 05afdd26edf..156fdf1b507 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
@@ -31,13 +31,19 @@ import 
org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.OffsetAndMetadata;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
@@ -1085,6 +1091,151 @@ public class GenericGroupTest {
         assertThrows(GroupIdNotFoundException.class, 
group::validateDeleteGroup);
     }
 
+    @Test
+    public void testOffsetExpirationCondition() {
+        long currentTimestamp = 30000L;
+        long commitTimestamp = 20000L;
+        long offsetsRetentionMs = 10000L;
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, 
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
+        MockTime time = new MockTime();
+        long currentStateTimestamp = time.milliseconds();
+        GenericGroup group = new GenericGroup(new LogContext(), "groupId", 
EMPTY, time);
+
+        // 1. Test no protocol type. Simple consumer case, Base timestamp 
based off of last commit timestamp.
+        Optional<OffsetExpirationCondition> offsetExpirationCondition = 
group.offsetExpirationCondition();
+        assertTrue(offsetExpirationCondition.isPresent());
+
+        OffsetExpirationConditionImpl condition = 
(OffsetExpirationConditionImpl) offsetExpirationCondition.get();
+        assertEquals(commitTimestamp, 
condition.baseTimestamp().apply(offsetAndMetadata));
+        assertTrue(condition.isOffsetExpired(offsetAndMetadata, 
currentTimestamp, offsetsRetentionMs));
+
+        // 2. Test non-consumer protocol type + Empty state. Base timestamp 
based off of current state timestamp.
+        JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(ConsumerProtocol.serializeSubscription(
+                new 
ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic"))).array()));
+
+        GenericGroupMember memberWithNonConsumerProtocol = new 
GenericGroupMember(
+            "memberWithNonConsumerProtocol",
+            Optional.empty(),
+            clientId,
+            clientHost,
+            rebalanceTimeoutMs,
+            sessionTimeoutMs,
+            "My Protocol",
+            protocols
+        );
+
+        group.add(memberWithNonConsumerProtocol);
+        assertEquals("My Protocol", group.protocolType().get());
+
+        offsetExpirationCondition = group.offsetExpirationCondition();
+        assertTrue(offsetExpirationCondition.isPresent());
+
+        condition = (OffsetExpirationConditionImpl) 
offsetExpirationCondition.get();
+        assertEquals(currentStateTimestamp, 
condition.baseTimestamp().apply(offsetAndMetadata));
+        assertTrue(condition.isOffsetExpired(offsetAndMetadata, 
currentStateTimestamp + offsetsRetentionMs, offsetsRetentionMs));
+
+        // 3. Test non-consumer protocol type + non-Empty state. Do not expire 
any offsets.
+        group.transitionTo(PREPARING_REBALANCE);
+        offsetExpirationCondition = group.offsetExpirationCondition();
+        assertFalse(offsetExpirationCondition.isPresent());
+
+        // 4. Test consumer protocol type + subscribed topics + Stable state. 
Base timestamp based off of last commit timestamp.
+        group.remove("memberWithNonConsumerProtocol");
+        GenericGroupMember memberWithConsumerProtocol = new GenericGroupMember(
+            "memberWithConsumerProtocol",
+            Optional.empty(),
+            clientId,
+            clientHost,
+            rebalanceTimeoutMs,
+            sessionTimeoutMs,
+            "consumer",
+            protocols
+        );
+        group.add(memberWithConsumerProtocol);
+        group.initNextGeneration();
+        group.transitionTo(STABLE);
+        assertTrue(group.subscribedTopics().get().contains("topic"));
+
+        offsetExpirationCondition = group.offsetExpirationCondition();
+        assertTrue(offsetExpirationCondition.isPresent());
+
+        condition = (OffsetExpirationConditionImpl) 
offsetExpirationCondition.get();
+        assertEquals(commitTimestamp, 
condition.baseTimestamp().apply(offsetAndMetadata));
+        assertTrue(condition.isOffsetExpired(offsetAndMetadata, 
currentTimestamp, offsetsRetentionMs));
+
+        // 5. Test consumer protocol type + subscribed topics + non-Stable 
state. Do not expire any offsets.
+        group.transitionTo(PREPARING_REBALANCE);
+        offsetExpirationCondition = group.offsetExpirationCondition();
+        assertFalse(offsetExpirationCondition.isPresent());
+    }
+
+    @Test
+    public void testIsSubscribedToTopic() {
+        GenericGroup group = new GenericGroup(new LogContext(), "groupId", 
EMPTY, Time.SYSTEM);
+
+        // 1. group has no protocol type => not subscribed
+        assertFalse(group.isSubscribedToTopic("topic"));
+
+        // 2. group does not use consumer group protocol type => not subscribed
+        JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(ConsumerProtocol.serializeSubscription(
+                new 
ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic"))).array()));
+
+        GenericGroupMember memberWithNonConsumerProtocol = new 
GenericGroupMember(
+            "memberWithNonConsumerProtocol",
+            Optional.empty(),
+            clientId,
+            clientHost,
+            rebalanceTimeoutMs,
+            sessionTimeoutMs,
+            "My Protocol",
+            protocols
+        );
+
+        group.add(memberWithNonConsumerProtocol);
+        group.transitionTo(PREPARING_REBALANCE);
+        group.initNextGeneration();
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+        assertEquals(Optional.empty(), group.computeSubscribedTopics());
+        assertFalse(group.isSubscribedToTopic("topic"));
+
+        // 3. group uses consumer group protocol type but empty members => not 
subscribed
+        group.remove("memberWithNonConsumerProtocol");
+        GenericGroupMember memberWithConsumerProtocol = new GenericGroupMember(
+            "memberWithConsumerProtocol",
+            Optional.empty(),
+            clientId,
+            clientHost,
+            rebalanceTimeoutMs,
+            sessionTimeoutMs,
+            "consumer",
+            protocols
+        );
+
+        group.add(memberWithConsumerProtocol);
+        group.remove("memberWithConsumerProtocol");
+        group.transitionTo(PREPARING_REBALANCE);
+        group.initNextGeneration();
+        assertTrue(group.isInState(EMPTY));
+        assertEquals(Optional.of(Collections.emptySet()), 
group.computeSubscribedTopics());
+        assertTrue(group.usesConsumerGroupProtocol());
+        assertFalse(group.isSubscribedToTopic("topic"));
+
+        // 4. group uses consumer group protocol type with member subscription 
=> subscribed
+        group.add(memberWithConsumerProtocol);
+        group.transitionTo(PREPARING_REBALANCE);
+        group.initNextGeneration();
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+        assertEquals(Optional.of(Collections.singleton("topic")), 
group.computeSubscribedTopics());
+        assertTrue(group.usesConsumerGroupProtocol());
+        assertTrue(group.isSubscribedToTopic("topic"));
+    }
+
     private void assertState(GenericGroup group, GenericGroupState 
targetState) {
         Set<GenericGroupState> otherStates = new HashSet<>();
         otherStates.add(STABLE);

Reply via email to