This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 31547145481 MINOR: Appending value to LIST config should not generate
empty string with … (#12503)
31547145481 is described below
commit 3154714548174fd2b037de293c143f3c3db9f7db
Author: Chia-Ping Tsai <[email protected]>
AuthorDate: Tue Aug 16 02:05:28 2022 +0800
MINOR: Appending value to LIST config should not generate empty string with
… (#12503)
Reviewers: dengziming <[email protected]>, Luke Chen
<[email protected]>
---
.../scala/kafka/server/ConfigAdminManager.scala | 5 ++--
.../kafka/api/PlaintextAdminIntegrationTest.scala | 32 ++++++++++++++++++++++
2 files changed, 35 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
index cc7a98179dd..c6ea4e13953 100644
--- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
@@ -497,8 +497,9 @@ object ConfigAdminManager {
throw new InvalidConfigurationException(s"Config value append is
not allowed for config key: ${alterConfigOp.configEntry.name}")
val oldValueList =
Option(configProps.getProperty(alterConfigOp.configEntry.name))
.orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue,
ConfigDef.Type.LIST)))
- .getOrElse("")
- .split(",").toList
+ .filter(s => s.nonEmpty)
+ .map(_.split(",").toList)
+ .getOrElse(List.empty)
val appendingValueList =
alterConfigOp.configEntry.value.split(",").toList.filter(value =>
!oldValueList.contains(value))
val newValueList = oldValueList ::: appendingValueList
configProps.setProperty(alterConfigOp.configEntry.name,
newValueList.mkString(","))
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 203c04a68a7..7121f98bb9c 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2480,6 +2480,38 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAppendConfigToEmptyDefaultValue(ignored: String): Unit = {
+ testAppendConfig(new Properties(), "0:0", "0:0")
+ }
+
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAppendConfigToExistentValue(ignored: String): Unit = {
+ val props = new Properties();
+ props.setProperty(LogConfig.LeaderReplicationThrottledReplicasProp, "1:1")
+ testAppendConfig(props, "0:0", "1:1,0:0")
+ }
+
+ private def testAppendConfig(props: Properties, append: String, expected:
String): Unit = {
+ client = Admin.create(createConfig)
+ createTopic(topic, topicConfig = props)
+ val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
+ val topicAlterConfigs = Seq(
+ new AlterConfigOp(new
ConfigEntry(LogConfig.LeaderReplicationThrottledReplicasProp, append),
AlterConfigOp.OpType.APPEND),
+ ).asJavaCollection
+
+ val alterResult = client.incrementalAlterConfigs(Map(
+ topicResource -> topicAlterConfigs
+ ).asJava)
+ alterResult.all().get()
+
+ ensureConsistentKRaftMetadata()
+ val config =
client.describeConfigs(List(topicResource).asJava).all().get().get(topicResource).get(LogConfig.LeaderReplicationThrottledReplicasProp)
+ assertEquals(expected, config.value())
+ }
+
/**
* Test that createTopics returns the dynamic configurations of the topics
that were created.
*