This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7b5d640cc65 KAFKA-14987; Implement Group/Offset expiration in the new
coordinator (#14467)
7b5d640cc65 is described below
commit 7b5d640cc656443a078bda096d01910b3edfdb37
Author: Jeff Kim <[email protected]>
AuthorDate: Thu Oct 12 02:45:13 2023 -0400
KAFKA-14987; Implement Group/Offset expiration in the new coordinator
(#14467)
This patch implements the groups and offsets expiration in the new group
coordinator.
Reviewers: Ritika Reddy <[email protected]>, David Jacot
<[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 4 +-
.../org/apache/kafka/coordinator/group/Group.java | 16 +-
.../coordinator/group/GroupCoordinatorConfig.java | 27 +++-
.../coordinator/group/GroupCoordinatorShard.java | 69 +++++++-
.../coordinator/group/GroupMetadataManager.java | 20 +++
.../group/OffsetExpirationCondition.java | 36 +++++
.../group/OffsetExpirationConditionImpl.java | 62 +++++++
.../coordinator/group/OffsetMetadataManager.java | 95 +++++++++--
.../coordinator/group/consumer/ConsumerGroup.java | 23 ++-
.../coordinator/group/generic/GenericGroup.java | 54 ++++++-
.../group/GroupCoordinatorConfigTest.java | 29 +++-
.../group/GroupCoordinatorServiceTest.java | 4 +-
.../group/GroupCoordinatorShardTest.java | 178 ++++++++++++++++++---
.../group/GroupMetadataManagerTest.java | 47 ++++++
.../group/OffsetExpirationConditionImplTest.java | 70 ++++++++
.../group/OffsetMetadataManagerTest.java | 173 ++++++++++++++++++--
.../group/consumer/ConsumerGroupTest.java | 67 ++++++++
.../group/generic/GenericGroupTest.java | 151 +++++++++++++++++
19 files changed, 1063 insertions(+), 64 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index c9601753c1e..678765aa6bf 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -328,7 +328,7 @@
<suppress checks="ClassFanOutComplexity"
files="(GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorService|GroupCoordinatorServiceTest).java"/>
<suppress checks="ParameterNumber"
- files="(ConsumerGroupMember|GroupMetadataManager).java"/>
+
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorConfig).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck"
files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest).java"/>
<suppress checks="JavaNCSS"
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 96e8dc8a5a1..daec8f73089 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -522,7 +522,9 @@ class BrokerServer(
config.groupInitialRebalanceDelay,
GroupCoordinatorConfig.GENERIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS,
config.groupMinSessionTimeoutMs,
- config.groupMaxSessionTimeoutMs
+ config.groupMaxSessionTimeoutMs,
+ config.offsetsRetentionCheckIntervalMs,
+ config.offsetsRetentionMinutes * 60 * 1000L
)
val timer = new SystemTimerReaper(
"group-coordinator-reaper",
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index 29a252e47bf..0cb10b12a51 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import java.util.List;
+import java.util.Optional;
/**
* Interface common for all groups.
@@ -106,7 +107,8 @@ public interface Group {
/**
* Returns true if the group is actively subscribed to the topic.
*
- * @param topic The topic name.
+ * @param topic The topic name.
+ *
* @return Whether the group is subscribed to the topic.
*/
boolean isSubscribedToTopic(String topic);
@@ -117,4 +119,16 @@ public interface Group {
* @param records The list of records.
*/
void createGroupTombstoneRecords(List<Record> records);
+
+ /**
+ * @return Whether the group is in Empty state.
+ */
+ boolean isEmpty();
+
+ /**
+ * See {@link OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty if no
such condition exists.
+ */
+ Optional<OffsetExpirationCondition> offsetExpirationCondition();
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index fe31a24524f..330813ee912 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -91,6 +91,27 @@ public class GroupCoordinatorConfig {
*/
public final int genericGroupMaxSessionTimeoutMs;
+ /**
+ * Frequency at which to check for expired offsets.
+ */
+ public final long offsetsRetentionCheckIntervalMs;
+
+ /**
+ * For subscribed consumers, committed offset of a specific partition will
be expired and discarded when:
+ * 1) This retention period has elapsed after the consumer group loses
all its consumers (i.e. becomes empty);
+ * 2) This retention period has elapsed since the last time an offset
is committed for the partition AND
+ * the group is no longer subscribed to the corresponding topic.
+ *
+ * For standalone consumers (using manual assignment), offsets will be
expired after this retention period has
+ * elapsed since the time of last commit.
+ *
+ * Note that when a group is deleted via the DeleteGroups request, its
committed offsets will also be deleted immediately;
+ *
+ * Also, when a topic is deleted via the delete-topic request, upon
propagated metadata update any group's
+ * committed offsets for that topic will also be deleted without extra
retention period.
+ */
+ public final long offsetsRetentionMs;
+
public GroupCoordinatorConfig(
int numThreads,
int consumerGroupSessionTimeoutMs,
@@ -103,7 +124,9 @@ public class GroupCoordinatorConfig {
int genericGroupInitialRebalanceDelayMs,
int genericGroupNewMemberJoinTimeoutMs,
int genericGroupMinSessionTimeoutMs,
- int genericGroupMaxSessionTimeoutMs
+ int genericGroupMaxSessionTimeoutMs,
+ long offsetsRetentionCheckIntervalMs,
+ long offsetsRetentionMs
) {
this.numThreads = numThreads;
this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs;
@@ -117,5 +140,7 @@ public class GroupCoordinatorConfig {
this.genericGroupNewMemberJoinTimeoutMs =
genericGroupNewMemberJoinTimeoutMs;
this.genericGroupMinSessionTimeoutMs = genericGroupMinSessionTimeoutMs;
this.genericGroupMaxSessionTimeoutMs = genericGroupMaxSessionTimeoutMs;
+ this.offsetsRetentionCheckIntervalMs = offsetsRetentionCheckIntervalMs;
+ this.offsetsRetentionMs = offsetsRetentionMs;
}
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index c30d348336b..d0e67c1e693 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -70,6 +70,7 @@ import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
/**
* The group coordinator shard is a replicated state machine that manages the
metadata of all
@@ -159,17 +160,26 @@ public class GroupCoordinatorShard implements
CoordinatorShard<Record> {
.withSnapshotRegistry(snapshotRegistry)
.withTime(time)
.withGroupMetadataManager(groupMetadataManager)
- .withOffsetMetadataMaxSize(config.offsetMetadataMaxSize)
+ .withGroupCoordinatorConfig(config)
.build();
return new GroupCoordinatorShard(
logContext,
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ timer,
+ config
);
}
}
+ /**
+ * The group/offsets expiration key to schedule a timer task.
+ *
+ * Visible for testing.
+ */
+ static final String GROUP_EXPIRATION_KEY = "expire-group-metadata";
+
/**
* The logger.
*/
@@ -185,6 +195,16 @@ public class GroupCoordinatorShard implements
CoordinatorShard<Record> {
*/
private final OffsetMetadataManager offsetMetadataManager;
+ /**
+ * The coordinator timer.
+ */
+ private final CoordinatorTimer<Void, Record> timer;
+
+ /**
+ * The group coordinator config.
+ */
+ private final GroupCoordinatorConfig config;
+
/**
* Constructor.
*
@@ -195,11 +215,15 @@ public class GroupCoordinatorShard implements
CoordinatorShard<Record> {
GroupCoordinatorShard(
LogContext logContext,
GroupMetadataManager groupMetadataManager,
- OffsetMetadataManager offsetMetadataManager
+ OffsetMetadataManager offsetMetadataManager,
+ CoordinatorTimer<Void, Record> timer,
+ GroupCoordinatorConfig config
) {
this.log = logContext.logger(GroupCoordinatorShard.class);
this.groupMetadataManager = groupMetadataManager;
this.offsetMetadataManager = offsetMetadataManager;
+ this.timer = timer;
+ this.config = config;
}
/**
@@ -435,6 +459,39 @@ public class GroupCoordinatorShard implements
CoordinatorShard<Record> {
return offsetMetadataManager.deleteOffsets(request);
}
+ /**
+ * For each group, remove all expired offsets. If all offsets for the
group are removed and the group is eligible
+ * for deletion, delete the group.
+ *
+ * @return The list of tombstones (offset commit and group metadata) to
append.
+ */
+ public CoordinatorResult<Void, Record> cleanupGroupMetadata() {
+ List<Record> records = new ArrayList<>();
+ groupMetadataManager.groupIds().forEach(groupId -> {
+ if (offsetMetadataManager.cleanupExpiredOffsets(groupId, records))
{
+ groupMetadataManager.maybeDeleteGroup(groupId, records);
+ }
+ });
+
+ // Reschedule the next cycle.
+ scheduleGroupMetadataExpiration();
+ return new CoordinatorResult<>(records);
+ }
+
+ /**
+ * Schedule the group/offsets expiration job. If any exceptions are thrown
above, the timer will retry.
+ */
+ private void scheduleGroupMetadataExpiration() {
+ timer.schedule(
+ GROUP_EXPIRATION_KEY,
+ config.offsetsRetentionCheckIntervalMs,
+ TimeUnit.MILLISECONDS,
+ true,
+ this::cleanupGroupMetadata
+ );
+ }
+
+
/**
* The coordinator has been loaded. This is used to apply any
* post loading operations (e.g. registering timers).
@@ -448,6 +505,12 @@ public class GroupCoordinatorShard implements
CoordinatorShard<Record> {
offsetMetadataManager.onNewMetadataImage(newImage, emptyDelta);
groupMetadataManager.onLoaded();
+ scheduleGroupMetadataExpiration();
+ }
+
+ @Override
+ public void onUnloaded() {
+ timer.cancel(GROUP_EXPIRATION_KEY);
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 7bfe243d026..5974aabbe4c 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -3166,6 +3166,19 @@ public class GroupMetadataManager {
group.validateDeleteGroup();
}
+ /**
+ * Delete the group if it exists and is in Empty state.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to append the group metadata
tombstone records.
+ */
+ public void maybeDeleteGroup(String groupId, List<Record> records) {
+ Group group = groups.get(groupId);
+ if (group != null && group.isEmpty()) {
+ deleteGroup(groupId, records);
+ }
+ }
+
/**
* Checks whether the given protocol type or name in the request is
inconsistent with the group's.
*
@@ -3183,6 +3196,13 @@ public class GroupMetadataManager {
&& !groupProtocolTypeOrName.equals(protocolTypeOrName);
}
+ /**
+ * @return The set of all groups' ids.
+ */
+ public Set<String> groupIds() {
+ return Collections.unmodifiableSet(this.groups.keySet());
+ }
+
/**
* Generate a generic group heartbeat key for the timer.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java
new file mode 100644
index 00000000000..ce3f299b40a
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+/**
+ * An offset is considered expired based on different factors, such as the
state of the group
+ * and/or the GroupMetadata record version (for generic groups). This class is
used to check
+ * how offsets for the group should be expired.
+ */
+public interface OffsetExpirationCondition {
+
+ /**
+ * Given an offset metadata and offsets retention, return whether the
offset is expired or not.
+ *
+ * @param offset The offset metadata.
+ * @param currentTimestampMs The current timestamp.
+ * @param offsetsRetentionMs The offset retention.
+ *
+ * @return Whether the offset is considered expired or not.
+ */
+ boolean isOffsetExpired(OffsetAndMetadata offset, long currentTimestampMs,
long offsetsRetentionMs);
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImpl.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImpl.java
new file mode 100644
index 00000000000..cd823108ef3
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import java.util.function.Function;
+
+public class OffsetExpirationConditionImpl implements
OffsetExpirationCondition {
+
+ /**
+ * Given an offset and metadata, obtain the base timestamp that should be
used
+ * as the start of the offsets retention period.
+ */
+ private final Function<OffsetAndMetadata, Long> baseTimestamp;
+
+ public OffsetExpirationConditionImpl(Function<OffsetAndMetadata, Long>
baseTimestamp) {
+ this.baseTimestamp = baseTimestamp;
+ }
+
+ /**
+ * Determine whether an offset is expired. Older versions have an expire
timestamp per partition. If this
+ * exists, compare against the current timestamp. Otherwise, use the base
timestamp (either commit timestamp
+ * or current state timestamp if group is empty for generic groups) and
check whether the offset has
+ * exceeded the offset retention.
+ *
+ * @param offset The offset and metadata.
+ * @param currentTimestampMs The current timestamp.
+ * @param offsetsRetentionMs The offsets retention in milliseconds.
+ *
+ * @return Whether the given offset is expired or not.
+ */
+ @Override
+ public boolean isOffsetExpired(OffsetAndMetadata offset, long
currentTimestampMs, long offsetsRetentionMs) {
+ if (offset.expireTimestampMs.isPresent()) {
+ // Older versions with explicit expire_timestamp field => old
expiration semantics is used
+ return currentTimestampMs >= offset.expireTimestampMs.getAsLong();
+ } else {
+ // Current version with no per partition retention
+ return currentTimestampMs - baseTimestamp.apply(offset) >=
offsetsRetentionMs;
+ }
+ }
+
+ /**
+ * @return The base timestamp.
+ */
+ public Function<OffsetAndMetadata, Long> baseTimestamp() {
+ return this.baseTimestamp;
+ }
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 9744e492f49..076ec24476f 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
@@ -45,9 +46,13 @@ import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.OptionalLong;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
@@ -61,13 +66,14 @@ import static
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSE
* handling as well as during the initial loading of the records from the
partitions.
*/
public class OffsetMetadataManager {
+
public static class Builder {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
private Time time = null;
private GroupMetadataManager groupMetadataManager = null;
- private int offsetMetadataMaxSize = 4096;
private MetadataImage metadataImage = null;
+ private GroupCoordinatorConfig config = null;
Builder withLogContext(LogContext logContext) {
this.logContext = logContext;
@@ -89,8 +95,8 @@ public class OffsetMetadataManager {
return this;
}
- Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
- this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+ Builder withGroupCoordinatorConfig(GroupCoordinatorConfig config) {
+ this.config = config;
return this;
}
@@ -115,7 +121,7 @@ public class OffsetMetadataManager {
time,
metadataImage,
groupMetadataManager,
- offsetMetadataMaxSize
+ config
);
}
}
@@ -146,9 +152,9 @@ public class OffsetMetadataManager {
private final GroupMetadataManager groupMetadataManager;
/**
- * The maximum allowed metadata for any offset commit.
+ * The group coordinator config.
*/
- private final int offsetMetadataMaxSize;
+ private final GroupCoordinatorConfig config;
/**
* The offsets keyed by group id, topic name and partition id.
@@ -161,14 +167,14 @@ public class OffsetMetadataManager {
Time time,
MetadataImage metadataImage,
GroupMetadataManager groupMetadataManager,
- int offsetMetadataMaxSize
+ GroupCoordinatorConfig config
) {
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(OffsetMetadataManager.class);
this.time = time;
this.metadataImage = metadataImage;
this.groupMetadataManager = groupMetadataManager;
- this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+ this.config = config;
this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
}
@@ -316,7 +322,7 @@ public class OffsetMetadataManager {
response.topics().add(topicResponse);
topic.partitions().forEach(partition -> {
- if (partition.committedMetadata() != null &&
partition.committedMetadata().length() > offsetMetadataMaxSize) {
+ if (partition.committedMetadata() != null &&
partition.committedMetadata().length() > config.offsetMetadataMaxSize) {
topicResponse.partitions().add(new
OffsetCommitResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
@@ -544,6 +550,77 @@ public class OffsetMetadataManager {
.setTopics(topicResponses);
}
+ /**
+ * Remove expired offsets for the given group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false
otherwise.
+ */
+ public boolean cleanupExpiredOffsets(String groupId, List<Record> records)
{
+ TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>
offsetsByTopic = offsetsByGroup.get(groupId);
+ if (offsetsByTopic == null) {
+ return true;
+ }
+
+ // We expect the group to exist.
+ Group group = groupMetadataManager.group(groupId);
+ Set<String> expiredPartitions = new HashSet<>();
+ long currentTimestampMs = time.milliseconds();
+ Optional<OffsetExpirationCondition> offsetExpirationCondition =
group.offsetExpirationCondition();
+
+ if (!offsetExpirationCondition.isPresent()) {
+ return false;
+ }
+
+ AtomicBoolean allOffsetsExpired = new AtomicBoolean(true);
+ OffsetExpirationCondition condition = offsetExpirationCondition.get();
+
+ offsetsByTopic.forEach((topic, partitions) -> {
+ if (!group.isSubscribedToTopic(topic)) {
+ partitions.forEach((partition, offsetAndMetadata) -> {
+ if (condition.isOffsetExpired(offsetAndMetadata,
currentTimestampMs, config.offsetsRetentionMs)) {
+
expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition,
records).toString());
+ } else {
+ allOffsetsExpired.set(false);
+ }
+ });
+ } else {
+ allOffsetsExpired.set(false);
+ }
+ });
+
+ if (!expiredPartitions.isEmpty()) {
+ log.info("[GroupId {}] Expiring offsets of partitions
(allOffsetsExpired={}): {}",
+ groupId, allOffsetsExpired, String.join(", ",
expiredPartitions));
+ }
+
+ return allOffsetsExpired.get();
+ }
+
+ /**
+ * Add an offset commit tombstone record for the group.
+ *
+ * @param groupId The group id.
+ * @param topic The topic name.
+ * @param partition The partition.
+ * @param records The list of records to append the tombstone.
+ *
+ * @return The topic partition of the corresponding tombstone.
+ */
+ private TopicPartition appendOffsetCommitTombstone(
+ String groupId,
+ String topic,
+ int partition,
+ List<Record> records
+ ) {
+ records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
topic, partition));
+ TopicPartition tp = new TopicPartition(topic, partition);
+ log.trace("[GroupId {}] Removing expired offset and metadata for {}",
groupId, tp);
+ return tp;
+ }
+
/**
* Replays OffsetCommitKey/Value to update or delete the corresponding
offsets.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index 21f6f8124cd..e5144953a97 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -24,6 +24,8 @@ import
org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.Record;
import org.apache.kafka.coordinator.group.RecordHelpers;
import org.apache.kafka.image.ClusterImage;
@@ -349,9 +351,11 @@ public class ConsumerGroup implements Group {
/**
* Returns true if the consumer group is actively subscribed to the topic.
*
- * @param topic The topic name.
- * @return whether the group is subscribed to the topic.
+ * @param topic The topic name.
+ *
+ * @return Whether the group is subscribed to the topic.
*/
+ @Override
public boolean isSubscribedToTopic(String topic) {
return subscribedTopicNames.containsKey(topic);
}
@@ -635,6 +639,21 @@ public class ConsumerGroup implements Group {
records.add(RecordHelpers.newGroupEpochTombstoneRecord(groupId()));
}
+ @Override
+ public boolean isEmpty() {
+ return state() == ConsumerGroupState.EMPTY;
+ }
+
+ /**
+ * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty if no
such condition exists.
+ */
+ @Override
+ public Optional<OffsetExpirationCondition> offsetExpirationCondition() {
+ return Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata
-> offsetAndMetadata.commitTimestampMs));
+ }
+
/**
* Throws a StaleMemberEpochException if the received member epoch does
not match
* the expected member epoch.
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
index 88ec52727fa..4b408e0484a 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
@@ -33,6 +33,8 @@ import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.Record;
import org.apache.kafka.coordinator.group.RecordHelpers;
import org.slf4j.Logger;
@@ -898,6 +900,46 @@ public class GenericGroup implements Group {
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
}
+ @Override
+ public boolean isEmpty() {
+ return isInState(EMPTY);
+ }
+
+ /**
+ * Return the offset expiration condition to be used for this group. This
is based on several factors
+ * such as the group state, the protocol type, and the GroupMetadata
record version.
+ *
+ * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty if no
such condition exists.
+ */
+ @Override
+ public Optional<OffsetExpirationCondition> offsetExpirationCondition() {
+ if (protocolType.isPresent()) {
+ if (isInState(EMPTY)) {
+ // No members exist in the group =>
+ // - If current state timestamp exists and retention period
has passed since group became Empty,
+ // expire all offsets with no pending offset commit;
+ // - If there is no current state timestamp (old group
metadata schema) and retention period has passed
+ // since the last commit timestamp, expire the offset
+ return Optional.of(new OffsetExpirationConditionImpl(
+ offsetAndMetadata ->
currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs))
+ );
+ } else if (usesConsumerGroupProtocol() &&
subscribedTopics.isPresent() && isInState(STABLE)) {
+ // Consumers exist in the group and group is Stable =>
+ // - If the group is aware of the subscribed topics and
retention period has passed since the
+ // last commit timestamp, expire the offset.
+ return Optional.of(new
OffsetExpirationConditionImpl(offsetAndMetadata ->
offsetAndMetadata.commitTimestampMs));
+ }
+ } else {
+ // protocolType is None => standalone (simple) consumer, that uses
Kafka for offset storage. Only
+ // expire offsets where retention period has passed since their
last commit.
+ return Optional.of(new
OffsetExpirationConditionImpl(offsetAndMetadata ->
offsetAndMetadata.commitTimestampMs));
+ }
+ // If none of the conditions above are met, do not expire any offsets.
+ return Optional.empty();
+ }
+
/**
* Verify the member id is up to date for static members. Return true if
both conditions met:
* 1. given member is a known static member to group
@@ -1063,16 +1105,18 @@ public class GenericGroup implements Group {
}
/**
- * Returns true if the consumer group is actively subscribed to the topic.
When the consumer
- * group does not know, because the information is not available yet or
because it has
- * failed to parse the Consumer Protocol, it returns true to be safe.
+ * Returns true if the generic group is actively subscribed to the topic.
When the generic group does not know,
+ * because the information is not available yet or because it has failed
to parse the Consumer Protocol, we
+ * consider the group not subscribed to the topic if the group is not
using any protocol or not using the
+ * consumer group protocol.
+ *
+ * @param topic The topic name.
*
- * @param topic The topic name.
* @return whether the group is subscribed to the topic.
*/
public boolean isSubscribedToTopic(String topic) {
return subscribedTopics.map(topics -> topics.contains(topic))
- .orElse(true);
+ .orElse(usesConsumerGroupProtocol());
}
/**
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 1d31bd50845..f26b5ce6306 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -40,7 +40,9 @@ public class GroupCoordinatorConfigTest {
3000,
5 * 60 * 1000,
120,
- 10 * 60 * 1000
+ 10 * 60 * 1000,
+ 600000L,
+ 24 * 60 * 60 * 1000L
);
assertEquals(10, config.numThreads);
@@ -55,5 +57,30 @@ public class GroupCoordinatorConfigTest {
assertEquals(5 * 60 * 1000, config.genericGroupNewMemberJoinTimeoutMs);
assertEquals(120, config.genericGroupMinSessionTimeoutMs);
assertEquals(10 * 60 * 1000, config.genericGroupMaxSessionTimeoutMs);
+ assertEquals(10 * 60 * 1000, config.offsetsRetentionCheckIntervalMs);
+ assertEquals(24 * 60 * 60 * 1000L, config.offsetsRetentionMs);
+ }
+
+ public static GroupCoordinatorConfig createGroupCoordinatorConfig(
+ int offsetMetadataMaxSize,
+ long offsetsRetentionCheckIntervalMs,
+ long offsetsRetentionMs
+ ) {
+ return new GroupCoordinatorConfig(
+ 1,
+ 45,
+ 5,
+ Integer.MAX_VALUE,
+ Collections.singletonList(new RangeAssignor()),
+ 1000,
+ offsetMetadataMaxSize,
+ Integer.MAX_VALUE,
+ 3000,
+ 5 * 60 * 1000,
+ 120,
+ 10 * 5 * 1000,
+ offsetsRetentionCheckIntervalMs,
+ offsetsRetentionMs
+ );
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index d54d574f9e5..9a18db8f202 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -117,7 +117,9 @@ public class GroupCoordinatorServiceTest {
3000,
5 * 60 * 1000,
120,
- 10 * 5 * 1000
+ 10 * 5 * 1000,
+ 600000L,
+ 24 * 60 * 1000L
);
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 1ed931c029d..c95e9459285 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -25,6 +25,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
@@ -45,6 +47,7 @@ import
org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import java.util.ArrayList;
@@ -52,9 +55,14 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_EXPIRATION_KEY;
import static org.apache.kafka.coordinator.group.TestUtil.requestContext;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
@@ -75,7 +83,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
RequestContext context =
requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT);
@@ -100,7 +110,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
RequestContext context = requestContext(ApiKeys.OFFSET_COMMIT);
@@ -125,7 +137,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
@@ -177,7 +191,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
@@ -240,7 +256,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
OffsetCommitKey key = new OffsetCommitKey();
@@ -266,7 +284,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
OffsetCommitKey key = new OffsetCommitKey();
@@ -291,7 +311,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey();
@@ -312,7 +334,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey();
@@ -332,7 +356,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
ConsumerGroupPartitionMetadataKey key = new
ConsumerGroupPartitionMetadataKey();
@@ -353,7 +379,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
ConsumerGroupPartitionMetadataKey key = new
ConsumerGroupPartitionMetadataKey();
@@ -373,7 +401,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
ConsumerGroupMemberMetadataKey key = new
ConsumerGroupMemberMetadataKey();
@@ -394,7 +424,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
ConsumerGroupMemberMetadataKey key = new
ConsumerGroupMemberMetadataKey();
@@ -414,7 +446,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
ConsumerGroupTargetAssignmentMetadataKey key = new
ConsumerGroupTargetAssignmentMetadataKey();
@@ -435,7 +469,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
ConsumerGroupTargetAssignmentMetadataKey key = new
ConsumerGroupTargetAssignmentMetadataKey();
@@ -455,7 +491,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
ConsumerGroupTargetAssignmentMemberKey key = new
ConsumerGroupTargetAssignmentMemberKey();
@@ -476,7 +514,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
ConsumerGroupTargetAssignmentMemberKey key = new
ConsumerGroupTargetAssignmentMemberKey();
@@ -496,7 +536,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
ConsumerGroupCurrentMemberAssignmentKey key = new
ConsumerGroupCurrentMemberAssignmentKey();
@@ -517,7 +559,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
ConsumerGroupCurrentMemberAssignmentKey key = new
ConsumerGroupCurrentMemberAssignmentKey();
@@ -537,7 +581,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
assertThrows(NullPointerException.class, () -> coordinator.replay(new
Record(null, null)));
@@ -550,7 +596,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
ConsumerGroupCurrentMemberAssignmentKey key = new
ConsumerGroupCurrentMemberAssignmentKey();
@@ -570,7 +618,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
coordinator.onLoaded(image);
@@ -590,7 +640,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
GroupMetadataKey key = new GroupMetadataKey();
@@ -611,7 +663,9 @@ public class GroupCoordinatorShardTest {
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
- offsetMetadataManager
+ offsetMetadataManager,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class)
);
GroupMetadataKey key = new GroupMetadataKey();
@@ -623,4 +677,82 @@ public class GroupCoordinatorShardTest {
verify(groupMetadataManager, times(1)).replay(key, null);
}
+
+ @Test
+ public void testScheduleCleanupGroupMetadata() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ Time mockTime = new MockTime();
+ MockCoordinatorTimer<Void, Record> timer = new
MockCoordinatorTimer<>(mockTime);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager,
+ timer,
+ GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096,
1000L, 24 * 60)
+ );
+ MetadataImage image = MetadataImage.EMPTY;
+
+ // Confirm the cleanup is scheduled when the coordinator is initially
loaded.
+ coordinator.onLoaded(image);
+ assertTrue(timer.contains(GROUP_EXPIRATION_KEY));
+
+ // Confirm that it is rescheduled after completion.
+ mockTime.sleep(1000L);
+ List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> tasks =
timer.poll();
+ assertEquals(1, tasks.size());
+ assertTrue(timer.contains(GROUP_EXPIRATION_KEY));
+
+ coordinator.onUnloaded();
+ assertFalse(timer.contains(GROUP_EXPIRATION_KEY));
+ }
+
+ @Test
+ public void testCleanupGroupMetadata() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ MockCoordinatorTimer<Void, Record> timer = new
MockCoordinatorTimer<>(new MockTime());
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager,
+ timer,
+ mock(GroupCoordinatorConfig.class)
+ );
+
+ Record offsetCommitTombstone =
RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "topic", 0);
+ Record groupMetadataTombstone =
RecordHelpers.newGroupMetadataTombstoneRecord("group-id");
+
+ @SuppressWarnings("unchecked")
+ ArgumentCaptor<List<Record>> recordsCapture =
ArgumentCaptor.forClass(List.class);
+
+ when(groupMetadataManager.groupIds()).thenReturn(mkSet("group-id",
"other-group-id"));
+ when(offsetMetadataManager.cleanupExpiredOffsets(eq("group-id"),
recordsCapture.capture()))
+ .thenAnswer(invocation -> {
+ List<Record> records = recordsCapture.getValue();
+ records.add(offsetCommitTombstone);
+ return true;
+ });
+ when(offsetMetadataManager.cleanupExpiredOffsets("other-group-id",
Collections.emptyList())).thenReturn(false);
+ doAnswer(invocation -> {
+ List<Record> records = recordsCapture.getValue();
+ records.add(groupMetadataTombstone);
+ return null;
+ }).when(groupMetadataManager).maybeDeleteGroup(eq("group-id"),
recordsCapture.capture());
+
+ assertFalse(timer.contains(GROUP_EXPIRATION_KEY));
+ CoordinatorResult<Void, Record> result =
coordinator.cleanupGroupMetadata();
+ assertTrue(timer.contains(GROUP_EXPIRATION_KEY));
+
+ List<Record> expectedRecords = Arrays.asList(offsetCommitTombstone,
groupMetadataTombstone);
+ assertEquals(expectedRecords, result.records());
+ assertNull(result.response());
+ assertNull(result.appendFuture());
+
+ verify(groupMetadataManager, times(1)).groupIds();
+ verify(offsetMetadataManager,
times(1)).cleanupExpiredOffsets(eq("group-id"), any());
+ verify(offsetMetadataManager,
times(1)).cleanupExpiredOffsets(eq("other-group-id"), any());
+ verify(groupMetadataManager,
times(1)).maybeDeleteGroup(eq("group-id"), any());
+ verify(groupMetadataManager,
times(0)).maybeDeleteGroup(eq("other-group-id"), any());
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index f299705016d..ee44304072e 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -9494,6 +9494,27 @@ public class GroupMetadataManagerTest {
assertEquals(expectedRecords, records);
}
+ @Test
+ public void testGenericGroupMaybeDelete() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+
+ List<Record> expectedRecords =
Collections.singletonList(RecordHelpers.newGroupMetadataTombstoneRecord("group-id"));
+ List<Record> records = new ArrayList<>();
+ context.groupMetadataManager.maybeDeleteGroup("group-id", records);
+ assertEquals(expectedRecords, records);
+
+ records = new ArrayList<>();
+ group.transitionTo(PREPARING_REBALANCE);
+ context.groupMetadataManager.maybeDeleteGroup("group-id", records);
+ assertEquals(Collections.emptyList(), records);
+
+ records = new ArrayList<>();
+ context.groupMetadataManager.maybeDeleteGroup("invalid-group-id",
records);
+ assertEquals(Collections.emptyList(), records);
+ }
+
@Test
public void testConsumerGroupDelete() {
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
@@ -9510,6 +9531,32 @@ public class GroupMetadataManagerTest {
assertEquals(expectedRecords, records);
}
+ @Test
+ public void testConsumerGroupMaybeDelete() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group-id", true);
+
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newTargetAssignmentEpochTombstoneRecord("group-id"),
+
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord("group-id"),
+ RecordHelpers.newGroupEpochTombstoneRecord("group-id")
+ );
+ List<Record> records = new ArrayList<>();
+ context.groupMetadataManager.maybeDeleteGroup("group-id", records);
+ assertEquals(expectedRecords, records);
+
+ records = new ArrayList<>();
+ group.updateMember(new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setTargetMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .build()
+ );
+ context.groupMetadataManager.maybeDeleteGroup("group-id", records);
+ assertEquals(Collections.emptyList(), records);
+ }
+
private static void assertNoOrEmptyResult(List<ExpiredTimeout<Void,
Record>> timeouts) {
assertTrue(timeouts.size() <= 1);
timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT,
timeout.result));
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java
new file mode 100644
index 00000000000..2f1cb354a5a
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class OffsetExpirationConditionImplTest {
+
+ @Test
+ public void testIsOffsetExpired() {
+ long currentTimestamp = 1500L;
+ long commitTimestamp = 500L;
+ OptionalLong expireTimestampMs = OptionalLong.of(1500);
+ long offsetsRetentionMs = 500L;
+
+ OffsetExpirationConditionImpl condition = new
OffsetExpirationConditionImpl(__ -> commitTimestamp);
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(
+ 100,
+ OptionalInt.of(1),
+ "metadata",
+ commitTimestamp,
+ expireTimestampMs
+ );
+
+ // Test when expire timestamp exists (older versions with per
partition retention)
+ // 1. Current timestamp >= expire timestamp => should expire
+ assertTrue(condition.isOffsetExpired(offsetAndMetadata,
currentTimestamp, offsetsRetentionMs));
+
+ // 2. Current timestamp < expire timestamp => should not expire
+ currentTimestamp = 499;
+ assertFalse(condition.isOffsetExpired(offsetAndMetadata,
currentTimestamp, offsetsRetentionMs));
+
+ // Test when expire timestamp does not exist (current version with no
per partition retention)
+ offsetAndMetadata = new OffsetAndMetadata(
+ 100,
+ OptionalInt.of(1),
+ "metadata",
+ commitTimestamp,
+ OptionalLong.empty()
+ );
+
+ // 3. Current timestamp - base timestamp >= offsets retention =>
should expire
+ currentTimestamp = 1000L;
+ assertTrue(condition.isOffsetExpired(offsetAndMetadata,
currentTimestamp, offsetsRetentionMs));
+
+ // 4. Current timestamp - base timestamp < offsets retention => should
not expire
+ currentTimestamp = 999L;
+ assertFalse(condition.isOffsetExpired(offsetAndMetadata,
currentTimestamp, offsetsRetentionMs));
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 5598a74e50e..82fca8b0e25 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -76,33 +76,51 @@ import static
org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class OffsetMetadataManagerTest {
static class OffsetMetadataManagerTestContext {
public static class Builder {
- final private MockTime time = new MockTime();
- final private MockCoordinatorTimer<Void, Record> timer = new
MockCoordinatorTimer<>(time);
- final private LogContext logContext = new LogContext();
- final private SnapshotRegistry snapshotRegistry = new
SnapshotRegistry(logContext);
+ private final MockTime time = new MockTime();
+ private final MockCoordinatorTimer<Void, Record> timer = new
MockCoordinatorTimer<>(time);
+ private final LogContext logContext = new LogContext();
+ private final SnapshotRegistry snapshotRegistry = new
SnapshotRegistry(logContext);
+ private GroupMetadataManager groupMetadataManager = null;
private MetadataImage metadataImage = null;
- private int offsetMetadataMaxSize = 4096;
+ private GroupCoordinatorConfig config = null;
Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
- this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+ config =
GroupCoordinatorConfigTest.createGroupCoordinatorConfig(offsetMetadataMaxSize,
60000L, 24 * 60 * 1000);
+ return this;
+ }
+
+ Builder withOffsetsRetentionMs(long offsetsRetentionMs) {
+ config =
GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L,
offsetsRetentionMs);
+ return this;
+ }
+
+ Builder withGroupMetadataManager(GroupMetadataManager
groupMetadataManager) {
+ this.groupMetadataManager = groupMetadataManager;
return this;
}
OffsetMetadataManagerTestContext build() {
if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+ if (config == null) {
+ config =
GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, 24 * 60 *
1000);
+ }
- GroupMetadataManager groupMetadataManager = new
GroupMetadataManager.Builder()
- .withTime(time)
- .withTimer(timer)
- .withSnapshotRegistry(snapshotRegistry)
- .withLogContext(logContext)
- .withMetadataImage(metadataImage)
- .withConsumerGroupAssignors(Collections.singletonList(new
RangeAssignor()))
- .build();
+ if (groupMetadataManager == null) {
+ groupMetadataManager = new GroupMetadataManager.Builder()
+ .withTime(time)
+ .withTimer(timer)
+ .withSnapshotRegistry(snapshotRegistry)
+ .withLogContext(logContext)
+ .withMetadataImage(metadataImage)
+
.withConsumerGroupAssignors(Collections.singletonList(new RangeAssignor()))
+ .build();
+ }
OffsetMetadataManager offsetMetadataManager = new
OffsetMetadataManager.Builder()
.withTime(time)
@@ -110,7 +128,7 @@ public class OffsetMetadataManagerTest {
.withSnapshotRegistry(snapshotRegistry)
.withMetadataImage(metadataImage)
.withGroupMetadataManager(groupMetadataManager)
- .withOffsetMetadataMaxSize(offsetMetadataMaxSize)
+ .withGroupCoordinatorConfig(config)
.build();
return new OffsetMetadataManagerTestContext(
@@ -203,6 +221,15 @@ public class OffsetMetadataManagerTest {
return numDeletedOffsets;
}
+ public boolean cleanupExpiredOffsets(String groupId, List<Record>
records) {
+ List<Record> addedRecords = new ArrayList<>();
+ boolean isOffsetsEmptyForGroup =
offsetMetadataManager.cleanupExpiredOffsets(groupId, addedRecords);
+ addedRecords.forEach(this::replay);
+
+ records.addAll(addedRecords);
+ return isOffsetsEmptyForGroup;
+ }
+
public List<OffsetFetchResponseData.OffsetFetchResponseTopics>
fetchOffsets(
String groupId,
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
@@ -282,6 +309,18 @@ public class OffsetMetadataManagerTest {
int partition,
long offset,
int leaderEpoch
+ ) {
+ commitOffset(groupId, topic, partition, offset, leaderEpoch,
time.milliseconds());
+
+ }
+
+ public void commitOffset(
+ String groupId,
+ String topic,
+ int partition,
+ long offset,
+ int leaderEpoch,
+ long commitTimestamp
) {
replay(RecordHelpers.newOffsetCommitRecord(
groupId,
@@ -291,7 +330,7 @@ public class OffsetMetadataManagerTest {
offset,
OptionalInt.of(leaderEpoch),
"metadata",
- time.milliseconds(),
+ commitTimestamp,
OptionalLong.empty()
),
MetadataVersion.latest()
@@ -1763,6 +1802,108 @@ public class OffsetMetadataManagerTest {
assertEquals(3, numDeleteOffsets);
}
+ @Test
+ public void testCleanupExpiredOffsetsGroupHasNoOffsets() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder()
+ .build();
+
+ List<Record> records = new ArrayList<>();
+ assertTrue(context.cleanupExpiredOffsets("unknown-group-id", records));
+ assertEquals(Collections.emptyList(), records);
+ }
+
+ @Test
+ public void testCleanupExpiredOffsetsGroupDoesNotExist() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder()
+ .withGroupMetadataManager(groupMetadataManager)
+ .build();
+
+
when(groupMetadataManager.group("unknown-group-id")).thenThrow(GroupIdNotFoundException.class);
+ context.commitOffset("unknown-group-id", "topic", 0, 100L, 0);
+ assertThrows(GroupIdNotFoundException.class, () ->
context.cleanupExpiredOffsets("unknown-group-id", new ArrayList<>()));
+ }
+
+ @Test
+ public void testCleanupExpiredOffsetsEmptyOffsetExpirationCondition() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ Group group = mock(Group.class);
+
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder()
+ .withGroupMetadataManager(groupMetadataManager)
+ .build();
+
+ context.commitOffset("group-id", "topic", 0, 100L, 0);
+
+ when(groupMetadataManager.group("group-id")).thenReturn(group);
+ when(group.offsetExpirationCondition()).thenReturn(Optional.empty());
+
+ List<Record> records = new ArrayList<>();
+ assertFalse(context.cleanupExpiredOffsets("group-id", records));
+ assertEquals(Collections.emptyList(), records);
+ }
+
+ @Test
+ public void testCleanupExpiredOffsets() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ Group group = mock(Group.class);
+
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder()
+ .withGroupMetadataManager(groupMetadataManager)
+ .withOffsetsRetentionMs(1000)
+ .build();
+
+ long commitTimestamp = context.time.milliseconds();
+
+ context.commitOffset("group-id", "firstTopic", 0, 100L, 0,
commitTimestamp);
+ context.commitOffset("group-id", "secondTopic", 0, 100L, 0,
commitTimestamp);
+ context.commitOffset("group-id", "secondTopic", 1, 100L, 0,
commitTimestamp + 500);
+
+ context.time.sleep(1000);
+
+ // firstTopic-0: group is still subscribed to firstTopic. Do not
expire.
+ // secondTopic-0: should expire as offset retention has passed.
+ // secondTopic-1: has not passed offset retention. Do not expire.
+ List<Record> expectedRecords = Collections.singletonList(
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id",
"secondTopic", 0)
+ );
+
+ when(groupMetadataManager.group("group-id")).thenReturn(group);
+ when(group.offsetExpirationCondition()).thenReturn(Optional.of(
+ new OffsetExpirationConditionImpl(offsetAndMetadata ->
offsetAndMetadata.commitTimestampMs)));
+ when(group.isSubscribedToTopic("firstTopic")).thenReturn(true);
+ when(group.isSubscribedToTopic("secondTopic")).thenReturn(false);
+
+ List<Record> records = new ArrayList<>();
+ assertFalse(context.cleanupExpiredOffsets("group-id", records));
+ assertEquals(expectedRecords, records);
+
+ // Expire secondTopic-1.
+ context.time.sleep(500);
+ expectedRecords = Collections.singletonList(
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id",
"secondTopic", 1)
+ );
+
+ records = new ArrayList<>();
+ assertFalse(context.cleanupExpiredOffsets("group-id", records));
+ assertEquals(expectedRecords, records);
+
+ // Add 2 more commits, then expire all.
+ when(group.isSubscribedToTopic("firstTopic")).thenReturn(false);
+ context.commitOffset("group-id", "firstTopic", 1, 100L, 0,
commitTimestamp + 500);
+ context.commitOffset("group-id", "secondTopic", 0, 101L, 0,
commitTimestamp + 500);
+
+ expectedRecords = Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id",
"firstTopic", 0),
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id",
"firstTopic", 1),
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id",
"secondTopic", 0)
+ );
+
+ records = new ArrayList<>();
+ assertTrue(context.cleanupExpiredOffsets("group-id", records));
+ assertEquals(expectedRecords, records);
+ }
+
static private OffsetFetchResponseData.OffsetFetchResponsePartitions
mkOffsetPartitionResponse(
int partition,
long offset,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
index 210aa8eb901..df643a5d2f0 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -23,6 +23,9 @@ import
org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.coordinator.group.GroupMetadataManagerTest;
+import org.apache.kafka.coordinator.group.OffsetAndMetadata;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
@@ -30,6 +33,8 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -703,4 +708,66 @@ public class ConsumerGroupTest {
assertEquals(ConsumerGroup.ConsumerGroupState.STABLE,
consumerGroup.state());
assertThrows(GroupNotEmptyException.class,
consumerGroup::validateDeleteGroup);
}
+
+ @Test
+ public void testOffsetExpirationCondition() {
+ long currentTimestamp = 30000L;
+ long commitTimestamp = 20000L;
+ long offsetsRetentionMs = 10000L;
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L,
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
+ ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new
LogContext()), "group-id");
+
+ Optional<OffsetExpirationCondition> offsetExpirationCondition =
group.offsetExpirationCondition();
+ assertTrue(offsetExpirationCondition.isPresent());
+
+ OffsetExpirationConditionImpl condition =
(OffsetExpirationConditionImpl) offsetExpirationCondition.get();
+ assertEquals(commitTimestamp,
condition.baseTimestamp().apply(offsetAndMetadata));
+ assertTrue(condition.isOffsetExpired(offsetAndMetadata,
currentTimestamp, offsetsRetentionMs));
+ }
+
+ @Test
+ public void testIsSubscribedToTopic() {
+ Uuid fooTopicId = Uuid.randomUuid();
+ Uuid barTopicId = Uuid.randomUuid();
+
+ MetadataImage image = new
GroupMetadataManagerTest.MetadataImageBuilder()
+ .addTopic(fooTopicId, "foo", 1)
+ .addTopic(barTopicId, "bar", 2)
+ .addRacks()
+ .build();
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder("member1")
+ .setSubscribedTopicNames(Collections.singletonList("foo"))
+ .build();
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder("member2")
+ .setSubscribedTopicNames(Collections.singletonList("bar"))
+ .build();
+
+ ConsumerGroup consumerGroup = createConsumerGroup("group-foo");
+
+ consumerGroup.updateMember(member1);
+ consumerGroup.updateMember(member2);
+
+ assertEquals(
+ mkMap(
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1))),
+ mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2,
mkMapOfPartitionRacks(2)))
+ ),
+ consumerGroup.computeSubscriptionMetadata(
+ null,
+ null,
+ image.topics(),
+ image.cluster()
+ )
+ );
+
+ assertTrue(consumerGroup.isSubscribedToTopic("foo"));
+ assertTrue(consumerGroup.isSubscribedToTopic("bar"));
+
+ consumerGroup.removeMember("member1");
+ assertFalse(consumerGroup.isSubscribedToTopic("foo"));
+
+ consumerGroup.removeMember("member2");
+ assertFalse(consumerGroup.isSubscribedToTopic("bar"));
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
index 05afdd26edf..156fdf1b507 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
@@ -31,13 +31,19 @@ import
org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.OffsetAndMetadata;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -1085,6 +1091,151 @@ public class GenericGroupTest {
assertThrows(GroupIdNotFoundException.class,
group::validateDeleteGroup);
}
+ @Test
+ public void testOffsetExpirationCondition() {
+ long currentTimestamp = 30000L;
+ long commitTimestamp = 20000L;
+ long offsetsRetentionMs = 10000L;
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L,
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
+ MockTime time = new MockTime();
+ long currentStateTimestamp = time.milliseconds();
+ GenericGroup group = new GenericGroup(new LogContext(), "groupId",
EMPTY, time);
+
+ // 1. Test no protocol type. Simple consumer case, Base timestamp
based off of last commit timestamp.
+ Optional<OffsetExpirationCondition> offsetExpirationCondition =
group.offsetExpirationCondition();
+ assertTrue(offsetExpirationCondition.isPresent());
+
+ OffsetExpirationConditionImpl condition =
(OffsetExpirationConditionImpl) offsetExpirationCondition.get();
+ assertEquals(commitTimestamp,
condition.baseTimestamp().apply(offsetAndMetadata));
+ assertTrue(condition.isOffsetExpired(offsetAndMetadata,
currentTimestamp, offsetsRetentionMs));
+
+ // 2. Test non-consumer protocol type + Empty state. Base timestamp
based off of current state timestamp.
+ JoinGroupRequestProtocolCollection protocols = new
JoinGroupRequestProtocolCollection();
+ protocols.add(new JoinGroupRequestProtocol()
+ .setName("range")
+ .setMetadata(ConsumerProtocol.serializeSubscription(
+ new
ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic"))).array()));
+
+ GenericGroupMember memberWithNonConsumerProtocol = new
GenericGroupMember(
+ "memberWithNonConsumerProtocol",
+ Optional.empty(),
+ clientId,
+ clientHost,
+ rebalanceTimeoutMs,
+ sessionTimeoutMs,
+ "My Protocol",
+ protocols
+ );
+
+ group.add(memberWithNonConsumerProtocol);
+ assertEquals("My Protocol", group.protocolType().get());
+
+ offsetExpirationCondition = group.offsetExpirationCondition();
+ assertTrue(offsetExpirationCondition.isPresent());
+
+ condition = (OffsetExpirationConditionImpl)
offsetExpirationCondition.get();
+ assertEquals(currentStateTimestamp,
condition.baseTimestamp().apply(offsetAndMetadata));
+ assertTrue(condition.isOffsetExpired(offsetAndMetadata,
currentStateTimestamp + offsetsRetentionMs, offsetsRetentionMs));
+
+ // 3. Test non-consumer protocol type + non-Empty state. Do not expire
any offsets.
+ group.transitionTo(PREPARING_REBALANCE);
+ offsetExpirationCondition = group.offsetExpirationCondition();
+ assertFalse(offsetExpirationCondition.isPresent());
+
+ // 4. Test consumer protocol type + subscribed topics + Stable state.
Base timestamp based off of last commit timestamp.
+ group.remove("memberWithNonConsumerProtocol");
+ GenericGroupMember memberWithConsumerProtocol = new GenericGroupMember(
+ "memberWithConsumerProtocol",
+ Optional.empty(),
+ clientId,
+ clientHost,
+ rebalanceTimeoutMs,
+ sessionTimeoutMs,
+ "consumer",
+ protocols
+ );
+ group.add(memberWithConsumerProtocol);
+ group.initNextGeneration();
+ group.transitionTo(STABLE);
+ assertTrue(group.subscribedTopics().get().contains("topic"));
+
+ offsetExpirationCondition = group.offsetExpirationCondition();
+ assertTrue(offsetExpirationCondition.isPresent());
+
+ condition = (OffsetExpirationConditionImpl)
offsetExpirationCondition.get();
+ assertEquals(commitTimestamp,
condition.baseTimestamp().apply(offsetAndMetadata));
+ assertTrue(condition.isOffsetExpired(offsetAndMetadata,
currentTimestamp, offsetsRetentionMs));
+
+ // 5. Test consumer protocol type + subscribed topics + non-Stable
state. Do not expire any offsets.
+ group.transitionTo(PREPARING_REBALANCE);
+ offsetExpirationCondition = group.offsetExpirationCondition();
+ assertFalse(offsetExpirationCondition.isPresent());
+ }
+
+ @Test
+ public void testIsSubscribedToTopic() {
+ GenericGroup group = new GenericGroup(new LogContext(), "groupId",
EMPTY, Time.SYSTEM);
+
+ // 1. group has no protocol type => not subscribed
+ assertFalse(group.isSubscribedToTopic("topic"));
+
+ // 2. group does not use consumer group protocol type => not subscribed
+ JoinGroupRequestProtocolCollection protocols = new
JoinGroupRequestProtocolCollection();
+ protocols.add(new JoinGroupRequestProtocol()
+ .setName("range")
+ .setMetadata(ConsumerProtocol.serializeSubscription(
+ new
ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic"))).array()));
+
+ GenericGroupMember memberWithNonConsumerProtocol = new
GenericGroupMember(
+ "memberWithNonConsumerProtocol",
+ Optional.empty(),
+ clientId,
+ clientHost,
+ rebalanceTimeoutMs,
+ sessionTimeoutMs,
+ "My Protocol",
+ protocols
+ );
+
+ group.add(memberWithNonConsumerProtocol);
+ group.transitionTo(PREPARING_REBALANCE);
+ group.initNextGeneration();
+ assertTrue(group.isInState(COMPLETING_REBALANCE));
+ assertEquals(Optional.empty(), group.computeSubscribedTopics());
+ assertFalse(group.isSubscribedToTopic("topic"));
+
+ // 3. group uses consumer group protocol type but empty members => not
subscribed
+ group.remove("memberWithNonConsumerProtocol");
+ GenericGroupMember memberWithConsumerProtocol = new GenericGroupMember(
+ "memberWithConsumerProtocol",
+ Optional.empty(),
+ clientId,
+ clientHost,
+ rebalanceTimeoutMs,
+ sessionTimeoutMs,
+ "consumer",
+ protocols
+ );
+
+ group.add(memberWithConsumerProtocol);
+ group.remove("memberWithConsumerProtocol");
+ group.transitionTo(PREPARING_REBALANCE);
+ group.initNextGeneration();
+ assertTrue(group.isInState(EMPTY));
+ assertEquals(Optional.of(Collections.emptySet()),
group.computeSubscribedTopics());
+ assertTrue(group.usesConsumerGroupProtocol());
+ assertFalse(group.isSubscribedToTopic("topic"));
+
+ // 4. group uses consumer group protocol type with member subscription
=> subscribed
+ group.add(memberWithConsumerProtocol);
+ group.transitionTo(PREPARING_REBALANCE);
+ group.initNextGeneration();
+ assertTrue(group.isInState(COMPLETING_REBALANCE));
+ assertEquals(Optional.of(Collections.singleton("topic")),
group.computeSubscribedTopics());
+ assertTrue(group.usesConsumerGroupProtocol());
+ assertTrue(group.isSubscribedToTopic("topic"));
+ }
+
private void assertState(GenericGroup group, GenericGroupState
targetState) {
Set<GenericGroupState> otherStates = new HashSet<>();
otherStates.add(STABLE);