This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 138f57f KAFKA-6321; Consolidate calls to KafkaConsumer's
`beginningOffsets()` and `endOffsets()` in ConsumerGroupCommand
138f57f is described below
commit 138f57f16068b49a08a2f58155b71e72ff7ed273
Author: Vahid Hashemian <[email protected]>
AuthorDate: Thu Jan 25 20:40:11 2018 -0800
KAFKA-6321; Consolidate calls to KafkaConsumer's `beginningOffsets()` and
`endOffsets()` in ConsumerGroupCommand
Author: Vahid Hashemian <[email protected]>
Reviewers: Ted Yu <[email protected]>, Jiangjie (Becket) Qin
<[email protected]>
Closes #4344 from vahidhashemian/KAFKA-6321
---
.../scala/kafka/admin/ConsumerGroupCommand.scala | 237 ++++++++++++---------
1 file changed, 132 insertions(+), 105 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 4905a94..3aa821c 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -287,7 +287,10 @@ object ConsumerGroupCommand extends Logging {
protected def opts: ConsumerGroupCommandOptions
- protected def getLogEndOffset(topicPartition: TopicPartition):
LogOffsetResult
+ protected def getLogEndOffset(topicPartition: TopicPartition):
LogOffsetResult =
+
getLogEndOffsets(Seq(topicPartition)).get(topicPartition).getOrElse(LogOffsetResult.Ignore)
+
+ protected def getLogEndOffsets(topicPartitions: Seq[TopicPartition]):
Map[TopicPartition, LogOffsetResult]
def collectGroupOffsets(): (Option[String],
Option[Seq[PartitionAssignmentState]])
@@ -302,43 +305,40 @@ object ConsumerGroupCommand extends Logging {
consumerIdOpt: Option[String],
hostOpt: Option[String],
clientIdOpt: Option[String]):
Array[PartitionAssignmentState] = {
- if (topicPartitions.isEmpty)
+ if (topicPartitions.isEmpty) {
Array[PartitionAssignmentState](
PartitionAssignmentState(group, coordinator, None, None, None,
getLag(None, None), consumerIdOpt, hostOpt, clientIdOpt, None)
)
- else {
- var assignmentRows: Array[PartitionAssignmentState] = Array()
- topicPartitions
- .sortBy(_.partition)
- .foreach { topicPartition =>
- assignmentRows = assignmentRows :+ describePartition(group,
coordinator, topicPartition.topic, topicPartition.partition,
getPartitionOffset(topicPartition),
- consumerIdOpt, hostOpt, clientIdOpt)
- }
- assignmentRows
}
+ else
+ describePartitions(group, coordinator,
topicPartitions.sortBy(_.partition), getPartitionOffset, consumerIdOpt,
hostOpt, clientIdOpt)
}
private def getLag(offset: Option[Long], logEndOffset: Option[Long]):
Option[Long] =
offset.filter(_ != -1).flatMap(offset => logEndOffset.map(_ - offset))
- private def describePartition(group: String,
- coordinator: Option[Node],
- topic: String,
- partition: Int,
- offsetOpt: Option[Long],
- consumerIdOpt: Option[String],
- hostOpt: Option[String],
- clientIdOpt: Option[String]):
PartitionAssignmentState = {
- def getDescribePartitionResult(logEndOffsetOpt: Option[Long]):
PartitionAssignmentState =
- PartitionAssignmentState(group, coordinator, Option(topic),
Option(partition), offsetOpt,
- getLag(offsetOpt, logEndOffsetOpt),
consumerIdOpt, hostOpt,
- clientIdOpt, logEndOffsetOpt)
-
- getLogEndOffset(new TopicPartition(topic, partition)) match {
- case LogOffsetResult.LogOffset(logEndOffset) =>
getDescribePartitionResult(Some(logEndOffset))
- case LogOffsetResult.Unknown => getDescribePartitionResult(None)
- case LogOffsetResult.Ignore => null
+ private def describePartitions(group: String,
+ coordinator: Option[Node],
+ topicPartitions: Seq[TopicPartition],
+ getPartitionOffset: TopicPartition =>
Option[Long],
+ consumerIdOpt: Option[String],
+ hostOpt: Option[String],
+ clientIdOpt: Option[String]):
Array[PartitionAssignmentState] = {
+
+ def getDescribePartitionResult(topicPartition: TopicPartition,
logEndOffsetOpt: Option[Long]): PartitionAssignmentState = {
+ val offset = getPartitionOffset(topicPartition)
+ PartitionAssignmentState(group, coordinator,
Option(topicPartition.topic), Option(topicPartition.partition), offset,
+ getLag(offset, logEndOffsetOpt), consumerIdOpt, hostOpt,
clientIdOpt, logEndOffsetOpt)
}
+
+ getLogEndOffsets(topicPartitions).map {
+ logEndOffsetResult =>
+ logEndOffsetResult._2 match {
+ case LogOffsetResult.LogOffset(logEndOffset) =>
getDescribePartitionResult(logEndOffsetResult._1, Some(logEndOffset))
+ case LogOffsetResult.Unknown =>
getDescribePartitionResult(logEndOffsetResult._1, None)
+ case LogOffsetResult.Ignore => null
+ }
+ }.toArray
}
def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = throw new
UnsupportedOperationException
@@ -423,21 +423,23 @@ object ConsumerGroupCommand extends Logging {
}
}
- protected def getLogEndOffset(topicPartition: TopicPartition):
LogOffsetResult = {
- zkUtils.getLeaderForPartition(topicPartition.topic,
topicPartition.partition) match {
- case Some(-1) => LogOffsetResult.Unknown
- case Some(brokerId) =>
- getZkConsumer(brokerId).map { consumer =>
- val topicAndPartition = new TopicAndPartition(topicPartition)
- val request = OffsetRequest(Map(topicAndPartition ->
PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
- val logEndOffset =
consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
- consumer.close()
- LogOffsetResult.LogOffset(logEndOffset)
- }.getOrElse(LogOffsetResult.Ignore)
- case None =>
- printError(s"No broker for partition '$topicPartition'")
- LogOffsetResult.Ignore
- }
+ protected def getLogEndOffsets(topicPartitions: Seq[TopicPartition]):
Map[TopicPartition, LogOffsetResult] = {
+ topicPartitions.map { topicPartition => (topicPartition,
+ zkUtils.getLeaderForPartition(topicPartition.topic,
topicPartition.partition) match {
+ case Some(-1) => LogOffsetResult.Unknown
+ case Some(brokerId) =>
+ getZkConsumer(brokerId).map { consumer =>
+ val topicAndPartition = new TopicAndPartition(topicPartition)
+ val request = OffsetRequest(Map(topicAndPartition ->
PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
+ val logEndOffset =
consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+ consumer.close()
+ LogOffsetResult.LogOffset(logEndOffset)
+ }.getOrElse(LogOffsetResult.Ignore)
+ case None =>
+ printError(s"No broker for partition '$topicPartition'")
+ LogOffsetResult.Ignore
+ }
+ )}.toMap
}
private def getPartitionOffsets(group: String,
@@ -596,27 +598,34 @@ object ConsumerGroupCommand extends Logging {
consumerGroupSummary.state, consumerGroupSummary.consumers.get.size)
}
- protected def getLogEndOffset(topicPartition: TopicPartition):
LogOffsetResult = {
- val offsets = getConsumer.endOffsets(List(topicPartition).asJava)
- val logStartOffset = offsets.get(topicPartition)
- LogOffsetResult.LogOffset(logStartOffset)
+ protected def getLogEndOffsets(topicPartitions: Seq[TopicPartition]):
Map[TopicPartition, LogOffsetResult] = {
+ val offsets = getConsumer.endOffsets(topicPartitions.asJava)
+ topicPartitions.map { topicPartition =>
+ val logEndOffset = offsets.get(topicPartition)
+ topicPartition -> LogOffsetResult.LogOffset(logEndOffset)
+ }.toMap
}
- protected def getLogStartOffset(topicPartition: TopicPartition):
LogOffsetResult = {
- val offsets = getConsumer.beginningOffsets(List(topicPartition).asJava)
- val logStartOffset = offsets.get(topicPartition)
- LogOffsetResult.LogOffset(logStartOffset)
+ protected def getLogStartOffsets(topicPartitions: Seq[TopicPartition]):
Map[TopicPartition, LogOffsetResult] = {
+ val offsets = getConsumer.beginningOffsets(topicPartitions.asJava)
+ topicPartitions.map { topicPartition =>
+ val logStartOffset = offsets.get(topicPartition)
+ topicPartition -> LogOffsetResult.LogOffset(logStartOffset)
+ }.toMap
}
- protected def getLogTimestampOffset(topicPartition: TopicPartition,
timestamp: java.lang.Long): LogOffsetResult = {
+ protected def getLogTimestampOffsets(topicPartitions: Seq[TopicPartition],
timestamp: java.lang.Long): Map[TopicPartition, LogOffsetResult] = {
val consumer = getConsumer
- consumer.assign(List(topicPartition).asJava)
- val offsetsForTimes = consumer.offsetsForTimes(Map(topicPartition ->
timestamp).asJava)
- if (offsetsForTimes != null && !offsetsForTimes.isEmpty &&
offsetsForTimes.get(topicPartition) != null)
- LogOffsetResult.LogOffset(offsetsForTimes.get(topicPartition).offset)
- else {
- getLogEndOffset(topicPartition)
- }
+ consumer.assign(topicPartitions.asJava)
+
+ val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) =
+ consumer.offsetsForTimes(topicPartitions.map(_ ->
timestamp).toMap.asJava).asScala.partition(_._2 != null)
+
+ val successfulLogTimestampOffsets = successfulOffsetsForTimes.map {
+ case (topicPartition, offsetAndTimestamp) => topicPartition ->
LogOffsetResult.LogOffset(offsetAndTimestamp.offset)
+ }.toMap
+
+ successfulLogTimestampOffsets ++
getLogEndOffsets(unsuccessfulOffsetsForTimes.keySet.toSeq)
}
def close() {
@@ -703,57 +712,60 @@ object ConsumerGroupCommand extends Logging {
}.toMap
}
- private def prepareOffsetsToReset(groupId: String, partitionsToReset:
Iterable[TopicPartition]): Map[TopicPartition, OffsetAndMetadata] = {
+ private def prepareOffsetsToReset(groupId: String, partitionsToReset:
Seq[TopicPartition]): Map[TopicPartition, OffsetAndMetadata] = {
if (opts.options.has(opts.resetToOffsetOpt)) {
val offset = opts.options.valueOf(opts.resetToOffsetOpt)
- partitionsToReset.map {
- topicPartition =>
- val newOffset: Long = checkOffsetRange(topicPartition, offset)
- (topicPartition, new OffsetAndMetadata(newOffset))
- }.toMap
+ checkOffsetsRange(partitionsToReset.map((_, offset)).toMap).map {
+ case (topicPartition, newOffset) => (topicPartition, new
OffsetAndMetadata(newOffset))
+ }
} else if (opts.options.has(opts.resetToEarliestOpt)) {
+ val logStartOffsets = getLogStartOffsets(partitionsToReset)
partitionsToReset.map { topicPartition =>
- getLogStartOffset(topicPartition) match {
- case LogOffsetResult.LogOffset(offset) => (topicPartition, new
OffsetAndMetadata(offset))
+ logStartOffsets.get(topicPartition) match {
+ case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition,
new OffsetAndMetadata(offset))
case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error
getting starting offset of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetToLatestOpt)) {
+ val logEndOffsets = getLogEndOffsets(partitionsToReset)
partitionsToReset.map { topicPartition =>
- getLogEndOffset(topicPartition) match {
- case LogOffsetResult.LogOffset(offset) => (topicPartition, new
OffsetAndMetadata(offset))
+ logEndOffsets.get(topicPartition) match {
+ case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition,
new OffsetAndMetadata(offset))
case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error
getting ending offset of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetShiftByOpt)) {
val currentCommittedOffsets = adminClient.listGroupOffsets(groupId)
- partitionsToReset.map { topicPartition =>
+ val requestedOffsets = partitionsToReset.map { topicPartition =>
val shiftBy = opts.options.valueOf(opts.resetShiftByOpt)
val currentOffset = currentCommittedOffsets.getOrElse(topicPartition,
throw new IllegalArgumentException(s"Cannot shift offset for
partition $topicPartition since there is no current committed offset"))
- val shiftedOffset = currentOffset + shiftBy
- val newOffset: Long = checkOffsetRange(topicPartition, shiftedOffset)
- (topicPartition, new OffsetAndMetadata(newOffset))
+ (topicPartition, currentOffset + shiftBy)
}.toMap
+ checkOffsetsRange(requestedOffsets).map {
+ case (topicPartition, newOffset) => (topicPartition, new
OffsetAndMetadata(newOffset))
+ }
} else if (opts.options.has(opts.resetToDatetimeOpt)) {
val timestamp =
convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt))
+ val logTimestampOffsets = getLogTimestampOffsets(partitionsToReset,
timestamp)
partitionsToReset.map { topicPartition =>
- val logTimestampOffset = getLogTimestampOffset(topicPartition,
timestamp)
+ val logTimestampOffset = logTimestampOffsets.get(topicPartition)
logTimestampOffset match {
- case LogOffsetResult.LogOffset(offset) => (topicPartition, new
OffsetAndMetadata(offset))
+ case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition,
new OffsetAndMetadata(offset))
case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error
getting offset by timestamp of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetByDurationOpt)) {
+ val duration = opts.options.valueOf(opts.resetByDurationOpt)
+ val durationParsed =
DatatypeFactory.newInstance().newDuration(duration)
+ val now = new Date()
+ durationParsed.negate().addTo(now)
+ val timestamp = now.getTime
+ val logTimestampOffsets = getLogTimestampOffsets(partitionsToReset,
timestamp)
partitionsToReset.map { topicPartition =>
- val duration = opts.options.valueOf(opts.resetByDurationOpt)
- val now = new Date()
- val durationParsed =
DatatypeFactory.newInstance().newDuration(duration)
- durationParsed.negate().addTo(now)
- val timestamp = now.getTime
- val logTimestampOffset = getLogTimestampOffset(topicPartition,
timestamp)
+ val logTimestampOffset = logTimestampOffsets.get(topicPartition)
logTimestampOffset match {
- case LogOffsetResult.LogOffset(offset) => (topicPartition, new
OffsetAndMetadata(offset))
+ case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition,
new OffsetAndMetadata(offset))
case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error
getting offset by timestamp of topic partition: $topicPartition")
}
}.toMap
@@ -761,40 +773,55 @@ object ConsumerGroupCommand extends Logging {
val resetPlanPath = opts.options.valueOf(opts.resetFromFileOpt)
val resetPlanCsv = Utils.readFileAsString(resetPlanPath)
val resetPlan = parseResetPlan(resetPlanCsv)
- resetPlan.keySet.map { topicPartition =>
- val newOffset: Long = checkOffsetRange(topicPartition,
resetPlan(topicPartition).offset())
- (topicPartition, new OffsetAndMetadata(newOffset))
+ val requestedOffsets = resetPlan.keySet.map { topicPartition =>
+ (topicPartition, resetPlan(topicPartition).offset())
}.toMap
+ checkOffsetsRange(requestedOffsets).map {
+ case (topicPartition, newOffset) => (topicPartition, new
OffsetAndMetadata(newOffset))
+ }
} else if (opts.options.has(opts.resetToCurrentOpt)) {
val currentCommittedOffsets = adminClient.listGroupOffsets(groupId)
- partitionsToReset.map { topicPartition =>
- currentCommittedOffsets.get(topicPartition).map { offset =>
- (topicPartition, new OffsetAndMetadata(offset))
- }.getOrElse(
- getLogEndOffset(topicPartition) match {
- case LogOffsetResult.LogOffset(offset) => (topicPartition, new
OffsetAndMetadata(offset))
- case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error
getting ending offset of topic partition: $topicPartition")
- }
- )
+ val (partitionsToResetWithCommittedOffset,
partitionsToResetWithoutCommittedOffset) =
+
partitionsToReset.partition(currentCommittedOffsets.keySet.contains(_))
+
+ val preparedOffsetsForParititionsWithCommittedOffset =
partitionsToResetWithCommittedOffset.map { topicPartition =>
+ (topicPartition, new
OffsetAndMetadata(currentCommittedOffsets.get(topicPartition) match {
+ case Some(offset) => offset
+ case _ => throw new IllegalStateException(s"Expected a valid
current offset for topic partition: $topicPartition")
+ }))
}.toMap
+
+ val preparedOffsetsForPartitionsWithoutCommittedOffset =
getLogEndOffsets(partitionsToResetWithoutCommittedOffset).map {
+ case (topicPartition, LogOffsetResult.LogOffset(offset)) =>
(topicPartition, new OffsetAndMetadata(offset))
+ case (topicPartition, _) =>
CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of
topic partition: $topicPartition")
+ }
+
+ preparedOffsetsForParititionsWithCommittedOffset ++
preparedOffsetsForPartitionsWithoutCommittedOffset
} else {
CommandLineUtils.printUsageAndDie(opts.parser, "Option '%s' requires
one of the following scenarios: %s".format(opts.resetOffsetsOpt,
opts.allResetOffsetScenarioOpts) )
}
}
- private def checkOffsetRange(topicPartition: TopicPartition, offset: Long)
= {
- getLogEndOffset(topicPartition) match {
- case LogOffsetResult.LogOffset(endOffset) if offset > endOffset =>
- warn(s"New offset ($offset) is higher than latest offset. Value will
be set to $endOffset")
- endOffset
+ private def checkOffsetsRange(requestedOffsets: Map[TopicPartition, Long])
= {
+ val logStartOffsets = getLogStartOffsets(requestedOffsets.keySet.toSeq)
+ val logEndOffsets = getLogEndOffsets(requestedOffsets.keySet.toSeq)
+ requestedOffsets.map { case (topicPartition, offset) => (topicPartition,
+ logEndOffsets.get(topicPartition) match {
+ case Some(LogOffsetResult.LogOffset(endOffset)) if offset >
endOffset =>
+ warn(s"New offset ($offset) is higher than latest offset for topic
partition $topicPartition. Value will be set to $endOffset")
+ endOffset
- case _ => getLogStartOffset(topicPartition) match {
- case LogOffsetResult.LogOffset(startOffset) if offset < startOffset
=>
- warn(s"New offset ($offset) is lower than earliest offset. Value
will be set to $startOffset")
- startOffset
+ case Some(_) => logStartOffsets.get(topicPartition) match {
+ case Some(LogOffsetResult.LogOffset(startOffset)) if offset <
startOffset =>
+ warn(s"New offset ($offset) is lower than earliest offset for
topic partition $topicPartition. Value will be set to $startOffset")
+ startOffset
- case _ => offset
- }
+ case _ => offset
+ }
+
+ case None => // the control should not reach here
+ throw new IllegalStateException(s"Unexpected non-existing offset
value for topic partition $topicPartition")
+ })
}
}
--
To stop receiving notification emails like this one, please contact
[email protected].