This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 84ff9f3  KAFKA-6519; Reduce log level for normal replica fetch errors 
(#4501)
84ff9f3 is described below

commit 84ff9f303539dc8770b3bcad975e8ed32fcc42df
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Feb 7 16:20:18 2018 -0800

    KAFKA-6519; Reduce log level for normal replica fetch errors (#4501)
    
    Out of range and not leader errors are common in replica fetchers and not 
necessarily an indication of a problem. This patch therefore reduces the log 
level for log messages corresponding to these errors from `ERROR` to `INFO`. 
Additionally, this patch removes some redundant information in the log message 
which is already present in the log context.
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>
---
 .../kafka/consumer/ConsumerFetcherManager.scala    |  4 +--
 .../kafka/consumer/ConsumerFetcherThread.scala     |  8 ++++--
 .../scala/kafka/server/AbstractFetcherThread.scala | 33 +++++++++++++---------
 3 files changed, 26 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 
b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 23f5356..e84472f 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -114,9 +114,7 @@ class ConsumerFetcherManager(private val consumerIdString: 
String,
   }
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
-    new ConsumerFetcherThread(
-      "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, 
sourceBroker.id),
-      config, sourceBroker, partitionMap, this)
+    new ConsumerFetcherThread(consumerIdString, fetcherId, config, 
sourceBroker, partitionMap, this)
   }
 
   def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: 
Cluster) {
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala 
b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 705dc24..ac83fa1 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -34,12 +34,13 @@ import org.apache.kafka.common.requests.EpochEndOffset
 
 @deprecated("This class has been deprecated and will be removed in a future 
release. " +
             "Please use org.apache.kafka.clients.consumer.internals.Fetcher 
instead.", "0.11.0.0")
-class ConsumerFetcherThread(name: String,
+class ConsumerFetcherThread(consumerIdString: String,
+                            fetcherId: Int,
                             val config: ConsumerConfig,
                             sourceBroker: BrokerEndPoint,
                             partitionMap: Map[TopicPartition, 
PartitionTopicInfo],
                             val consumerFetcherManager: ConsumerFetcherManager)
-        extends AbstractFetcherThread(name = name,
+        extends AbstractFetcherThread(name = 
s"ConsumerFetcherThread-$consumerIdString-$fetcherId-${sourceBroker.id}",
                                       clientId = config.clientId,
                                       sourceBroker = sourceBroker,
                                       fetchBackOffMs = 
config.refreshLeaderBackoffMs,
@@ -49,6 +50,9 @@ class ConsumerFetcherThread(name: String,
   type REQ = FetchRequest
   type PD = PartitionData
 
+  this.logIdent = s"[ConsumerFetcher consumerId=$consumerIdString, 
leaderId=${sourceBroker.id}, " +
+    s"fetcherId=$fetcherId] "
+
   private val clientId = config.clientId
   private val fetchSize = config.fetchMessageMaxBytes
 
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 39a7032..8d787c9 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -47,8 +47,7 @@ abstract class AbstractFetcherThread(name: String,
                                      val sourceBroker: BrokerEndPoint,
                                      fetchBackOffMs: Int = 0,
                                      isInterruptible: Boolean = true,
-                                     includeLogTruncation: Boolean
-                                    )
+                                     includeLogTruncation: Boolean)
   extends ShutdownableThread(name, isInterruptible) {
 
   type REQ <: FetchRequest
@@ -140,16 +139,15 @@ abstract class AbstractFetcherThread(name: String,
 
   private def processFetchRequest(fetchRequest: REQ) {
     val partitionsWithError = mutable.Set[TopicPartition]()
-
     var responseData: Seq[(TopicPartition, PD)] = Seq.empty
 
     try {
-      trace(s"Issuing fetch to broker ${sourceBroker.id}, request: 
$fetchRequest")
+      trace(s"Sending fetch request $fetchRequest")
       responseData = fetch(fetchRequest)
     } catch {
       case t: Throwable =>
         if (isRunning) {
-          warn(s"Error in fetch to broker ${sourceBroker.id}, request 
$fetchRequest", t)
+          warn(s"Error in response for fetch request $fetchRequest", t)
           inLock(partitionMapLock) {
             partitionsWithError ++= partitionStates.partitionSet.asScala
             // there is an error occurred while fetching partitions, sleep a 
while
@@ -210,27 +208,34 @@ abstract class AbstractFetcherThread(name: String,
                   try {
                     val newOffset = handleOffsetOutOfRange(topicPartition)
                     partitionStates.updateAndMoveToEnd(topicPartition, new 
PartitionFetchState(newOffset))
-                    error(s"Current offset 
${currentPartitionFetchState.fetchOffset} for partition $topicPartition out of 
range; reset offset to $newOffset")
+                    info(s"Current offset 
${currentPartitionFetchState.fetchOffset} for partition $topicPartition is " +
+                      s"out of range, which typically implies a leader change. 
Reset fetch offset to $newOffset")
                   } catch {
                     case e: FatalExitError => throw e
                     case e: Throwable =>
-                      error(s"Error getting offset for partition 
$topicPartition from broker ${sourceBroker.id}", e)
+                      error(s"Error getting offset for partition 
$topicPartition", e)
                       partitionsWithError += topicPartition
                   }
+
+                case Errors.NOT_LEADER_FOR_PARTITION =>
+                  info(s"Remote broker is not the leader for partition 
$topicPartition, which could indicate " +
+                    "that the partition is being moved")
+                  partitionsWithError += topicPartition
+
                 case _ =>
-                  if (isRunning) {
-                    error(s"Error for partition $topicPartition from broker 
${sourceBroker.id}", partitionData.exception.get)
-                    partitionsWithError += topicPartition
-                  }
+                  error(s"Error for partition $topicPartition at offset 
${currentPartitionFetchState.fetchOffset}",
+                    partitionData.exception.get)
+                  partitionsWithError += topicPartition
               }
             })
         }
       }
     }
 
-    if (partitionsWithError.nonEmpty)
-      debug(s"handling partitions with error for $partitionsWithError")
-    handlePartitionsWithErrors(partitionsWithError)
+    if (partitionsWithError.nonEmpty) {
+      debug(s"Handling errors for partitions $partitionsWithError")
+      handlePartitionsWithErrors(partitionsWithError)
+    }
   }
 
   def markPartitionsForTruncation(topicPartition: TopicPartition, 
truncationOffset: Long) {

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.

Reply via email to