This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ce5ce2d MINOR: A few logging improvements in the broker (#6773)
ce5ce2d is described below
commit ce5ce2d569dd9fead42974d81bc7adfc5e6c7a22
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue May 21 14:50:24 2019 -0700
MINOR: A few logging improvements in the broker (#6773)
Reviewers: Boyang Chen <[email protected]>, Rajini Sivaram
<[email protected]>
---
core/src/main/scala/kafka/log/Log.scala | 7 -------
core/src/main/scala/kafka/server/AbstractFetcherManager.scala | 3 ++-
core/src/main/scala/kafka/server/ReplicaManager.scala | 11 ++++++++---
.../main/scala/kafka/server/epoch/LeaderEpochFileCache.scala | 5 ++++-
4 files changed, 14 insertions(+), 12 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index ef786be..56b2969 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -230,13 +230,6 @@ class Log(@volatile var dir: File,
}
def updateConfig(updatedKeys: Set[String], newConfig: LogConfig): Unit = {
- if ((updatedKeys.contains(LogConfig.RetentionMsProp)
- || updatedKeys.contains(LogConfig.MessageTimestampDifferenceMaxMsProp))
- && topicPartition.partition == 0 // generate warnings only for one
partition of each topic
- && newConfig.retentionMs < newConfig.messageTimestampDifferenceMaxMs)
- warn(s"${LogConfig.RetentionMsProp} for topic ${topicPartition.topic} is
set to ${newConfig.retentionMs}. It is smaller than " +
- s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value
${newConfig.messageTimestampDifferenceMaxMs}. " +
- s"This may result in frequent log rolling.")
val oldConfig = this.config
this.config = newConfig
if (updatedKeys.contains(LogConfig.MessageFormatVersionProp)) {
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index a5faf0e..53152eb 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -174,7 +174,8 @@ abstract class AbstractFetcherManager[T <:
AbstractFetcherThread](val name: Stri
fetcher.removePartitions(partitions)
failedPartitions.removeAll(partitions)
}
- info(s"Removed fetcher for partitions $partitions")
+ if (partitions.nonEmpty)
+ info(s"Removed fetcher for partitions $partitions")
}
def shutdownIdleFetcherThreads() {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index cccccfe..2023a97 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1084,14 +1084,19 @@ class ReplicaManager(val config: KafkaConfig,
s"in assigned replica list
${stateInfo.basePartitionState.replicas.asScala.mkString(",")}")
responseMap.put(topicPartition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
- } else {
- // Otherwise record the error code in response
+ } else if (requestLeaderEpoch < currentLeaderEpoch) {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition since its
associated " +
- s"leader epoch $requestLeaderEpoch is not higher than the
current " +
+ s"leader epoch $requestLeaderEpoch is smaller than the current "
+
s"leader epoch $currentLeaderEpoch")
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
+ } else {
+ stateChangeLogger.debug(s"Ignoring LeaderAndIsr request from " +
+ s"controller $controllerId with correlation id $correlationId " +
+ s"epoch $controllerEpoch for partition $topicPartition since its
associated " +
+ s"leader epoch $requestLeaderEpoch matches the current leader
epoch")
+ responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
}
}
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index 0c885b7..7219ee1 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -78,7 +78,10 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
if (removedEpochs.isEmpty) {
debug(s"Appended new epoch entry $entryToAppend. Cache now contains
${epochs.size} entries.")
- } else {
+ } else if (removedEpochs.size > 1 || removedEpochs.head.startOffset !=
entryToAppend.startOffset) {
+ // Only log a warning if there were non-trivial removals. If the start
offset of the new entry
+ // matches the start offfset of the removed epoch, then no data has been
written and the truncation
+ // is expected.
warn(s"New epoch entry $entryToAppend caused truncation of conflicting
entries $removedEpochs. " +
s"Cache now contains ${epochs.size} entries.")
}