Repository: kafka Updated Branches: refs/heads/0.11.0 85d2ce23c -> 08c80c6d4
KAFKA-5266; Follow-up improvements for consumer offset reset tool (KIP-122) Implement improvements defined here: https://issues.apache.org/jira/browse/KAFKA-5266 Author: Jorge Quilcate Otoya <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #3102 from jeqo/feature/KAFKA-5266 (cherry picked from commit ef9551297c815a0ac3065a65a0831863090714f0) Signed-off-by: Jason Gustafson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/08c80c6d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/08c80c6d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/08c80c6d Branch: refs/heads/0.11.0 Commit: 08c80c6d4859a25be3c72f1dbefb4bf2734fcd17 Parents: 85d2ce2 Author: Jorge Quilcate Otoya <[email protected]> Authored: Wed May 31 00:42:43 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Wed May 31 00:52:47 2017 -0700 ---------------------------------------------------------------------- .../kafka/admin/ConsumerGroupCommand.scala | 88 ++++++++++++-------- .../admin/ResetConsumerGroupOffsetTest.scala | 33 ++++++-- 2 files changed, 80 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/08c80c6d/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index fb589a2..2f26f57 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -106,8 +106,7 @@ object ConsumerGroupCommand extends Logging { } else if (opts.options.has(opts.resetOffsetsOpt)) { val offsetsToReset = consumerGroupService.resetOffsets() - val export = opts.options.has(opts.exportOpt) - if (export) { + if (opts.options.has(opts.exportOpt)) { val exported = consumerGroupService.exportOffsetsToReset(offsetsToReset) println(exported) } else @@ -523,7 +522,7 @@ object ConsumerGroupCommand extends Logging { val groupId = opts.options.valueOf(opts.groupOpt) val consumerGroupSummary = adminClient.describeConsumerGroup(groupId, opts.options.valueOf(opts.timeoutMsOpt)) consumerGroupSummary.state match { - case "Empty" => + case "Empty" | "Dead" => val partitionsToReset = getPartitionsToReset(groupId) val preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset) val execute = opts.options.has(opts.executeOpt) @@ -536,7 +535,7 @@ object ConsumerGroupCommand extends Logging { } } - private def parseTopicPartitionsToReset(topicArgs: Seq[String]): Iterable[TopicPartition] = topicArgs.flatMap { + private def parseTopicPartitionsToReset(topicArgs: Seq[String]): Seq[TopicPartition] = topicArgs.flatMap { case topicArg if topicArg.contains(":") => val topicAndPartitions = topicArg.split(":") val topic = topicAndPartitions(0) @@ -545,14 +544,18 @@ object ConsumerGroupCommand extends Logging { .map(partitionInfo => new TopicPartition(topic, partitionInfo.partition)) } - private def getPartitionsToReset(groupId: String): Iterable[TopicPartition] = { + private def getPartitionsToReset(groupId: String): Seq[TopicPartition] = { if (opts.options.has(opts.allTopicsOpt)) { - adminClient.listGroupOffsets(groupId).keys + val allTopicPartitions = adminClient.listGroupOffsets(groupId).keys.toSeq + allTopicPartitions } else if (opts.options.has(opts.topicOpt)) { val topics = opts.options.valuesOf(opts.topicOpt).asScala parseTopicPartitionsToReset(topics) } else { - CommandLineUtils.printUsageAndDie(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic.") + if (opts.options.has(opts.resetFromFileOpt)) + Nil + else + CommandLineUtils.printUsageAndDie(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic.") } } @@ -570,20 +573,22 @@ object ConsumerGroupCommand extends Logging { if (opts.options.has(opts.resetToOffsetOpt)) { val offset = opts.options.valueOf(opts.resetToOffsetOpt) partitionsToReset.map { - topicPartition => (topicPartition, new OffsetAndMetadata(offset)) + topicPartition => + val newOffset: Long = checkOffsetRange(topicPartition, offset) + (topicPartition, new OffsetAndMetadata(newOffset)) }.toMap } else if (opts.options.has(opts.resetToEarliestOpt)) { partitionsToReset.map { topicPartition => getLogStartOffset(topicPartition) match { case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset)) - case _ => null + case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting starting offset of topic partition: $topicPartition") } }.toMap } else if (opts.options.has(opts.resetToLatestOpt)) { partitionsToReset.map { topicPartition => getLogEndOffset(topicPartition) match { case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset)) - case _ => null + case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of topic partition: $topicPartition") } }.toMap } else if (opts.options.has(opts.resetShiftByOpt)) { @@ -592,21 +597,8 @@ object ConsumerGroupCommand extends Logging { val shiftBy = opts.options.valueOf(opts.resetShiftByOpt) val currentOffset = currentCommittedOffsets.getOrElse(topicPartition, throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition since there is no current committed offset")) - val shiftedOffset = currentOffset + shiftBy - val newOffset = getLogEndOffset(topicPartition) match { - case LogOffsetResult.LogOffset(endOffset) if shiftedOffset > endOffset => - warn(s"New offset ($shiftedOffset) is higher than latest offset. Value will be set to $endOffset") - endOffset - - case _ => getLogStartOffset(topicPartition) match { - case LogOffsetResult.LogOffset(startOffset) if shiftedOffset < startOffset => - warn(s"New offset ($shiftedOffset) is lower than earliest offset. Value will be set to $startOffset") - startOffset - - case _ => shiftedOffset - } - } + val newOffset: Long = checkOffsetRange(topicPartition, shiftedOffset) (topicPartition, new OffsetAndMetadata(newOffset)) }.toMap } else if (opts.options.has(opts.resetToDatetimeOpt)) { @@ -615,7 +607,7 @@ object ConsumerGroupCommand extends Logging { val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp) logTimestampOffset match { case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset)) - case _ => null + case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition") } }.toMap } else if (opts.options.has(opts.resetByDurationOpt)) { @@ -628,25 +620,47 @@ object ConsumerGroupCommand extends Logging { val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp) logTimestampOffset match { case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset)) - case _ => null + case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition") } }.toMap } else if (opts.options.has(opts.resetFromFileOpt)) { val resetPlanPath = opts.options.valueOf(opts.resetFromFileOpt) val resetPlanCsv = Utils.readFileAsString(resetPlanPath) val resetPlan = parseResetPlan(resetPlanCsv) - partitionsToReset.map { topicPartition => - if (resetPlan.keySet.contains(topicPartition)) - (topicPartition, resetPlan(topicPartition)) - else null + resetPlan.keySet.map { topicPartition => + val newOffset: Long = checkOffsetRange(topicPartition, resetPlan(topicPartition).offset()) + (topicPartition, new OffsetAndMetadata(newOffset)) }.toMap - } else { + } else if (opts.options.has(opts.resetToCurrentOpt)) { val currentCommittedOffsets = adminClient.listGroupOffsets(groupId) partitionsToReset.map { topicPartition => currentCommittedOffsets.get(topicPartition).map { offset => (topicPartition, new OffsetAndMetadata(offset)) - }.orNull + }.getOrElse( + getLogEndOffset(topicPartition) match { + case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset)) + case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of topic partition: $topicPartition") + } + ) }.toMap + } else { + CommandLineUtils.printUsageAndDie(opts.parser, "Option '%s' requires one of the following scenarios: %s".format(opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts) ) + } + } + + private def checkOffsetRange(topicPartition: TopicPartition, offset: Long) = { + getLogEndOffset(topicPartition) match { + case LogOffsetResult.LogOffset(endOffset) if offset > endOffset => + warn(s"New offset ($offset) is higher than latest offset. Value will be set to $endOffset") + endOffset + + case _ => getLogStartOffset(topicPartition) match { + case LogOffsetResult.LogOffset(startOffset) if offset < startOffset => + warn(s"New offset ($offset) is lower than earliest offset. Value will be set to $startOffset") + startOffset + + case _ => offset + } } } @@ -701,8 +715,8 @@ object ConsumerGroupCommand extends Logging { val CommandConfigDoc = "Property file containing configs to be passed to Admin Client and Consumer." val ResetOffsetsDoc = "Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive" + nl + "Has 3 execution options: (default) to plan which offsets to reset, --execute to execute the reset-offsets process, and --export to export the results to a CSV format." + nl + - "Has the following scenarios to choose: --to-datetime, --by-period, --to-earliest, --to-latest, --shift-by, --from-file. And by default it resets to current offset." + nl + - "To define the scope use: --all-topics or --topic" + "Has the following scenarios to choose: --to-datetime, --by-period, --to-earliest, --to-latest, --shift-by, --from-file, --to-current. One scenario must be choose" + nl + + "To define the scope use: --all-topics or --topic. . One scope must be choose, unless you use '--from-file' scenario" val ExecuteDoc = "Execute operation. Supported operations: reset-offsets." val ExportDoc = "Export operation execution to a CSV file. Supported operations: reset-offsets." val ResetToOffsetDoc = "Reset offsets to a specific offset." @@ -711,6 +725,7 @@ object ConsumerGroupCommand extends Logging { val ResetByDurationDoc = "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'" val ResetToEarliestDoc = "Reset offsets to earliest offset." val ResetToLatestDoc = "Reset offsets to latest offset." + val ResetToCurrentDoc = "Reset offsets to current offset." val ResetShiftByDoc = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative" val parser = new OptionParser(false) @@ -765,6 +780,7 @@ object ConsumerGroupCommand extends Logging { .ofType(classOf[String]) val resetToEarliestOpt = parser.accepts("to-earliest", ResetToEarliestDoc) val resetToLatestOpt = parser.accepts("to-latest", ResetToLatestDoc) + val resetToCurrentOpt = parser.accepts("to-current", ResetToCurrentDoc) val resetShiftByOpt = parser.accepts("shift-by", ResetShiftByDoc) .withRequiredArg() .describedAs("number-of-offsets") @@ -776,7 +792,7 @@ object ConsumerGroupCommand extends Logging { val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt, resetOffsetsOpt) val allResetOffsetScenarioOpts: Set[OptionSpec[_]] = Set(resetToOffsetOpt, resetShiftByOpt, - resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetFromFileOpt) + resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt) def checkArgs() { // check required args @@ -808,9 +824,11 @@ object ConsumerGroupCommand extends Logging { CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, allResetOffsetScenarioOpts - resetByDurationOpt) CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt, allResetOffsetScenarioOpts - resetToEarliestOpt) CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, allResetOffsetScenarioOpts - resetToLatestOpt) + CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt, allResetOffsetScenarioOpts - resetToCurrentOpt) CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt, allResetOffsetScenarioOpts - resetShiftByOpt) CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt, allResetOffsetScenarioOpts - resetFromFileOpt) + // check invalid args CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt - resetOffsetsOpt) CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt - resetOffsetsOpt) http://git-wip-us.apache.org/repos/asf/kafka/blob/08c80c6d/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala index 67d03e9..22958a9 100644 --- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala @@ -75,7 +75,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { def testResetOffsetsNotExistingGroup() { createConsumerGroupExecutor(brokerList, 1, group, topic1) - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "missing.group", "--all-topics") + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "missing.group", "--all-topics", "--to-current") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) @@ -87,15 +87,33 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { } @Test + def testResetOffsetsNewConsumerExistingTopic(): Unit = { + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "new.group", "--topic", topic1, "--to-offset", "50", "--execute") + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupCommand = new KafkaConsumerGroupService(opts) + + AdminUtils.createTopic(zkUtils, topic1, 1, 1) + + TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000) + + TestUtils.waitUntilTrue(() => { + val assignmentsToReset = consumerGroupCommand.resetOffsets() + assignmentsToReset.exists({ assignment => assignment._2.offset() == 50 }) + }, "Expected the consumer group to reset to offset 1 (specific offset).") + + printConsumerGroup("new.group") + AdminUtils.deleteTopic(zkUtils, topic1) + consumerGroupCommand.close() + } + + @Test def testResetOffsetsToLocalDateTime() { AdminUtils.createTopic(zkUtils, topic1, 1, 1) val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS") - val checkpoint = new Date() val calendar = Calendar.getInstance() calendar.add(Calendar.DATE, -1) - TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000) val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics") @@ -259,7 +277,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsToCurrentOffset() { - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--execute") + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-current", "--execute") val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupCommand = createConsumerGroupService(opts) @@ -269,7 +287,6 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000) - TestUtils.waitUntilTrue(() => { val assignmentsToReset = consumerGroupCommand.resetOffsets() assignmentsToReset.exists({ assignment => assignment._2.offset() == 100 }) @@ -283,7 +300,6 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { TestUtils.produceMessages(servers, topic, totalMessages, acks = 1, 100 * 1000) val executor = createConsumerGroupExecutor(brokerList, numConsumers, group, topic) - TestUtils.waitUntilTrue(() => { val (_, assignmentsOption) = consumerGroupCommand.describeGroup() assignmentsOption match { @@ -538,6 +554,11 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { ConsumerGroupCommand.main(cgcArgs) } + private def printConsumerGroup(group: String) { + val cgcArgs = Array("--bootstrap-server", brokerList, "--group", group, "--describe") + ConsumerGroupCommand.main(cgcArgs) + } + private def createConsumerGroupExecutor(brokerList: String, numConsumers: Int, groupId: String, topic: String): ConsumerGroupExecutor = { val executor = new ConsumerGroupExecutor(brokerList, numConsumers, groupId, topic) executors += executor
