This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push: new 84ff9f3 KAFKA-6519; Reduce log level for normal replica fetch errors (#4501) 84ff9f3 is described below commit 84ff9f303539dc8770b3bcad975e8ed32fcc42df Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Wed Feb 7 16:20:18 2018 -0800 KAFKA-6519; Reduce log level for normal replica fetch errors (#4501) Out of range and not leader errors are common in replica fetchers and not necessarily an indication of a problem. This patch therefore reduces the log level for log messages corresponding to these errors from `ERROR` to `INFO`. Additionally, this patch removes some redundant information in the log message which is already present in the log context. Reviewers: Ismael Juma <ism...@juma.me.uk> --- .../kafka/consumer/ConsumerFetcherManager.scala | 4 +-- .../kafka/consumer/ConsumerFetcherThread.scala | 8 ++++-- .../scala/kafka/server/AbstractFetcherThread.scala | 33 +++++++++++++--------- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 23f5356..e84472f 100755 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -114,9 +114,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, } override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { - new ConsumerFetcherThread( - "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id), - config, sourceBroker, partitionMap, this) + new ConsumerFetcherThread(consumerIdString, fetcherId, config, sourceBroker, partitionMap, this) } def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) { diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index 705dc24..ac83fa1 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -34,12 +34,13 @@ import org.apache.kafka.common.requests.EpochEndOffset @deprecated("This class has been deprecated and will be removed in a future release. " + "Please use org.apache.kafka.clients.consumer.internals.Fetcher instead.", "0.11.0.0") -class ConsumerFetcherThread(name: String, +class ConsumerFetcherThread(consumerIdString: String, + fetcherId: Int, val config: ConsumerConfig, sourceBroker: BrokerEndPoint, partitionMap: Map[TopicPartition, PartitionTopicInfo], val consumerFetcherManager: ConsumerFetcherManager) - extends AbstractFetcherThread(name = name, + extends AbstractFetcherThread(name = s"ConsumerFetcherThread-$consumerIdString-$fetcherId-${sourceBroker.id}", clientId = config.clientId, sourceBroker = sourceBroker, fetchBackOffMs = config.refreshLeaderBackoffMs, @@ -49,6 +50,9 @@ class ConsumerFetcherThread(name: String, type REQ = FetchRequest type PD = PartitionData + this.logIdent = s"[ConsumerFetcher consumerId=$consumerIdString, leaderId=${sourceBroker.id}, " + + s"fetcherId=$fetcherId] " + private val clientId = config.clientId private val fetchSize = config.fetchMessageMaxBytes diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 39a7032..8d787c9 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -47,8 +47,7 @@ abstract class AbstractFetcherThread(name: String, val sourceBroker: BrokerEndPoint, fetchBackOffMs: Int = 0, isInterruptible: Boolean = true, - includeLogTruncation: Boolean - ) + includeLogTruncation: Boolean) extends ShutdownableThread(name, isInterruptible) { type REQ <: FetchRequest @@ -140,16 +139,15 @@ abstract class AbstractFetcherThread(name: String, private def processFetchRequest(fetchRequest: REQ) { val partitionsWithError = mutable.Set[TopicPartition]() - var responseData: Seq[(TopicPartition, PD)] = Seq.empty try { - trace(s"Issuing fetch to broker ${sourceBroker.id}, request: $fetchRequest") + trace(s"Sending fetch request $fetchRequest") responseData = fetch(fetchRequest) } catch { case t: Throwable => if (isRunning) { - warn(s"Error in fetch to broker ${sourceBroker.id}, request $fetchRequest", t) + warn(s"Error in response for fetch request $fetchRequest", t) inLock(partitionMapLock) { partitionsWithError ++= partitionStates.partitionSet.asScala // there is an error occurred while fetching partitions, sleep a while @@ -210,27 +208,34 @@ abstract class AbstractFetcherThread(name: String, try { val newOffset = handleOffsetOutOfRange(topicPartition) partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset)) - error(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition out of range; reset offset to $newOffset") + info(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition is " + + s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset") } catch { case e: FatalExitError => throw e case e: Throwable => - error(s"Error getting offset for partition $topicPartition from broker ${sourceBroker.id}", e) + error(s"Error getting offset for partition $topicPartition", e) partitionsWithError += topicPartition } + + case Errors.NOT_LEADER_FOR_PARTITION => + info(s"Remote broker is not the leader for partition $topicPartition, which could indicate " + + "that the partition is being moved") + partitionsWithError += topicPartition + case _ => - if (isRunning) { - error(s"Error for partition $topicPartition from broker ${sourceBroker.id}", partitionData.exception.get) - partitionsWithError += topicPartition - } + error(s"Error for partition $topicPartition at offset ${currentPartitionFetchState.fetchOffset}", + partitionData.exception.get) + partitionsWithError += topicPartition } }) } } } - if (partitionsWithError.nonEmpty) - debug(s"handling partitions with error for $partitionsWithError") - handlePartitionsWithErrors(partitionsWithError) + if (partitionsWithError.nonEmpty) { + debug(s"Handling errors for partitions $partitionsWithError") + handlePartitionsWithErrors(partitionsWithError) + } } def markPartitionsForTruncation(topicPartition: TopicPartition, truncationOffset: Long) { -- To stop receiving notification emails like this one, please contact j...@apache.org.