This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1bf70399999 KAFKA-15098 Allow authorizers to be configured in ZK 
migration (#13895)
1bf70399999 is described below

commit 1bf703999999123c4ac7901ee45e5523be2236f8
Author: David Arthur <[email protected]>
AuthorDate: Thu Jun 22 09:34:49 2023 -0400

    KAFKA-15098 Allow authorizers to be configured in ZK migration (#13895)
    
    Reviewers: Ron Dagostino <[email protected]>
---
 core/src/main/scala/kafka/server/KafkaConfig.scala |  2 -
 .../kafka/zk/ZkMigrationIntegrationTest.scala      | 54 ++++++++++++++++++++++
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  7 +--
 .../metadata/migration/KRaftMigrationDriver.java   |  4 +-
 .../metadata/migration/MigrationDriverState.java   |  2 +-
 5 files changed, 59 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index ebc398822da..81b02600395 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -2305,8 +2305,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
     } else {
       // ZK-based
       if (migrationEnabled) {
-        require(!originals.containsKey(KafkaConfig.AuthorizerClassNameProp),
-          s"ZooKeeper migration does not yet support authorizers. Remove 
${KafkaConfig.AuthorizerClassNameProp} before performing a migration.")
         validateNonEmptyQuorumVotersForMigration()
         require(controllerListenerNames.nonEmpty,
           s"${KafkaConfig.ControllerListenerNamesProp} must not be empty when 
running in ZooKeeper migration mode: ${controllerListenerNames.asJava}")
diff --git 
a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 02bc1b18726..fc5da56721f 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -121,6 +121,60 @@ class ZkMigrationIntegrationTest {
     }
   }
 
+  @ClusterTest(
+    brokers = 3, clusterType = Type.ZK, autoStart = AutoStart.YES,
+    metadataVersion = MetadataVersion.IBP_3_4_IV0,
+    serverProperties = Array(
+      new ClusterConfigProperty(key = "authorizer.class.name", value = 
"kafka.security.authorizer.AclAuthorizer"),
+      new ClusterConfigProperty(key = "super.users", value = "User:ANONYMOUS"),
+      new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+      new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+      new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+      new ClusterConfigProperty(key = "listener.security.protocol.map", value 
= "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+    )
+  )
+  def testStartZkBrokerWithAuthorizer(zkCluster: ClusterInstance): Unit = {
+    // Bootstrap the ZK cluster ID into KRaft
+    val clusterId = zkCluster.clusterId()
+    val kraftCluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
+        setClusterId(Uuid.fromString(clusterId)).
+        setNumBrokerNodes(0).
+        setNumControllerNodes(1).build())
+      .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
+      .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+      .build()
+    try {
+      kraftCluster.format()
+      kraftCluster.startup()
+      val readyFuture = 
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
+
+      // Enable migration configs and restart brokers
+      log.info("Restart brokers in migration mode")
+      val clientProps = kraftCluster.controllerClientProperties()
+      val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
+      
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
+      
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, 
voters)
+      
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
+      
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+      zkCluster.rollingBrokerRestart() // This would throw if authorizers 
weren't allowed
+      zkCluster.waitForReadyBrokers()
+      readyFuture.get(30, TimeUnit.SECONDS)
+
+      val zkClient = 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
+      TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000), 
"Timed out waiting for KRaft controller to take over")
+
+      def inDualWrite(): Boolean = {
+        val migrationState = 
kraftCluster.controllers().get(3000).migrationSupport.get.migrationDriver.migrationState().get(10,
 TimeUnit.SECONDS)
+        migrationState.allowDualWrite()
+      }
+      TestUtils.waitUntilTrue(() => inDualWrite(), "Timed out waiting for 
dual-write mode")
+    } finally {
+      shutdownInSequence(zkCluster, kraftCluster)
+    }
+  }
+
   /**
    * Test ZkMigrationClient against a real ZooKeeper-backed Kafka cluster. 
This test creates a ZK cluster
    * and modifies data using AdminClient. The ZkMigrationClient is then used 
to read the metadata from ZK
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 40d1e99f1b8..1b969a49359 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1714,12 +1714,9 @@ class KafkaConfigTest {
     // All needed configs are now set
     KafkaConfig.fromProps(props)
 
-    // Don't allow migration startup with an authorizer set
+    // Check that we allow authorizer to be set
     props.setProperty(KafkaConfig.AuthorizerClassNameProp, 
classOf[AclAuthorizer].getCanonicalName)
-    assertEquals(
-      "requirement failed: ZooKeeper migration does not yet support 
authorizers. Remove authorizer.class.name before performing a migration.",
-      assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props)).getMessage)
-    props.remove(KafkaConfig.AuthorizerClassNameProp)
+    KafkaConfig.fromProps(props)
 
     // Don't allow migration startup with an older IBP
     props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, 
MetadataVersion.IBP_3_3_IV0.version())
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 0b8aa0341f9..de691fa36f9 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -145,7 +145,7 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
     }
 
     // Visible for testing
-    CompletableFuture<MigrationDriverState> migrationState() {
+    public CompletableFuture<MigrationDriverState> migrationState() {
         CompletableFuture<MigrationDriverState> stateFuture = new 
CompletableFuture<>();
         eventQueue.append(() -> stateFuture.complete(migrationState));
         return stateFuture;
@@ -328,7 +328,7 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
 
     /**
      * Construct and enqueue a {@link MetadataChangeEvent} with a given 
completion handler. In production use cases,
-     * this handler is a no-op. This method exists so we can add additional 
logic in our unit tests to wait for the
+     * this handler is a no-op. This method exists so that we can add 
additional logic in our unit tests to wait for the
      * enqueued event to finish executing.
      */
     void enqueueMetadataChangeEvent(
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java
index 2f354855640..07d318d1209 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java
@@ -55,7 +55,7 @@ public enum MigrationDriverState {
         this.allowDualWrite = allowDualWrite;
     }
 
-    boolean allowDualWrite() {
+    public boolean allowDualWrite() {
         return allowDualWrite;
     }
 }

Reply via email to