This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a802865aadd KAFKA-17593; [5/N] Include resolved regular expressions 
into target assignment computation (#17750)
a802865aadd is described below

commit a802865aaddf1035661be2d9682e0e244b3d9442
Author: David Jacot <[email protected]>
AuthorDate: Wed Nov 13 15:59:52 2024 +0100

    KAFKA-17593; [5/N] Include resolved regular expressions into target 
assignment computation (#17750)
    
    This patch does a few things:
    * Refactors the `TargetAssignmentBuilder` to use inheritance to 
differentiate Consumer and Share groups.
    * Introduces `UnionSet` to lazily aggregate the subscriptions for a given 
member.
    * Wires the resolved regular expressions in the `GroupMetadataManager`. At 
the moment, they are only used when the target assignment is computed.
    
    Reviewers: Sean Quah <[email protected]>, Jeff Kim 
<[email protected]>, Lianet Magrans <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |  11 +-
 .../group/modern/TargetAssignmentBuilder.java      | 253 +++++++++++++++------
 .../kafka/coordinator/group/modern/UnionSet.java   | 219 ++++++++++++++++++
 .../group/modern/consumer/ConsumerGroup.java       |   7 +
 .../group/GroupMetadataManagerTest.java            |  78 +++++++
 .../group/modern/TargetAssignmentBuilderTest.java  | 204 ++++++++++++-----
 .../coordinator/group/modern/UnionSetTest.java     | 144 ++++++++++++
 .../modern/consumer/ConsumerGroupBuilder.java      |  14 ++
 .../assignor/TargetAssignmentBuilderBenchmark.java |   4 +-
 9 files changed, 803 insertions(+), 131 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index f2169740e3b..0d4ef16a45c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2606,8 +2606,8 @@ public class GroupMetadataManager {
             updatedMember
         ).orElse(defaultConsumerGroupAssignor.name());
         try {
-            TargetAssignmentBuilder<ConsumerGroupMember> 
assignmentResultBuilder =
-                new 
TargetAssignmentBuilder<ConsumerGroupMember>(group.groupId(), groupEpoch, 
consumerGroupAssignors.get(preferredServerAssignor))
+            TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder 
assignmentResultBuilder =
+                new 
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(group.groupId(), 
groupEpoch, consumerGroupAssignors.get(preferredServerAssignor))
                     .withMembers(group.members())
                     .withStaticMembers(group.staticMembers())
                     .withSubscriptionMetadata(subscriptionMetadata)
@@ -2615,6 +2615,7 @@ public class GroupMetadataManager {
                     .withTargetAssignment(group.targetAssignment())
                     
.withInvertedTargetAssignment(group.invertedTargetAssignment())
                     .withTopicsImage(metadataImage.topics())
+                    
.withResolvedRegularExpressions(group.resolvedRegularExpressions())
                     .addOrUpdateMember(updatedMember.memberId(), 
updatedMember);
 
             // If the instance id was associated to a different member, it 
means that the
@@ -2673,16 +2674,14 @@ public class GroupMetadataManager {
         List<CoordinatorRecord> records
     ) {
         try {
-            TargetAssignmentBuilder<ShareGroupMember> assignmentResultBuilder =
-                new TargetAssignmentBuilder<ShareGroupMember>(group.groupId(), 
groupEpoch, shareGroupAssignor)
+            TargetAssignmentBuilder.ShareTargetAssignmentBuilder 
assignmentResultBuilder =
+                new 
TargetAssignmentBuilder.ShareTargetAssignmentBuilder(group.groupId(), 
groupEpoch, shareGroupAssignor)
                     .withMembers(group.members())
                     .withSubscriptionMetadata(subscriptionMetadata)
                     .withSubscriptionType(subscriptionType)
                     .withTargetAssignment(group.targetAssignment())
                     
.withInvertedTargetAssignment(group.invertedTargetAssignment())
                     .withTopicsImage(metadataImage.topics())
-                    
.withTargetAssignmentRecordBuilder(GroupCoordinatorRecordHelpers::newShareGroupTargetAssignmentRecord)
-                    
.withTargetAssignmentEpochRecordBuilder(GroupCoordinatorRecordHelpers::newShareGroupTargetAssignmentEpochRecord)
                     .addOrUpdateMember(updatedMember.memberId(), 
updatedMember);
 
             long startTimeMs = time.milliseconds();
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
index ba08a236ba6..63bf81f1e08 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
@@ -24,6 +24,9 @@ import 
org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
 import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
 import 
org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
 import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
+import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
+import 
org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
+import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
 import org.apache.kafka.image.TopicsImage;
 
 import java.util.ArrayList;
@@ -47,7 +50,7 @@ import java.util.Set;
  * is deleted as part of the member deletion process. In other words, this 
class
  * does not yield a tombstone for removed members.
  */
-public class TargetAssignmentBuilder<T extends ModernGroupMember> {
+public abstract class TargetAssignmentBuilder<T extends ModernGroupMember, U 
extends TargetAssignmentBuilder<T, U>> {
 
     /**
      * The assignment result returned by {{@link 
TargetAssignmentBuilder#build()}}.
@@ -89,6 +92,144 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
         }
     }
 
+    public static class ConsumerTargetAssignmentBuilder extends 
TargetAssignmentBuilder<ConsumerGroupMember, ConsumerTargetAssignmentBuilder> {
+
+        /**
+         * The resolved regular expressions.
+         */
+        private Map<String, ResolvedRegularExpression> 
resolvedRegularExpressions = Collections.emptyMap();
+
+        public ConsumerTargetAssignmentBuilder(
+            String groupId,
+            int groupEpoch,
+            PartitionAssignor assignor
+        ) {
+            super(groupId, groupEpoch, assignor);
+        }
+
+        /**
+         * Adds all the existing resolved regular expressions.
+         *
+         * @param resolvedRegularExpressions The resolved regular expressions.
+         * @return This object.
+         */
+        public ConsumerTargetAssignmentBuilder withResolvedRegularExpressions(
+            Map<String, ResolvedRegularExpression> resolvedRegularExpressions
+        ) {
+            this.resolvedRegularExpressions = resolvedRegularExpressions;
+            return self();
+        }
+
+        @Override
+        protected ConsumerTargetAssignmentBuilder self() {
+            return this;
+        }
+
+        @Override
+        protected CoordinatorRecord newTargetAssignmentRecord(
+            String groupId,
+            String memberId,
+            Map<Uuid, Set<Integer>> partitions
+        ) {
+            return 
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(
+                groupId,
+                memberId,
+                partitions
+            );
+        }
+
+        @Override
+        protected CoordinatorRecord newTargetAssignmentEpochRecord(String 
groupId, int assignmentEpoch) {
+            return 
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(
+                groupId,
+                assignmentEpoch
+            );
+        }
+
+        @Override
+        protected MemberSubscriptionAndAssignmentImpl 
newMemberSubscriptionAndAssignment(
+            ConsumerGroupMember member,
+            Assignment memberAssignment,
+            TopicIds.TopicResolver topicResolver
+        ) {
+            Set<String> subscriptions = member.subscribedTopicNames();
+
+            // Check whether the member is also subscribed to a regular 
expression. If it is,
+            // create the union of the two subscriptions.
+            String subscribedTopicRegex = member.subscribedTopicRegex();
+            if (subscribedTopicRegex != null && 
!subscribedTopicRegex.isEmpty()) {
+                ResolvedRegularExpression resolvedRegularExpression = 
resolvedRegularExpressions.get(subscribedTopicRegex);
+                if (resolvedRegularExpression != null) {
+                    if (subscriptions.isEmpty()) {
+                        subscriptions = resolvedRegularExpression.topics;
+                    } else if (!resolvedRegularExpression.topics.isEmpty()) {
+                        // We only use a UnionSet when the member uses both 
type of subscriptions. The
+                        // protocol allows it. However, the Apache Kafka 
Consumer does not support it.
+                        // Other clients such as librdkafka may support it.
+                        subscriptions = new UnionSet<>(subscriptions, 
resolvedRegularExpression.topics);
+                    }
+                }
+            }
+
+            return new MemberSubscriptionAndAssignmentImpl(
+                Optional.ofNullable(member.rackId()),
+                Optional.ofNullable(member.instanceId()),
+                new TopicIds(subscriptions, topicResolver),
+                memberAssignment
+            );
+        }
+    }
+
+    public static class ShareTargetAssignmentBuilder extends 
TargetAssignmentBuilder<ShareGroupMember, ShareTargetAssignmentBuilder> {
+        public ShareTargetAssignmentBuilder(
+            String groupId,
+            int groupEpoch,
+            PartitionAssignor assignor
+        ) {
+            super(groupId, groupEpoch, assignor);
+        }
+
+        @Override
+        protected ShareTargetAssignmentBuilder self() {
+            return this;
+        }
+
+        @Override
+        protected CoordinatorRecord newTargetAssignmentRecord(
+            String groupId,
+            String memberId,
+            Map<Uuid, Set<Integer>> partitions
+        ) {
+            return 
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentRecord(
+                groupId,
+                memberId,
+                partitions
+            );
+        }
+
+        @Override
+        protected CoordinatorRecord newTargetAssignmentEpochRecord(String 
groupId, int assignmentEpoch) {
+            return 
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(
+                groupId,
+                assignmentEpoch
+            );
+        }
+
+        @Override
+        protected MemberSubscriptionAndAssignmentImpl 
newMemberSubscriptionAndAssignment(
+            ShareGroupMember member,
+            Assignment memberAssignment,
+            TopicIds.TopicResolver topicResolver
+        ) {
+            return new MemberSubscriptionAndAssignmentImpl(
+                Optional.ofNullable(member.rackId()),
+                Optional.ofNullable(member.instanceId()),
+                new TopicIds(member.subscribedTopicNames(), topicResolver),
+                memberAssignment
+            );
+        }
+    }
+
     /**
      * The group id.
      */
@@ -146,27 +287,6 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
      */
     private Map<String, String> staticMembers = new HashMap<>();
 
-    public interface TargetAssignmentRecordBuilder {
-        CoordinatorRecord build(
-            final String groupId,
-            final String memberId,
-            final Map<Uuid, Set<Integer>> partitions
-       );
-    }
-
-    public interface TargetAssignmentEpochRecordBuilder {
-        CoordinatorRecord build(
-            final String groupId,
-            final int assignmentEpoch
-        );
-    }
-
-    private TargetAssignmentRecordBuilder targetAssignmentRecordBuilder =
-        GroupCoordinatorRecordHelpers::newConsumerGroupTargetAssignmentRecord;
-
-    private TargetAssignmentEpochRecordBuilder 
targetAssignmentEpochRecordBuilder =
-        
GroupCoordinatorRecordHelpers::newConsumerGroupTargetAssignmentEpochRecord;
-
     /**
      * Constructs the object.
      *
@@ -190,11 +310,11 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
      * @param members   The existing members in the consumer group.
      * @return This object.
      */
-    public TargetAssignmentBuilder<T> withMembers(
+    public U withMembers(
         Map<String, T> members
     ) {
         this.members = members;
-        return this;
+        return self();
     }
 
     /**
@@ -203,11 +323,11 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
      * @param staticMembers   The existing static members in the consumer 
group.
      * @return This object.
      */
-    public TargetAssignmentBuilder<T> withStaticMembers(
+    public U withStaticMembers(
         Map<String, String> staticMembers
     ) {
         this.staticMembers = staticMembers;
-        return this;
+        return self();
     }
 
     /**
@@ -216,11 +336,11 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
      * @param subscriptionMetadata  The subscription metadata.
      * @return This object.
      */
-    public TargetAssignmentBuilder<T> withSubscriptionMetadata(
+    public U withSubscriptionMetadata(
         Map<String, TopicMetadata> subscriptionMetadata
     ) {
         this.subscriptionMetadata = subscriptionMetadata;
-        return this;
+        return self();
     }
 
     /**
@@ -229,11 +349,11 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
      * @param subscriptionType  Subscription type of the group.
      * @return This object.
      */
-    public TargetAssignmentBuilder<T> withSubscriptionType(
+    public U withSubscriptionType(
         SubscriptionType subscriptionType
     ) {
         this.subscriptionType = subscriptionType;
-        return this;
+        return self();
     }
 
     /**
@@ -242,11 +362,11 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
      * @param targetAssignment   The existing target assignment.
      * @return This object.
      */
-    public TargetAssignmentBuilder<T> withTargetAssignment(
+    public U withTargetAssignment(
         Map<String, Assignment> targetAssignment
     ) {
         this.targetAssignment = targetAssignment;
-        return this;
+        return self();
     }
 
     /**
@@ -255,11 +375,11 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
      * @param invertedTargetAssignment   The reverse lookup map of the current 
target assignment.
      * @return This object.
      */
-    public TargetAssignmentBuilder<T> withInvertedTargetAssignment(
+    public U withInvertedTargetAssignment(
         Map<Uuid, Map<Integer, String>> invertedTargetAssignment
     ) {
         this.invertedTargetAssignment = invertedTargetAssignment;
-        return this;
+        return self();
     }
 
     /**
@@ -268,25 +388,11 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
      * @param topicsImage    The topics image.
      * @return This object.
      */
-    public TargetAssignmentBuilder<T> withTopicsImage(
+    public U withTopicsImage(
         TopicsImage topicsImage
     ) {
         this.topicsImage = topicsImage;
-        return this;
-    }
-
-    public TargetAssignmentBuilder<T> withTargetAssignmentRecordBuilder(
-        TargetAssignmentRecordBuilder targetAssignmentRecordBuilder
-    ) {
-        this.targetAssignmentRecordBuilder = targetAssignmentRecordBuilder;
-        return this;
-    }
-
-    public TargetAssignmentBuilder<T> withTargetAssignmentEpochRecordBuilder(
-        TargetAssignmentEpochRecordBuilder targetAssignmentEpochRecordBuilder
-    ) {
-        this.targetAssignmentEpochRecordBuilder = 
targetAssignmentEpochRecordBuilder;
-        return this;
+        return self();
     }
 
     /**
@@ -297,12 +403,12 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
      * @param member    The member to add or update.
      * @return This object.
      */
-    public TargetAssignmentBuilder<T> addOrUpdateMember(
+    public U addOrUpdateMember(
         String memberId,
         T member
     ) {
         this.updatedMembers.put(memberId, member);
-        return this;
+        return self();
     }
 
     /**
@@ -312,7 +418,7 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
      * @param memberId The member id.
      * @return This object.
      */
-    public TargetAssignmentBuilder<T> removeMember(
+    public U removeMember(
         String memberId
     ) {
         return addOrUpdateMember(memberId, null);
@@ -331,7 +437,7 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
 
         // Prepare the member spec for all members.
         members.forEach((memberId, member) ->
-            memberSpecs.put(memberId, createMemberSubscriptionAndAssignment(
+            memberSpecs.put(memberId, newMemberSubscriptionAndAssignment(
                 member,
                 targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
                 topicResolver
@@ -353,7 +459,7 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
                     }
                 }
 
-                memberSpecs.put(memberId, 
createMemberSubscriptionAndAssignment(
+                memberSpecs.put(memberId, newMemberSubscriptionAndAssignment(
                     updatedMemberOrNull,
                     assignment,
                     topicResolver
@@ -391,7 +497,7 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
             if (!newMemberAssignment.equals(oldMemberAssignment)) {
                 // If the member had no assignment or had a different 
assignment, we
                 // create a record for the new assignment.
-                records.add(targetAssignmentRecordBuilder.build(
+                records.add(newTargetAssignmentRecord(
                     groupId,
                     memberId,
                     newMemberAssignment.partitions()
@@ -400,11 +506,30 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
         }
 
         // Bump the target assignment epoch.
-        records.add(targetAssignmentEpochRecordBuilder.build(groupId, 
groupEpoch));
+        records.add(newTargetAssignmentEpochRecord(groupId, groupEpoch));
 
         return new TargetAssignmentResult(records, 
newGroupAssignment.members());
     }
 
+    protected abstract U self();
+
+    protected abstract CoordinatorRecord newTargetAssignmentRecord(
+        String groupId,
+        String memberId,
+        Map<Uuid, Set<Integer>> partitions
+    );
+
+    protected abstract CoordinatorRecord newTargetAssignmentEpochRecord(
+        String groupId,
+        int assignmentEpoch
+    );
+
+    protected abstract MemberSubscriptionAndAssignmentImpl 
newMemberSubscriptionAndAssignment(
+        T member,
+        Assignment memberAssignment,
+        TopicIds.TopicResolver topicResolver
+    );
+
     private Assignment newMemberAssignment(
         GroupAssignment newGroupAssignment,
         String memberId
@@ -416,18 +541,4 @@ public class TargetAssignmentBuilder<T extends 
ModernGroupMember> {
             return Assignment.EMPTY;
         }
     }
-
-    // private for testing
-    static <T extends ModernGroupMember> MemberSubscriptionAndAssignmentImpl 
createMemberSubscriptionAndAssignment(
-        T member,
-        Assignment memberAssignment,
-        TopicIds.TopicResolver topicResolver
-    ) {
-        return new MemberSubscriptionAndAssignmentImpl(
-            Optional.ofNullable(member.rackId()),
-            Optional.ofNullable(member.instanceId()),
-            new TopicIds(member.subscribedTopicNames(), topicResolver),
-            memberAssignment
-        );
-    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/UnionSet.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/UnionSet.java
new file mode 100644
index 00000000000..185142a13df
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/UnionSet.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern;
+
+import java.lang.reflect.Array;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A set which presents the union of two underlying sets without
+ * materializing it. This class expects the underlying sets to
+ * be immutable.
+ *
+ * @param <T> The set type.
+ */
+public class UnionSet<T> implements Set<T> {
+    private final Set<T> largeSet;
+    private final Set<T> smallSet;
+    private int size = -1;
+
+    public UnionSet(Set<T> s1, Set<T> s2) {
+        Objects.requireNonNull(s1);
+        Objects.requireNonNull(s2);
+
+        if (s1.size() > s2.size()) {
+            largeSet = s1;
+            smallSet = s2;
+        } else {
+            largeSet = s2;
+            smallSet = s1;
+        }
+    }
+
+    @Override
+    public int size() {
+        if (size == -1) {
+            size = largeSet.size();
+            for (T item : smallSet) {
+                if (!largeSet.contains(item)) {
+                    size++;
+                }
+            }
+        }
+        return size;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return largeSet.isEmpty() && smallSet.isEmpty();
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        return largeSet.contains(o) || smallSet.contains(o);
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+        return new Iterator<T>() {
+            private final Iterator<T> largeSetIterator = largeSet.iterator();
+            private final Iterator<T> smallSetIterator = smallSet.iterator();
+            private T next = null;
+
+            @Override
+            public boolean hasNext() {
+                if (next != null) return true;
+                if (largeSetIterator.hasNext()) {
+                    next = largeSetIterator.next();
+                    return true;
+                }
+                while (smallSetIterator.hasNext()) {
+                    next = smallSetIterator.next();
+                    if (!largeSet.contains(next)) {
+                        return true;
+                    }
+                }
+                next = null;
+                return false;
+            }
+
+            @Override
+            public T next() {
+                if (!hasNext()) throw new NoSuchElementException();
+                T result = next;
+                next = null;
+                return result;
+            }
+        };
+    }
+
+    @Override
+    public Object[] toArray() {
+        Object[] array = new Object[size()];
+        int index = 0;
+        for (T item : largeSet) {
+            array[index] = item;
+            index++;
+        }
+        for (T item : smallSet) {
+            if (!largeSet.contains(item)) {
+                array[index] = item;
+                index++;
+            }
+        }
+        return array;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <U> U[] toArray(U[] array) {
+        int size = size();
+        if (array.length < size) {
+            // Create a new array of the same type with the correct size
+            array = (U[]) 
Array.newInstance(array.getClass().getComponentType(), size);
+        }
+        int index = 0;
+        for (T item : largeSet) {
+            array[index] = (U) item;
+            index++;
+        }
+        for (T item : smallSet) {
+            if (!largeSet.contains(item)) {
+                array[index] = (U) item;
+                index++;
+            }
+        }
+        if (array.length > size) {
+            array[size] = null;
+        }
+        return array;
+    }
+
+    @Override
+    public boolean add(T t) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean remove(Object o) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c) {
+        for (Object o : c) {
+            if (!contains(o)) return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends T> c) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> c) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void clear() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof Set)) return false;
+
+        Set<?> set = (Set<?>) o;
+        if (set.size() != size()) return false;
+        return containsAll(set);
+    }
+
+    @Override
+    public int hashCode() {
+        int h = 0;
+        for (T item : largeSet) {
+            h += item.hashCode();
+        }
+        for (T item : smallSet) {
+            if (!largeSet.contains(item)) {
+                h += item.hashCode();
+            }
+        }
+        return h;
+    }
+
+    @Override
+    public String toString() {
+        return "UnionSet(" +
+            "largeSet=" + largeSet +
+            ", smallSet=" + smallSet +
+            ')';
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index 3be2124c058..654c49e3958 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -416,6 +416,13 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         return Collections.unmodifiableMap(staticMembers);
     }
 
+    /**
+     * @return An immutable Map containing all the resolved regular 
expressions.
+     */
+    public Map<String, ResolvedRegularExpression> resolvedRegularExpressions() 
{
+        return Collections.unmodifiableMap(resolvedRegularExpressions);
+    }
+
     /**
      * Returns the current epoch of a partition or -1 if the partition
      * does not have one.
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 6ed55d450d4..b21ff1796fc 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -70,6 +70,7 @@ import 
org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer.ExpiredT
 import 
org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer.ScheduledTimeout;
 import 
org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
 import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
 import 
org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
 import org.apache.kafka.coordinator.group.classic.ClassicGroup;
 import org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
@@ -14979,6 +14980,83 @@ public class GroupMetadataManagerTest {
         );
     }
 
+    @Test
+    public void 
testConsumerGroupMemberPicksUpExistingResolvedRegularExpression() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        ConsumerGroupPartitionAssignor assignor = 
mock(ConsumerGroupPartitionAssignor.class);
+        when(assignor.name()).thenReturn("range");
+        when(assignor.assign(any(), any())).thenAnswer(answer -> {
+            GroupSpec spec = answer.getArgument(0);
+
+            List.of(memberId1, memberId2).forEach(memberId ->
+                assertEquals(
+                    Collections.singleton(fooTopicId),
+                    spec.memberSubscription(memberId).subscribedTopicIds(),
+                    String.format("Member %s has unexpected subscribed topic 
ids", memberId)
+                )
+            );
+
+            return new GroupAssignment(Map.of(
+                memberId1, new MemberAssignmentImpl(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0)
+                )),
+                memberId2, new MemberAssignmentImpl(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 1)
+                ))
+            ));
+        });
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroupAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 2)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setSubscribedTopicRegex("foo*")
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1)))
+                    .build())
+                .withResolvedRegularExpression("foo*", new 
ResolvedRegularExpression(
+                    Collections.singleton(fooTopicName),
+                    100L,
+                    12345L))
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(10000)
+                .setSubscribedTopicRegex("foo*")
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
+            result.response()
+        );
+    }
+
     private static void checkJoinGroupResponse(
         JoinGroupResponseData expectedResponse,
         JoinGroupResponseData actualResponse,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
index 3f2aaa34f0d..03863ea9dca 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
@@ -21,10 +21,10 @@ import 
org.apache.kafka.coordinator.group.AssignmentTestUtil;
 import org.apache.kafka.coordinator.group.MetadataImageBuilder;
 import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
 import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
-import org.apache.kafka.coordinator.group.api.assignor.MemberSubscription;
 import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
 import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
+import 
org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
 import org.apache.kafka.image.TopicsImage;
 
 import org.junit.jupiter.api.Test;
@@ -43,7 +43,6 @@ import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssig
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord;
 import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
-import static 
org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder.createMemberSubscriptionAndAssignment;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -63,6 +62,7 @@ public class TargetAssignmentBuilderTest {
         private final Map<String, Assignment> targetAssignment = new 
HashMap<>();
         private final Map<String, MemberAssignment> memberAssignments = new 
HashMap<>();
         private final Map<String, String> staticMembers = new HashMap<>();
+        private final Map<String, ResolvedRegularExpression> 
resolvedRegularExpressions = new HashMap<>();
         private MetadataImageBuilder topicsImageBuilder = new 
MetadataImageBuilder();
 
         public TargetAssignmentBuilderTestContext(
@@ -78,17 +78,37 @@ public class TargetAssignmentBuilderTest {
             List<String> subscriptions,
             Map<Uuid, Set<Integer>> targetPartitions
         ) {
-            addGroupMember(memberId, null, subscriptions, targetPartitions);
+            addGroupMember(memberId, null, subscriptions, "", 
targetPartitions);
         }
 
-        private void addGroupMember(
+        public void addGroupMember(
+            String memberId,
+            List<String> subscriptions,
+            String subscribedRegex,
+            Map<Uuid, Set<Integer>> targetPartitions
+        ) {
+            addGroupMember(memberId, null, subscriptions, subscribedRegex, 
targetPartitions);
+        }
+
+        public void addGroupMember(
+            String memberId,
+            String instanceId,
+            List<String> subscriptions,
+            Map<Uuid, Set<Integer>> targetPartitions
+        ) {
+            addGroupMember(memberId, instanceId, subscriptions, "", 
targetPartitions);
+        }
+
+        public void addGroupMember(
             String memberId,
             String instanceId,
             List<String> subscriptions,
+            String subscribedRegex,
             Map<Uuid, Set<Integer>> targetPartitions
         ) {
             ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder(memberId)
-                .setSubscribedTopicNames(subscriptions);
+                .setSubscribedTopicNames(subscriptions)
+                .setSubscribedTopicRegex(subscribedRegex);
 
             if (instanceId != null) {
                 memberBuilder.setInstanceId(instanceId);
@@ -158,6 +178,45 @@ public class TargetAssignmentBuilderTest {
             memberAssignments.put(memberId, new 
MemberAssignmentImpl(assignment));
         }
 
+        public void addResolvedRegularExpression(
+            String regex,
+            ResolvedRegularExpression resolvedRegularExpression
+        ) {
+            resolvedRegularExpressions.put(regex, resolvedRegularExpression);
+        }
+
+        private MemberSubscriptionAndAssignmentImpl 
newMemberSubscriptionAndAssignment(
+            ConsumerGroupMember member,
+            Assignment memberAssignment,
+            TopicIds.TopicResolver topicResolver
+        ) {
+            Set<String> subscriptions = member.subscribedTopicNames();
+
+            // Check whether the member is also subscribed to a regular 
expression. If it is,
+            // create the union of the two subscriptions.
+            String subscribedTopicRegex = member.subscribedTopicRegex();
+            if (subscribedTopicRegex != null && 
!subscribedTopicRegex.isEmpty()) {
+                ResolvedRegularExpression resolvedRegularExpression = 
resolvedRegularExpressions.get(subscribedTopicRegex);
+                if (resolvedRegularExpression != null) {
+                    if (subscriptions.isEmpty()) {
+                        subscriptions = resolvedRegularExpression.topics;
+                    } else if (!resolvedRegularExpression.topics.isEmpty()) {
+                        // We only use a UnionSet when the member uses both 
type of subscriptions. The
+                        // protocol allows it. However, the Apache Kafka 
Consumer does not support it.
+                        // Other clients such as librdkafka may support it.
+                        subscriptions = new UnionSet<>(subscriptions, 
resolvedRegularExpression.topics);
+                    }
+                }
+            }
+
+            return new MemberSubscriptionAndAssignmentImpl(
+                Optional.ofNullable(member.rackId()),
+                Optional.ofNullable(member.instanceId()),
+                new TopicIds(subscriptions, topicResolver),
+                memberAssignment
+            );
+        }
+
         public TargetAssignmentBuilder.TargetAssignmentResult build() {
             TopicsImage topicsImage = topicsImageBuilder.build().topics();
             TopicIds.TopicResolver topicResolver = new 
TopicIds.CachedTopicResolver(topicsImage);
@@ -166,7 +225,7 @@ public class TargetAssignmentBuilderTest {
 
             // All the existing members are prepared.
             members.forEach((memberId, member) ->
-                memberSubscriptions.put(memberId, 
createMemberSubscriptionAndAssignment(
+                memberSubscriptions.put(memberId, 
newMemberSubscriptionAndAssignment(
                     member,
                     targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
                     topicResolver
@@ -189,7 +248,7 @@ public class TargetAssignmentBuilderTest {
                         }
                     }
 
-                    memberSubscriptions.put(memberId, 
createMemberSubscriptionAndAssignment(
+                    memberSubscriptions.put(memberId, 
newMemberSubscriptionAndAssignment(
                         updatedMemberOrNull,
                         assignment,
                         topicResolver
@@ -223,15 +282,16 @@ public class TargetAssignmentBuilderTest {
                 .thenReturn(new GroupAssignment(memberAssignments));
 
             // Create and populate the assignment builder.
-            TargetAssignmentBuilder<ConsumerGroupMember> builder =
-                new TargetAssignmentBuilder<ConsumerGroupMember>(groupId, 
groupEpoch, assignor)
-                .withMembers(members)
-                .withStaticMembers(staticMembers)
-                .withSubscriptionMetadata(subscriptionMetadata)
-                .withSubscriptionType(subscriptionType)
-                .withTargetAssignment(targetAssignment)
-                .withInvertedTargetAssignment(invertedTargetAssignment)
-                .withTopicsImage(topicsImage);
+            TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder builder =
+                new 
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(groupId, groupEpoch, 
assignor)
+                    .withMembers(members)
+                    .withStaticMembers(staticMembers)
+                    .withSubscriptionMetadata(subscriptionMetadata)
+                    .withSubscriptionType(subscriptionType)
+                    .withTargetAssignment(targetAssignment)
+                    .withInvertedTargetAssignment(invertedTargetAssignment)
+                    .withTopicsImage(topicsImage)
+                    
.withResolvedRegularExpressions(resolvedRegularExpressions);
 
             // Add the updated members or delete the deleted members.
             updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
@@ -254,42 +314,6 @@ public class TargetAssignmentBuilderTest {
         }
     }
 
-    @Test
-    public void testCreateMemberSubscriptionSpecImpl() {
-        Uuid fooTopicId = Uuid.randomUuid();
-        Uuid barTopicId = Uuid.randomUuid();
-        TopicsImage topicsImage = new MetadataImageBuilder()
-            .addTopic(fooTopicId, "foo", 5)
-            .addTopic(barTopicId, "bar", 5)
-            .build()
-            .topics();
-        TopicIds.TopicResolver topicResolver = new 
TopicIds.DefaultTopicResolver(topicsImage);
-
-        ConsumerGroupMember member = new 
ConsumerGroupMember.Builder("member-id")
-            .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
-            .setRackId("rackId")
-            .setInstanceId("instanceId")
-            .build();
-
-        Assignment assignment = new Assignment(mkAssignment(
-            mkTopicAssignment(fooTopicId, 1, 2, 3),
-            mkTopicAssignment(barTopicId, 1, 2, 3)
-        ));
-
-        MemberSubscription subscriptionSpec = 
createMemberSubscriptionAndAssignment(
-            member,
-            assignment,
-            topicResolver
-        );
-
-        assertEquals(new MemberSubscriptionAndAssignmentImpl(
-            Optional.of("rackId"),
-            Optional.of("instanceId"),
-            new TopicIds(Set.of("bar", "foo", "zar"), topicsImage),
-            assignment
-        ), subscriptionSpec);
-    }
-
     @Test
     public void testEmpty() {
         TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
@@ -810,4 +834,80 @@ public class TargetAssignmentBuilderTest {
 
         assertEquals(expectedAssignment, result.targetAssignment());
     }
+
+    @Test
+    public void testRegularExpressions() {
+        TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+            "my-group",
+            20
+        );
+
+        Uuid fooTopicId = context.addTopicMetadata("foo", 6);
+        Uuid barTopicId = context.addTopicMetadata("bar", 6);
+
+        context.addGroupMember("member-1", Arrays.asList("bar", "zar"), 
"foo*", mkAssignment());
+
+        context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), 
mkAssignment());
+
+        context.addGroupMember("member-3", Collections.emptyList(), "foo*", 
mkAssignment());
+
+        context.addResolvedRegularExpression("foo*", new 
ResolvedRegularExpression(
+            Collections.singleton("foo"),
+            10L,
+            12345L
+        ));
+
+        context.prepareMemberAssignment("member-1", mkAssignment(
+            mkTopicAssignment(fooTopicId, 1, 2),
+            mkTopicAssignment(barTopicId, 1, 2, 3)
+        ));
+
+        context.prepareMemberAssignment("member-2", mkAssignment(
+            mkTopicAssignment(fooTopicId, 3, 4),
+            mkTopicAssignment(barTopicId, 4, 5, 6)
+        ));
+
+        context.prepareMemberAssignment("member-3", mkAssignment(
+            mkTopicAssignment(fooTopicId, 5, 6)
+        ));
+
+        TargetAssignmentBuilder.TargetAssignmentResult result = 
context.build();
+
+        assertEquals(4, result.records().size());
+
+        assertUnorderedListEquals(Arrays.asList(
+            newConsumerGroupTargetAssignmentRecord("my-group", "member-1", 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 1, 2),
+                mkTopicAssignment(barTopicId, 1, 2, 3)
+            )),
+            newConsumerGroupTargetAssignmentRecord("my-group", "member-2", 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 3, 4),
+                mkTopicAssignment(barTopicId, 4, 5, 6)
+            )),
+            newConsumerGroupTargetAssignmentRecord("my-group", "member-3", 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 5, 6)
+            ))
+        ), result.records().subList(0, 3));
+
+        assertEquals(newConsumerGroupTargetAssignmentEpochRecord(
+            "my-group",
+            20
+        ), result.records().get(3));
+
+        Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
+        expectedAssignment.put("member-1", new 
MemberAssignmentImpl(mkAssignment(
+            mkTopicAssignment(fooTopicId, 1, 2),
+            mkTopicAssignment(barTopicId, 1, 2, 3)
+        )));
+        expectedAssignment.put("member-2", new 
MemberAssignmentImpl(mkAssignment(
+            mkTopicAssignment(fooTopicId, 3, 4),
+            mkTopicAssignment(barTopicId, 4, 5, 6)
+        )));
+
+        expectedAssignment.put("member-3", new 
MemberAssignmentImpl(mkAssignment(
+            mkTopicAssignment(fooTopicId, 5, 6)
+        )));
+
+        assertEquals(expectedAssignment, result.targetAssignment());
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/UnionSetTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/UnionSetTest.java
new file mode 100644
index 00000000000..23986231b65
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/UnionSetTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class UnionSetTest {
+    @Test
+    public void testSetsCannotBeNull() {
+        assertThrows(NullPointerException.class, () -> new 
UnionSet<String>(Collections.emptySet(), null));
+        assertThrows(NullPointerException.class, () -> new 
UnionSet<String>(null, Collections.emptySet()));
+    }
+
+    @Test
+    public void testUnion() {
+        UnionSet<Integer> union = new UnionSet<>(
+            Set.of(1, 2, 3),
+            Set.of(2, 3, 4, 5)
+        );
+
+        List<Integer> result = new ArrayList<>();
+        result.addAll(union);
+        result.sort(Integer::compareTo);
+
+        assertEquals(List.of(1, 2, 3, 4, 5), result);
+    }
+
+    @Test
+    public void testSize() {
+        UnionSet<Integer> union = new UnionSet<>(
+            Set.of(1, 2, 3),
+            Set.of(2, 3, 4, 5)
+        );
+
+        assertEquals(5, union.size());
+    }
+
+    @Test
+    public void testIsEmpty() {
+        UnionSet<Integer> union = new UnionSet<>(
+            Set.of(1, 2, 3),
+            Set.of(2, 3, 4, 5)
+        );
+
+        assertFalse(union.isEmpty());
+
+        union = new UnionSet<>(
+            Set.of(1, 2, 3),
+            Collections.emptySet()
+        );
+
+        assertFalse(union.isEmpty());
+
+        union = new UnionSet<>(
+            Collections.emptySet(),
+            Set.of(2, 3, 4, 5)
+        );
+
+        assertFalse(union.isEmpty());
+
+        union = new UnionSet<>(
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+        assertTrue(union.isEmpty());
+    }
+
+    @Test
+    public void testContains() {
+        UnionSet<Integer> union = new UnionSet<>(
+            Set.of(1, 2, 3),
+            Set.of(2, 3, 4, 5)
+        );
+
+        IntStream.range(1, 6).forEach(item -> 
assertTrue(union.contains(item)));
+
+        assertFalse(union.contains(0));
+        assertFalse(union.contains(6));
+    }
+
+    @Test
+    public void testToArray() {
+        UnionSet<Integer> union = new UnionSet<>(
+            Set.of(1, 2, 3),
+            Set.of(2, 3, 4, 5)
+        );
+
+        Object[] expected = {1, 2, 3, 4, 5};
+        Object[] actual = union.toArray();
+        Arrays.sort(actual);
+        assertArrayEquals(expected, actual);
+    }
+
+    @Test
+    public void testToArrayWithArrayParameter() {
+        UnionSet<Integer> union = new UnionSet<>(
+            Set.of(1, 2, 3),
+            Set.of(2, 3, 4, 5)
+        );
+
+        Integer[] input = new Integer[5];
+        Integer[] expected = {1, 2, 3, 4, 5};
+        union.toArray(input);
+        Arrays.sort(input);
+        assertArrayEquals(expected, input);
+    }
+
+    @Test
+    public void testEquals() {
+        UnionSet<Integer> union = new UnionSet<>(
+            Set.of(1, 2, 3),
+            Set.of(2, 3, 4, 5)
+        );
+
+        assertEquals(Set.of(1, 2, 3, 4, 5), union);
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
index 4c044323d06..800073f42bc 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
@@ -37,6 +37,7 @@ public class ConsumerGroupBuilder {
     private final Map<String, ConsumerGroupMember> members = new HashMap<>();
     private final Map<String, Assignment> assignments = new HashMap<>();
     private Map<String, TopicMetadata> subscriptionMetadata;
+    private final Map<String, ResolvedRegularExpression> 
resolvedRegularExpressions = new HashMap<>();
 
     public ConsumerGroupBuilder(String groupId, int groupEpoch) {
         this.groupId = groupId;
@@ -49,6 +50,14 @@ public class ConsumerGroupBuilder {
         return this;
     }
 
+    public ConsumerGroupBuilder withResolvedRegularExpression(
+        String regex,
+        ResolvedRegularExpression resolvedRegularExpression
+    ) {
+        this.resolvedRegularExpressions.put(regex, resolvedRegularExpression);
+        return this;
+    }
+
     public ConsumerGroupBuilder withSubscriptionMetadata(Map<String, 
TopicMetadata> subscriptionMetadata) {
         this.subscriptionMetadata = subscriptionMetadata;
         return this;
@@ -72,6 +81,11 @@ public class ConsumerGroupBuilder {
             
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 member))
         );
 
+        // Add resolved regular expressions.
+        resolvedRegularExpressions.forEach((regex, resolvedRegularExpression) 
->
+            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(groupId,
 regex, resolvedRegularExpression))
+        );
+
         // Add subscription metadata.
         if (subscriptionMetadata == null) {
             subscriptionMetadata = new HashMap<>();
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
index 2a23d22b655..6fbb7908622 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
@@ -82,7 +82,7 @@ public class TargetAssignmentBuilderBenchmark {
 
     private PartitionAssignor partitionAssignor;
 
-    private TargetAssignmentBuilder<ConsumerGroupMember> 
targetAssignmentBuilder;
+    private TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder 
targetAssignmentBuilder;
 
     /** The number of homogeneous subgroups to create for the heterogeneous 
subscription case. */
     private static final int MAX_BUCKET_COUNT = 5;
@@ -116,7 +116,7 @@ public class TargetAssignmentBuilderBenchmark {
             .setSubscribedTopicNames(allTopicNames)
             .build();
 
-        targetAssignmentBuilder = new 
TargetAssignmentBuilder<ConsumerGroupMember>(GROUP_ID, GROUP_EPOCH, 
partitionAssignor)
+        targetAssignmentBuilder = new 
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(GROUP_ID, GROUP_EPOCH, 
partitionAssignor)
             .withMembers(members)
             .withSubscriptionMetadata(subscriptionMetadata)
             .withSubscriptionType(subscriptionType)

Reply via email to