Repository: kafka Updated Branches: refs/heads/trunk 6910baf54 -> 2181ae768
KAFKA-4743; [KIP-122] Add Reset Consumer Group Offsets tooling Author: Jorge Quilcate <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jason Gustafson <[email protected]> Closes #2624 from jeqo/feature/rewind-consumer-group-offset Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2181ae76 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2181ae76 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2181ae76 Branch: refs/heads/trunk Commit: 2181ae768719a9ae3a929ba875faa89c67edf643 Parents: 6910baf Author: Jorge Quilcate <[email protected]> Authored: Wed May 17 14:24:27 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Wed May 17 14:24:40 2017 -0700 ---------------------------------------------------------------------- .../kafka/admin/ConsumerGroupCommand.scala | 302 +++++++++- .../admin/ResetConsumerGroupOffsetTest.scala | 601 +++++++++++++++++++ 2 files changed, 874 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2181ae76/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index dd7a477..69f0d8a 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -17,7 +17,9 @@ package kafka.admin -import java.util.Properties +import java.text.SimpleDateFormat +import java.util.{Date, Properties} +import javax.xml.datatype.DatatypeFactory import joptsimple.{OptionParser, OptionSpec} import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo} @@ -27,7 +29,7 @@ import kafka.consumer.SimpleConsumer import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNoNodeException import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} import org.apache.kafka.common.errors.BrokerNotAvailableException import org.apache.kafka.common.{KafkaException, Node, TopicPartition} import org.apache.kafka.common.internals.Topic @@ -38,7 +40,7 @@ import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.utils.Utils import scala.collection.JavaConverters._ -import scala.collection.{Set, mutable} +import scala.collection.{Seq, Set, mutable} object ConsumerGroupCommand extends Logging { @@ -46,12 +48,12 @@ object ConsumerGroupCommand extends Logging { val opts = new ConsumerGroupCommandOptions(args) if (args.length == 0) - CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.") + CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.") // should have exactly one action - val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _) + val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt).count(opts.options.has _) if (actions != 1) - CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete") + CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offset") opts.checkArgs() @@ -102,6 +104,15 @@ object ConsumerGroupCommand extends Logging { case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService.") } } + else if (opts.options.has(opts.resetOffsetsOpt)) { + val offsetsToReset = consumerGroupService.resetOffsets() + val export = opts.options.has(opts.exportOpt) + if (export) { + val exported = consumerGroupService.exportOffsetsToReset(offsetsToReset) + println(exported) + } else + printOffsetsToReset(offsetsToReset) + } } catch { case e: Throwable => printError(s"Executing consumer group command failed due to ${e.getMessage}", Some(e)) @@ -134,6 +145,20 @@ object ConsumerGroupCommand extends Logging { } } + def printOffsetsToReset(groupAssignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): Unit = { + print("\n%-30s %-10s %-15s".format("TOPIC", "PARTITION", "NEW-OFFSET")) + println() + + groupAssignmentsToReset.foreach { + case (consumerAssignment, offsetAndMetadata) => + print("%-30s %-10s %-15s".format( + consumerAssignment.topic(), + consumerAssignment.partition(), + offsetAndMetadata.offset())) + println() + } + } + protected case class PartitionAssignmentState(group: String, coordinator: Option[Node], topic: Option[String], partition: Option[Int], offset: Option[Long], lag: Option[Long], consumerId: Option[String], host: Option[String], @@ -151,7 +176,7 @@ object ConsumerGroupCommand extends Logging { protected def opts: ConsumerGroupCommandOptions - protected def getLogEndOffset(topicPartition: TopicPartition): LogEndOffsetResult + protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult protected def collectGroupAssignment(group: String): (Option[String], Option[Seq[PartitionAssignmentState]]) @@ -195,12 +220,16 @@ object ConsumerGroupCommand extends Logging { clientIdOpt, logEndOffsetOpt) getLogEndOffset(new TopicPartition(topic, partition)) match { - case LogEndOffsetResult.LogEndOffset(logEndOffset) => getDescribePartitionResult(Some(logEndOffset)) - case LogEndOffsetResult.Unknown => getDescribePartitionResult(None) - case LogEndOffsetResult.Ignore => null + case LogOffsetResult.LogOffset(logEndOffset) => getDescribePartitionResult(Some(logEndOffset)) + case LogOffsetResult.Unknown => getDescribePartitionResult(None) + case LogOffsetResult.Ignore => null } } + + def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = throw new UnsupportedOperationException + + def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = throw new UnsupportedOperationException } class ZkConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService { @@ -278,20 +307,20 @@ object ConsumerGroupCommand extends Logging { } } - protected def getLogEndOffset(topicPartition: TopicPartition): LogEndOffsetResult = { + protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult = { zkUtils.getLeaderForPartition(topicPartition.topic, topicPartition.partition) match { - case Some(-1) => LogEndOffsetResult.Unknown + case Some(-1) => LogOffsetResult.Unknown case Some(brokerId) => getZkConsumer(brokerId).map { consumer => val topicAndPartition = TopicAndPartition(topicPartition.topic, topicPartition.partition) val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head consumer.close() - LogEndOffsetResult.LogEndOffset(logEndOffset) - }.getOrElse(LogEndOffsetResult.Ignore) + LogOffsetResult.LogOffset(logEndOffset) + }.getOrElse(LogOffsetResult.Ignore) case None => printError(s"No broker for partition '$topicPartition'") - LogEndOffsetResult.Ignore + LogOffsetResult.Ignore } } @@ -380,7 +409,6 @@ object ConsumerGroupCommand extends Logging { None } } - } class KafkaConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService { @@ -434,12 +462,29 @@ object ConsumerGroupCommand extends Logging { ) } - protected def getLogEndOffset(topicPartition: TopicPartition): LogEndOffsetResult = { + protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult = { + val consumer = getConsumer() + val offsets = consumer.endOffsets(List(topicPartition).asJava) + val logStartOffset = offsets.get(topicPartition) + LogOffsetResult.LogOffset(logStartOffset) + } + + protected def getLogStartOffset(topicPartition: TopicPartition): LogOffsetResult = { + val consumer = getConsumer() + val offsets = consumer.beginningOffsets(List(topicPartition).asJava) + val logStartOffset = offsets.get(topicPartition) + LogOffsetResult.LogOffset(logStartOffset) + } + + protected def getLogTimestampOffset(topicPartition: TopicPartition, timestamp: java.lang.Long): LogOffsetResult = { val consumer = getConsumer() consumer.assign(List(topicPartition).asJava) - consumer.seekToEnd(List(topicPartition).asJava) - val logEndOffset = consumer.position(topicPartition) - LogEndOffsetResult.LogEndOffset(logEndOffset) + val offsetsForTimes = consumer.offsetsForTimes(Map(topicPartition -> timestamp).asJava) + if (offsetsForTimes != null && !offsetsForTimes.isEmpty) + LogOffsetResult.LogOffset(offsetsForTimes.get(topicPartition).offset) + else { + getLogEndOffset(topicPartition) + } } def close() { @@ -474,14 +519,160 @@ object ConsumerGroupCommand extends Logging { new KafkaConsumer(properties) } + override def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = { + val groupId = opts.options.valueOf(opts.groupOpt) + val consumerGroupSummary = adminClient.describeConsumerGroup(groupId, opts.options.valueOf(opts.timeoutMsOpt)) + consumerGroupSummary.state match { + case "Empty" => + val partitionsToReset = getPartitionsToReset(groupId) + val preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset) + val execute = opts.options.has(opts.executeOpt) + if (execute) + getConsumer().commitSync(preparedOffsets.asJava) + preparedOffsets + case currentState => + printError(s"Assignments can only be reset if the group '$groupId' is inactive, but the current state is $currentState.") + Map.empty + } + } + + private def parseTopicPartitionsToReset(topicArgs: Seq[String]): Iterable[TopicPartition] = topicArgs.flatMap { + case topicArg if topicArg.contains(":") => + val topicAndPartitions = topicArg.split(":") + val topic = topicAndPartitions(0) + topicAndPartitions(1).split(",").map(partition => new TopicPartition(topic, partition.toInt)) + case topic => getConsumer().partitionsFor(topic).asScala + .map(partitionInfo => new TopicPartition(topic, partitionInfo.partition)) + } + + private def getPartitionsToReset(groupId: String): Iterable[TopicPartition] = { + if (opts.options.has(opts.allTopicsOpt)) { + adminClient.listGroupOffsets(groupId).keys + } else if (opts.options.has(opts.topicOpt)) { + val topics = opts.options.valuesOf(opts.topicOpt).asScala + parseTopicPartitionsToReset(topics) + } else { + CommandLineUtils.printUsageAndDie(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic.") + } + } + + private def parseResetPlan(resetPlanCsv: String): Map[TopicPartition, OffsetAndMetadata] = { + resetPlanCsv.split("\n") + .map { line => + val Array(topic, partition, offset) = line.split(",").map(_.trim) + val topicPartition = new TopicPartition(topic, partition.asInstanceOf[Int]) + val offsetAndMetadata = new OffsetAndMetadata(offset.asInstanceOf[Long]) + (topicPartition, offsetAndMetadata) + }.toMap + } + + private def prepareOffsetsToReset(groupId: String, partitionsToReset: Iterable[TopicPartition]): Map[TopicPartition, OffsetAndMetadata] = { + if (opts.options.has(opts.resetToOffsetOpt)) { + val offset = opts.options.valueOf(opts.resetToOffsetOpt) + partitionsToReset.map { + topicPartition => (topicPartition, new OffsetAndMetadata(offset)) + }.toMap + } else if (opts.options.has(opts.resetToEarliestOpt)) { + partitionsToReset.map { topicPartition => + getLogStartOffset(topicPartition) match { + case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset)) + case _ => null + } + }.toMap + } else if (opts.options.has(opts.resetToLatestOpt)) { + partitionsToReset.map { topicPartition => + getLogEndOffset(topicPartition) match { + case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset)) + case _ => null + } + }.toMap + } else if (opts.options.has(opts.resetShiftByOpt)) { + val currentCommittedOffsets = adminClient.listGroupOffsets(groupId) + partitionsToReset.map { topicPartition => + val shiftBy = opts.options.valueOf(opts.resetShiftByOpt) + val currentOffset = currentCommittedOffsets.getOrElse(topicPartition, + throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition since there is no current committed offset")) + + val shiftedOffset = currentOffset + shiftBy + val newOffset = getLogEndOffset(topicPartition) match { + case LogOffsetResult.LogOffset(endOffset) if shiftedOffset > endOffset => + warn(s"New offset ($shiftedOffset) is higher than latest offset. Value will be set to $endOffset") + endOffset + + case _ => getLogStartOffset(topicPartition) match { + case LogOffsetResult.LogOffset(startOffset) if shiftedOffset < startOffset => + warn(s"New offset ($shiftedOffset) is lower than earliest offset. Value will be set to $startOffset") + startOffset + + case _ => shiftedOffset + } + } + (topicPartition, new OffsetAndMetadata(newOffset)) + }.toMap + } else if (opts.options.has(opts.resetToDatetimeOpt)) { + partitionsToReset.map { topicPartition => + val timestamp = getDateTime + val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp) + logTimestampOffset match { + case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset)) + case _ => null + } + }.toMap + } else if (opts.options.has(opts.resetByDurationOpt)) { + partitionsToReset.map { topicPartition => + val duration = opts.options.valueOf(opts.resetByDurationOpt) + val now = new Date() + val durationParsed = DatatypeFactory.newInstance().newDuration(duration) + durationParsed.negate().addTo(now) + val timestamp = now.getTime + val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp) + logTimestampOffset match { + case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset)) + case _ => null + } + }.toMap + } else if (opts.options.has(opts.resetFromFileOpt)) { + val resetPlanPath = opts.options.valueOf(opts.resetFromFileOpt) + val resetPlanCsv = Utils.readFileAsString(resetPlanPath) + val resetPlan = parseResetPlan(resetPlanCsv) + partitionsToReset.map { topicPartition => + if (resetPlan.keySet.contains(topicPartition)) + (topicPartition, resetPlan(topicPartition)) + else null + }.toMap + } else { + val currentCommittedOffsets = adminClient.listGroupOffsets(groupId) + partitionsToReset.map { topicPartition => + currentCommittedOffsets.get(topicPartition).map { offset => + (topicPartition, new OffsetAndMetadata(offset)) + }.orNull + }.toMap + } + } + + private def getDateTime: java.lang.Long = { + val datetime: String = opts.options.valueOf(opts.resetToDatetimeOpt) match { + case ts if ts.split("T")(1).contains("+") || ts.split("T")(1).contains("-") || ts.split("T")(1).contains("Z") => ts.toString + case ts => s"${ts}Z" + } + val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX") + val date = format.parse(datetime) + date.getTime + } + + override def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = { + val rows = assignmentsToReset.map { case (k,v) => s"${k.topic()},${k.partition()},${v.offset()}" }(collection.breakOut): List[String] + rows.foldRight("")(_ + "\n" + _) + } + } - sealed trait LogEndOffsetResult + sealed trait LogOffsetResult - object LogEndOffsetResult { - case class LogEndOffset(value: Long) extends LogEndOffsetResult - case object Unknown extends LogEndOffsetResult - case object Ignore extends LogEndOffsetResult + object LogOffsetResult { + case class LogOffset(value: Long) extends LogOffsetResult + case object Unknown extends LogOffsetResult + case object Ignore extends LogOffsetResult } class ConsumerGroupCommandOptions(args: Array[String]) { @@ -489,7 +680,10 @@ object ConsumerGroupCommand extends Logging { "Multiple URLS can be given to allow fail-over." val BootstrapServerDoc = "REQUIRED (unless old consumer is used): The server to connect to." val GroupDoc = "The consumer group we wish to act on." - val TopicDoc = "The topic whose consumer group information should be deleted." + val TopicDoc = "The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. " + + "In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " + + "Reset-offsets also supports multiple topic inputs." + val AllTopicsDoc = "Consider all topics assigned to a group in the `reset-offsets` process." val ListDoc = "List all consumer groups." val DescribeDoc = "Describe consumer group and list offset lag (number of messages not yet processed) related to given group." val nl = System.getProperty("line.separator") @@ -505,6 +699,19 @@ object ConsumerGroupCommand extends Logging { "to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, " + "or is going through some changes)." val CommandConfigDoc = "Property file containing configs to be passed to Admin Client and Consumer." + val ResetOffsetsDoc = "Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive" + nl + + "Has 3 execution options: (default) to plan which offsets to reset, --execute to execute the reset-offsets process, and --export to export the results to a CSV format." + nl + + "Has the following scenarios to choose: --to-datetime, --by-period, --to-earliest, --to-latest, --shift-by, --from-file. And by default it resets to current offset." + nl + + "To define the scope use: --all-topics or --topic" + val ExecuteDoc = "Execute operation. Supported operations: reset-offsets." + val ExportDoc = "Export operation execution to a CSV file. Supported operations: reset-offsets." + val ResetToOffsetDoc = "Reset offsets to a specific offset." + val ResetFromFileDoc = "Reset offsets to values defined in CSV file." + val ResetToDatetimeDoc = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'" + val ResetByDurationDoc = "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'" + val ResetToEarliestDoc = "Reset offsets to earliest offset." + val ResetToLatestDoc = "Reset offsets to latest offset." + val ResetShiftByDoc = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative" val parser = new OptionParser val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc) @@ -523,6 +730,7 @@ object ConsumerGroupCommand extends Logging { .withRequiredArg .describedAs("topic") .ofType(classOf[String]) + val allTopicsOpt = parser.accepts("all-topics", AllTopicsDoc) val listOpt = parser.accepts("list", ListDoc) val describeOpt = parser.accepts("describe", DescribeDoc) val deleteOpt = parser.accepts("delete", DeleteDoc) @@ -536,12 +744,39 @@ object ConsumerGroupCommand extends Logging { .withRequiredArg .describedAs("command config property file") .ofType(classOf[String]) + val resetOffsetsOpt = parser.accepts("reset-offsets", ResetOffsetsDoc) + val executeOpt = parser.accepts("execute", ExecuteDoc) + val exportOpt = parser.accepts("export", ExportDoc) + val resetToOffsetOpt = parser.accepts("to-offset", ResetToOffsetDoc) + .withRequiredArg() + .describedAs("offset") + .ofType(classOf[Long]) + val resetFromFileOpt = parser.accepts("from-file", ResetFromFileDoc) + .withRequiredArg() + .describedAs("path to CSV file") + .ofType(classOf[String]) + val resetToDatetimeOpt = parser.accepts("to-datetime", ResetToDatetimeDoc) + .withRequiredArg() + .describedAs("datetime") + .ofType(classOf[String]) + val resetByDurationOpt = parser.accepts("by-duration", ResetByDurationDoc) + .withRequiredArg() + .describedAs("duration") + .ofType(classOf[String]) + val resetToEarliestOpt = parser.accepts("to-earliest", ResetToEarliestDoc) + val resetToLatestOpt = parser.accepts("to-latest", ResetToLatestDoc) + val resetShiftByOpt = parser.accepts("shift-by", ResetShiftByDoc) + .withRequiredArg() + .describedAs("number-of-offsets") + .ofType(classOf[Long]) val options = parser.parse(args : _*) val useOldConsumer = options.has(zkConnectOpt) val describeOptPresent = options.has(describeOpt) - val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt) + val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt, resetOffsetsOpt) + val allResetOffsetScenarioOpts: Set[OptionSpec[_]] = Set(resetToOffsetOpt, resetShiftByOpt, + resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetFromFileOpt) def checkArgs() { // check required args @@ -566,10 +801,19 @@ object ConsumerGroupCommand extends Logging { CommandLineUtils.checkRequiredArgs(parser, options, groupOpt) if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt)) CommandLineUtils.printUsageAndDie(parser, "Option %s either takes %s, %s, or both".format(deleteOpt, groupOpt, topicOpt)) + if (options.has(resetOffsetsOpt)) + CommandLineUtils.checkRequiredArgs(parser, options, groupOpt) + CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt, allResetOffsetScenarioOpts - resetToOffsetOpt) + CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, allResetOffsetScenarioOpts - resetToDatetimeOpt) + CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, allResetOffsetScenarioOpts - resetByDurationOpt) + CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt, allResetOffsetScenarioOpts - resetToEarliestOpt) + CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, allResetOffsetScenarioOpts - resetToLatestOpt) + CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt, allResetOffsetScenarioOpts - resetShiftByOpt) + CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt, allResetOffsetScenarioOpts - resetFromFileOpt) // check invalid args - CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt) - CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt) + CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt - resetOffsetsOpt) + CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt - resetOffsetsOpt) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/2181ae76/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala new file mode 100644 index 0000000..d58231e --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala @@ -0,0 +1,601 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package unit.kafka.admin + +import java.io.{BufferedWriter, File, FileWriter} +import java.text.SimpleDateFormat +import java.util.concurrent.{ExecutorService, Executors, TimeUnit} +import java.util.{Calendar, Collections, Date, Properties} + +import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, KafkaConsumerGroupService} +import kafka.admin.{AdminUtils, ConsumerGroupCommand} +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.errors.WakeupException +import org.apache.kafka.common.serialization.StringDeserializer +import org.junit.{Before, Test} + +/** + * Test cases by: + * - Non-existing consumer group + * - One for each scenario, with scope=all-topics + * - scope=one topic, scenario=to-earliest + * - scope=one topic+partitions, scenario=to-earliest + * - scope=topics, scenario=to-earliest + * - scope=topics+partitions, scenario=to-earliest + * - export/import + */ +class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { + + val overridingProps = new Properties() + val topic1 = "foo1" + val topic2 = "foo2" + val group = "test.group" + val props = new Properties + + /** + * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every + * test and should not reuse previous configurations unless they select their ports randomly when servers are started. + */ + override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps)) + + @Before + override def setUp() { + super.setUp() + + props.setProperty("group.id", group) + } + + @Test + def testResetOffsetsNotExistingGroup() { + new ConsumerGroupExecutor(brokerList, 1, group, topic1) + + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "missing.group", "--all-topics") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand.resetOffsets() + assignmentsToReset == Map.empty + }, "Expected to have an empty assignations map.") + + consumerGroupCommand.close() + } + + @Test + def testResetOffsetsToLocalDateTime() { + AdminUtils.createTopic(zkUtils, topic1, 1, 1) + + val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS") + val checkpoint = new Date() + val calendar = Calendar.getInstance() + calendar.add(Calendar.DATE, -1) + + + TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000) + + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic1) + + TestUtils.waitUntilTrue(() => { + val (_, assignmentsOption) = consumerGroupCommand.describeGroup() + assignmentsOption match { + case Some(assignments) => + val sumOffset = assignments.filter(_.topic.exists(_ == topic1)) + .filter(_.offset.isDefined) + .map(assignment => assignment.offset.get) + .foldLeft(0.toLong)(_ + _) + sumOffset == 100 + case _ => false + } + }, "Expected that consumer group has consumed all messages from topic/partition.") + + executor.shutdown() + + val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(calendar.getTime), "--execute") + val opts1 = new ConsumerGroupCommandOptions(cgcArgs1) + val consumerGroupCommand1 = new KafkaConsumerGroupService(opts1) + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand1.resetOffsets() + assignmentsToReset.exists { assignment => assignment._2.offset() == 0 } + }, "Expected the consumer group to reset to when offset was 50.") + + printConsumerGroup() + + AdminUtils.deleteTopic(zkUtils, topic1) + consumerGroupCommand.close() + } + + @Test + def testResetOffsetsToZonedDateTime() { + AdminUtils.createTopic(zkUtils, topic1, 1, 1) + TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000) + + val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX") + val checkpoint = new Date() + + TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000) + + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic1) + + TestUtils.waitUntilTrue(() => { + val (_, assignmentsOption) = consumerGroupCommand.describeGroup() + assignmentsOption match { + case Some(assignments) => + val sumOffset = (assignments.filter(_.topic.exists(_ == topic1)) + .filter(_.offset.isDefined) + .map(assignment => assignment.offset.get) foldLeft 0.toLong)(_ + _) + sumOffset == 100 + case _ => false + } + }, "Expected that consumer group has consumed all messages from topic/partition.") + + executor.shutdown() + + val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute") + val opts1 = new ConsumerGroupCommandOptions(cgcArgs1) + val consumerGroupCommand1 = new KafkaConsumerGroupService(opts1) + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand1.resetOffsets() + assignmentsToReset.exists { assignment => assignment._2.offset() == 50 } + }, "Expected the consumer group to reset to when offset was 50.") + + printConsumerGroup() + + AdminUtils.deleteTopic(zkUtils, topic1) + consumerGroupCommand.close() + } + + @Test + def testResetOffsetsByDuration() { + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT1M", "--execute") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + AdminUtils.createTopic(zkUtils, topic1, 1, 1) + + produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100) + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand.resetOffsets() + assignmentsToReset.exists { assignment => assignment._2.offset() == 0 } + }, "Expected the consumer group to reset to offset 0 (earliest by duration).") + + printConsumerGroup() + + AdminUtils.deleteTopic(zkUtils, topic1) + consumerGroupCommand.close() + } + + @Test + def testResetOffsetsByDurationToEarliest() { + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT0.1S", "--execute") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + AdminUtils.createTopic(zkUtils, topic1, 1, 1) + + produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100) + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand.resetOffsets() + assignmentsToReset.exists { assignment => assignment._2.offset() == 100 } + }, "Expected the consumer group to reset to offset 100 (latest by duration).") + + printConsumerGroup() + AdminUtils.deleteTopic(zkUtils, topic1) + consumerGroupCommand.close() + } + + @Test + def testResetOffsetsToEarliest() { + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest", "--execute") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + AdminUtils.createTopic(zkUtils, topic1, 1, 1) + + produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100) + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand.resetOffsets() + assignmentsToReset.exists { assignment => assignment._2.offset() == 0 } + }, "Expected the consumer group to reset to offset 0 (earliest).") + + printConsumerGroup() + AdminUtils.deleteTopic(zkUtils, topic1) + consumerGroupCommand.close() + } + + @Test + def testResetOffsetsToLatest() { + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-latest", "--execute") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + AdminUtils.createTopic(zkUtils, topic1, 1, 1) + + produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100) + + TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000) + + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand.resetOffsets() + assignmentsToReset.exists({ assignment => assignment._2.offset() == 200 }) + }, "Expected the consumer group to reset to offset 200 (latest).") + + printConsumerGroup() + AdminUtils.deleteTopic(zkUtils, topic1) + consumerGroupCommand.close() + } + + @Test + def testResetOffsetsToCurrentOffset() { + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--execute") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + AdminUtils.createTopic(zkUtils, topic1, 1, 1) + + produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100) + + TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000) + + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand.resetOffsets() + assignmentsToReset.exists({ assignment => assignment._2.offset() == 100 }) + }, "Expected the consumer group to reset to offset 100 (current).") + + printConsumerGroup() + AdminUtils.deleteTopic(zkUtils, topic1) + consumerGroupCommand.close() + } + + private def produceConsumeAndShutdown(consumerGroupCommand: KafkaConsumerGroupService, numConsumers: Int = 1, topic: String, totalMessages: Int) { + TestUtils.produceMessages(servers, topic, totalMessages, acks = 1, 100 * 1000) + val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, topic) + + + TestUtils.waitUntilTrue(() => { + val (_, assignmentsOption) = consumerGroupCommand.describeGroup() + assignmentsOption match { + case Some(assignments) => + val sumOffset = assignments.filter(_.topic.exists(_ == topic)) + .filter(_.offset.isDefined) + .map(assignment => assignment.offset.get) + .foldLeft(0.toLong)(_ + _) + sumOffset == totalMessages + case _ => false + } + }, "Expected the consumer group to consume all messages from topic.") + + executor.shutdown() + } + + @Test + def testResetOffsetsToSpecificOffset() { + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-offset", "1", "--execute") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + AdminUtils.createTopic(zkUtils, topic1, 1, 1) + + produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100) + + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand.resetOffsets() + assignmentsToReset.exists({ assignment => assignment._2.offset() == 1 }) + }, "Expected the consumer group to reset to offset 1 (specific offset).") + + printConsumerGroup() + AdminUtils.deleteTopic(zkUtils, topic1) + consumerGroupCommand.close() + } + + @Test + def testResetOffsetsShiftPlus() { + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "50", "--execute") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + AdminUtils.createTopic(zkUtils, topic1, 1, 1) + + produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100) + + TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000) + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand.resetOffsets() + assignmentsToReset.exists({ assignment => assignment._2.offset() == 150 }) + }, "Expected the consumer group to reset to offset 150 (current + 50).") + + printConsumerGroup() + AdminUtils.deleteTopic(zkUtils, topic1) + consumerGroupCommand.close() + } + + @Test + def testResetOffsetsShiftMinus() { + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-50", "--execute") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + AdminUtils.createTopic(zkUtils, topic1, 1, 1) + + produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100) + + TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000) + + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand.resetOffsets() + assignmentsToReset.exists({ assignment => assignment._2.offset() == 50 }) + }, "Expected the consumer group to reset to offset 50 (current - 50).") + + printConsumerGroup() + AdminUtils.deleteTopic(zkUtils, topic1) + consumerGroupCommand.close() + } + + @Test + def testResetOffsetsShiftByLowerThanEarliest() { + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-150", "--execute") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + AdminUtils.createTopic(zkUtils, topic1, 1, 1) + + produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100) + + TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000) + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand.resetOffsets() + assignmentsToReset.exists({ assignment => assignment._2.offset() == 0 }) + }, "Expected the consumer group to reset to offset 0 (earliest by shift).") + + printConsumerGroup() + AdminUtils.deleteTopic(zkUtils, topic1) + consumerGroupCommand.close() + } + + @Test + def testResetOffsetsShiftByHigherThanLatest() { + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "150", "--execute") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + AdminUtils.createTopic(zkUtils, topic1, 1, 1) + + produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100) + + TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000) + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand.resetOffsets() + assignmentsToReset.exists({ assignment => assignment._2.offset() == 200 }) + }, "Expected the consumer group to reset to offset 200 (latest by shift).") + + printConsumerGroup() + AdminUtils.deleteTopic(zkUtils, topic1) + consumerGroupCommand.close() + } + + @Test + def testResetOffsetsToEarliestOnOneTopic() { + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic1, "--to-earliest", "--execute") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + AdminUtils.createTopic(zkUtils, topic1, 1, 1) + + produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100) + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand.resetOffsets() + assignmentsToReset.exists { assignment => assignment._2.offset() == 0 } + }, "Expected the consumer group to reset to offset 0 (earliest).") + + printConsumerGroup() + AdminUtils.deleteTopic(zkUtils, topic1) + consumerGroupCommand.close() + } + + @Test + def testResetOffsetsToEarliestOnOneTopicAndPartition() { + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", String.format("%s:1", topic1), "--to-earliest", "--execute") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + AdminUtils.createTopic(zkUtils, topic1, 2, 1) + + produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100) + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand.resetOffsets() + assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && assignment._1.partition() == 1 } + }, "Expected the consumer group to reset to offset 0 (earliest) in partition 1.") + + printConsumerGroup() + AdminUtils.deleteTopic(zkUtils, topic1) + consumerGroupCommand.close() + } + + @Test + def testResetOffsetsToEarliestOnTopics() { + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", + "--group", group, + "--topic", topic1, + "--topic", topic2, + "--to-earliest", "--execute") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + AdminUtils.createTopic(zkUtils, topic1, 1, 1) + AdminUtils.createTopic(zkUtils, topic2, 1, 1) + + produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100) + produceConsumeAndShutdown(consumerGroupCommand, 1, topic2, 100) + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand.resetOffsets() + assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && assignment._1.topic() == topic1 } && + assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && assignment._1.topic() == topic2 } + }, "Expected the consumer group to reset to offset 0 (earliest).") + + printConsumerGroup() + AdminUtils.deleteTopic(zkUtils, topic1) + AdminUtils.deleteTopic(zkUtils, topic2) + consumerGroupCommand.close() + } + + @Test + def testResetOffsetsToEarliestOnTopicsAndPartitions() { + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", + "--group", group, + "--topic", String.format("%s:1", topic1), + "--topic", String.format("%s:1", topic2), + "--to-earliest", "--execute") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + AdminUtils.createTopic(zkUtils, topic1, 2, 1) + AdminUtils.createTopic(zkUtils, topic2, 2, 1) + + produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100) + produceConsumeAndShutdown(consumerGroupCommand, 2, topic2, 100) + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand.resetOffsets() + assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && assignment._1.partition() == 1 && assignment._1.topic() == topic1 } + assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && assignment._1.partition() == 1 && assignment._1.topic() == topic2 } + }, "Expected the consumer group to reset to offset 0 (earliest) in partition 1.") + + printConsumerGroup() + AdminUtils.deleteTopic(zkUtils, topic1) + AdminUtils.deleteTopic(zkUtils, topic2) + consumerGroupCommand.close() + } + + @Test + def testResetOffsetsExportImportPlan() { + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest", "--export") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + AdminUtils.createTopic(zkUtils, topic1, 2, 1) + + produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100) + + val file = File.createTempFile("reset", ".csv") + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand.resetOffsets() + val bw = new BufferedWriter(new FileWriter(file)) + bw.write(consumerGroupCommand.exportOffsetsToReset(assignmentsToReset)) + bw.close() + assignmentsToReset.exists { assignment => assignment._2.offset() == 0 } && file.exists() + }, "Expected the consume all messages and save reset offsets plan to file") + + + val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest", "--from-file", file.getCanonicalPath) + val optsExec = new ConsumerGroupCommandOptions(cgcArgsExec) + val consumerGroupCommandExec = new KafkaConsumerGroupService(optsExec) + + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommandExec.resetOffsets() + assignmentsToReset.exists { assignment => assignment._2.offset() == 0 } + }, "Expected the consumer group to reset to offset 0 (earliest) by file.") + + file.deleteOnExit() + + printConsumerGroup() + AdminUtils.deleteTopic(zkUtils, topic1) + consumerGroupCommand.close() + } + + private def printConsumerGroup() { + val cgcArgs = Array("--bootstrap-server", brokerList, "--group", group, "--describe") + ConsumerGroupCommand.main(cgcArgs) + } + +} + + +class ConsumerThread(broker: String, id: Int, groupId: String, topic: String) extends Runnable { + val props = new Properties + props.put("bootstrap.servers", broker) + props.put("group.id", groupId) + props.put("key.deserializer", classOf[StringDeserializer].getName) + props.put("value.deserializer", classOf[StringDeserializer].getName) + val consumer = new KafkaConsumer(props) + + def run() { + try { + consumer.subscribe(Collections.singleton(topic)) + while (true) + consumer.poll(Long.MaxValue) + } catch { + case _: WakeupException => // OK + } finally { + consumer.close() + } + } + + def shutdown() { + consumer.wakeup() + } +} + +class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String) { + val executor: ExecutorService = Executors.newFixedThreadPool(numConsumers) + var consumers: List[ConsumerThread] = List[ConsumerThread]() + + for (i <- 1 to numConsumers) { + val consumer = new ConsumerThread(broker, i, groupId, topic) + consumers ++= List(consumer) + executor.submit(consumer) + } + + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run() { + shutdown() + } + }) + + def shutdown() { + consumers.foreach(_.shutdown()) + executor.shutdown() + try { + executor.awaitTermination(5000, TimeUnit.MILLISECONDS) + } catch { + case e: InterruptedException => + e.printStackTrace() + } + } +}
