This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push: new d83825b KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode (#11631) d83825b is described below commit d83825bfc99503ca946256d2097a4175b4518bbb Author: Luke Chen <show...@gmail.com> AuthorDate: Mon Feb 7 07:07:59 2022 +0800 KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode (#11631) After KAFKA-10793, we clear the findCoordinatorFuture in 2 places: 1. heartbeat thread 2. AbstractCoordinator#ensureCoordinatorReady But in non consumer group mode with group id provided (for offset commitment. So that there will be consumerCoordinator created), there will be no (1)heartbeat thread , and it only call (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to fetch committed offset position. That is, after 2nd lookupCoordinator call, we have no chance to clear the findCoordinatorFuture , and causes the offset commit never succeeded. To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear the findCoordinatorFuture in the future listener. So, I think we can fix this issue by calling AbstractCoordinator#ensureCoordinatorReady when coordinator unknown in non consumer group case, under each ConsumerCoordinator#poll. Reviewers: Guozhang Wang <wangg...@gmail.com> --- .../consumer/internals/ConsumerCoordinator.java | 18 +- .../kafka/clients/consumer/KafkaConsumerTest.java | 185 ++++++--------------- .../internals/ConsumerCoordinatorTest.java | 15 ++ .../kafka/api/AuthorizerIntegrationTest.scala | 3 +- 4 files changed, 74 insertions(+), 147 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index fad7f92..a7194a0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -455,6 +455,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } } + private boolean coordinatorUnknownAndUnready(Timer timer) { + return coordinatorUnknown() && !ensureCoordinatorReady(timer); + } + /** * Poll for coordinator events. This ensures that the coordinator is known and that the consumer * has joined the group (if it is using group management). This also handles periodic offset commits @@ -480,7 +484,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // Always update the heartbeat last poll time so that the heartbeat thread does not leave the // group proactively due to application inactivity even if (say) the coordinator cannot be found. pollHeartbeat(timer.currentTimeMs()); - if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { + if (coordinatorUnknownAndUnready(timer)) { return false; } @@ -517,15 +521,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } } } else { - // For manually assigned partitions, if there are no ready nodes, await metadata. + // For manually assigned partitions, if coordinator is unknown, make sure we lookup one and await metadata. // If connections to all nodes fail, wakeups triggered while attempting to send fetch // requests result in polls returning immediately, causing a tight loop of polls. Without // the wakeup, poll() with no channels would block for the timeout, delaying re-connection. - // awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop. - // When group management is used, metadata wait is already performed for this scenario as - // coordinator is unknown, hence this check is not required. - if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { - client.awaitMetadataUpdate(timer); + // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop. + if (coordinatorUnknownAndUnready(timer)) { + return false; } } @@ -1021,7 +1023,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { return true; do { - if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { + if (coordinatorUnknownAndUnready(timer)) { return false; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 9b79473..f8b86b8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -192,6 +192,9 @@ public class KafkaConsumerTest { private final String partitionLost = "Hit partition lost "; private final Collection<TopicPartition> singleTopicPartition = Collections.singleton(new TopicPartition(topic, 0)); + private final Time time = new MockTime(); + private final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + private final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); @Test public void testMetricsReporterAutoGeneratedClientId() { @@ -266,12 +269,9 @@ public class KafkaConsumerTest { } private KafkaConsumer<String, String> setUpConsumerWithRecordsToPoll(TopicPartition tp, int recordCount, Deserializer<String> deserializer) { - Time time = new MockTime(); Cluster cluster = TestUtils.singletonCluster(tp.topic(), 1); Node node = cluster.nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); @@ -538,16 +538,12 @@ public class KafkaConsumerTest { @Test public void verifyHeartbeatSent() throws Exception { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -573,14 +569,11 @@ public class KafkaConsumerTest { @Test public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -607,16 +600,12 @@ public class KafkaConsumerTest { @Test public void verifyPollTimesOutDuringMetadataUpdate() { - final Time time = new MockTime(); - final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); final ConsumerMetadata metadata = createMetadata(subscription); final MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); // Since we would enable the heartbeat thread after received join-response which could @@ -635,16 +624,12 @@ public class KafkaConsumerTest { @SuppressWarnings("deprecation") @Test public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() { - final Time time = new MockTime(); - final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); final ConsumerMetadata metadata = createMetadata(subscription); final MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, assignor, singletonList(tp0), null); @@ -660,15 +645,12 @@ public class KafkaConsumerTest { @Test public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); + KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, null, groupInstanceId, false); consumer.assign(singleton(tp0)); consumer.seekToBeginning(singleton(tp0)); @@ -684,21 +666,57 @@ public class KafkaConsumerTest { } @Test + public void verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit() { + ConsumerMetadata metadata = createMetadata(subscription); + MockClient client = new MockClient(time, metadata); + + initMetadata(client, Collections.singletonMap(topic, 1)); + Node node = metadata.fetch().nodes().get(0); + + // create a consumer with groupID with manual assignment + KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); + consumer.assign(singleton(tp0)); + + // 1st coordinator error should cause coordinator unknown + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE, groupId, node), node); + consumer.poll(Duration.ofMillis(0)); + + // 2nd coordinator error should find the correct coordinator and clear the findCoordinatorFuture + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node); + + client.prepareResponse(offsetResponse(Collections.singletonMap(tp0, 50L), Errors.NONE)); + client.prepareResponse(fetchResponse(tp0, 50L, 5)); + + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0)); + assertEquals(5, records.count()); + assertEquals(55L, consumer.position(tp0)); + + // after coordinator found, consumer should be able to commit the offset successfully + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp0, Errors.NONE))); + consumer.commitSync(Collections.singletonMap(tp0, new OffsetAndMetadata(55L))); + + // verify the offset is committed + client.prepareResponse(offsetResponse(Collections.singletonMap(tp0, 55L), Errors.NONE)); + assertEquals(55, consumer.committed(Collections.singleton(tp0), Duration.ZERO).get(tp0).offset()); + consumer.close(Duration.ofMillis(0)); + } + + @Test public void testFetchProgressWithMissingPartitionPosition() { // Verifies that we can make progress on one partition while we are awaiting // a reset on another partition. - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 2)); + Node node = metadata.fetch().nodes().get(0); KafkaConsumer<String, String> consumer = newConsumerNoAutoCommit(time, client, subscription, metadata); consumer.assign(Arrays.asList(tp0, tp1)); consumer.seekToEnd(singleton(tp0)); consumer.seekToBeginning(singleton(tp1)); + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node); client.prepareResponse(body -> { ListOffsetsRequest request = (ListOffsetsRequest) body; List<ListOffsetsPartition> partitions = request.topics().stream().flatMap(t -> { @@ -742,7 +760,6 @@ public class KafkaConsumerTest { @Test public void testMissingOffsetNoResetPolicy() { - Time time = new MockTime(); SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -750,8 +767,6 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, groupInstanceId, false); consumer.assign(singletonList(tp0)); @@ -766,7 +781,6 @@ public class KafkaConsumerTest { @Test public void testResetToCommittedOffset() { - Time time = new MockTime(); SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -774,8 +788,6 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, groupInstanceId, false); consumer.assign(singletonList(tp0)); @@ -791,7 +803,6 @@ public class KafkaConsumerTest { @Test public void testResetUsingAutoResetPolicy() { - Time time = new MockTime(); SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -799,8 +810,6 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, groupInstanceId, false); consumer.assign(singletonList(tp0)); @@ -818,15 +827,12 @@ public class KafkaConsumerTest { @Test public void testOffsetIsValidAfterSeek() { - Time time = new MockTime(); SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, Optional.empty(), false); consumer.assign(singletonList(tp0)); @@ -840,16 +846,12 @@ public class KafkaConsumerTest { long offset1 = 10000; long offset2 = 20000; - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 2)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.assign(singletonList(tp0)); @@ -894,8 +896,6 @@ public class KafkaConsumerTest { private KafkaConsumer<String, String> setupThrowableConsumer() { long offset1 = 10000; - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -904,8 +904,6 @@ public class KafkaConsumerTest { Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer<String, String> consumer = newConsumer( time, client, subscription, metadata, assignor, true, groupId, groupInstanceId, true); consumer.assign(singletonList(tp0)); @@ -922,16 +920,12 @@ public class KafkaConsumerTest { public void testNoCommittedOffsets() { long offset1 = 10000; - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 2)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.assign(Arrays.asList(tp0, tp1)); @@ -951,16 +945,12 @@ public class KafkaConsumerTest { @Test public void testAutoCommitSentBeforePositionUpdate() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null); @@ -988,8 +978,6 @@ public class KafkaConsumerTest { @Test public void testRegexSubscription() { String unmatchedTopic = "unmatched"; - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1000,8 +988,6 @@ public class KafkaConsumerTest { initMetadata(client, partitionCounts); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null); @@ -1018,13 +1004,9 @@ public class KafkaConsumerTest { @Test public void testChangingRegexSubscription() { - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - String otherTopic = "other"; TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0); - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1057,16 +1039,12 @@ public class KafkaConsumerTest { @Test public void testWakeupWithFetchDataAvailable() throws Exception { - final Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, assignor, singletonList(tp0), null); @@ -1098,16 +1076,12 @@ public class KafkaConsumerTest { @Test public void testPollThrowsInterruptExceptionIfInterrupted() { - final Time time = new MockTime(); - final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); final ConsumerMetadata metadata = createMetadata(subscription); final MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, assignor, singletonList(tp0), null); @@ -1128,16 +1102,12 @@ public class KafkaConsumerTest { @Test public void fetchResponseWithUnexpectedPartitionIsIgnored() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RangeAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singletonList(topic), getConsumerRebalanceListener(consumer)); @@ -1164,8 +1134,6 @@ public class KafkaConsumerTest { */ @Test public void testSubscriptionChangesWithAutoCommitEnabled() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1279,8 +1247,6 @@ public class KafkaConsumerTest { */ @Test public void testSubscriptionChangesWithAutoCommitDisabled() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1336,8 +1302,6 @@ public class KafkaConsumerTest { @Test public void testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1361,8 +1325,6 @@ public class KafkaConsumerTest { @Test public void testUnsubscribeShouldTriggerPartitionsLostWithNoGeneration() throws Exception { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1398,8 +1360,6 @@ public class KafkaConsumerTest { @Test public void testManualAssignmentChangeWithAutoCommitEnabled() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1455,8 +1415,6 @@ public class KafkaConsumerTest { @Test public void testManualAssignmentChangeWithAutoCommitDisabled() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1512,8 +1470,6 @@ public class KafkaConsumerTest { @Test public void testOffsetOfPausedPartitions() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1707,16 +1663,12 @@ public class KafkaConsumerTest { @Test public void testShouldAttemptToRejoinGroupAfterSyncGroupFailed() throws Exception { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node); @@ -1779,16 +1731,12 @@ public class KafkaConsumerTest { List<? extends AbstractResponse> responses, long waitMs, boolean interrupt) throws Exception { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, Optional.empty()); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null); @@ -1865,8 +1813,6 @@ public class KafkaConsumerTest { @Test public void testPartitionsForNonExistingTopic() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1879,8 +1825,6 @@ public class KafkaConsumerTest { Collections.emptyList()); client.prepareResponse(updateResponse); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); assertEquals(Collections.emptyList(), consumer.partitionsFor("non-exist-topic")); } @@ -1954,7 +1898,6 @@ public class KafkaConsumerTest { MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 2)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.assign(singletonList(tp0)); @@ -1999,7 +1942,6 @@ public class KafkaConsumerTest { MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 2)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.assign(singletonList(tp0)); @@ -2022,16 +1964,12 @@ public class KafkaConsumerTest { @Test public void testRebalanceException() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getExceptionConsumerRebalanceListener()); @@ -2069,7 +2007,6 @@ public class KafkaConsumerTest { @Test public void testReturnRecordsDuringRebalance() throws InterruptedException { Time time = new MockTime(1L); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor(); @@ -2194,16 +2131,12 @@ public class KafkaConsumerTest { @Test public void testGetGroupMetadata() { - final Time time = new MockTime(); - final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); final ConsumerMetadata metadata = createMetadata(subscription); final MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); final Node node = metadata.fetch().nodes().get(0); - final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); final ConsumerGroupMetadata groupMetadataOnStart = consumer.groupMetadata(); @@ -2228,8 +2161,6 @@ public class KafkaConsumerTest { @Test public void testInvalidGroupMetadata() throws InterruptedException { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); @@ -2257,13 +2188,10 @@ public class KafkaConsumerTest { @Test public void testCurrentLag() { - final Time time = new MockTime(); - final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); final ConsumerMetadata metadata = createMetadata(subscription); final MockClient client = new MockClient(time, metadata); initMetadata(client, singletonMap(topic, 1)); - final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); @@ -2315,13 +2243,10 @@ public class KafkaConsumerTest { @Test public void testListOffsetShouldUpateSubscriptions() { - final Time time = new MockTime(); - final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); final ConsumerMetadata metadata = createMetadata(subscription); final MockClient client = new MockClient(time, metadata); initMetadata(client, singletonMap(topic, 1)); - final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); @@ -2343,7 +2268,6 @@ public class KafkaConsumerTest { } private KafkaConsumer<String, String> consumerWithPendingAuthenticationError(final Time time) { - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -2641,14 +2565,16 @@ public class KafkaConsumerTest { ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time, retryBackoffMs, requestTimeoutMs, heartbeatIntervalMs); - GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, + ConsumerCoordinator consumerCoordinator = null; + if (groupId != null) { + GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, rebalanceTimeoutMs, heartbeatIntervalMs, groupId, groupInstanceId, retryBackoffMs, true); - ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(rebalanceConfig, + consumerCoordinator = new ConsumerCoordinator(rebalanceConfig, loggerFactory, consumerClient, assignors, @@ -2661,6 +2587,7 @@ public class KafkaConsumerTest { autoCommitIntervalMs, interceptors, throwOnStableOffsetNotSupported); + } Fetcher<String, String> fetcher = new Fetcher<>( loggerFactory, consumerClient, @@ -2723,16 +2650,12 @@ public class KafkaConsumerTest { @Test public void testSubscriptionOnInvalidTopic() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Cluster cluster = metadata.fetch(); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - String invalidTopicName = "topic abc"; // Invalid topic name due to space List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>(); @@ -2752,14 +2675,10 @@ public class KafkaConsumerTest { @Test public void testPollTimeMetrics() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singletonList(topic)); // MetricName objects to check @@ -2801,14 +2720,10 @@ public class KafkaConsumerTest { @Test public void testPollIdleRatio() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); // MetricName object to check Metrics metrics = consumer.metrics; @@ -2851,8 +2766,6 @@ public class KafkaConsumerTest { @Test public void testClosingConsumerUnregistersConsumerMetrics() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); @@ -2879,10 +2792,8 @@ public class KafkaConsumerTest { @Test public void testEnforceRebalanceTriggersRebalanceOnNextPoll() { Time time = new MockTime(1L); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); MockRebalanceListener countingRebalanceListener = new MockRebalanceListener(); initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1))); @@ -2976,8 +2887,6 @@ public class KafkaConsumerTest { } private KafkaConsumer<String, String> consumerForCheckingTimeoutException() { - final Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 96aaf8b..b073995 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -478,6 +478,21 @@ public abstract class ConsumerCoordinatorTest { } @Test + public void testCoordinatorNotAvailableWithUserAssignedType() { + subscriptions.assignFromUser(Collections.singleton(t1p)); + // should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error + client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE)); + // set timeout to 0 because we don't want to retry after the error + coordinator.poll(time.timer(0)); + assertTrue(coordinator.coordinatorUnknown()); + + // should find an available node in next find coordinator request + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.poll(time.timer(Long.MAX_VALUE)); + assertFalse(coordinator.coordinatorUnknown()); + } + + @Test public void testCoordinatorNotAvailable() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 0b215ff..c7e8cb9 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -1038,7 +1038,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) // in this case, we do an explicit seek, so there should be no need to query the coordinator at all - val consumer = createConsumer() + // remove the group.id config to avoid coordinator created + val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) consumer.assign(List(tp).asJava) consumer.seekToBeginning(List(tp).asJava) consumeRecords(consumer)