This is an automated email from the ASF dual-hosted git repository. davidarthur pushed a commit to branch 3.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit a7369bd52ff1e91b3a56d5622ce49c7c515cb81e Author: David Arthur <[email protected]> AuthorDate: Thu Aug 4 15:09:08 2022 -0400 KAFKA-14136 Generate ConfigRecord for brokers even if the value is unchanged (#12483) --- .../server/DynamicBrokerReconfigurationTest.scala | 54 ++++++++++++++-------- .../controller/ConfigurationControlManager.java | 13 ++++-- 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index c3d1c68c71..a76fa91381 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -67,6 +67,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.util.concurrent.atomic.AtomicInteger import scala.annotation.nowarn import scala.collection._ import scala.collection.mutable.ArrayBuffer @@ -352,8 +353,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup } } - @Test // TODO KAFKA-14126 add KRaft support - def testKeyStoreAlter(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testKeyStoreAlter(quorum: String): Unit = { val topic2 = "testtopic2" TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions, replicationFactor = numServers) @@ -419,8 +421,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup stopAndVerifyProduceConsume(producerThread, consumerThread) } - @Test // TODO KAFKA-14126 add KRaft support - def testTrustStoreAlter(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTrustStoreAlter(quorum: String): Unit = { val producerBuilder = ProducerBuilder().listenerName(SecureInternal).securityProtocol(SecurityProtocol.SSL) // Producer with new keystore should fail to connect before truststore update @@ -467,9 +470,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup assertFalse(response.wasDisconnected(), "Request failed because broker is not available") } + val group_id = new AtomicInteger(1) + def next_group_name(): String = s"alter-truststore-${group_id.getAndIncrement()}" + // Produce/consume should work with old as well as new client keystore - verifySslProduceConsume(sslProperties1, "alter-truststore-1") - verifySslProduceConsume(sslProperties2, "alter-truststore-2") + verifySslProduceConsume(sslProperties1, next_group_name()) + verifySslProduceConsume(sslProperties2, next_group_name()) // Revert to old truststore with only one certificate and update. Clients should connect only with old keystore. val oldTruststoreProps = new Properties @@ -478,7 +484,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup reconfigureServers(oldTruststoreProps, perBrokerConfig = true, (s"$prefix$SSL_TRUSTSTORE_LOCATION_CONFIG", sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG))) verifyAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties2).build()) - verifySslProduceConsume(sslProperties1, "alter-truststore-3") + verifySslProduceConsume(sslProperties1, next_group_name()) // Update same truststore file to contain both certificates without changing any configs. // Clients should connect successfully with either keystore after admin client AlterConfigsRequest completes. @@ -486,8 +492,14 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup Paths.get(sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)), StandardCopyOption.REPLACE_EXISTING) TestUtils.incrementalAlterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get() - verifySslProduceConsume(sslProperties1, "alter-truststore-4") - verifySslProduceConsume(sslProperties2, "alter-truststore-5") + TestUtils.retry(30000) { + try { + verifySslProduceConsume(sslProperties1, next_group_name()) + verifySslProduceConsume(sslProperties2, next_group_name()) + } catch { + case t: Throwable => throw new AssertionError(t) + } + } // Update internal keystore/truststore and validate new client connections from broker (e.g. controller). // Alter internal keystore from `sslProperties1` to `sslProperties2`, force disconnect of a controller connection @@ -495,21 +507,23 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup val props2 = securityProps(sslProperties2, KEYSTORE_PROPS, prefix) props2 ++= securityProps(combinedStoreProps, TRUSTSTORE_PROPS, prefix) TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS) - verifySslProduceConsume(sslProperties2, "alter-truststore-6") + verifySslProduceConsume(sslProperties2, next_group_name()) props2 ++= securityProps(sslProperties2, TRUSTSTORE_PROPS, prefix) TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS) - verifySslProduceConsume(sslProperties2, "alter-truststore-7") + verifySslProduceConsume(sslProperties2, next_group_name()) waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1)) - val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer] - val controllerChannelManager = controller.kafkaController.controllerChannelManager - val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] = - JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo") - brokerStateInfo(0).networkClient.disconnect("0") - TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers) - - // validate that the brokerToController request works fine - verifyBrokerToControllerCall(controller) + if (!isKRaftTest()) { + val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer] + val controllerChannelManager = controller.kafkaController.controllerChannelManager + val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] = + JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo") + brokerStateInfo(0).networkClient.disconnect("0") + TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers) + + // validate that the brokerToController request works fine + verifyBrokerToControllerCall(controller) + } } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index a16361343b..746fdf1ffe 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigResource.Type; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.utils.LogContext; @@ -146,7 +147,8 @@ public class ConfigurationControlManager { } break; } - if (!Objects.equals(currentValue, newValue)) { + if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) { + // KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers newRecords.add(new ApiMessageAndVersion(new ConfigRecord(). setResourceType(configResource.type().id()). setResourceName(configResource.name()). @@ -233,7 +235,8 @@ public class ConfigurationControlManager { String key = entry.getKey(); String newValue = entry.getValue(); String currentValue = currentConfigs.get(key); - if (!Objects.equals(newValue, currentValue)) { + if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) { + // KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers newRecords.add(new ApiMessageAndVersion(new ConfigRecord(). setResourceType(configResource.type().id()). setResourceName(configResource.name()). @@ -297,7 +300,11 @@ public class ConfigurationControlManager { if (configs.isEmpty()) { configData.remove(configResource); } - log.info("{}: set configuration {} to {}", configResource, record.name(), record.value()); + if (configSchema.isSensitive(record)) { + log.info("{}: set configuration {} to {}", configResource, record.name(), Password.HIDDEN); + } else { + log.info("{}: set configuration {} to {}", configResource, record.name(), record.value()); + } } // VisibleForTesting
