This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c66c412862f MINOR: fix authorizer reconfiguration in KRaft mode
(#13360)
c66c412862f is described below
commit c66c412862f9c83d2210a0c07258415de808566b
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Wed Mar 8 14:14:52 2023 -0800
MINOR: fix authorizer reconfiguration in KRaft mode (#13360)
Fix a bug with authorizer reconfiguration in KRaft mode. The bug
happened because we were invoking DynamicBrokerConfig.addReconfigurables
before initializing BrokerServer.authorizer. Add a test of reconfiguring
the Authorizer. Also, in testReconfigureControllerClientQuotas, test
both combined and isolated mode.
Reviewers: Ron Dagostino <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 6 +-
.../kafka/server/KRaftClusterTest.scala | 115 ++++++++++++++++++++-
2 files changed, 115 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 232393c4d37..ae7767c9d06 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -312,9 +312,6 @@ class BrokerServer(
config, Some(clientToControllerChannelManager), None, None,
groupCoordinator, transactionCoordinator)
- /* Add all reconfigurables for config change notification before
starting the metadata listener */
- config.dynamicConfig.addReconfigurables(this)
-
dynamicConfigHandlers = Map[String, ConfigHandler](
ConfigType.Topic -> new TopicConfigHandler(logManager, config,
quotaManagers, None),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
@@ -461,6 +458,9 @@ class BrokerServer(
sharedServer.initialBrokerMetadataLoadFaultHandler,
sharedServer.metadataPublishingFaultHandler)
+ // Add all reconfigurables for config change notification before
installing the metadata publisher.
+ config.dynamicConfig.addReconfigurables(this)
+
// Tell the metadata listener to start publishing its output, and wait
for the first
// publish operation to complete. This first operation will initialize
logManager,
// replicaManager, groupCoordinator, and txnCoordinator. The log manager
may perform
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index aee0c182ec2..e5783374024 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -24,7 +24,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
-import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.{ConfigException, ConfigResource}
import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.message.DescribeClusterRequestData
import org.apache.kafka.common.network.ListenerName
@@ -49,7 +49,8 @@ import org.slf4j.LoggerFactory
import java.io.File
import java.nio.file.{FileSystems, Path}
import java.{lang, util}
-import java.util.concurrent.CompletionStage
+import java.util.concurrent.{CompletableFuture, CompletionStage}
+import java.util.concurrent.atomic.AtomicInteger
import java.util.{Arrays, Collections, Optional, OptionalLong, Properties}
import scala.annotation.nowarn
import scala.collection.mutable
@@ -974,7 +975,7 @@ class KRaftClusterTest {
}
@ParameterizedTest
- @ValueSource(booleans = Array(true))
+ @ValueSource(booleans = Array(false, true))
def testReconfigureControllerClientQuotas(combinedController: Boolean): Unit
= {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
@@ -1014,6 +1015,47 @@ class KRaftClusterTest {
cluster.close()
}
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(false, true))
+ def testReconfigureControllerAuthorizer(combinedMode: Boolean): Unit = {
+ val cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setCoResident(combinedMode).
+ setNumControllerNodes(1).build()).
+ setConfigProp("authorizer.class.name",
classOf[FakeConfigurableAuthorizer].getName).
+ build()
+
+ def assertFoobarValue(expected: Int): Unit = {
+ TestUtils.retry(60000) {
+ assertEquals(expected,
cluster.controllers().values().iterator().next().
+ authorizer.get.asInstanceOf[FakeConfigurableAuthorizer].foobar.get())
+ assertEquals(expected, cluster.brokers().values().iterator().next().
+ authorizer.get.asInstanceOf[FakeConfigurableAuthorizer].foobar.get())
+ }
+ }
+
+ try {
+ cluster.format()
+ cluster.startup()
+ cluster.waitForReadyBrokers()
+ assertFoobarValue(0)
+ val admin = Admin.create(cluster.clientProperties())
+ try {
+ admin.incrementalAlterConfigs(
+ Collections.singletonMap(new ConfigResource(Type.BROKER, ""),
+ Collections.singletonList(new AlterConfigOp(
+ new ConfigEntry(FakeConfigurableAuthorizer.foobarConfigKey,
"123"), OpType.SET)))).
+ all().get()
+ } finally {
+ admin.close()
+ }
+ assertFoobarValue(123)
+ } finally {
+ cluster.close()
+ }
+ }
}
class BadAuthorizer() extends Authorizer {
@@ -1070,3 +1112,70 @@ class DummyClientQuotaCallback() extends
ClientQuotaCallback with Reconfigurable
override def reconfigure(configs: util.Map[String, _]): Unit =
configure(configs)
}
+object FakeConfigurableAuthorizer {
+ val foobarConfigKey = "fake.configurable.authorizer.foobar.config"
+
+ def fakeConfigurableAuthorizerConfigToInt(configs: util.Map[String, _]): Int
= {
+ val result = configs.get(foobarConfigKey)
+ if (result == null) {
+ 0
+ } else {
+ val resultString = result.toString().trim()
+ try {
+ Integer.valueOf(resultString)
+ } catch {
+ case e: NumberFormatException => throw new ConfigException(s"Bad value
of ${foobarConfigKey}: ${resultString}")
+ }
+ }
+ }
+}
+
+class FakeConfigurableAuthorizer() extends Authorizer with Reconfigurable {
+ import FakeConfigurableAuthorizer._
+
+ val foobar = new AtomicInteger(0)
+
+ override def start(serverInfo: AuthorizerServerInfo):
java.util.Map[Endpoint, _ <: CompletionStage[Void]] = {
+ serverInfo.endpoints().asScala.map(e => e -> {
+ val future = new CompletableFuture[Void]
+ future.complete(null)
+ future
+ }).toMap.asJava
+ }
+
+ override def reconfigurableConfigs(): java.util.Set[String] =
Set(foobarConfigKey).asJava
+
+ override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
+ fakeConfigurableAuthorizerConfigToInt(configs)
+ }
+
+ override def reconfigure(configs: util.Map[String, _]): Unit = {
+ foobar.set(fakeConfigurableAuthorizerConfigToInt(configs))
+ }
+
+ override def authorize(requestContext: AuthorizableRequestContext, actions:
util.List[Action]): util.List[AuthorizationResult] = {
+ actions.asScala.map(_ => AuthorizationResult.ALLOWED).toList.asJava
+ }
+
+ override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] =
List[AclBinding]().asJava
+
+ override def close(): Unit = {}
+
+ override def configure(configs: util.Map[String, _]): Unit = {
+ foobar.set(fakeConfigurableAuthorizerConfigToInt(configs))
+ }
+
+ override def createAcls(
+ requestContext: AuthorizableRequestContext,
+ aclBindings: util.List[AclBinding]
+ ): util.List[_ <: CompletionStage[AclCreateResult]] = {
+ Collections.emptyList()
+ }
+
+ override def deleteAcls(
+ requestContext: AuthorizableRequestContext,
+ aclBindingFilters: util.List[AclBindingFilter]
+ ): util.List[_ <: CompletionStage[AclDeleteResult]] = {
+ Collections.emptyList()
+ }
+}