This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new e87176f  MINOR: optimize performAssignment to skip unnecessary check 
(#11218)
e87176f is described below

commit e87176fe423324cfcdb457df3f0c4eff6ca8803e
Author: Luke Chen <[email protected]>
AuthorDate: Wed Sep 29 08:22:05 2021 +0800

    MINOR: optimize performAssignment to skip unnecessary check (#11218)
    
    Found this while reading the code. We did a "a little heavy" check each 
time after performing assignment, which is to compare the "assigned topics" set 
and the "subscribed topics" set, to see if there's any topics not existed in 
another set. Also, the "assigned topics" set is created by traversing all the 
assigned partitions, which will be a little heavy if partition numbers are 
large.
    
    However, as the comments described, it's a safe-guard for user-customized 
assignor, which might do assignment that we don't expected. In most cases, user 
will just use the in-product assignor, which guarantee that we only assign the 
topics from subscribed topics. Therefore, no need this check for in-product 
assignors.
    
    In this PR, I added an "in-product assignor names" list, and we'll in 
consumerCoordinator check if the assignor is one of in-product assignors, to 
decide if we need to do the additional check. Also add test for it.
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>, Guozhang 
Wang <[email protected]>
---
 .../kafka/clients/consumer/ConsumerConfig.java     | 15 ++++
 .../consumer/CooperativeStickyAssignor.java        |  3 +-
 .../kafka/clients/consumer/RangeAssignor.java      |  3 +-
 .../kafka/clients/consumer/RoundRobinAssignor.java |  3 +-
 .../kafka/clients/consumer/StickyAssignor.java     |  3 +-
 .../consumer/internals/ConsumerCoordinator.java    | 78 ++++++++++++-------
 .../internals/ConsumerCoordinatorTest.java         | 90 ++++++++++++++++++----
 7 files changed, 149 insertions(+), 46 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 783c97e..ca24c28 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
@@ -39,6 +40,10 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static 
org.apache.kafka.clients.consumer.CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME;
+import static 
org.apache.kafka.clients.consumer.RangeAssignor.RANGE_ASSIGNOR_NAME;
+import static 
org.apache.kafka.clients.consumer.RoundRobinAssignor.ROUNDROBIN_ASSIGNOR_NAME;
+import static 
org.apache.kafka.clients.consumer.StickyAssignor.STICKY_ASSIGNOR_NAME;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
@@ -48,6 +53,16 @@ import static 
org.apache.kafka.common.config.ConfigDef.ValidString.in;
 public class ConsumerConfig extends AbstractConfig {
     private static final ConfigDef CONFIG;
 
+    // a list contains all the assignor names that only assign subscribed 
topics to consumer. Should be updated when new assignor added.
+    // This is to help optimize ConsumerCoordinator#performAssignment method
+    public static final List<String> ASSIGN_FROM_SUBSCRIBED_ASSIGNORS =
+        Collections.unmodifiableList(Arrays.asList(
+            RANGE_ASSIGNOR_NAME,
+            ROUNDROBIN_ASSIGNOR_NAME,
+            STICKY_ASSIGNOR_NAME,
+            COOPERATIVE_STICKY_ASSIGNOR_NAME
+        ));
+
     /*
      * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES 
AS
      * THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
index 5f0bb0c..b2cca87 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
@@ -47,6 +47,7 @@ import org.apache.kafka.common.protocol.types.Type;
  * cooperative rebalancing. See the <a 
href="https://kafka.apache.org/documentation/#upgrade_240_notable";>upgrade 
guide</a> for details.
  */
 public class CooperativeStickyAssignor extends AbstractStickyAssignor {
+    public static final String COOPERATIVE_STICKY_ASSIGNOR_NAME = 
"cooperative-sticky";
 
     // these schemas are used for preserving useful metadata for the 
assignment, such as the last stable generation
     private static final String GENERATION_KEY_NAME = "generation";
@@ -57,7 +58,7 @@ public class CooperativeStickyAssignor extends 
AbstractStickyAssignor {
 
     @Override
     public String name() {
-        return "cooperative-sticky";
+        return COOPERATIVE_STICKY_ASSIGNOR_NAME;
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
index 21eb47a..aec0d39 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
@@ -65,10 +65,11 @@ import java.util.Map;
  * </ul>
  */
 public class RangeAssignor extends AbstractPartitionAssignor {
+    public static final String RANGE_ASSIGNOR_NAME = "range";
 
     @Override
     public String name() {
-        return "range";
+        return RANGE_ASSIGNOR_NAME;
     }
 
     private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, 
Subscription> consumerMetadata) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
index edecce8..2d6edea 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
@@ -98,6 +98,7 @@ import java.util.TreeSet;
  * </ul>
  */
 public class RoundRobinAssignor extends AbstractPartitionAssignor {
+    public static final String ROUNDROBIN_ASSIGNOR_NAME = "roundrobin";
 
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
@@ -138,7 +139,7 @@ public class RoundRobinAssignor extends 
AbstractPartitionAssignor {
 
     @Override
     public String name() {
-        return "roundrobin";
+        return ROUNDROBIN_ASSIGNOR_NAME;
     }
 
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
index 77a61df..787c432 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
@@ -174,6 +174,7 @@ import org.apache.kafka.common.utils.CollectionUtils;
  * {@link ConsumerPartitionAssignor.RebalanceProtocol} for a detailed 
explanation of cooperative rebalancing.
  */
 public class StickyAssignor extends AbstractStickyAssignor {
+    public static final String STICKY_ASSIGNOR_NAME = "sticky";
 
     // these schemas are used for preserving consumer's previously assigned 
partitions
     // list and sending it as user data to the leader during a rebalance
@@ -196,7 +197,7 @@ public class StickyAssignor extends AbstractStickyAssignor {
 
     @Override
     public String name() {
-        return "sticky";
+        return STICKY_ASSIGNOR_NAME;
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 68cf8a9..67ad51e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -81,6 +81,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ASSIGN_FROM_SUBSCRIBED_ASSIGNORS;
+
 /**
  * This class manages the coordination process with the consumer coordinator.
  */
@@ -556,6 +558,53 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         maybeUpdateSubscriptionMetadata();
     }
 
+    private boolean isAssignFromSubscribedTopicsAssignor(String name) {
+        return ASSIGN_FROM_SUBSCRIBED_ASSIGNORS.contains(name);
+    }
+
+    /**
+     * user-customized assignor may have created some topics that are not in 
the subscription list
+     * and assign their partitions to the members; in this case we would like 
to update the leader's
+     * own metadata with the newly added topics so that it will not trigger a 
subsequent rebalance
+     * when these topics gets updated from metadata refresh.
+     *
+     * We skip the check for in-product assignors since this will not happen 
in in-product assignors.
+     *
+     * TODO: this is a hack and not something we want to support long-term 
unless we push regex into the protocol
+     *       we may need to modify the ConsumerPartitionAssignor API to better 
support this case.
+     *
+     * @param assignorName          the selected assignor name
+     * @param assignments           the assignments after assignor assigned
+     * @param allSubscribedTopics   all consumers' subscribed topics
+     */
+    private void maybeUpdateGroupSubscription(String assignorName,
+                                              Map<String, Assignment> 
assignments,
+                                              Set<String> allSubscribedTopics) 
{
+        if (!isAssignFromSubscribedTopicsAssignor(assignorName)) {
+            Set<String> assignedTopics = new HashSet<>();
+            for (Assignment assigned : assignments.values()) {
+                for (TopicPartition tp : assigned.partitions())
+                    assignedTopics.add(tp.topic());
+            }
+
+            if (!assignedTopics.containsAll(allSubscribedTopics)) {
+                Set<String> notAssignedTopics = new 
HashSet<>(allSubscribedTopics);
+                notAssignedTopics.removeAll(assignedTopics);
+                log.warn("The following subscribed topics are not assigned to 
any members: {} ", notAssignedTopics);
+            }
+
+            if (!allSubscribedTopics.containsAll(assignedTopics)) {
+                Set<String> newlyAddedTopics = new HashSet<>(assignedTopics);
+                newlyAddedTopics.removeAll(allSubscribedTopics);
+                log.info("The following not-subscribed topics are assigned, 
and their metadata will be " +
+                    "fetched from the brokers: {}", newlyAddedTopics);
+
+                allSubscribedTopics.addAll(newlyAddedTopics);
+                updateGroupSubscription(allSubscribedTopics);
+            }
+        }
+    }
+
     @Override
     protected Map<String, ByteBuffer> performAssignment(String leaderId,
                                                         String 
assignmentStrategy,
@@ -592,34 +641,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
             validateCooperativeAssignment(ownedPartitions, assignments);
         }
 
-        // user-customized assignor may have created some topics that are not 
in the subscription list
-        // and assign their partitions to the members; in this case we would 
like to update the leader's
-        // own metadata with the newly added topics so that it will not 
trigger a subsequent rebalance
-        // when these topics gets updated from metadata refresh.
-        //
-        // TODO: this is a hack and not something we want to support long-term 
unless we push regex into the protocol
-        //       we may need to modify the ConsumerPartitionAssignor API to 
better support this case.
-        Set<String> assignedTopics = new HashSet<>();
-        for (Assignment assigned : assignments.values()) {
-            for (TopicPartition tp : assigned.partitions())
-                assignedTopics.add(tp.topic());
-        }
-
-        if (!assignedTopics.containsAll(allSubscribedTopics)) {
-            Set<String> notAssignedTopics = new HashSet<>(allSubscribedTopics);
-            notAssignedTopics.removeAll(assignedTopics);
-            log.warn("The following subscribed topics are not assigned to any 
members: {} ", notAssignedTopics);
-        }
-
-        if (!allSubscribedTopics.containsAll(assignedTopics)) {
-            Set<String> newlyAddedTopics = new HashSet<>(assignedTopics);
-            newlyAddedTopics.removeAll(allSubscribedTopics);
-            log.info("The following not-subscribed topics are assigned, and 
their metadata will be " +
-                    "fetched from the brokers: {}", newlyAddedTopics);
-
-            allSubscribedTopics.addAll(assignedTopics);
-            updateGroupSubscription(allSubscribedTopics);
-        }
+        maybeUpdateGroupSubscription(assignor.name(), assignments, 
allSubscribedTopics);
 
         assignmentSnapshot = metadataSnapshot;
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 902ea99..e5aa804 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -77,6 +77,8 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -191,7 +193,8 @@ public abstract class ConsumerCoordinatorTest {
         this.coordinator = buildCoordinator(rebalanceConfig,
                                             metrics,
                                             assignors,
-                                            false);
+                                            false,
+                                            subscriptions);
     }
 
     private GroupRebalanceConfig buildRebalanceConfig(Optional<String> 
groupInstanceId) {
@@ -258,6 +261,63 @@ public abstract class ConsumerCoordinatorTest {
         return metrics.metrics().get(metrics.metricName(name, consumerId + 
groupId + "-coordinator-metrics"));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void 
testPerformAssignmentShouldUpdateGroupSubscriptionAfterAssignmentIfNeeded() {
+        SubscriptionState mockSubscriptionState = 
Mockito.mock(SubscriptionState.class);
+
+        // the consumer only subscribed to "topic1"
+        Map<String, List<String>> memberSubscriptions = 
singletonMap(consumerId, singletonList(topic1));
+
+        List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new 
ArrayList<>();
+        for (Map.Entry<String, List<String>> subscriptionEntry : 
memberSubscriptions.entrySet()) {
+            ConsumerPartitionAssignor.Subscription subscription = new 
ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue());
+            ByteBuffer buf = 
ConsumerProtocol.serializeSubscription(subscription);
+            metadata.add(new JoinGroupResponseData.JoinGroupResponseMember()
+                .setMemberId(subscriptionEntry.getKey())
+                .setMetadata(buf.array()));
+        }
+
+        // normal case: the assignment result will have partitions for only 
the subscribed topic: "topic1"
+        partitionAssignor.prepare(Collections.singletonMap(consumerId, 
singletonList(t1p)));
+
+        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false, 
mockSubscriptionState)) {
+            coordinator.performAssignment("1", partitionAssignor.name(), 
metadata);
+
+            ArgumentCaptor<Collection<String>> topicsCaptor = 
ArgumentCaptor.forClass(Collection.class);
+            // groupSubscribe should be only called 1 time, which is before 
assignment,
+            // because the assigned topics are the same as the subscribed 
topics
+            Mockito.verify(mockSubscriptionState, 
Mockito.times(1)).groupSubscribe(topicsCaptor.capture());
+
+            List<Collection<String>> capturedTopics = 
topicsCaptor.getAllValues();
+
+            // expected the final group subscribed topics to be updated to 
"topic1"
+            Set<String> expectedTopicsGotCalled = new 
HashSet<>(Arrays.asList(topic1));
+            assertEquals(expectedTopicsGotCalled, capturedTopics.get(0));
+        }
+
+        Mockito.clearInvocations(mockSubscriptionState);
+
+        // unsubscribed topic partition assigned case: the assignment result 
will have partitions for (1) subscribed topic: "topic1"
+        // and (2) the additional unsubscribed topic: "topic2". We should add 
"topic2" into group subscription list
+        partitionAssignor.prepare(Collections.singletonMap(consumerId, 
Arrays.asList(t1p, t2p)));
+
+        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false, 
mockSubscriptionState)) {
+            coordinator.performAssignment("1", partitionAssignor.name(), 
metadata);
+
+            ArgumentCaptor<Collection<String>> topicsCaptor = 
ArgumentCaptor.forClass(Collection.class);
+            // groupSubscribe should be called 2 times, once before 
assignment, once after assignment
+            // (because the assigned topics are not the same as the subscribed 
topics)
+            Mockito.verify(mockSubscriptionState, 
Mockito.times(2)).groupSubscribe(topicsCaptor.capture());
+
+            List<Collection<String>> capturedTopics = 
topicsCaptor.getAllValues();
+
+            // expected the final group subscribed topics to be updated to 
"topic1" and "topic2"
+            Set<String> expectedTopicsGotCalled = new 
HashSet<>(Arrays.asList(topic1, topic2));
+            assertEquals(expectedTopicsGotCalled, capturedTopics.get(1));
+        }
+    }
+
     @Test
     public void testSelectRebalanceProtcol() {
         List<ConsumerPartitionAssignor> assignors = new ArrayList<>();
@@ -265,14 +325,14 @@ public abstract class ConsumerCoordinatorTest {
         assignors.add(new 
MockPartitionAssignor(Collections.singletonList(COOPERATIVE)));
 
         // no commonly supported protocols
-        assertThrows(IllegalArgumentException.class, () -> 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false));
+        assertThrows(IllegalArgumentException.class, () -> 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false, 
subscriptions));
 
         assignors.clear();
         assignors.add(new 
MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER,
 COOPERATIVE)));
         assignors.add(new 
MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER,
 COOPERATIVE)));
 
         // select higher indexed (more advanced) protocols
-        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false)) {
+        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false, 
subscriptions)) {
             assertEquals(COOPERATIVE, coordinator.getProtocol());
         }
     }
