This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 2594686cd60f5eb1900df89922f3844fda8c3c0b
Author: Ismael Juma <[email protected]>
AuthorDate: Tue Jun 8 06:39:31 2021 -0700

    MINOR: Only log overridden topic configs during topic creation (#10828)
    
    It's quite verbose to include all configs for every partition 
loaded/created.
    Also make sure to redact sensitive and unknown config values.
    
    Unit test included.
    
    Reviewers: David Jacot <[email protected]>, Kowshik Prakasam 
<[email protected]>, Luke Chen <[email protected]>
---
 core/src/main/scala/kafka/log/LogConfig.scala      | 19 ++++++++++++++-----
 core/src/main/scala/kafka/log/LogManager.scala     |  2 +-
 .../test/scala/unit/kafka/log/LogConfigTest.scala  | 22 ++++++++++++++++++++++
 gradle/spotbugs-exclude.xml                        |  7 +++++++
 4 files changed, 44 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogConfig.scala 
b/core/src/main/scala/kafka/log/LogConfig.scala
index c2ab1d8..60e813d 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -17,20 +17,21 @@
 
 package kafka.log
 
-import java.util.{Collections, Locale, Properties}
-
-import scala.jdk.CollectionConverters._
 import kafka.api.{ApiVersion, ApiVersionValidator}
+import kafka.log.LogConfig.configDef
 import kafka.message.BrokerCompressionCodec
 import kafka.server.{KafkaConfig, ThrottledReplicaListValidator}
 import kafka.utils.Implicits._
 import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, TopicConfig}
+import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList, 
Validator}
 import org.apache.kafka.common.record.{LegacyRecord, TimestampType}
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{ConfigUtils, Utils}
 
 import scala.collection.{Map, mutable}
-import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList, 
Validator}
+import scala.jdk.CollectionConverters._
+
+import java.util.{Collections, Locale, Properties}
 
 object Defaults {
   val SegmentSize = kafka.server.Defaults.LogSegmentBytes
@@ -108,6 +109,14 @@ case class LogConfig(props: java.util.Map[_, _], 
overriddenConfigs: Set[String]
     if (compact && maxCompactionLagMs > 0) math.min(maxCompactionLagMs, 
segmentMs)
     else segmentMs
   }
+
+  def overriddenConfigsAsLoggableString: String = {
+    val overriddenTopicProps = props.asScala.collect {
+      case (k: String, v) if overriddenConfigs.contains(k) => (k, 
v.asInstanceOf[AnyRef])
+    }
+    ConfigUtils.configMapToRedactedString(overriddenTopicProps.asJava, 
configDef)
+  }
+
 }
 
 object LogConfig {
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 1ca4d7e..38266ed 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -834,7 +834,7 @@ class LogManager(logDirs: Seq[File],
         else
           currentLogs.put(topicPartition, log)
 
-        info(s"Created log for partition $topicPartition in $logDir with 
properties " + s"{${config.originals.asScala.mkString(", ")}}.")
+        info(s"Created log for partition $topicPartition in $logDir with 
properties ${config.overriddenConfigsAsLoggableString}")
         // Remove the preferred log dir since it has already been satisfied
         preferredLogDirs.remove(topicPartition)
 
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala 
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 069547d..19c0b93 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -162,6 +162,28 @@ class LogConfigTest {
     assertNull(nullServerDefault)
   }
 
+  @Test
+  def testOverriddenConfigsAsLoggableString(): Unit = {
+    val kafkaProps = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
+    kafkaProps.put("unknown.broker.password.config", "aaaaa")
+    kafkaProps.put(KafkaConfig.SslKeyPasswordProp, "somekeypassword")
+    kafkaProps.put(KafkaConfig.LogRetentionBytesProp, "50")
+    val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
+    val topicOverrides = new Properties
+    // Only set as a topic config
+    topicOverrides.setProperty(LogConfig.MinInSyncReplicasProp, "2")
+    // Overrides value from broker config
+    topicOverrides.setProperty(LogConfig.RetentionBytesProp, "100")
+    // Unknown topic config, but known broker config
+    topicOverrides.setProperty(KafkaConfig.SslTruststorePasswordProp, 
"sometrustpasswrd")
+    // Unknown config
+    topicOverrides.setProperty("unknown.topic.password.config", "bbbb")
+    // We don't currently have any sensitive topic configs, if we add them, we 
should set one here
+    val logConfig = 
LogConfig.fromProps(LogConfig.extractLogConfigMap(kafkaConfig), topicOverrides)
+    assertEquals("{min.insync.replicas=2, retention.bytes=100, 
ssl.truststore.password=(redacted), unknown.topic.password.config=(redacted)}",
+      logConfig.overriddenConfigsAsLoggableString)
+  }
+
   private def isValid(configValue: String): Boolean = {
     try {
       ThrottledReplicaListValidator.ensureValidString("", configValue)
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index ab60dfd..3b7b5d3 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -159,6 +159,13 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
     </Match>
 
     <Match>
+        <!-- Uncallable anonymous methods are left behind after inlining by 
scalac 2.12, fixed in 2.13 -->
+        <Source name="LogConfig.scala"/>
+        <Package name="kafka.log"/>
+        <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
+    </Match>
+
+    <Match>
         <!-- offsets is a lazy val and it confuses spotBugs with its locking 
scheme -->
         <Class name="kafka.server.checkpoints.LazyOffsetCheckpointMap"/>
         <Bug pattern="IS2_INCONSISTENT_SYNC"/>

Reply via email to