Repository: kafka Updated Branches: refs/heads/trunk a3c45b0c9 -> 573a6f398
KAFKA-2857; Retry querying the consumer group while initializing This applies to new-consumer based groups and would avoid scenarios in which user issues a `--describe` query while the group is initializing. Example: The following could occur for a newly created group. ``` kafkakafka:~/workspace/kafka$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers). Error: Executing consumer group command failed due to The group coordinator is not available. ``` With this PR the group is queried repeatedly at specific intervals within a preset (and configurable) timeout `group-init-timeout` to circumvent unfortunate situations like above. Author: Vahid Hashemian <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #2538 from vahidhashemian/KAFKA-2857 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/573a6f39 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/573a6f39 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/573a6f39 Branch: refs/heads/trunk Commit: 573a6f39863061a6f38a0aca35f11470c3e8538e Parents: a3c45b0 Author: Vahid Hashemian <[email protected]> Authored: Fri Mar 3 11:22:42 2017 -0800 Committer: Jason Gustafson <[email protected]> Committed: Fri Mar 3 12:01:38 2017 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/admin/AdminClient.scala | 48 +++++-- .../kafka/admin/ConsumerGroupCommand.scala | 17 ++- .../main/scala/kafka/tools/StreamsResetter.java | 2 +- .../kafka/admin/DescribeConsumerGroupTest.scala | 133 ++++++++----------- .../integration/ResetIntegrationTest.java | 2 +- 5 files changed, 110 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/573a6f39/core/src/main/scala/kafka/admin/AdminClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 7cfc91a..4b28460 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -16,18 +16,21 @@ import java.nio.ByteBuffer import java.util.{Collections, Properties} import java.util.concurrent.atomic.AtomicInteger -import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion import kafka.common.KafkaException import kafka.coordinator.GroupOverview import kafka.utils.Logging + import org.apache.kafka.clients._ import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture} import org.apache.kafka.common.config.ConfigDef.{Importance, Type} import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} +import org.apache.kafka.common.errors.TimeoutException import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.Selector import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests._ +import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion +import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata import org.apache.kafka.common.requests.OffsetFetchResponse import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{Cluster, Node, TopicPartition} @@ -37,6 +40,7 @@ import scala.util.Try class AdminClient(val time: Time, val requestTimeoutMs: Int, + val retryBackoffMs: Int, val client: ConsumerNetworkClient, val bootstrapBrokers: List[Node]) extends Logging { @@ -66,9 +70,19 @@ class AdminClient(val time: Time, throw new RuntimeException(s"Request $api failed on brokers $bootstrapBrokers") } - def findCoordinator(groupId: String): Node = { + def findCoordinator(groupId: String, timeoutMs: Long = 0): Node = { + val startTime = time.milliseconds val requestBuilder = new GroupCoordinatorRequest.Builder(groupId) - val response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse] + var response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse] + + while (response.error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE && time.milliseconds - startTime < timeoutMs) { + Thread.sleep(retryBackoffMs) + response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse] + } + + if (response.error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE) + throw new TimeoutException("The consumer group command timed out while waiting for group to initialize: ", response.error.exception) + response.error.maybeThrow() response.node } @@ -165,18 +179,34 @@ class AdminClient(val time: Time, consumers: Option[List[ConsumerSummary]], coordinator: Node) - def describeConsumerGroup(groupId: String): ConsumerGroupSummary = { - val coordinator = findCoordinator(groupId) + def describeConsumerGroupHandler(coordinator: Node, groupId: String): GroupMetadata = { val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest.Builder(Collections.singletonList(groupId))) val response = responseBody.asInstanceOf[DescribeGroupsResponse] val metadata = response.groups.get(groupId) if (metadata == null) throw new KafkaException(s"Response from broker contained no metadata for group $groupId") - if (metadata.state != "Dead" && metadata.state != "Empty" && metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE) - throw new IllegalArgumentException(s"Consumer Group $groupId with protocol type '${metadata.protocolType}' is not a valid consumer group") + metadata + } + + def describeConsumerGroup(groupId: String, timeoutMs: Long = 0): ConsumerGroupSummary = { + + def isValidConsumerGroupResponse(metadata: DescribeGroupsResponse.GroupMetadata): Boolean = + metadata.error == Errors.NONE && (metadata.state == "Dead" || metadata.state == "Empty" || metadata.protocolType == ConsumerProtocol.PROTOCOL_TYPE) + + val startTime = time.milliseconds + val coordinator = findCoordinator(groupId, timeoutMs) + var metadata = describeConsumerGroupHandler(coordinator, groupId) + + while (!isValidConsumerGroupResponse(metadata) && time.milliseconds - startTime < timeoutMs) { + debug(s"The consumer group response for group '$groupId' is invalid. Retrying the request as the group is initializing ...") + Thread.sleep(retryBackoffMs) + metadata = describeConsumerGroupHandler(coordinator, groupId) + } + + if (!isValidConsumerGroupResponse(metadata)) + throw new TimeoutException("The consumer group command timed out while waiting for group to initialize") - metadata.error.maybeThrow() val consumers = metadata.members.asScala.map { consumer => ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.state match { case "Stable" => @@ -204,6 +234,7 @@ object AdminClient { val DefaultSendBufferBytes = 128 * 1024 val DefaultReceiveBufferBytes = 32 * 1024 val DefaultRetryBackoffMs = 100 + val AdminClientIdSequence = new AtomicInteger(1) val AdminConfigDef = { val config = new ConfigDef() @@ -274,6 +305,7 @@ object AdminClient { new AdminClient( time, DefaultRequestTimeoutMs, + DefaultRetryBackoffMs, highLevelClient, bootstrapCluster.nodes.asScala.toList) } http://git-wip-us.apache.org/repos/asf/kafka/blob/573a6f39/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 11f4f89..caad62a 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -396,7 +396,7 @@ object ConsumerGroupCommand extends Logging { } protected def collectGroupAssignment(group: String): (Option[String], Option[Seq[PartitionAssignmentState]]) = { - val consumerGroupSummary = adminClient.describeConsumerGroup(group) + val consumerGroupSummary = adminClient.describeConsumerGroup(group, opts.options.valueOf(opts.timeoutMsOpt)) (Some(consumerGroupSummary.state), consumerGroupSummary.consumers match { case None => @@ -502,7 +502,11 @@ object ConsumerGroupCommand extends Logging { "for every consumer group. For instance --topic t1" + nl + "WARNING: Group deletion only works for old ZK-based consumer groups, and one has to use it carefully to only delete groups that are not active." val NewConsumerDoc = "Use new consumer. This is the default." + val TimeoutMsDoc = "The timeout that can be set for some use cases. For example, it can be used when describing the group " + + "to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, " + + "or is going through some changes)." val CommandConfigDoc = "Property file containing configs to be passed to Admin Client and Consumer." + val parser = new OptionParser val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc) .withRequiredArg @@ -524,6 +528,11 @@ object ConsumerGroupCommand extends Logging { val describeOpt = parser.accepts("describe", DescribeDoc) val deleteOpt = parser.accepts("delete", DeleteDoc) val newConsumerOpt = parser.accepts("new-consumer", NewConsumerDoc) + val timeoutMsOpt = parser.accepts("timeout", TimeoutMsDoc) + .withRequiredArg + .describedAs("timeout (ms)") + .ofType(classOf[Long]) + .defaultsTo(5000) val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc) .withRequiredArg .describedAs("command config property file") @@ -531,11 +540,15 @@ object ConsumerGroupCommand extends Logging { val options = parser.parse(args : _*) val useOldConsumer = options.has(zkConnectOpt) + val describeOptPresent = options.has(describeOpt) val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt) def checkArgs() { // check required args + if (options.has(timeoutMsOpt) && (!describeOptPresent || useOldConsumer)) + debug(s"Option '$timeoutMsOpt' is applicable only when both '$bootstrapServerOpt' and '$describeOpt' are used.") + if (useOldConsumer) { if (options.has(bootstrapServerOpt)) CommandLineUtils.printUsageAndDie(parser, s"Option '$bootstrapServerOpt' is not valid with '$zkConnectOpt'.") @@ -550,7 +563,7 @@ object ConsumerGroupCommand extends Logging { "committed offset for that group expires.") } - if (options.has(describeOpt)) + if (describeOptPresent) CommandLineUtils.checkRequiredArgs(parser, options, groupOpt) if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt)) CommandLineUtils.printUsageAndDie(parser, "Option %s either takes %s, %s, or both".format(deleteOpt, groupOpt, topicOpt)) http://git-wip-us.apache.org/repos/asf/kafka/blob/573a6f39/core/src/main/scala/kafka/tools/StreamsResetter.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index a61c092..83166cd 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -91,7 +91,7 @@ public class StreamsResetter { adminClient = AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption)); final String groupId = options.valueOf(applicationIdOption); - if (!adminClient.describeConsumerGroup(groupId).consumers().get().isEmpty()) { + if (!adminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) { throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " + "Make sure to stop all running application instances before running the reset tool."); } http://git-wip-us.apache.org/repos/asf/kafka/blob/573a6f39/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala index 8e10a87..905d113 100644 --- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala @@ -23,6 +23,7 @@ import java.util.Collections import java.util.Properties import org.easymock.EasyMock +import org.junit.Assert._ import org.junit.Before import org.junit.Test @@ -35,10 +36,11 @@ import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException +import org.apache.kafka.common.errors.TimeoutException import org.apache.kafka.common.errors.WakeupException import org.apache.kafka.common.serialization.StringDeserializer -import org.apache.kafka.clients.consumer.KafkaConsumer class DescribeConsumerGroupTest extends KafkaServerTestHarness { @@ -179,21 +181,8 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = new KafkaConsumerGroupService(opts) - TestUtils.waitUntilTrue(() => { - try { - val (state, assignments) = consumerGroupCommand.describeGroup() - println(state == Some("Dead") && assignments == Some(List())) - state == Some("Dead") && assignments == Some(List()) - } catch { - case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException => - // Do nothing while the group initializes - false - case e: Throwable => - e.printStackTrace() - throw e - } - }, "Expected the state to be 'Dead' with no members in the group.") - + val (state, assignments) = consumerGroupCommand.describeGroup() + assertTrue("Expected the state to be 'Dead' with no members in the group.", state == Some("Dead") && assignments == Some(List())) consumerGroupCommand.close() } @@ -207,21 +196,13 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { val consumerGroupCommand = new KafkaConsumerGroupService(opts) TestUtils.waitUntilTrue(() => { - try { - val (state, assignments) = consumerGroupCommand.describeGroup() - state == Some("Stable") && - assignments.isDefined && - assignments.get.count(_.group == group) == 1 && - assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) && - assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) && - assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) - } catch { - case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException => - // Do nothing while the group initializes - false - case e: Throwable => - throw e - } + val (state, assignments) = consumerGroupCommand.describeGroup() + state == Some("Stable") && + assignments.isDefined && + assignments.get.count(_.group == group) == 1 && + assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) && + assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) && + assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) }, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe group results.") consumerGroupCommand.close() @@ -237,40 +218,24 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { val consumerGroupCommand = new KafkaConsumerGroupService(opts) TestUtils.waitUntilTrue(() => { - try { - val (state, _) = consumerGroupCommand.describeGroup() - state == Some("Stable") - } catch { - case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException => - // Do nothing while the group initializes - false - case e: Throwable => - throw e - } + val (state, _) = consumerGroupCommand.describeGroup() + state == Some("Stable") }, "Expected the group to initially become stable.") // stop the consumer so the group has no active member anymore executor.shutdown() TestUtils.waitUntilTrue(() => { - try { - val (state, assignments) = consumerGroupCommand.describeGroup() - state == Some("Empty") && - assignments.isDefined && - assignments.get.count(_.group == group) == 1 && - assignments.get.filter(_.group == group).head.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone - assignments.get.filter(_.group == group).head.clientId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && - assignments.get.filter(_.group == group).head.host.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) - } catch { - case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException => - // Do nothing while the group initializes - false - case e: Throwable => - throw e - } finally { - consumerGroupCommand.close() - } + val (state, assignments) = consumerGroupCommand.describeGroup() + state == Some("Empty") && + assignments.isDefined && + assignments.get.count(_.group == group) == 1 && + assignments.get.filter(_.group == group).head.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone + assignments.get.filter(_.group == group).head.clientId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && + assignments.get.filter(_.group == group).head.host.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) }, "Expected no active member in describe group results.") + + consumerGroupCommand.close() } @Test @@ -283,20 +248,12 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { val consumerGroupCommand = new KafkaConsumerGroupService(opts) TestUtils.waitUntilTrue(() => { - try { - val (state, assignments) = consumerGroupCommand.describeGroup() - state == Some("Stable") && - assignments.isDefined && - assignments.get.count(_.group == group) == 2 && - assignments.get.count{ x => x.group == group && x.partition.isDefined} == 1 && - assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 1 - } catch { - case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException => - // Do nothing while the group initializes - false - case e: Throwable => - throw e - } + val (state, assignments) = consumerGroupCommand.describeGroup() + state == Some("Stable") && + assignments.isDefined && + assignments.get.count(_.group == group) == 2 && + assignments.get.count{ x => x.group == group && x.partition.isDefined} == 1 && + assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 1 }, "Expected rows for consumers with no assigned partitions in describe group results.") consumerGroupCommand.close() @@ -315,24 +272,40 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { val consumerGroupCommand = new KafkaConsumerGroupService(opts) TestUtils.waitUntilTrue(() => { - try { val (state, assignments) = consumerGroupCommand.describeGroup() state == Some("Stable") && assignments.isDefined && assignments.get.count(_.group == group) == 2 && assignments.get.count{ x => x.group == group && x.partition.isDefined} == 2 && assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 0 - } catch { - case _: GroupCoordinatorNotAvailableException | _: IllegalArgumentException => - // Do nothing while the group initializes - false - case e: Throwable => - throw e - } }, "Expected two rows (one row per consumer) in describe group results.") consumerGroupCommand.close() } + + @Test + def testDescribeGroupWithNewConsumerWithShortInitializationTimeout() { + // run one consumer in the group consuming from a single-partition topic + val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic) + + // set the group initialization timeout too low for the group to stabilize + val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "group", "--timeout", "10") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + try { + val (state, assignments) = consumerGroupCommand.describeGroup() + fail("The consumer group command should fail due to low initialization timeout") + } catch { + case e: TimeoutException => + // OK + case e: Throwable => + fail("An unexpected exception occurred: " + e.getMessage) + throw e + } finally { + consumerGroupCommand.close() + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/573a6f39/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index 3248b2a..4804bfb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -421,7 +421,7 @@ public class ResetIntegrationTest { private class WaitUntilConsumerGroupGotClosed implements TestCondition { @Override public boolean conditionMet() { - return adminClient.describeConsumerGroup(APP_ID + testNo).consumers().get().isEmpty(); + return adminClient.describeConsumerGroup(APP_ID + testNo, 0).consumers().get().isEmpty(); } }
