This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ea2423e  MINOR: Update log statements in 
alterBrokerConfigs/alterTopicConfigs methods
ea2423e is described below

commit ea2423e0e33c862fe93fbe42903e9d4478807353
Author: Manikumar Reddy <[email protected]>
AuthorDate: Wed Jan 6 19:59:10 2021 +0530

    MINOR: Update log statements in alterBrokerConfigs/alterTopicConfigs methods
    
    Current below log statements are not useful. This PR logs readable/masked 
configs during alterBrokerConfigs/alterTopicConfigs method call.
    
    `[Admin Manager on Broker 1]: Updating topic test with new configuration 
kafka.server.KafkaConfigc9ba35e3`
    
    Author: Manikumar Reddy <[email protected]>
    
    Reviewers: Rajini Sivaram <[email protected]>, Chia-Ping Tsai 
<[email protected]>
    
    Closes #9824 from omkreddy/admin-logs
---
 .../main/scala/kafka/network/RequestChannel.scala  | 16 ++--------------
 .../src/main/scala/kafka/server/AdminManager.scala | 22 +++++++++++++++++++---
 core/src/main/scala/kafka/server/KafkaConfig.scala | 14 ++++++++++++--
 3 files changed, 33 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 0c03d70..70ade3e 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -24,12 +24,10 @@ import java.util.concurrent._
 import com.fasterxml.jackson.databind.JsonNode
 import com.typesafe.scalalogging.Logger
 import com.yammer.metrics.core.Meter
-import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.KafkaConfig
 import kafka.utils.{Logging, NotNothing, Pool}
 import kafka.utils.Implicits._
