This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c66c412862f MINOR: fix authorizer reconfiguration in KRaft mode 
(#13360)
c66c412862f is described below

commit c66c412862f9c83d2210a0c07258415de808566b
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Wed Mar 8 14:14:52 2023 -0800

    MINOR: fix authorizer reconfiguration in KRaft mode (#13360)
    
    Fix a bug with authorizer reconfiguration in KRaft mode. The bug
    happened because we were invoking DynamicBrokerConfig.addReconfigurables
    before initializing BrokerServer.authorizer. Add a test of reconfiguring
    the Authorizer. Also, in testReconfigureControllerClientQuotas, test
    both combined and isolated mode.
    
    Reviewers: Ron Dagostino <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |   6 +-
 .../kafka/server/KRaftClusterTest.scala            | 115 ++++++++++++++++++++-
 2 files changed, 115 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 232393c4d37..ae7767c9d06 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -312,9 +312,6 @@ class BrokerServer(
         config, Some(clientToControllerChannelManager), None, None,
         groupCoordinator, transactionCoordinator)
 
-      /* Add all reconfigurables for config change notification before 
starting the metadata listener */
-      config.dynamicConfig.addReconfigurables(this)
-
       dynamicConfigHandlers = Map[String, ConfigHandler](
         ConfigType.Topic -> new TopicConfigHandler(logManager, config, 
quotaManagers, None),
         ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
@@ -461,6 +458,9 @@ class BrokerServer(
         sharedServer.initialBrokerMetadataLoadFaultHandler,
         sharedServer.metadataPublishingFaultHandler)
 
+      // Add all reconfigurables for config change notification before 
installing the metadata publisher.
+      config.dynamicConfig.addReconfigurables(this)
+
       // Tell the metadata listener to start publishing its output, and wait 
for the first
       // publish operation to complete. This first operation will initialize 
logManager,
       // replicaManager, groupCoordinator, and txnCoordinator. The log manager 
may perform
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index aee0c182ec2..e5783374024 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -24,7 +24,7 @@ import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
-import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.{ConfigException, ConfigResource}
 import org.apache.kafka.common.config.ConfigResource.Type
 import org.apache.kafka.common.message.DescribeClusterRequestData
 import org.apache.kafka.common.network.ListenerName
@@ -49,7 +49,8 @@ import org.slf4j.LoggerFactory
 import java.io.File
 import java.nio.file.{FileSystems, Path}
 import java.{lang, util}
-import java.util.concurrent.CompletionStage
+import java.util.concurrent.{CompletableFuture, CompletionStage}
+import java.util.concurrent.atomic.AtomicInteger
 import java.util.{Arrays, Collections, Optional, OptionalLong, Properties}
 import scala.annotation.nowarn
 import scala.collection.mutable
@@ -974,7 +975,7 @@ class KRaftClusterTest {
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = Array(true))
+  @ValueSource(booleans = Array(false, true))
   def testReconfigureControllerClientQuotas(combinedController: Boolean): Unit 
= {
     val cluster = new KafkaClusterTestKit.Builder(
       new TestKitNodes.Builder().
@@ -1014,6 +1015,47 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(false, true))
+  def testReconfigureControllerAuthorizer(combinedMode: Boolean): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(1).
+        setCoResident(combinedMode).
+        setNumControllerNodes(1).build()).
+      setConfigProp("authorizer.class.name", 
classOf[FakeConfigurableAuthorizer].getName).
+      build()
+
+    def assertFoobarValue(expected: Int): Unit = {
+      TestUtils.retry(60000) {
+        assertEquals(expected, 
cluster.controllers().values().iterator().next().
+          authorizer.get.asInstanceOf[FakeConfigurableAuthorizer].foobar.get())
+        assertEquals(expected, cluster.brokers().values().iterator().next().
+          authorizer.get.asInstanceOf[FakeConfigurableAuthorizer].foobar.get())
+      }
+    }
+
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      assertFoobarValue(0)
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        admin.incrementalAlterConfigs(
+          Collections.singletonMap(new ConfigResource(Type.BROKER, ""),
+            Collections.singletonList(new AlterConfigOp(
+              new ConfigEntry(FakeConfigurableAuthorizer.foobarConfigKey, 
"123"), OpType.SET)))).
+          all().get()
+      } finally {
+        admin.close()
+      }
+      assertFoobarValue(123)
+    } finally {
+      cluster.close()
+    }
+  }
 }
 
 class BadAuthorizer() extends Authorizer {
@@ -1070,3 +1112,70 @@ class DummyClientQuotaCallback() extends 
ClientQuotaCallback with Reconfigurable
   override def reconfigure(configs: util.Map[String, _]): Unit = 
configure(configs)
 }
 
+object FakeConfigurableAuthorizer {
+  val foobarConfigKey = "fake.configurable.authorizer.foobar.config"
+
+  def fakeConfigurableAuthorizerConfigToInt(configs: util.Map[String, _]): Int 
= {
+    val result = configs.get(foobarConfigKey)
+    if (result == null) {
+      0
+    } else {
+      val resultString = result.toString().trim()
+      try {
+        Integer.valueOf(resultString)
+      } catch {
+        case e: NumberFormatException => throw new ConfigException(s"Bad value 
of ${foobarConfigKey}: ${resultString}")
+      }
+    }
+  }
+}
+
+class FakeConfigurableAuthorizer() extends Authorizer with Reconfigurable {
+  import FakeConfigurableAuthorizer._
+
+  val foobar = new AtomicInteger(0)
+
+  override def start(serverInfo: AuthorizerServerInfo): 
java.util.Map[Endpoint, _ <: CompletionStage[Void]] = {
+    serverInfo.endpoints().asScala.map(e => e -> {
+      val future = new CompletableFuture[Void]
+      future.complete(null)
+      future
+    }).toMap.asJava
+  }
+
+  override def reconfigurableConfigs(): java.util.Set[String] = 
Set(foobarConfigKey).asJava
+
+  override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
+    fakeConfigurableAuthorizerConfigToInt(configs)
+  }
+
+  override def reconfigure(configs: util.Map[String, _]): Unit = {
+    foobar.set(fakeConfigurableAuthorizerConfigToInt(configs))
+  }
+
+  override def authorize(requestContext: AuthorizableRequestContext, actions: 
util.List[Action]): util.List[AuthorizationResult] = {
+    actions.asScala.map(_ => AuthorizationResult.ALLOWED).toList.asJava
+  }
+
+  override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = 
List[AclBinding]().asJava
+
+  override def close(): Unit = {}
+
+  override def configure(configs: util.Map[String, _]): Unit = {
+    foobar.set(fakeConfigurableAuthorizerConfigToInt(configs))
+  }
+
+  override def createAcls(
+    requestContext: AuthorizableRequestContext,
+    aclBindings: util.List[AclBinding]
+  ): util.List[_ <: CompletionStage[AclCreateResult]] = {
+    Collections.emptyList()
+  }
+
+  override def deleteAcls(
+    requestContext: AuthorizableRequestContext,
+    aclBindingFilters: util.List[AclBindingFilter]
+  ): util.List[_ <: CompletionStage[AclDeleteResult]] = {
+    Collections.emptyList()
+  }
+}

Reply via email to