This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 71f1cb7 MINOR: optimize performAssignment to skip unnecessary check
(#11218)
71f1cb7 is described below
commit 71f1cb7bfb2dc801c9ea47e0794d915e7f279635
Author: Luke Chen <[email protected]>
AuthorDate: Wed Sep 29 08:22:05 2021 +0800
MINOR: optimize performAssignment to skip unnecessary check (#11218)
Found this while reading the code. We did a "a little heavy" check each
time after performing assignment, which is to compare the "assigned topics" set
and the "subscribed topics" set, to see if there's any topics not existed in
another set. Also, the "assigned topics" set is created by traversing all the
assigned partitions, which will be a little heavy if partition numbers are
large.
However, as the comments described, it's a safe-guard for user-customized
assignor, which might do assignment that we don't expected. In most cases, user
will just use the in-product assignor, which guarantee that we only assign the
topics from subscribed topics. Therefore, no need this check for in-product
assignors.
In this PR, I added an "in-product assignor names" list, and we'll in
consumerCoordinator check if the assignor is one of in-product assignors, to
decide if we need to do the additional check. Also add test for it.
Reviewers: Anna Sophie Blee-Goldman <[email protected]>, Guozhang
Wang <[email protected]>
---
.../kafka/clients/consumer/ConsumerConfig.java | 15 ++++
.../consumer/CooperativeStickyAssignor.java | 3 +-
.../kafka/clients/consumer/RangeAssignor.java | 3 +-
.../kafka/clients/consumer/RoundRobinAssignor.java | 3 +-
.../kafka/clients/consumer/StickyAssignor.java | 3 +-
.../consumer/internals/ConsumerCoordinator.java | 78 ++++++++++++-------
.../internals/ConsumerCoordinatorTest.java | 90 ++++++++++++++++++----
7 files changed, 149 insertions(+), 46 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 783c97e..ca24c28 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
@@ -39,6 +40,10 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import static
org.apache.kafka.clients.consumer.CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME;
+import static
org.apache.kafka.clients.consumer.RangeAssignor.RANGE_ASSIGNOR_NAME;
+import static
org.apache.kafka.clients.consumer.RoundRobinAssignor.ROUNDROBIN_ASSIGNOR_NAME;
+import static
org.apache.kafka.clients.consumer.StickyAssignor.STICKY_ASSIGNOR_NAME;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
@@ -48,6 +53,16 @@ import static
org.apache.kafka.common.config.ConfigDef.ValidString.in;
public class ConsumerConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
+ // a list contains all the assignor names that only assign subscribed
topics to consumer. Should be updated when new assignor added.
+ // This is to help optimize ConsumerCoordinator#performAssignment method
+ public static final List<String> ASSIGN_FROM_SUBSCRIBED_ASSIGNORS =
+ Collections.unmodifiableList(Arrays.asList(
+ RANGE_ASSIGNOR_NAME,
+ ROUNDROBIN_ASSIGNOR_NAME,
+ STICKY_ASSIGNOR_NAME,
+ COOPERATIVE_STICKY_ASSIGNOR_NAME
+ ));
+
/*
* NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES
AS
* THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
index 5f0bb0c..b2cca87 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
@@ -47,6 +47,7 @@ import org.apache.kafka.common.protocol.types.Type;
* cooperative rebalancing. See the <a
href="https://kafka.apache.org/documentation/#upgrade_240_notable">upgrade
guide</a> for details.
*/
public class CooperativeStickyAssignor extends AbstractStickyAssignor {
+ public static final String COOPERATIVE_STICKY_ASSIGNOR_NAME =
"cooperative-sticky";
// these schemas are used for preserving useful metadata for the
assignment, such as the last stable generation
private static final String GENERATION_KEY_NAME = "generation";
@@ -57,7 +58,7 @@ public class CooperativeStickyAssignor extends
AbstractStickyAssignor {
@Override
public String name() {
- return "cooperative-sticky";
+ return COOPERATIVE_STICKY_ASSIGNOR_NAME;
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
index 21eb47a..aec0d39 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
@@ -65,10 +65,11 @@ import java.util.Map;
* </ul>
*/
public class RangeAssignor extends AbstractPartitionAssignor {
+ public static final String RANGE_ASSIGNOR_NAME = "range";
@Override
public String name() {
- return "range";
+ return RANGE_ASSIGNOR_NAME;
}
private Map<String, List<MemberInfo>> consumersPerTopic(Map<String,
Subscription> consumerMetadata) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
index edecce8..2d6edea 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
@@ -98,6 +98,7 @@ import java.util.TreeSet;
* </ul>
*/
public class RoundRobinAssignor extends AbstractPartitionAssignor {
+ public static final String ROUNDROBIN_ASSIGNOR_NAME = "roundrobin";
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer>
partitionsPerTopic,
@@ -138,7 +139,7 @@ public class RoundRobinAssignor extends
AbstractPartitionAssignor {
@Override
public String name() {
- return "roundrobin";
+ return ROUNDROBIN_ASSIGNOR_NAME;
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
index 77a61df..787c432 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
@@ -174,6 +174,7 @@ import org.apache.kafka.common.utils.CollectionUtils;
* {@link ConsumerPartitionAssignor.RebalanceProtocol} for a detailed
explanation of cooperative rebalancing.
*/
public class StickyAssignor extends AbstractStickyAssignor {
+ public static final String STICKY_ASSIGNOR_NAME = "sticky";
// these schemas are used for preserving consumer's previously assigned
partitions
// list and sending it as user data to the leader during a rebalance
@@ -196,7 +197,7 @@ public class StickyAssignor extends AbstractStickyAssignor {
@Override
public String name() {
- return "sticky";
+ return STICKY_ASSIGNOR_NAME;
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index ed0b793..bbdc434 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -81,6 +81,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.ASSIGN_FROM_SUBSCRIBED_ASSIGNORS;
+
/**
* This class manages the coordination process with the consumer coordinator.
*/
@@ -556,6 +558,53 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
maybeUpdateSubscriptionMetadata();
}
+ private boolean isAssignFromSubscribedTopicsAssignor(String name) {
+ return ASSIGN_FROM_SUBSCRIBED_ASSIGNORS.contains(name);
+ }
+
+ /**
+ * user-customized assignor may have created some topics that are not in
the subscription list
+ * and assign their partitions to the members; in this case we would like
to update the leader's
+ * own metadata with the newly added topics so that it will not trigger a
subsequent rebalance
+ * when these topics gets updated from metadata refresh.
+ *
+ * We skip the check for in-product assignors since this will not happen
in in-product assignors.
+ *
+ * TODO: this is a hack and not something we want to support long-term
unless we push regex into the protocol
+ * we may need to modify the ConsumerPartitionAssignor API to better
support this case.
+ *
+ * @param assignorName the selected assignor name
+ * @param assignments the assignments after assignor assigned
+ * @param allSubscribedTopics all consumers' subscribed topics
+ */
+ private void maybeUpdateGroupSubscription(String assignorName,
+ Map<String, Assignment>
assignments,
+ Set<String> allSubscribedTopics)
{
+ if (!isAssignFromSubscribedTopicsAssignor(assignorName)) {
+ Set<String> assignedTopics = new HashSet<>();
+ for (Assignment assigned : assignments.values()) {
+ for (TopicPartition tp : assigned.partitions())
+ assignedTopics.add(tp.topic());
+ }
+
+ if (!assignedTopics.containsAll(allSubscribedTopics)) {
+ Set<String> notAssignedTopics = new
HashSet<>(allSubscribedTopics);
+ notAssignedTopics.removeAll(assignedTopics);
+ log.warn("The following subscribed topics are not assigned to
any members: {} ", notAssignedTopics);
+ }
+
+ if (!allSubscribedTopics.containsAll(assignedTopics)) {
+ Set<String> newlyAddedTopics = new HashSet<>(assignedTopics);
+ newlyAddedTopics.removeAll(allSubscribedTopics);
+ log.info("The following not-subscribed topics are assigned,
and their metadata will be " +
+ "fetched from the brokers: {}", newlyAddedTopics);
+
+ allSubscribedTopics.addAll(newlyAddedTopics);
+ updateGroupSubscription(allSubscribedTopics);
+ }
+ }
+ }
+
@Override
protected Map<String, ByteBuffer> performAssignment(String leaderId,
String
assignmentStrategy,
@@ -592,34 +641,7 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
validateCooperativeAssignment(ownedPartitions, assignments);
}
- // user-customized assignor may have created some topics that are not
in the subscription list
- // and assign their partitions to the members; in this case we would
like to update the leader's
- // own metadata with the newly added topics so that it will not
trigger a subsequent rebalance
- // when these topics gets updated from metadata refresh.
- //
- // TODO: this is a hack and not something we want to support long-term
unless we push regex into the protocol
- // we may need to modify the ConsumerPartitionAssignor API to
better support this case.
- Set<String> assignedTopics = new HashSet<>();
- for (Assignment assigned : assignments.values()) {
- for (TopicPartition tp : assigned.partitions())
- assignedTopics.add(tp.topic());
- }
-
- if (!assignedTopics.containsAll(allSubscribedTopics)) {
- Set<String> notAssignedTopics = new HashSet<>(allSubscribedTopics);
- notAssignedTopics.removeAll(assignedTopics);
- log.warn("The following subscribed topics are not assigned to any
members: {} ", notAssignedTopics);
- }
-
- if (!allSubscribedTopics.containsAll(assignedTopics)) {
- Set<String> newlyAddedTopics = new HashSet<>(assignedTopics);
- newlyAddedTopics.removeAll(allSubscribedTopics);
- log.info("The following not-subscribed topics are assigned, and
their metadata will be " +
- "fetched from the brokers: {}", newlyAddedTopics);
-
- allSubscribedTopics.addAll(assignedTopics);
- updateGroupSubscription(allSubscribedTopics);
- }
+ maybeUpdateGroupSubscription(assignor.name(), assignments,
allSubscribedTopics);
assignmentSnapshot = metadataSnapshot;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 902ea99..e5aa804 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -77,6 +77,8 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -191,7 +193,8 @@ public abstract class ConsumerCoordinatorTest {
this.coordinator = buildCoordinator(rebalanceConfig,
metrics,
assignors,
- false);
+ false,
+ subscriptions);
}
private GroupRebalanceConfig buildRebalanceConfig(Optional<String>
groupInstanceId) {
@@ -258,6 +261,63 @@ public abstract class ConsumerCoordinatorTest {
return metrics.metrics().get(metrics.metricName(name, consumerId +
groupId + "-coordinator-metrics"));
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void
testPerformAssignmentShouldUpdateGroupSubscriptionAfterAssignmentIfNeeded() {
+ SubscriptionState mockSubscriptionState =
Mockito.mock(SubscriptionState.class);
+
+ // the consumer only subscribed to "topic1"
+ Map<String, List<String>> memberSubscriptions =
singletonMap(consumerId, singletonList(topic1));
+
+ List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new
ArrayList<>();
+ for (Map.Entry<String, List<String>> subscriptionEntry :
memberSubscriptions.entrySet()) {
+ ConsumerPartitionAssignor.Subscription subscription = new
ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue());
+ ByteBuffer buf =
ConsumerProtocol.serializeSubscription(subscription);
+ metadata.add(new JoinGroupResponseData.JoinGroupResponseMember()
+ .setMemberId(subscriptionEntry.getKey())
+ .setMetadata(buf.array()));
+ }
+
+ // normal case: the assignment result will have partitions for only
the subscribed topic: "topic1"
+ partitionAssignor.prepare(Collections.singletonMap(consumerId,
singletonList(t1p)));
+
+ try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false,
mockSubscriptionState)) {
+ coordinator.performAssignment("1", partitionAssignor.name(),
metadata);
+
+ ArgumentCaptor<Collection<String>> topicsCaptor =
ArgumentCaptor.forClass(Collection.class);
+ // groupSubscribe should be only called 1 time, which is before
assignment,
+ // because the assigned topics are the same as the subscribed
topics
+ Mockito.verify(mockSubscriptionState,
Mockito.times(1)).groupSubscribe(topicsCaptor.capture());
+
+ List<Collection<String>> capturedTopics =
topicsCaptor.getAllValues();
+
+ // expected the final group subscribed topics to be updated to
"topic1"
+ Set<String> expectedTopicsGotCalled = new
HashSet<>(Arrays.asList(topic1));
+ assertEquals(expectedTopicsGotCalled, capturedTopics.get(0));
+ }
+
+ Mockito.clearInvocations(mockSubscriptionState);
+
+ // unsubscribed topic partition assigned case: the assignment result
will have partitions for (1) subscribed topic: "topic1"
+ // and (2) the additional unsubscribed topic: "topic2". We should add
"topic2" into group subscription list
+ partitionAssignor.prepare(Collections.singletonMap(consumerId,
Arrays.asList(t1p, t2p)));
+
+ try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false,
mockSubscriptionState)) {
+ coordinator.performAssignment("1", partitionAssignor.name(),
metadata);
+
+ ArgumentCaptor<Collection<String>> topicsCaptor =
ArgumentCaptor.forClass(Collection.class);
+ // groupSubscribe should be called 2 times, once before
assignment, once after assignment
+ // (because the assigned topics are not the same as the subscribed
topics)
+ Mockito.verify(mockSubscriptionState,
Mockito.times(2)).groupSubscribe(topicsCaptor.capture());
+
+ List<Collection<String>> capturedTopics =
topicsCaptor.getAllValues();
+
+ // expected the final group subscribed topics to be updated to
"topic1" and "topic2"
+ Set<String> expectedTopicsGotCalled = new
HashSet<>(Arrays.asList(topic1, topic2));
+ assertEquals(expectedTopicsGotCalled, capturedTopics.get(1));
+ }
+ }
+
@Test
public void testSelectRebalanceProtcol() {
List<ConsumerPartitionAssignor> assignors = new ArrayList<>();
@@ -265,14 +325,14 @@ public abstract class ConsumerCoordinatorTest {
assignors.add(new
MockPartitionAssignor(Collections.singletonList(COOPERATIVE)));
// no commonly supported protocols
- assertThrows(IllegalArgumentException.class, () ->
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false));
+ assertThrows(IllegalArgumentException.class, () ->
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false,
subscriptions));
assignors.clear();
assignors.add(new
MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER,
COOPERATIVE)));
assignors.add(new
MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER,
COOPERATIVE)));
// select higher indexed (more advanced) protocols
- try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false)) {
+ try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false,
subscriptions)) {
assertEquals(COOPERATIVE, coordinator.getProtocol());
}
}
@@ -1586,7 +1646,7 @@ public abstract class ConsumerCoordinatorTest {
metadata = new ConsumerMetadata(0, Long.MAX_VALUE,
includeInternalTopics,
false, subscriptions, new LogContext(), new
ClusterResourceListeners());
client = new MockClient(time, metadata);
- try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false)) {
+ try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false,
subscriptions)) {
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
Node node = new Node(0, "localhost", 9999);
MetadataResponse.PartitionMetadata partitionMetadata =
@@ -1722,7 +1782,7 @@ public abstract class ConsumerCoordinatorTest {
@Test
public void testAutoCommitDynamicAssignment() {
- try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)
+ try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)
) {
subscriptions.subscribe(singleton(topic1), rebalanceListener);
joinAsFollowerAndReceiveAssignment(coordinator,
singletonList(t1p));
@@ -1736,7 +1796,7 @@ public abstract class ConsumerCoordinatorTest {
@Test
public void testAutoCommitRetryBackoff() {
- try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+ try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true,
subscriptions)) {
subscriptions.subscribe(singleton(topic1), rebalanceListener);
joinAsFollowerAndReceiveAssignment(coordinator,
singletonList(t1p));
@@ -1769,7 +1829,7 @@ public abstract class ConsumerCoordinatorTest {
@Test
public void testAutoCommitAwaitsInterval() {
- try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+ try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true,
subscriptions)) {
subscriptions.subscribe(singleton(topic1), rebalanceListener);
joinAsFollowerAndReceiveAssignment(coordinator,
singletonList(t1p));
@@ -1806,7 +1866,7 @@ public abstract class ConsumerCoordinatorTest {
@Test
public void testAutoCommitDynamicAssignmentRebalance() {
- try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+ try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true,
subscriptions)) {
subscriptions.subscribe(singleton(topic1), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -1830,7 +1890,7 @@ public abstract class ConsumerCoordinatorTest {
@Test
public void testAutoCommitManualAssignment() {
- try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+ try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true,
subscriptions)) {
subscriptions.assignFromUser(singleton(t1p));
subscriptions.seek(t1p, 100);
client.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
@@ -1845,7 +1905,7 @@ public abstract class ConsumerCoordinatorTest {
@Test
public void testAutoCommitManualAssignmentCoordinatorUnknown() {
- try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+ try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true,
subscriptions)) {
subscriptions.assignFromUser(singleton(t1p));
subscriptions.seek(t1p, 100);
@@ -2694,7 +2754,7 @@ public abstract class ConsumerCoordinatorTest {
@Test
public void testAutoCommitAfterCoordinatorBackToService() {
- try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+ try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true,
subscriptions)) {
subscriptions.assignFromUser(Collections.singleton(t1p));
subscriptions.seek(t1p, 100L);
@@ -2907,7 +2967,8 @@ public abstract class ConsumerCoordinatorTest {
ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
new Metrics(),
assignors,
- autoCommit);
+ autoCommit,
+ subscriptions);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
if (useGroupManagement) {
@@ -2996,14 +3057,15 @@ public abstract class ConsumerCoordinatorTest {
private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig
rebalanceConfig,
final Metrics metrics,
final
List<ConsumerPartitionAssignor> assignors,
- final boolean
autoCommitEnabled) {
+ final boolean
autoCommitEnabled,
+ final SubscriptionState
subscriptionState) {
return new ConsumerCoordinator(
rebalanceConfig,
new LogContext(),
consumerClient,
assignors,
metadata,
- subscriptions,
+ subscriptionState,
metrics,
consumerId + groupId,
time,