@@ -1586,7 +1646,7 @@ public abstract class ConsumerCoordinatorTest {
         metadata = new ConsumerMetadata(0, Long.MAX_VALUE, 
includeInternalTopics,
                 false, subscriptions, new LogContext(), new 
ClusterResourceListeners());
         client = new MockClient(time, metadata);
-        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false)) {
+        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false, 
subscriptions)) {
             subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
             Node node = new Node(0, "localhost", 9999);
             MetadataResponse.PartitionMetadata partitionMetadata =
@@ -1722,7 +1782,7 @@ public abstract class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitDynamicAssignment() {
-        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)
+        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)
         ) {
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
             joinAsFollowerAndReceiveAssignment(coordinator, 
singletonList(t1p));
@@ -1736,7 +1796,7 @@ public abstract class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitRetryBackoff() {
-        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, 
subscriptions)) {
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
             joinAsFollowerAndReceiveAssignment(coordinator, 
singletonList(t1p));
 
@@ -1769,7 +1829,7 @@ public abstract class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitAwaitsInterval() {
-        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, 
subscriptions)) {
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
             joinAsFollowerAndReceiveAssignment(coordinator, 
singletonList(t1p));
 
@@ -1806,7 +1866,7 @@ public abstract class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitDynamicAssignmentRebalance() {
-        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, 
subscriptions)) {
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
             client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
             coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -1830,7 +1890,7 @@ public abstract class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitManualAssignment() {
-        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, 
subscriptions)) {
             subscriptions.assignFromUser(singleton(t1p));
             subscriptions.seek(t1p, 100);
             client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
@@ -1845,7 +1905,7 @@ public abstract class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitManualAssignmentCoordinatorUnknown() {
-        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, 
subscriptions)) {
             subscriptions.assignFromUser(singleton(t1p));
             subscriptions.seek(t1p, 100);
 
@@ -2694,7 +2754,7 @@ public abstract class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitAfterCoordinatorBackToService() {
-        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, 
subscriptions)) {
             subscriptions.assignFromUser(Collections.singleton(t1p));
             subscriptions.seek(t1p, 100L);
 
@@ -2907,7 +2967,8 @@ public abstract class ConsumerCoordinatorTest {
         ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
                                                            new Metrics(),
                                                            assignors,
-                                                           autoCommit);
+                                                           autoCommit,
+                                                           subscriptions);
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
         if (useGroupManagement) {
@@ -2996,14 +3057,15 @@ public abstract class ConsumerCoordinatorTest {
     private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig 
rebalanceConfig,
                                                  final Metrics metrics,
                                                  final 
List<ConsumerPartitionAssignor> assignors,
-                                                 final boolean 
autoCommitEnabled) {
+                                                 final boolean 
autoCommitEnabled,
+                                                 final SubscriptionState 
subscriptionState) {
         return new ConsumerCoordinator(
                 rebalanceConfig,
                 new LogContext(),
                 consumerClient,
                 assignors,
                 metadata,
-                subscriptions,
+                subscriptionState,
                 metrics,
                 consumerId + groupId,
                 time,

Reply via email to