KAFKA-763 follow up changes; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0a928353 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0a928353 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0a928353 Branch: refs/heads/trunk Commit: 0a9283530418781941b7c232a8b09e4ec071b0e9 Parents: c546286 Author: Swapnil Ghike <[email protected]> Authored: Wed Mar 13 13:11:47 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Wed Mar 13 13:11:54 2013 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/consumer/SimpleConsumer.scala | 18 +++--------- .../scala/kafka/server/ReplicaFetcherThread.scala | 22 +++++++------- .../scala/kafka/tools/SimpleConsumerShell.scala | 5 +-- 3 files changed, 17 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0a928353/core/src/main/scala/kafka/consumer/SimpleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index dedbb50..5a0784a 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -136,22 +136,12 @@ class SimpleConsumer(val host: String, * Get the earliest or latest offset of a given topic, partition. * @param topicAndPartition Topic and partition of which the offset is needed. * @param earliestOrLatest A value to indicate earliest or latest offset. - * @param consumerId Id of the consumer which could be a client or a follower broker. - * @param isFromOrdinaryConsumer Boolean to specify ordinary consumer or debugging consumer. + * @param consumerId Id of the consumer which could be a consumer client, SimpleConsumerShell or a follower broker. * @return Requested offset. */ - def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, - earliestOrLatest: Long, - consumerId: Int = Request.OrdinaryConsumerId, - isFromOrdinaryConsumer: Boolean = true): Long = { - val request = - if(isFromOrdinaryConsumer) - OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), - replicaId = consumerId) - else - OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), - replicaId = Request.DebuggingConsumerId) - + def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = { + val request = OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), + replicaId = consumerId) val partitionErrorAndOffset = getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition) val offset = partitionErrorAndOffset.error match { case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head http://git-wip-us.apache.org/repos/asf/kafka/blob/0a928353/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index edd3164..d4f15c1 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -83,18 +83,18 @@ class ReplicaFetcherThread(name:String, val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId) if (leaderEndOffset < log.logEndOffset) { log.truncateTo(leaderEndOffset) - return leaderEndOffset + leaderEndOffset + } else { + /** + * The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's + * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset). + * + * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching. + */ + val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId) + log.truncateAndStartWithNewOffset(leaderStartOffset) + leaderStartOffset } - - /** - * The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's - * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset). - * - * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching. - */ - val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId) - log.truncateAndStartWithNewOffset(leaderStartOffset) - leaderStartOffset } // any logic for partitions whose leader has changed http://git-wip-us.apache.org/repos/asf/kafka/blob/0a928353/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index cdfb1b5..8f274df 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -165,9 +165,8 @@ object SimpleConsumerShell extends Logging { val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, ConsumerConfig.SocketTimeout, ConsumerConfig.SocketBufferSize, clientId) try { - startingOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition = TopicAndPartition(topic, partitionId), - earliestOrLatest = startingOffset, - isFromOrdinaryConsumer = false) + startingOffset = simpleConsumer.earliestOrLatestOffset(TopicAndPartition(topic, partitionId), startingOffset, + Request.DebuggingConsumerId) } catch { case t: Throwable => System.err.println("Error in getting earliest or latest offset due to: " + Utils.stackTrace(t))
