This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b7294d92e11 KAFKA-17593; [11/11] Update subscription type (#18020)
b7294d92e11 is described below

commit b7294d92e11b2cc7b4766663b46aa0f2a718bee9
Author: David Jacot <[email protected]>
AuthorDate: Fri Dec 6 15:57:27 2024 +0100

    KAFKA-17593; [11/11] Update subscription type (#18020)
    
    This is the last patch in the series which introduces regular expressions 
in the new consumer group protocol. The patch ensures that the subscription 
type of the group takes into account the regular expressions. Please refer to 
the code to see how they are included.
    
    Reviewers: Sean Quah <[email protected]>, Jeff Kim <[email protected]>
---
 .../group/api/assignor/SubscriptionType.java       |   4 +-
 .../coordinator/group/GroupMetadataManager.java    | 180 ++++++++++------
 .../coordinator/group/modern/ModernGroup.java      |  38 ++--
 .../group/modern/SubscriptionCount.java            | 101 +++++++++
 .../group/modern/consumer/ConsumerGroup.java       | 121 +++++++++--
 .../group/modern/consumer/ConsumerGroupMember.java |   8 +
 .../coordinator/group/modern/share/ShareGroup.java |   6 +-
 .../group/modern/consumer/ConsumerGroupTest.java   | 240 ++++++++++++++++-----
 8 files changed, 538 insertions(+), 160 deletions(-)

diff --git 
a/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/SubscriptionType.java
 
b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/SubscriptionType.java
index cab35bbf3db..eb80e3eacbb 100644
--- 
a/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/SubscriptionType.java
+++ 
b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/SubscriptionType.java
@@ -25,12 +25,12 @@ import 
org.apache.kafka.common.annotation.InterfaceStability;
 public enum SubscriptionType {
     /**
      * A homogeneous subscription type means that all the members
-     * of the group are subscribed to the same set of topics.
+     * of the group use the same subscription.
      */
     HOMOGENEOUS("Homogeneous"),
     /**
      * A heterogeneous subscription type means that not all the members
-     * of the group are subscribed to the same set of topics.
+     * of the group use the same subscription.
      */
     HETEROGENEOUS("Heterogeneous");
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index dda3a41d0bf..90d8db4d320 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -107,6 +107,7 @@ import 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.modern.Assignment;
 import org.apache.kafka.coordinator.group.modern.MemberState;
 import org.apache.kafka.coordinator.group.modern.ModernGroup;
+import org.apache.kafka.coordinator.group.modern.SubscriptionCount;
 import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder;
 import org.apache.kafka.coordinator.group.modern.TopicMetadata;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
@@ -199,6 +200,22 @@ import static 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMe
 public class GroupMetadataManager {
     private static final int METADATA_REFRESH_INTERVAL_MS = Integer.MAX_VALUE;
 
+    private static class UpdateSubscriptionMetadataResult {
+        private final int groupEpoch;
+        private final Map<String, TopicMetadata> subscriptionMetadata;
+        private final SubscriptionType subscriptionType;
+
+        UpdateSubscriptionMetadataResult(
+            int groupEpoch,
+            Map<String, TopicMetadata> subscriptionMetadata,
+            SubscriptionType subscriptionType
+        ) {
+            this.groupEpoch = groupEpoch;
+            this.subscriptionMetadata = 
Objects.requireNonNull(subscriptionMetadata);
+            this.subscriptionType = Objects.requireNonNull(subscriptionType);
+        }
+    }
+
     public static class Builder {
         private LogContext logContext = null;
         private SnapshotRegistry snapshotRegistry = null;
@@ -1698,43 +1715,17 @@ public class GroupMetadataManager {
             // The subscription metadata is updated in two cases:
             // 1) The member has updated its subscriptions;
             // 2) The refresh deadline has been reached.
-            Map<String, Integer> subscribedTopicNamesMap = 
group.computeSubscribedTopicNames(
+            UpdateSubscriptionMetadataResult result = 
updateSubscriptionMetadata(
+                group,
+                bumpGroupEpoch,
                 member,
-                updatedMember
-            );
-            subscriptionMetadata = group.computeSubscriptionMetadata(
-                subscribedTopicNamesMap,
-                metadataImage.topics(),
-                metadataImage.cluster()
-            );
-
-            int numMembers = group.numMembers();
-            if (!group.hasMember(updatedMember.memberId()) && 
!group.hasStaticMember(updatedMember.instanceId())) {
-                numMembers++;
-            }
-
-            subscriptionType = ModernGroup.subscriptionType(
-                subscribedTopicNamesMap,
-                numMembers
+                updatedMember,
+                records
             );
 
-            if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[GroupId {}] Computed new subscription 
metadata: {}.",
-                        groupId, subscriptionMetadata);
-                }
-                bumpGroupEpoch = true;
-                
records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
-            }
-
-            if (bumpGroupEpoch) {
-                groupEpoch += 1;
-                records.add(newConsumerGroupEpochRecord(groupId, groupEpoch));
-                log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
-                metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
-            }
-
-            group.setMetadataRefreshDeadline(currentTimeMs + 
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
+            groupEpoch = result.groupEpoch;
+            subscriptionMetadata = result.subscriptionMetadata;
+            subscriptionType = result.subscriptionType;
         }
 
         // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch. The delta between
@@ -1860,7 +1851,6 @@ public class GroupMetadataManager {
 
         int groupEpoch = group.groupEpoch();
         Map<String, TopicMetadata> subscriptionMetadata = 
group.subscriptionMetadata();
-        Map<String, Integer> subscribedTopicNamesMap = 
group.subscribedTopicNames();
         SubscriptionType subscriptionType = group.subscriptionType();
         final ConsumerProtocolSubscription subscription = 
deserializeSubscription(protocols);
 
@@ -1894,40 +1884,17 @@ public class GroupMetadataManager {
             // The subscription metadata is updated in two cases:
             // 1) The member has updated its subscriptions;
             // 2) The refresh deadline has been reached.
-            subscribedTopicNamesMap = 
group.computeSubscribedTopicNames(member, updatedMember);
-            subscriptionMetadata = group.computeSubscriptionMetadata(
-                subscribedTopicNamesMap,
-                metadataImage.topics(),
-                metadataImage.cluster()
-            );
-
-            int numMembers = group.numMembers();
-            if (!group.hasMember(updatedMember.memberId()) && 
!group.hasStaticMember(updatedMember.instanceId())) {
-                numMembers++;
-            }
-
-            subscriptionType = ConsumerGroup.subscriptionType(
-                subscribedTopicNamesMap,
-                numMembers
+            UpdateSubscriptionMetadataResult result = 
updateSubscriptionMetadata(
+                group,
+                bumpGroupEpoch,
+                member,
+                updatedMember,
+                records
             );
 
-            if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[GroupId {}] Computed new subscription 
metadata: {}.",
-                        groupId, subscriptionMetadata);
-                }
-                bumpGroupEpoch = true;
-                
records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
-            }
-
-            if (bumpGroupEpoch) {
-                groupEpoch += 1;
-                records.add(newConsumerGroupEpochRecord(groupId, groupEpoch));
-                log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
-                metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
-            }
-
-            group.setMetadataRefreshDeadline(currentTimeMs + 
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
+            groupEpoch = result.groupEpoch;
+            subscriptionMetadata = result.subscriptionMetadata;
+            subscriptionType = result.subscriptionType;
         }
 
         // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch. The delta between
@@ -2073,7 +2040,7 @@ public class GroupMetadataManager {
             // The subscription metadata is updated in two cases:
             // 1) The member has updated its subscriptions;
             // 2) The refresh deadline has been reached.
-            Map<String, Integer> subscribedTopicNamesMap = 
group.computeSubscribedTopicNames(member, updatedMember);
+            Map<String, SubscriptionCount> subscribedTopicNamesMap = 
group.computeSubscribedTopicNames(member, updatedMember);
             subscriptionMetadata = group.computeSubscriptionMetadata(
                 subscribedTopicNamesMap,
                 metadataImage.topics(),
@@ -2508,7 +2475,7 @@ public class GroupMetadataManager {
         List<CoordinatorRecord> records = new ArrayList<>();
         try {
             ConsumerGroup group = consumerGroup(groupId);
-            Map<String, Integer> subscribedTopicNames = new 
HashMap<>(group.subscribedTopicNames());
+            Map<String, SubscriptionCount> subscribedTopicNames = new 
HashMap<>(group.subscribedTopicNames());
 
             boolean bumpGroupEpoch = false;
             for (Map.Entry<String, ResolvedRegularExpression> entry : 
resolvedRegularExpressions.entrySet()) {
@@ -2527,11 +2494,11 @@ public class GroupMetadataManager {
                     bumpGroupEpoch = true;
 
                     oldResolvedRegularExpression.topics.forEach(topicName ->
-                        subscribedTopicNames.compute(topicName, 
Utils::decValue)
+                        subscribedTopicNames.compute(topicName, 
SubscriptionCount::decRegexCount)
                     );
 
                     newResolvedRegularExpression.topics.forEach(topicName ->
-                        subscribedTopicNames.compute(topicName, 
Utils::incValue)
+                        subscribedTopicNames.compute(topicName, 
SubscriptionCount::incRegexCount)
                     );
                 }
 
@@ -2709,6 +2676,77 @@ public class GroupMetadataManager {
         return updatedMember;
     }
 
+    /**
+     * Updates the subscription metadata and bumps the group epoch if needed.
+     *
+     * @param group             The consumer group.
+     * @param bumpGroupEpoch    Whether the group epoch must be bumped.
+     * @param member            The old member.
+     * @param updatedMember     The new member.
+     * @param records           The record accumulator.
+     * @return The result of the update.
+     */
+    private UpdateSubscriptionMetadataResult updateSubscriptionMetadata(
+        ConsumerGroup group,
+        boolean bumpGroupEpoch,
+        ConsumerGroupMember member,
+        ConsumerGroupMember updatedMember,
+        List<CoordinatorRecord> records
+    ) {
+        final long currentTimeMs = time.milliseconds();
+        final String groupId = group.groupId();
+        int groupEpoch = group.groupEpoch();
+
+        Map<String, Integer> subscribedRegularExpressions = 
group.computeSubscribedRegularExpressions(
+            member,
+            updatedMember
+        );
+        Map<String, SubscriptionCount> subscribedTopicNamesMap = 
group.computeSubscribedTopicNames(
+            member,
+            updatedMember
+        );
+        Map<String, TopicMetadata> subscriptionMetadata = 
group.computeSubscriptionMetadata(
+            subscribedTopicNamesMap,
+            metadataImage.topics(),
+            metadataImage.cluster()
+        );
+
+        int numMembers = group.numMembers();
+        if (!group.hasMember(updatedMember.memberId()) && 
!group.hasStaticMember(updatedMember.instanceId())) {
+            numMembers++;
+        }
+
+        SubscriptionType subscriptionType = ConsumerGroup.subscriptionType(
+            subscribedRegularExpressions,
+            subscribedTopicNamesMap,
+            numMembers
+        );
+
+        if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+            if (log.isDebugEnabled()) {
+                log.debug("[GroupId {}] Computed new subscription metadata: 
{}.",
+                    groupId, subscriptionMetadata);
+            }
+            bumpGroupEpoch = true;
+            records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+        }
+
+        if (bumpGroupEpoch) {
+            groupEpoch += 1;
+            records.add(newConsumerGroupEpochRecord(groupId, groupEpoch));
+            log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
+            metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
+        }
+
+        group.setMetadataRefreshDeadline(currentTimeMs + 
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
+
+        return new UpdateSubscriptionMetadataResult(
+            groupEpoch,
+            subscriptionMetadata,
+            subscriptionType
+        );
+    }
+
     /**
      * Updates the target assignment according to the updated member and 
subscription metadata.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
index 8b920a7e051..cf291e0306b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.coordinator.group.Group;
-import org.apache.kafka.coordinator.group.Utils;
 import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
 import org.apache.kafka.image.ClusterImage;
 import org.apache.kafka.image.TopicImage;
@@ -82,7 +81,7 @@ public abstract class ModernGroup<T extends 
ModernGroupMember> implements Group
     /**
      * The number of subscribers or regular expressions per topic.
      */
-    protected final TimelineHashMap<String, Integer> subscribedTopicNames;
+    protected final TimelineHashMap<String, SubscriptionCount> 
subscribedTopicNames;
 
     /**
      * The metadata associated with each subscribed topic name.
@@ -221,7 +220,7 @@ public abstract class ModernGroup<T extends 
ModernGroupMember> implements Group
      * @return An immutable map containing all the subscribed topic names
      *         with the subscribers counts per topic.
      */
-    public Map<String, Integer> subscribedTopicNames() {
+    public Map<String, SubscriptionCount> subscribedTopicNames() {
         return Collections.unmodifiableMap(subscribedTopicNames);
     }
 
@@ -378,7 +377,7 @@ public abstract class ModernGroup<T extends 
ModernGroupMember> implements Group
      * @return An immutable map of subscription metadata for each topic that 
the consumer group is subscribed to.
      */
     public Map<String, TopicMetadata> computeSubscriptionMetadata(
-        Map<String, Integer> subscribedTopicNames,
+        Map<String, SubscriptionCount> subscribedTopicNames,
         TopicsImage topicsImage,
         ClusterImage clusterImage
     ) {
@@ -440,19 +439,24 @@ public abstract class ModernGroup<T extends 
ModernGroupMember> implements Group
         return metadataRefreshDeadline;
     }
 
+    /**
+     * Updates the subscription type.
+     */
+    protected void maybeUpdateGroupSubscriptionType() {
+        subscriptionType.set(subscriptionType(subscribedTopicNames, 
members.size()));
+    }
+
     /**
      * Updates the subscribed topic names count.
-     * The subscription type is updated as a consequence.
      *
      * @param oldMember The old member.
      * @param newMember The new member.
      */
-    protected void maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(
+    protected void maybeUpdateSubscribedTopicNames(
         ModernGroupMember oldMember,
         ModernGroupMember newMember
     ) {
         maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, 
newMember);
-        subscriptionType.set(subscriptionType(subscribedTopicNames, 
members.size()));
     }
 
     /**
@@ -463,19 +467,19 @@ public abstract class ModernGroup<T extends 
ModernGroupMember> implements Group
      * @param newMember             The new member.
      */
     private static void maybeUpdateSubscribedTopicNames(
-        Map<String, Integer> subscribedTopicCount,
+        Map<String, SubscriptionCount> subscribedTopicCount,
         ModernGroupMember oldMember,
         ModernGroupMember newMember
     ) {
         if (oldMember != null) {
             oldMember.subscribedTopicNames().forEach(topicName ->
-                subscribedTopicCount.compute(topicName, Utils::decValue)
+                subscribedTopicCount.compute(topicName, 
SubscriptionCount::decNameCount)
             );
         }
 
         if (newMember != null) {
             newMember.subscribedTopicNames().forEach(topicName ->
-                subscribedTopicCount.compute(topicName, Utils::incValue)
+                subscribedTopicCount.compute(topicName, 
SubscriptionCount::incNameCount)
             );
         }
     }
@@ -488,11 +492,11 @@ public abstract class ModernGroup<T extends 
ModernGroupMember> implements Group
      *
      * @return Copy of the map of topics to the count of number of subscribers.
      */
-    public Map<String, Integer> computeSubscribedTopicNames(
+    public Map<String, SubscriptionCount> computeSubscribedTopicNames(
         ModernGroupMember oldMember,
         ModernGroupMember newMember
     ) {
-        Map<String, Integer> subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+        Map<String, SubscriptionCount> subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
         maybeUpdateSubscribedTopicNames(
             subscribedTopicNames,
             oldMember,
@@ -508,10 +512,10 @@ public abstract class ModernGroup<T extends 
ModernGroupMember> implements Group
      *
      * @return Copy of the map of topics to the count of number of subscribers.
      */
-    public Map<String, Integer> computeSubscribedTopicNames(
+    public Map<String, SubscriptionCount> computeSubscribedTopicNames(
         Set<? extends ModernGroupMember> removedMembers
     ) {
-        Map<String, Integer> subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+        Map<String, SubscriptionCount> subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
         if (removedMembers != null) {
             removedMembers.forEach(removedMember ->
                 maybeUpdateSubscribedTopicNames(
@@ -533,15 +537,15 @@ public abstract class ModernGroup<T extends 
ModernGroupMember> implements Group
      *         otherwise, {@link SubscriptionType#HETEROGENEOUS}.
      */
     public static SubscriptionType subscriptionType(
-        Map<String, Integer> subscribedTopicNames,
+        Map<String, SubscriptionCount> subscribedTopicNames,
         int numberOfMembers
     ) {
         if (subscribedTopicNames.isEmpty()) {
             return HOMOGENEOUS;
         }
 
-        for (int subscriberCount : subscribedTopicNames.values()) {
-            if (subscriberCount != numberOfMembers) {
+        for (SubscriptionCount subscriberCount : 
subscribedTopicNames.values()) {
+            if (subscriberCount.byNameCount != numberOfMembers) {
                 return HETEROGENEOUS;
             }
         }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscriptionCount.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscriptionCount.java
new file mode 100644
index 00000000000..ef75a3c3eca
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscriptionCount.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern;
+
+/**
+ * A class which holds two counters. One to count subscription by name and
+ * another one to count subscription by regex.
+ */
+public class SubscriptionCount {
+    public final int byNameCount;
+    public final int byRegexCount;
+
+    public SubscriptionCount(int byNameCount, int byRegexCount) {
+        this.byNameCount = byNameCount;
+        this.byRegexCount = byRegexCount;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        SubscriptionCount that = (SubscriptionCount) o;
+
+        if (byNameCount != that.byNameCount) return false;
+        return byRegexCount == that.byRegexCount;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = byNameCount;
+        result = 31 * result + byRegexCount;
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "SubscriptionCount(" +
+            "byNameCount=" + byNameCount +
+            ", byRegexCount=" + byRegexCount +
+            ')';
+    }
+
+    /**
+     * Increments the name count by 1; This helper is meant to be used with 
Map#compute.
+     */
+    public static SubscriptionCount incNameCount(String key, SubscriptionCount 
count) {
+        if (count == null) {
+            return new SubscriptionCount(1, 0);
+        } else {
+            return new SubscriptionCount(count.byNameCount + 1, 
count.byRegexCount);
+        }
+    }
+
+    /**
+     * Decrements the name count by 1; This helper is meant to be used with 
Map#compute.
+     */
+    public static SubscriptionCount decNameCount(String key, SubscriptionCount 
count) {
+        if (count == null || (count.byNameCount == 1 && count.byRegexCount == 
0)) {
+            return null;
+        } else {
+            return new SubscriptionCount(count.byNameCount - 1, 
count.byRegexCount);
+        }
+    }
+
+    /**
+     * Increments the regex count by 1; This helper is meant to be used with 
Map#compute.
+     */
+    public static SubscriptionCount incRegexCount(String key, 
SubscriptionCount count) {
+        if (count == null) {
+            return new SubscriptionCount(0, 1);
+        } else {
+            return new SubscriptionCount(count.byNameCount, count.byRegexCount 
+ 1);
+        }
+    }
+
+    /**
+     * Decrements the regex count by 1; This helper is meant to be used with 
Map#compute.
+     */
+    public static SubscriptionCount decRegexCount(String key, 
SubscriptionCount count) {
+        if (count == null || (count.byRegexCount == 1 && count.byNameCount == 
0)) {
+            return null;
+        } else {
+            return new SubscriptionCount(count.byNameCount, count.byRegexCount 
- 1);
+        }
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index 1fdfe5ae87e..fbd259dc332 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -32,6 +32,7 @@ import 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
 import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
 import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
 import org.apache.kafka.coordinator.group.Utils;
+import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
 import org.apache.kafka.coordinator.group.classic.ClassicGroup;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
@@ -39,6 +40,7 @@ import org.apache.kafka.coordinator.group.modern.Assignment;
 import org.apache.kafka.coordinator.group.modern.MemberState;
 import org.apache.kafka.coordinator.group.modern.ModernGroup;
 import org.apache.kafka.coordinator.group.modern.ModernGroupMember;
+import org.apache.kafka.coordinator.group.modern.SubscriptionCount;
 import org.apache.kafka.image.TopicsImage;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
@@ -59,11 +61,14 @@ import java.util.Set;
 
 import static org.apache.kafka.coordinator.group.Utils.toOptional;
 import static org.apache.kafka.coordinator.group.Utils.toTopicPartitionMap;
+import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
+import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
 import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupMember.EMPTY_ASSIGNMENT;
 import static 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState.ASSIGNING;
 import static 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState.EMPTY;
 import static 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState.RECONCILING;
 import static 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState.STABLE;
+import static 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.subscribedTopicRegexOrNull;
 
 /**
  * A Consumer Group. All the metadata in this class are backed by
@@ -307,12 +312,13 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
             throw new IllegalArgumentException("newMember cannot be null.");
         }
         ConsumerGroupMember oldMember = members.put(newMember.memberId(), 
newMember);
-        maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, 
newMember);
+        maybeUpdateSubscribedTopicNames(oldMember, newMember);
         maybeUpdateServerAssignors(oldMember, newMember);
         maybeUpdatePartitionEpoch(oldMember, newMember);
         maybeUpdateSubscribedRegularExpression(oldMember, newMember);
         updateStaticMember(newMember);
         maybeUpdateGroupState();
+        maybeUpdateGroupSubscriptionType();
         maybeUpdateNumClassicProtocolMembers(oldMember, newMember);
         maybeUpdateClassicProtocolMembersSupportedProtocols(oldMember, 
newMember);
     }
@@ -331,12 +337,13 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
     @Override
     public void removeMember(String memberId) {
         ConsumerGroupMember oldMember = members.remove(memberId);
-        maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, 
null);
+        maybeUpdateSubscribedTopicNames(oldMember, null);
         maybeUpdateServerAssignors(oldMember, null);
         maybeRemovePartitionEpoch(oldMember);
         maybeUpdateSubscribedRegularExpression(oldMember, null);
         removeStaticMember(oldMember);
         maybeUpdateGroupState();
+        maybeUpdateGroupSubscriptionType();
         maybeUpdateNumClassicProtocolMembers(oldMember, null);
         maybeUpdateClassicProtocolMembersSupportedProtocols(oldMember, null);
     }
@@ -360,28 +367,21 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
      *
      * @return Copy of the map of topics to the count of number of subscribers.
      */
-    public Map<String, Integer> computeSubscribedTopicNames(
+    public Map<String, SubscriptionCount> computeSubscribedTopicNames(
         ConsumerGroupMember oldMember,
         ConsumerGroupMember newMember
     ) {
-        Map<String, Integer> subscribedTopicsNames = 
super.computeSubscribedTopicNames(oldMember, newMember);
-
-        String oldSubscribedTopicRegex = null;
-        if (oldMember != null && oldMember.subscribedTopicRegex() != null && 
!oldMember.subscribedTopicRegex().isEmpty()) {
-            oldSubscribedTopicRegex = oldMember.subscribedTopicRegex();
-        }
+        Map<String, SubscriptionCount> subscribedTopicsNames = 
super.computeSubscribedTopicNames(oldMember, newMember);
+        String oldSubscribedTopicRegex = subscribedTopicRegexOrNull(oldMember);
 
         if (oldSubscribedTopicRegex != null) {
-            String newSubscribedTopicRegex = null;
-            if (newMember != null && newMember.subscribedTopicRegex() != null 
&& !newMember.subscribedTopicRegex().isEmpty()) {
-                newSubscribedTopicRegex = newMember.subscribedTopicRegex();
-            }
+            String newSubscribedTopicRegex = 
subscribedTopicRegexOrNull(newMember);
 
             // If the old member was the last one subscribed to the regex and 
the new member
             // is not subscribed to it, we must remove it from the subscribed 
topic names.
             if (!oldSubscribedTopicRegex.equals(newSubscribedTopicRegex) && 
numSubscribedMembers(oldSubscribedTopicRegex) == 1) {
                 
resolvedRegularExpression(oldSubscribedTopicRegex).ifPresent(resolvedRegularExpression
 ->
-                    resolvedRegularExpression.topics.forEach(topic -> 
subscribedTopicsNames.compute(topic, Utils::decValue))
+                    resolvedRegularExpression.topics.forEach(topic -> 
subscribedTopicsNames.compute(topic, SubscriptionCount::decRegexCount))
                 );
             }
         }
@@ -389,6 +389,35 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         return subscribedTopicsNames;
     }
 
+    /**
+     * Computes an updated version of the subscribed regular expressions based 
on
+     * the new/old members.
+     *
+     * @param oldMember The old member.
+     * @param newMember The new member.
+     * @return An unmodifiable and updated copy of the map.
+     */
+    public Map<String, Integer> computeSubscribedRegularExpressions(
+        ConsumerGroupMember oldMember,
+        ConsumerGroupMember newMember
+    ) {
+        String oldRegex = subscribedTopicRegexOrNull(oldMember);
+        String newRegex = subscribedTopicRegexOrNull(newMember);
+
+        if (!Objects.equals(oldRegex, newRegex)) {
+            Map<String, Integer> newSubscribedRegularExpressions = new 
HashMap<>(subscribedRegularExpressions);
+            if (oldRegex != null) {
+                newSubscribedRegularExpressions.compute(oldRegex, 
Utils::decValue);
+            }
+            if (newRegex != null) {
+                newSubscribedRegularExpressions.compute(newRegex, 
Utils::incValue);
+            }
+            return 
Collections.unmodifiableMap(newSubscribedRegularExpressions);
+        } else {
+            return Collections.unmodifiableMap(subscribedRegularExpressions);
+        }
+    }
+
     /**
      * Computes an updated copy of the subscribed topic names without the 
provided
      * removed members and removed regular expressions.
@@ -398,16 +427,16 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
      *
      * @return Copy of the map of topics to the count of number of subscribers.
      */
-    public Map<String, Integer> 
computeSubscribedTopicNamesWithoutDeletedMembers(
+    public Map<String, SubscriptionCount> 
computeSubscribedTopicNamesWithoutDeletedMembers(
         Set<ConsumerGroupMember> removedMembers,
         Set<String> removedRegexes
     ) {
-        Map<String, Integer> subscribedTopicsNames = 
super.computeSubscribedTopicNames(removedMembers);
+        Map<String, SubscriptionCount> subscribedTopicsNames = 
super.computeSubscribedTopicNames(removedMembers);
 
         removedRegexes.forEach(regex ->
             
resolvedRegularExpression(regex).ifPresent(resolvedRegularExpression ->
                 resolvedRegularExpression.topics.forEach(topic ->
-                    subscribedTopicsNames.compute(topic, Utils::decValue)
+                    subscribedTopicsNames.compute(topic, 
SubscriptionCount::decRegexCount)
                 )
             )
         );
@@ -428,7 +457,7 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         removeResolvedRegularExpression(regex);
         if (newResolvedRegularExpression != null) {
             resolvedRegularExpressions.put(regex, 
newResolvedRegularExpression);
-            newResolvedRegularExpression.topics.forEach(topicName -> 
subscribedTopicNames.compute(topicName, Utils::incValue));
+            newResolvedRegularExpression.topics.forEach(topicName -> 
subscribedTopicNames.compute(topicName, SubscriptionCount::incRegexCount));
         }
     }
 
@@ -440,7 +469,7 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
     public void removeResolvedRegularExpression(String regex) {
         ResolvedRegularExpression oldResolvedRegularExpression = 
resolvedRegularExpressions.remove(regex);
         if (oldResolvedRegularExpression != null) {
-            oldResolvedRegularExpression.topics.forEach(topicName -> 
subscribedTopicNames.compute(topicName, Utils::decValue));
+            oldResolvedRegularExpression.topics.forEach(topicName -> 
subscribedTopicNames.compute(topicName, SubscriptionCount::decRegexCount));
         }
     }
 
@@ -798,6 +827,60 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         }
     }
 
+    /**
+     * Computes the subscription type based on the provided information.
+     *
+     * @param subscribedRegularExpressions  The subscribed regular expression 
count.
+     * @param subscribedTopicNames          The subscribed topic name count.
+     * @param numberOfMembers               The number of members in the group.
+     *
+     * @return The subscription type.
+     */
+    public static SubscriptionType subscriptionType(
+        Map<String, Integer> subscribedRegularExpressions,
+        Map<String, SubscriptionCount> subscribedTopicNames,
+        int numberOfMembers
+    ) {
+        if (subscribedRegularExpressions.isEmpty()) {
+            // If the members do not use regular expressions, the subscription 
is
+            // considered as homogeneous if all the members are subscribed to 
the
+            // same topics. Otherwise, it is considered as heterogeneous.
+            for (SubscriptionCount subscriberCount : 
subscribedTopicNames.values()) {
+                if (subscriberCount.byNameCount != numberOfMembers) {
+                    return HETEROGENEOUS;
+                }
+            }
+            return HOMOGENEOUS;
+        } else {
+            int count = 
subscribedRegularExpressions.values().iterator().next();
+            if (count == numberOfMembers) {
+                // If all the members are subscribed to a single regular 
expressions
+                // and none of them are subscribed to topic names, the 
subscription
+                // is considered as homogeneous. If some members are 
subscribed to
+                // topic names too, the subscription is considered as 
heterogeneous.
+                for (SubscriptionCount subscriberCount : 
subscribedTopicNames.values()) {
+                    if (subscriberCount.byRegexCount != 1 || 
subscriberCount.byNameCount > 0) {
+                        return HETEROGENEOUS;
+                    }
+                }
+                return HOMOGENEOUS;
+            } else {
+                // The subscription is considered as heterogeneous because
+                // there is a mix of regular expressions.
+                return SubscriptionType.HETEROGENEOUS;
+            }
+        }
+    }
+
+    @Override
+    protected void maybeUpdateGroupSubscriptionType() {
+        subscriptionType.set(subscriptionType(
+            subscribedRegularExpressions,
+            subscribedTopicNames,
+            members.size()
+        ));
+    }
+
     @Override
     protected void maybeUpdateGroupState() {
         ConsumerGroupState previousState = state.get();
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
index 69fa5cdd50c..c96dd277adb 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
@@ -518,4 +518,12 @@ public class ConsumerGroupMember extends ModernGroupMember 
{
             ", classicMemberMetadata='" + classicMemberMetadata + '\'' +
             ')';
     }
+
+    public static String subscribedTopicRegexOrNull(ConsumerGroupMember 
member) {
+        if (member != null && member.subscribedTopicRegex() != null && 
!member.subscribedTopicRegex().isEmpty()) {
+            return member.subscribedTopicRegex();
+        } else {
+            return null;
+        }
+    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
index 1e641dfbe87..bbd073c8e76 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
@@ -163,8 +163,9 @@ public class ShareGroup extends 
ModernGroup<ShareGroupMember> {
         }
 
         ShareGroupMember oldMember = members.put(newMember.memberId(), 
newMember);
-        maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, 
newMember);
+        maybeUpdateSubscribedTopicNames(oldMember, newMember);
         maybeUpdateGroupState();
+        maybeUpdateGroupSubscriptionType();
     }
 
     /**
@@ -174,8 +175,9 @@ public class ShareGroup extends 
ModernGroup<ShareGroupMember> {
      */
     public void removeMember(String memberId) {
         ShareGroupMember oldMember = members.remove(memberId);
-        maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, 
null);
+        maybeUpdateSubscribedTopicNames(oldMember, null);
         maybeUpdateGroupState();
+        maybeUpdateGroupSubscriptionType();
     }
 
     @Override
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index eacb4fca4d5..3f3ee32e581 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -46,6 +46,7 @@ import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataV
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.modern.Assignment;
 import org.apache.kafka.coordinator.group.modern.MemberState;
+import org.apache.kafka.coordinator.group.modern.SubscriptionCount;
 import org.apache.kafka.coordinator.group.modern.TopicMetadata;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.timeline.SnapshotRegistry;
@@ -1660,9 +1661,9 @@ public class ConsumerGroupTest {
         // Verify initial state.
         assertEquals(
             Map.of(
-                "foo", 2,
-                "bar", 2,
-                "zar", 1
+                "foo", new SubscriptionCount(2, 0),
+                "bar", new SubscriptionCount(2, 0),
+                "zar", new SubscriptionCount(1, 0)
             ),
             consumerGroup.subscribedTopicNames()
         );
@@ -1679,9 +1680,9 @@ public class ConsumerGroupTest {
 
         assertEquals(
             Map.of(
-                "foo", 3,
-                "bar", 3,
-                "zar", 1
+                "foo", new SubscriptionCount(2, 1),
+                "bar", new SubscriptionCount(2, 1),
+                "zar", new SubscriptionCount(1, 0)
             ),
             consumerGroup.subscribedTopicNames()
         );
@@ -1698,10 +1699,10 @@ public class ConsumerGroupTest {
 
         assertEquals(
             Map.of(
-                "foo", 3,
-                "bar", 3,
-                "zar", 1,
-                "foobar", 1
+                "foo", new SubscriptionCount(2, 1),
+                "bar", new SubscriptionCount(2, 1),
+                "zar", new SubscriptionCount(1, 0),
+                "foobar", new SubscriptionCount(0, 1)
             ),
             consumerGroup.subscribedTopicNames()
         );
@@ -1718,10 +1719,10 @@ public class ConsumerGroupTest {
 
         assertEquals(
             Map.of(
-                "foo", 3,
-                "bar", 2,
-                "zar", 1,
-                "foobar", 1
+                "foo", new SubscriptionCount(2, 1),
+                "bar", new SubscriptionCount(2, 0),
+                "zar", new SubscriptionCount(1, 0),
+                "foobar", new SubscriptionCount(0, 1)
             ),
             consumerGroup.subscribedTopicNames()
         );
@@ -1731,10 +1732,10 @@ public class ConsumerGroupTest {
 
         assertEquals(
             Map.of(
-                "foo", 2,
-                "bar", 2,
-                "zar", 1,
-                "foobar", 1
+                "foo", new SubscriptionCount(2, 0),
+                "bar", new SubscriptionCount(2, 0),
+                "zar", new SubscriptionCount(1, 0),
+                "foobar", new SubscriptionCount(0, 1)
             ),
             consumerGroup.subscribedTopicNames()
         );
@@ -1744,9 +1745,9 @@ public class ConsumerGroupTest {
 
         assertEquals(
             Map.of(
-                "foo", 2,
-                "bar", 2,
-                "zar", 1
+                "foo", new SubscriptionCount(2, 0),
+                "bar", new SubscriptionCount(2, 0),
+                "zar", new SubscriptionCount(1, 0)
             ),
             consumerGroup.subscribedTopicNames()
         );
@@ -1807,11 +1808,11 @@ public class ConsumerGroupTest {
         // Verify initial state.
         assertEquals(
             Map.of(
-                "foo", 3,
-                "fooo", 1,
-                "bar", 3,
-                "barr", 1,
-                "zar", 1
+                "foo", new SubscriptionCount(2, 1),
+                "fooo", new SubscriptionCount(0, 1),
+                "bar", new SubscriptionCount(2, 1),
+                "barr", new SubscriptionCount(0, 1),
+                "zar", new SubscriptionCount(1, 0)
             ),
             consumerGroup.subscribedTopicNames()
         );
@@ -1819,10 +1820,10 @@ public class ConsumerGroupTest {
         // Compute with removed members and regexes.
         assertEquals(
             Map.of(
-                "foo", 1,
-                "bar", 2,
-                "barr", 1,
-                "zar", 1
+                "foo", new SubscriptionCount(1, 0),
+                "bar", new SubscriptionCount(1, 1),
+                "barr", new SubscriptionCount(0, 1),
+                "zar", new SubscriptionCount(1, 0)
             ),
             consumerGroup.computeSubscribedTopicNamesWithoutDeletedMembers(
                 Set.of(member2, member3, member4, member5),
@@ -1863,21 +1864,21 @@ public class ConsumerGroupTest {
         // Verify initial state.
         assertEquals(
             Map.of(
-                "foo", 4,
-                "fooo", 1,
-                "bar", 2,
-                "zar", 1
+                "foo", new SubscriptionCount(3, 1),
+                "fooo", new SubscriptionCount(0, 1),
+                "bar", new SubscriptionCount(2, 0),
+                "zar", new SubscriptionCount(1, 0)
             ),
             consumerGroup.subscribedTopicNames()
         );
 
-        // Compute subscribed topic names without changing the regex.
+        // Compute subscribed topic names without changing anything.
         assertEquals(
             Map.of(
-                "foo", 4,
-                "fooo", 1,
-                "bar", 2,
-                "zar", 1
+                "foo", new SubscriptionCount(3, 1),
+                "fooo", new SubscriptionCount(0, 1),
+                "bar", new SubscriptionCount(2, 0),
+                "zar", new SubscriptionCount(1, 0)
             ),
             consumerGroup.computeSubscribedTopicNames(member3, member3)
         );
@@ -1885,9 +1886,9 @@ public class ConsumerGroupTest {
         // Compute subscribed topic names with removing the regex.
         assertEquals(
             Map.of(
-                "foo", 3,
-                "bar", 2,
-                "zar", 1
+                "foo", new SubscriptionCount(3, 0),
+                "bar", new SubscriptionCount(2, 0),
+                "zar", new SubscriptionCount(1, 0)
             ),
             consumerGroup.computeSubscribedTopicNames(
                 member3,
@@ -1900,10 +1901,10 @@ public class ConsumerGroupTest {
         // Compute subscribed topic names with removing the names.
         assertEquals(
             Map.of(
-                "foo", 3,
-                "fooo", 1,
-                "bar", 2,
-                "zar", 1
+                "foo", new SubscriptionCount(2, 1),
+                "fooo", new SubscriptionCount(0, 1),
+                "bar", new SubscriptionCount(2, 0),
+                "zar", new SubscriptionCount(1, 0)
             ),
             consumerGroup.computeSubscribedTopicNames(
                 member3,
@@ -1916,9 +1917,9 @@ public class ConsumerGroupTest {
         // Compute subscribed topic names with removing both.
         assertEquals(
             Map.of(
-                "foo", 2,
-                "bar", 2,
-                "zar", 1
+                "foo", new SubscriptionCount(2, 0),
+                "bar", new SubscriptionCount(2, 0),
+                "zar", new SubscriptionCount(1, 0)
             ),
             consumerGroup.computeSubscribedTopicNames(
                 member3,
@@ -2097,4 +2098,145 @@ public class ConsumerGroupTest {
             records
         );
     }
+
+    @Test
+    public void testSubscriptionType() {
+        assertEquals(
+            HOMOGENEOUS,
+            ConsumerGroup.subscriptionType(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                0
+            )
+        );
+
+        assertEquals(
+            HOMOGENEOUS,
+            ConsumerGroup.subscriptionType(
+                Collections.emptyMap(),
+                Map.of("foo", new SubscriptionCount(5, 0)),
+                5
+            )
+        );
+
+        assertEquals(
+            HETEROGENEOUS,
+            ConsumerGroup.subscriptionType(
+                Collections.emptyMap(),
+                Map.of(
+                    "foo", new SubscriptionCount(4, 0),
+                    "bar", new SubscriptionCount(1, 0)
+                ),
+                5
+            )
+        );
+
+        assertEquals(
+            HOMOGENEOUS,
+            ConsumerGroup.subscriptionType(
+                Map.of("foo*", 5),
+                Map.of("foo", new SubscriptionCount(0, 1)),
+                5
+            )
+        );
+
+        assertEquals(
+            HOMOGENEOUS,
+            ConsumerGroup.subscriptionType(
+                Map.of("foo*", 5),
+                Map.of(
+                    "foo", new SubscriptionCount(0, 1),
+                    "food", new SubscriptionCount(0, 1)),
+                5
+            )
+        );
+
+        assertEquals(
+            HETEROGENEOUS,
+            ConsumerGroup.subscriptionType(
+                Map.of("foo*", 5),
+                Map.of("foo", new SubscriptionCount(1, 1)),
+                5
+            )
+        );
+
+        assertEquals(
+            HETEROGENEOUS,
+            ConsumerGroup.subscriptionType(
+                Map.of("foo*", 5),
+                Map.of(
+                    "foo", new SubscriptionCount(0, 1),
+                    "bar", new SubscriptionCount(1, 0)
+                ),
+                5
+            )
+        );
+
+        assertEquals(
+            HETEROGENEOUS,
+            ConsumerGroup.subscriptionType(
+                Map.of("foo*", 4, "bar*", 1),
+                Map.of(
+                    "foo", new SubscriptionCount(0, 1),
+                    "bar", new SubscriptionCount(0, 1)),
+                5
+            )
+        );
+    }
+
+    @Test
+    public void testComputeSubscribedRegularExpressions() {
+        ConsumerGroup consumerGroup = createConsumerGroup("foo");
+        consumerGroup.setGroupEpoch(10);
+
+        consumerGroup.updateMember(new ConsumerGroupMember.Builder("m1")
+            .setSubscribedTopicRegex("foo*")
+            .build());
+
+        consumerGroup.updateMember(new ConsumerGroupMember.Builder("m2")
+            .setSubscribedTopicRegex("foo*")
+            .build());
+
+        assertEquals(
+            Map.of("foo*", 3),
+            consumerGroup.computeSubscribedRegularExpressions(
+                null,
+                new ConsumerGroupMember.Builder("m3")
+                    .setSubscribedTopicRegex("foo*")
+                    .build()
+            )
+        );
+
+        assertEquals(
+            Map.of("foo*", 1),
+            consumerGroup.computeSubscribedRegularExpressions(
+                new ConsumerGroupMember.Builder("m2")
+                    .setSubscribedTopicRegex("foo*")
+                    .build(),
+                null
+            )
+        );
+
+        assertEquals(
+            Map.of("foo*", 2, "bar*", 1),
+            consumerGroup.computeSubscribedRegularExpressions(
+                null,
+                new ConsumerGroupMember.Builder("m4")
+                    .setSubscribedTopicRegex("bar*")
+                    .build()
+            )
+        );
+
+        assertEquals(
+            Map.of("foo*", 1, "bar*", 1),
+            consumerGroup.computeSubscribedRegularExpressions(
+                new ConsumerGroupMember.Builder("m2")
+                    .setSubscribedTopicRegex("foo*")
+                    .build(),
+                new ConsumerGroupMember.Builder("m2")
+                    .setSubscribedTopicRegex("bar*")
+                    .build()
+            )
+        );
+    }
 }

Reply via email to