This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ea2423e MINOR: Update log statements in
alterBrokerConfigs/alterTopicConfigs methods
ea2423e is described below
commit ea2423e0e33c862fe93fbe42903e9d4478807353
Author: Manikumar Reddy <[email protected]>
AuthorDate: Wed Jan 6 19:59:10 2021 +0530
MINOR: Update log statements in alterBrokerConfigs/alterTopicConfigs methods
Current below log statements are not useful. This PR logs readable/masked
configs during alterBrokerConfigs/alterTopicConfigs method call.
`[Admin Manager on Broker 1]: Updating topic test with new configuration
kafka.server.KafkaConfigc9ba35e3`
Author: Manikumar Reddy <[email protected]>
Reviewers: Rajini Sivaram <[email protected]>, Chia-Ping Tsai
<[email protected]>
Closes #9824 from omkreddy/admin-logs
---
.../main/scala/kafka/network/RequestChannel.scala | 16 ++--------------
.../src/main/scala/kafka/server/AdminManager.scala | 22 +++++++++++++++++++---
core/src/main/scala/kafka/server/KafkaConfig.scala | 14 ++++++++++++--
3 files changed, 33 insertions(+), 19 deletions(-)
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 0c03d70..70ade3e 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -24,12 +24,10 @@ import java.util.concurrent._
import com.fasterxml.jackson.databind.JsonNode
import com.typesafe.scalalogging.Logger
import com.yammer.metrics.core.Meter
-import kafka.log.LogConfig
import kafka.metrics.KafkaMetricsGroup
import kafka.server.KafkaConfig
import kafka.utils.{Logging, NotNothing, Pool}
import kafka.utils.Implicits._
-import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
@@ -164,21 +162,11 @@ object RequestChannel extends Logging {
def loggableRequest: AbstractRequest = {
- def loggableValue(resourceType: ConfigResource.Type, name: String,
value: String): String = {
- val maybeSensitive = resourceType match {
- case ConfigResource.Type.BROKER =>
KafkaConfig.maybeSensitive(KafkaConfig.configType(name))
- case ConfigResource.Type.TOPIC =>
KafkaConfig.maybeSensitive(LogConfig.configType(name))
- case ConfigResource.Type.BROKER_LOGGER => false
- case _ => true
- }
- if (maybeSensitive) Password.HIDDEN else value
- }
-
bodyAndSize.request match {
case alterConfigs: AlterConfigsRequest =>
val loggableConfigs = alterConfigs.configs().asScala.map { case
(resource, config) =>
val loggableEntries = new
AlterConfigsRequest.Config(config.entries.asScala.map { entry =>
- new AlterConfigsRequest.ConfigEntry(entry.name,
loggableValue(resource.`type`, entry.name, entry.value))
+ new AlterConfigsRequest.ConfigEntry(entry.name,
KafkaConfig.loggableValue(resource.`type`, entry.name, entry.value))
}.asJavaCollection)
(resource, loggableEntries)
}.asJava
@@ -193,7 +181,7 @@ object RequestChannel extends Logging {
resource.configs.forEach { config =>
newResource.configs.add(new AlterableConfig()
.setName(config.name)
-
.setValue(loggableValue(ConfigResource.Type.forId(resource.resourceType),
config.name, config.value))
+
.setValue(KafkaConfig.loggableValue(ConfigResource.Type.forId(resource.resourceType),
config.name, config.value))
.setConfigOperation(config.configOperation))
}
resources.add(newResource)
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala
b/core/src/main/scala/kafka/server/AdminManager.scala
index f1bd1e2..b22ccd7 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -506,7 +506,7 @@ class AdminManager(val config: KafkaConfig,
adminZkClient.validateTopicConfig(topic, configProps)
validateConfigPolicy(resource, configEntriesMap)
if (!validateOnly) {
- info(s"Updating topic $topic with new configuration $config")
+ info(s"Updating topic $topic with new configuration :
${toLoggableProps(resource, configProps).mkString(",")}")
adminZkClient.changeTopicConfig(topic, configProps)
}
@@ -522,6 +522,12 @@ class AdminManager(val config: KafkaConfig,
if (!validateOnly) {
if (perBrokerConfig)
this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps)
+
+ if (perBrokerConfig)
+ info(s"Updating broker ${brokerId.get} with new configuration :
${toLoggableProps(resource, configProps).mkString(",")}")
+ else
+ info(s"Updating brokers with new configuration :
${toLoggableProps(resource, configProps).mkString(",")}")
+
adminZkClient.changeBrokerConfig(brokerId,
this.config.dynamicConfig.toPersistentProps(configProps,
perBrokerConfig))
}
@@ -529,13 +535,23 @@ class AdminManager(val config: KafkaConfig,
resource -> ApiError.NONE
}
+ private def toLoggableProps(resource: ConfigResource, configProps:
Properties): Map[String, String] = {
+ configProps.asScala.map {
+ case (key, value) => (key, KafkaConfig.loggableValue(resource.`type`,
key, value))
+ }
+ }
+
private def alterLogLevelConfigs(alterConfigOps: Seq[AlterConfigOp]): Unit =
{
alterConfigOps.foreach { alterConfigOp =>
val loggerName = alterConfigOp.configEntry().name()
val logLevel = alterConfigOp.configEntry().value()
alterConfigOp.opType() match {
- case OpType.SET => Log4jController.logLevel(loggerName, logLevel)
- case OpType.DELETE => Log4jController.unsetLogLevel(loggerName)
+ case OpType.SET =>
+ info(s"Updating the log level of $loggerName to $logLevel")
+ Log4jController.logLevel(loggerName, logLevel)
+ case OpType.DELETE =>
+ info(s"Unset the log level of $loggerName")
+ Log4jController.unsetLogLevel(loggerName)
case _ => throw new IllegalArgumentException(
s"Log level cannot be changed for OpType: ${alterConfigOp.opType()}")
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 93b8405..895612e 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -24,17 +24,17 @@ import kafka.api.{ApiVersion, ApiVersionValidator,
KAFKA_0_10_0_IV1, KAFKA_2_1_I
import kafka.cluster.EndPoint
import kafka.coordinator.group.OffsetConfig
import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
+import kafka.log.LogConfig
import kafka.message.{BrokerCompressionCodec, CompressionCodec,
ZStdCompressionCodec}
import kafka.security.authorizer.AuthorizerUtils
import kafka.utils.CoreUtils
import kafka.utils.Implicits._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.Reconfigurable
-import org.apache.kafka.common.config.SecurityConfig
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef,
ConfigException, ConfigResource, SaslConfigs, SecurityConfig, SslClientAuth,
SslConfigs, TopicConfig}
import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList}
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.types.Password
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef,
ConfigException, SaslConfigs, SslClientAuth, SslConfigs, TopicConfig}
import org.apache.kafka.common.metrics.Sensor
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.record.{LegacyRecord, Records, TimestampType}
@@ -1311,6 +1311,16 @@ object KafkaConfig {
// If we can't determine the config entry type, treat it as a sensitive
config to be safe
configType.isEmpty || configType.contains(ConfigDef.Type.PASSWORD)
}
+
+ def loggableValue(resourceType: ConfigResource.Type, name: String, value:
String): String = {
+ val maybeSensitive = resourceType match {
+ case ConfigResource.Type.BROKER =>
KafkaConfig.maybeSensitive(KafkaConfig.configType(name))
+ case ConfigResource.Type.TOPIC =>
KafkaConfig.maybeSensitive(LogConfig.configType(name))
+ case ConfigResource.Type.BROKER_LOGGER => false
+ case _ => true
+ }
+ if (maybeSensitive) Password.HIDDEN else value
+ }
}
class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean,
dynamicConfigOverride: Option[DynamicBrokerConfig])