-import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
@@ -164,21 +162,11 @@ object RequestChannel extends Logging {
 
     def loggableRequest: AbstractRequest = {
 
-      def loggableValue(resourceType: ConfigResource.Type, name: String, 
value: String): String = {
-        val maybeSensitive = resourceType match {
-          case ConfigResource.Type.BROKER => 
KafkaConfig.maybeSensitive(KafkaConfig.configType(name))
-          case ConfigResource.Type.TOPIC => 
KafkaConfig.maybeSensitive(LogConfig.configType(name))
-          case ConfigResource.Type.BROKER_LOGGER => false
-          case _ => true
-        }
-        if (maybeSensitive) Password.HIDDEN else value
-      }
-
       bodyAndSize.request match {
         case alterConfigs: AlterConfigsRequest =>
           val loggableConfigs = alterConfigs.configs().asScala.map { case 
(resource, config) =>
             val loggableEntries = new 
AlterConfigsRequest.Config(config.entries.asScala.map { entry =>
-                new AlterConfigsRequest.ConfigEntry(entry.name, 
loggableValue(resource.`type`, entry.name, entry.value))
+                new AlterConfigsRequest.ConfigEntry(entry.name, 
KafkaConfig.loggableValue(resource.`type`, entry.name, entry.value))
             }.asJavaCollection)
             (resource, loggableEntries)
           }.asJava
@@ -193,7 +181,7 @@ object RequestChannel extends Logging {
             resource.configs.forEach { config =>
               newResource.configs.add(new AlterableConfig()
                 .setName(config.name)
-                
.setValue(loggableValue(ConfigResource.Type.forId(resource.resourceType), 
config.name, config.value))
+                
.setValue(KafkaConfig.loggableValue(ConfigResource.Type.forId(resource.resourceType),
 config.name, config.value))
                 .setConfigOperation(config.configOperation))
             }
             resources.add(newResource)
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala 
b/core/src/main/scala/kafka/server/AdminManager.scala
index f1bd1e2..b22ccd7 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -506,7 +506,7 @@ class AdminManager(val config: KafkaConfig,
     adminZkClient.validateTopicConfig(topic, configProps)
     validateConfigPolicy(resource, configEntriesMap)
     if (!validateOnly) {
-      info(s"Updating topic $topic with new configuration $config")
+      info(s"Updating topic $topic with new configuration : 
${toLoggableProps(resource, configProps).mkString(",")}")
       adminZkClient.changeTopicConfig(topic, configProps)
     }
 
@@ -522,6 +522,12 @@ class AdminManager(val config: KafkaConfig,
     if (!validateOnly) {
       if (perBrokerConfig)
         
this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps)
+
+      if (perBrokerConfig)
+        info(s"Updating broker ${brokerId.get} with new configuration : 
${toLoggableProps(resource, configProps).mkString(",")}")
+      else
+        info(s"Updating brokers with new configuration : 
${toLoggableProps(resource, configProps).mkString(",")}")
+
       adminZkClient.changeBrokerConfig(brokerId,
         this.config.dynamicConfig.toPersistentProps(configProps, 
perBrokerConfig))
     }
@@ -529,13 +535,23 @@ class AdminManager(val config: KafkaConfig,
     resource -> ApiError.NONE
   }
 
+  private def toLoggableProps(resource: ConfigResource, configProps: 
Properties): Map[String, String] = {
+    configProps.asScala.map {
+      case (key, value) => (key, KafkaConfig.loggableValue(resource.`type`, 
key, value))
+    }
+  }
+
   private def alterLogLevelConfigs(alterConfigOps: Seq[AlterConfigOp]): Unit = 
{
     alterConfigOps.foreach { alterConfigOp =>
       val loggerName = alterConfigOp.configEntry().name()
       val logLevel = alterConfigOp.configEntry().value()
       alterConfigOp.opType() match {
-        case OpType.SET => Log4jController.logLevel(loggerName, logLevel)
-        case OpType.DELETE => Log4jController.unsetLogLevel(loggerName)
+        case OpType.SET =>
+          info(s"Updating the log level of $loggerName to $logLevel")
+          Log4jController.logLevel(loggerName, logLevel)
+        case OpType.DELETE =>
+          info(s"Unset the log level of $loggerName")
+          Log4jController.unsetLogLevel(loggerName)
         case _ => throw new IllegalArgumentException(
           s"Log level cannot be changed for OpType: ${alterConfigOp.opType()}")
       }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 93b8405..895612e 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -24,17 +24,17 @@ import kafka.api.{ApiVersion, ApiVersionValidator, 
KAFKA_0_10_0_IV1, KAFKA_2_1_I
 import kafka.cluster.EndPoint
 import kafka.coordinator.group.OffsetConfig
 import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
+import kafka.log.LogConfig
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, 
ZStdCompressionCodec}
 import kafka.security.authorizer.AuthorizerUtils
 import kafka.utils.CoreUtils
 import kafka.utils.Implicits._
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.Reconfigurable
-import org.apache.kafka.common.config.SecurityConfig
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, ConfigResource, SaslConfigs, SecurityConfig, SslClientAuth, 
SslConfigs, TopicConfig}
 import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList}
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.config.types.Password
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, SaslConfigs, SslClientAuth, SslConfigs, TopicConfig}
 import org.apache.kafka.common.metrics.Sensor
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.record.{LegacyRecord, Records, TimestampType}
@@ -1311,6 +1311,16 @@ object KafkaConfig {
     // If we can't determine the config entry type, treat it as a sensitive 
config to be safe
     configType.isEmpty || configType.contains(ConfigDef.Type.PASSWORD)
   }
+
+  def loggableValue(resourceType: ConfigResource.Type, name: String, value: 
String): String = {
+    val maybeSensitive = resourceType match {
+      case ConfigResource.Type.BROKER => 
KafkaConfig.maybeSensitive(KafkaConfig.configType(name))
+      case ConfigResource.Type.TOPIC => 
KafkaConfig.maybeSensitive(LogConfig.configType(name))
+      case ConfigResource.Type.BROKER_LOGGER => false
+      case _ => true
+    }
+    if (maybeSensitive) Password.HIDDEN else value
+  }
 }
 
 class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, 
dynamicConfigOverride: Option[DynamicBrokerConfig])

Reply via email to