This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 2594686cd60f5eb1900df89922f3844fda8c3c0b Author: Ismael Juma <[email protected]> AuthorDate: Tue Jun 8 06:39:31 2021 -0700 MINOR: Only log overridden topic configs during topic creation (#10828) It's quite verbose to include all configs for every partition loaded/created. Also make sure to redact sensitive and unknown config values. Unit test included. Reviewers: David Jacot <[email protected]>, Kowshik Prakasam <[email protected]>, Luke Chen <[email protected]> --- core/src/main/scala/kafka/log/LogConfig.scala | 19 ++++++++++++++----- core/src/main/scala/kafka/log/LogManager.scala | 2 +- .../test/scala/unit/kafka/log/LogConfigTest.scala | 22 ++++++++++++++++++++++ gradle/spotbugs-exclude.xml | 7 +++++++ 4 files changed, 44 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index c2ab1d8..60e813d 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -17,20 +17,21 @@ package kafka.log -import java.util.{Collections, Locale, Properties} - -import scala.jdk.CollectionConverters._ import kafka.api.{ApiVersion, ApiVersionValidator} +import kafka.log.LogConfig.configDef import kafka.message.BrokerCompressionCodec import kafka.server.{KafkaConfig, ThrottledReplicaListValidator} import kafka.utils.Implicits._ import org.apache.kafka.common.errors.InvalidConfigurationException import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, TopicConfig} +import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList, Validator} import org.apache.kafka.common.record.{LegacyRecord, TimestampType} -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{ConfigUtils, Utils} import scala.collection.{Map, mutable} -import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList, Validator} +import scala.jdk.CollectionConverters._ + +import java.util.{Collections, Locale, Properties} object Defaults { val SegmentSize = kafka.server.Defaults.LogSegmentBytes @@ -108,6 +109,14 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String] if (compact && maxCompactionLagMs > 0) math.min(maxCompactionLagMs, segmentMs) else segmentMs } + + def overriddenConfigsAsLoggableString: String = { + val overriddenTopicProps = props.asScala.collect { + case (k: String, v) if overriddenConfigs.contains(k) => (k, v.asInstanceOf[AnyRef]) + } + ConfigUtils.configMapToRedactedString(overriddenTopicProps.asJava, configDef) + } + } object LogConfig { diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 1ca4d7e..38266ed 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -834,7 +834,7 @@ class LogManager(logDirs: Seq[File], else currentLogs.put(topicPartition, log) - info(s"Created log for partition $topicPartition in $logDir with properties " + s"{${config.originals.asScala.mkString(", ")}}.") + info(s"Created log for partition $topicPartition in $logDir with properties ${config.overriddenConfigsAsLoggableString}") // Remove the preferred log dir since it has already been satisfied preferredLogDirs.remove(topicPartition) diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 069547d..19c0b93 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -162,6 +162,28 @@ class LogConfigTest { assertNull(nullServerDefault) } + @Test + def testOverriddenConfigsAsLoggableString(): Unit = { + val kafkaProps = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") + kafkaProps.put("unknown.broker.password.config", "aaaaa") + kafkaProps.put(KafkaConfig.SslKeyPasswordProp, "somekeypassword") + kafkaProps.put(KafkaConfig.LogRetentionBytesProp, "50") + val kafkaConfig = KafkaConfig.fromProps(kafkaProps) + val topicOverrides = new Properties + // Only set as a topic config + topicOverrides.setProperty(LogConfig.MinInSyncReplicasProp, "2") + // Overrides value from broker config + topicOverrides.setProperty(LogConfig.RetentionBytesProp, "100") + // Unknown topic config, but known broker config + topicOverrides.setProperty(KafkaConfig.SslTruststorePasswordProp, "sometrustpasswrd") + // Unknown config + topicOverrides.setProperty("unknown.topic.password.config", "bbbb") + // We don't currently have any sensitive topic configs, if we add them, we should set one here + val logConfig = LogConfig.fromProps(LogConfig.extractLogConfigMap(kafkaConfig), topicOverrides) + assertEquals("{min.insync.replicas=2, retention.bytes=100, ssl.truststore.password=(redacted), unknown.topic.password.config=(redacted)}", + logConfig.overriddenConfigsAsLoggableString) + } + private def isValid(configValue: String): Boolean = { try { ThrottledReplicaListValidator.ensureValidString("", configValue) diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index ab60dfd..3b7b5d3 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -159,6 +159,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read </Match> <Match> + <!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 --> + <Source name="LogConfig.scala"/> + <Package name="kafka.log"/> + <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/> + </Match> + + <Match> <!-- offsets is a lazy val and it confuses spotBugs with its locking scheme --> <Class name="kafka.server.checkpoints.LazyOffsetCheckpointMap"/> <Bug pattern="IS2_INCONSISTENT_SYNC"/>
