This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 68ec1a3 KAFKA-6519; Reduce log level for normal replica fetch errors
(#4501)
68ec1a3 is described below
commit 68ec1a3cce22706faf762de7d2d97b490c47fd90
Author: Jason Gustafson <[email protected]>
AuthorDate: Wed Feb 7 16:20:18 2018 -0800
KAFKA-6519; Reduce log level for normal replica fetch errors (#4501)
Out of range and not leader errors are common in replica fetchers and not
necessarily an indication of a problem. This patch therefore reduces the log
level for log messages corresponding to these errors from `ERROR` to `INFO`.
Additionally, this patch removes some redundant information in the log message
which is already present in the log context.
Reviewers: Ismael Juma <[email protected]>
---
.../kafka/consumer/ConsumerFetcherManager.scala | 4 +--
.../kafka/consumer/ConsumerFetcherThread.scala | 8 ++++--
.../scala/kafka/server/AbstractFetcherThread.scala | 33 +++++++++++++---------
3 files changed, 26 insertions(+), 19 deletions(-)
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 23f5356..e84472f 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -114,9 +114,7 @@ class ConsumerFetcherManager(private val consumerIdString:
String,
}
override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): AbstractFetcherThread = {
- new ConsumerFetcherThread(
- "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId,
sourceBroker.id),
- config, sourceBroker, partitionMap, this)
+ new ConsumerFetcherThread(consumerIdString, fetcherId, config,
sourceBroker, partitionMap, this)
}
def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster:
Cluster) {
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 705dc24..ac83fa1 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -34,12 +34,13 @@ import org.apache.kafka.common.requests.EpochEndOffset
@deprecated("This class has been deprecated and will be removed in a future
release. " +
"Please use org.apache.kafka.clients.consumer.internals.Fetcher
instead.", "0.11.0.0")
-class ConsumerFetcherThread(name: String,
+class ConsumerFetcherThread(consumerIdString: String,
+ fetcherId: Int,
val config: ConsumerConfig,
sourceBroker: BrokerEndPoint,
partitionMap: Map[TopicPartition,
PartitionTopicInfo],
val consumerFetcherManager: ConsumerFetcherManager)
- extends AbstractFetcherThread(name = name,
+ extends AbstractFetcherThread(name =
s"ConsumerFetcherThread-$consumerIdString-$fetcherId-${sourceBroker.id}",
clientId = config.clientId,
sourceBroker = sourceBroker,
fetchBackOffMs =
config.refreshLeaderBackoffMs,
@@ -49,6 +50,9 @@ class ConsumerFetcherThread(name: String,
type REQ = FetchRequest
type PD = PartitionData
+ this.logIdent = s"[ConsumerFetcher consumerId=$consumerIdString,
leaderId=${sourceBroker.id}, " +
+ s"fetcherId=$fetcherId] "
+
private val clientId = config.clientId
private val fetchSize = config.fetchMessageMaxBytes
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 39a7032..8d787c9 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -47,8 +47,7 @@ abstract class AbstractFetcherThread(name: String,
val sourceBroker: BrokerEndPoint,
fetchBackOffMs: Int = 0,
isInterruptible: Boolean = true,
- includeLogTruncation: Boolean
- )
+ includeLogTruncation: Boolean)
extends ShutdownableThread(name, isInterruptible) {
type REQ <: FetchRequest
@@ -140,16 +139,15 @@ abstract class AbstractFetcherThread(name: String,
private def processFetchRequest(fetchRequest: REQ) {
val partitionsWithError = mutable.Set[TopicPartition]()
-
var responseData: Seq[(TopicPartition, PD)] = Seq.empty
try {
- trace(s"Issuing fetch to broker ${sourceBroker.id}, request:
$fetchRequest")
+ trace(s"Sending fetch request $fetchRequest")
responseData = fetch(fetchRequest)
} catch {
case t: Throwable =>
if (isRunning) {
- warn(s"Error in fetch to broker ${sourceBroker.id}, request
$fetchRequest", t)
+ warn(s"Error in response for fetch request $fetchRequest", t)
inLock(partitionMapLock) {
partitionsWithError ++= partitionStates.partitionSet.asScala
// there is an error occurred while fetching partitions, sleep a
while
@@ -210,27 +208,34 @@ abstract class AbstractFetcherThread(name: String,
try {
val newOffset = handleOffsetOutOfRange(topicPartition)
partitionStates.updateAndMoveToEnd(topicPartition, new
PartitionFetchState(newOffset))
- error(s"Current offset
${currentPartitionFetchState.fetchOffset} for partition $topicPartition out of
range; reset offset to $newOffset")
+ info(s"Current offset
${currentPartitionFetchState.fetchOffset} for partition $topicPartition is " +
+ s"out of range, which typically implies a leader change.
Reset fetch offset to $newOffset")
} catch {
case e: FatalExitError => throw e
case e: Throwable =>
- error(s"Error getting offset for partition
$topicPartition from broker ${sourceBroker.id}", e)
+ error(s"Error getting offset for partition
$topicPartition", e)
partitionsWithError += topicPartition
}
+
+ case Errors.NOT_LEADER_FOR_PARTITION =>
+ info(s"Remote broker is not the leader for partition
$topicPartition, which could indicate " +
+ "that the partition is being moved")
+ partitionsWithError += topicPartition
+
case _ =>
- if (isRunning) {
- error(s"Error for partition $topicPartition from broker
${sourceBroker.id}", partitionData.exception.get)
- partitionsWithError += topicPartition
- }
+ error(s"Error for partition $topicPartition at offset
${currentPartitionFetchState.fetchOffset}",
+ partitionData.exception.get)
+ partitionsWithError += topicPartition
}
})
}
}
}
- if (partitionsWithError.nonEmpty)
- debug(s"handling partitions with error for $partitionsWithError")
- handlePartitionsWithErrors(partitionsWithError)
+ if (partitionsWithError.nonEmpty) {
+ debug(s"Handling errors for partitions $partitionsWithError")
+ handlePartitionsWithErrors(partitionsWithError)
+ }
}
def markPartitionsForTruncation(topicPartition: TopicPartition,
truncationOffset: Long) {
--
To stop receiving notification emails like this one, please contact
[email protected].