This is an automated email from the ASF dual-hosted git repository.

viktor pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new d13c1f652d0  KAFKA-19480: Recreate /migration when it has null value 
(#20627)
d13c1f652d0 is described below

commit d13c1f652d0866a1149f93c7e62760e1acba8ef3
Author: Viktor Somogyi-Vass <[email protected]>
AuthorDate: Tue Oct 7 12:12:41 2025 +0200

     KAFKA-19480: Recreate /migration when it has null value (#20627)
    
    When using the zookeeper-security-migration
    tool without the '–enable.path.check' option, the script not only
    updates the ACLs for the existing znodes, but also creates any
    non-existing ones (with the ACL options specified) using null values
    based on the list defined in
    `ZkData.SecureRootPaths`.
    This is especially problematic for the /migration znode as the current
    logic only checks for the existence of the znode and later the migration
    process will hang when it tries to parse the null value over and over
    again.
    
    In summary, the migration cannot be completed if the
    zookeeper-security-migration script was run previously, and the only
    workaround is to manually remove the /migration znode in such cases. I
    propose a simple fix to circumvent the manual step by recreating the
    /migration znode if it contains a null value.
    
    ---------
    
    Co-authored-by: Gergely Harmadas <[email protected]>
---
 .../scala/kafka/admin/ZkSecurityMigrator.scala     | 10 ++++--
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   | 20 ++++++++---
 .../kafka/security/auth/ZkAuthorizationTest.scala  | 42 +++++++++++++++++-----
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 15 ++++++++
 docs/ops.html                                      |  2 ++
 5 files changed, 74 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala 
b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index b5c54596b21..affac796610 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -21,7 +21,7 @@ import joptsimple.{OptionSet, OptionSpec, OptionSpecBuilder}
 import kafka.server.KafkaConfig
 import kafka.utils.{Exit, Logging, ToolsUtils}
 import kafka.utils.Implicits._
-import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, 
ZkSecurityMigratorUtils}
+import kafka.zk.{ControllerZNode, KafkaZkClient, MigrationZNode, ZkData, 
ZkSecurityMigratorUtils}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.server.config.ZkConfigs
@@ -260,12 +260,18 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends 
Logging {
   }
 
   private def run(enablePathCheck: Boolean): Unit = {
+    def skipSetAcl(path: String): Boolean = {
+      val isControllerPath = path == ControllerZNode.path
+      val isMigrationPath = path == MigrationZNode.path
+      (isControllerPath || isMigrationPath) && !zkClient.pathExists(path)
+    }
+    
     try {
       setAclIndividually("/")
       checkPathExistenceAndMaybeExit(enablePathCheck)
       for (path <- ZkData.SecureRootPaths) {
         debug("Going to set ACL for %s".format(path))
-        if (path == ControllerZNode.path && !zkClient.pathExists(path)) {
+        if (skipSetAcl(path)) {
           debug("Ignoring to set ACL for %s, because it doesn't 
exist".format(path))
         } else {
           zkClient.makeSurePersistentPathExists(path)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index cb22d3caccf..c447f22a142 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -1734,20 +1734,32 @@ class KafkaZkClient private[zk] (
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
     getDataResponse.resultCode match {
       case Code.OK =>
-        MigrationZNode.decode(getDataResponse.data, 
getDataResponse.stat.getVersion, getDataResponse.stat.getMtime)
+        Option(getDataResponse.data) match {
+          case Some(data) =>
+            MigrationZNode.decode(data, getDataResponse.stat.getVersion, 
getDataResponse.stat.getMtime)
+          case None =>
+            info("Migration znode exists with null data, recreating initial 
migration state")           
+            createInitialMigrationState(initialState, removeFirst = true)
+        }
       case Code.NONODE =>
         createInitialMigrationState(initialState)
       case _ => throw getDataResponse.resultException.get
     }
   }
 
-  private def createInitialMigrationState(initialState: 
ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
-    val createRequest = CreateRequest(
+  private def createInitialMigrationState(initialState: 
ZkMigrationLeadershipState, removeFirst: Boolean = false): 
ZkMigrationLeadershipState = {
+    val createOp = CreateOp(
       MigrationZNode.path,
       MigrationZNode.encode(initialState),
       defaultAcls(MigrationZNode.path),
       CreateMode.PERSISTENT)
-    val response = retryRequestUntilConnected(createRequest)
+    val deleteOp = DeleteOp(MigrationZNode.path, ZkVersion.MatchAnyVersion)
+    val multi = if (removeFirst) {
+      MultiRequest(Seq(deleteOp, createOp))
+    } else {
+      MultiRequest(Seq(createOp))
+    }
+    val response = retryRequestUntilConnected(multi)
     response.maybeThrow()
     initialState.withMigrationZkVersion(0)
   }
diff --git 
a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala 
b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 4f5bec4d527..0fcf5cc1a94 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -38,6 +38,8 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.zookeeper.client.ZKClientConfig
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
@@ -147,11 +149,12 @@ class ZkAuthorizationTest extends QuorumTestHarness with 
Logging {
    * Tests the migration tool when making an unsecure
    * cluster secure.
    */
-  @Test
-  def testZkMigration(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(false, true))
+  def testZkMigration(includeAllZnodes: Boolean): Unit = {
     val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false)
     try {
-      testMigration(zkConnect, unsecureZkClient, zkClient)
+      testMigration(zkConnect, unsecureZkClient, zkClient, includeAllZnodes)
     } finally {
       unsecureZkClient.close()
     }
@@ -161,11 +164,12 @@ class ZkAuthorizationTest extends QuorumTestHarness with 
Logging {
    * Tests the migration tool when making a secure
    * cluster unsecure.
    */
-  @Test
-  def testZkAntiMigration(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(false, true))
+  def testZkAntiMigration(includeAllZnodes: Boolean): Unit = {
     val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false)
     try {
-      testMigration(zkConnect, zkClient, unsecureZkClient)
+      testMigration(zkConnect, zkClient, unsecureZkClient, includeAllZnodes)
     } finally {
       unsecureZkClient.close()
     }
@@ -218,9 +222,17 @@ class ZkAuthorizationTest extends QuorumTestHarness with 
Logging {
    * Exercises the migration tool. It is used in these test cases:
    * testZkMigration, testZkAntiMigration, testChroot.
    */
-  private def testMigration(zkUrl: String, firstZk: KafkaZkClient, secondZk: 
KafkaZkClient): Unit = {
+  private def testMigration(
+                             zkUrl: String,
+                             firstZk: KafkaZkClient,
+                             secondZk: KafkaZkClient,
+                             includeAllZnodes: Boolean = true): Unit = {
     info(s"zkConnect string: $zkUrl")
-    for (path <- ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths) {
+    // Optionally do not create controller and migration znodes
+    val predicate: String => Boolean = if (includeAllZnodes) _ => true else 
skipCreateZnodes
+    val paths = (ZkData.SecureRootPaths ++ 
ZkData.SensitiveRootPaths).filter(predicate)
+    
+    for (path <- paths) {
       info(s"Creating $path")
       firstZk.makeSurePersistentPathExists(path)
       // Create a child for each znode to exercise the recurrent
@@ -241,7 +253,7 @@ class ZkAuthorizationTest extends QuorumTestHarness with 
Logging {
       }
     ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", 
s"--zookeeper.connect=$zkUrl"))
     info("Done with migration")
-    for (path <- ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths) {
+    for (path <- paths) {
       val sensitive = ZkData.sensitivePath(path)
       val listParent = secondZk.getAcl(path)
       assertTrue(isAclCorrect(listParent, secondZk.secure, sensitive), path)
@@ -257,6 +269,18 @@ class ZkAuthorizationTest extends QuorumTestHarness with 
Logging {
       ZkData.sensitivePath(ExtendedAclZNode.path)), "/kafka-acl-extended")
     assertTrue(isAclCorrect(firstZk.getAcl("/feature"), secondZk.secure,
       ZkData.sensitivePath(FeatureZNode.path)), "ACL mismatch for /feature 
path")
+
+    if (!includeAllZnodes) {
+      // Check controller and migration znodes should not be created
+      assertFalse(firstZk.pathExists(ControllerZNode.path))
+      assertFalse(firstZk.pathExists(MigrationZNode.path))
+    }
+  }
+
+  private def skipCreateZnodes(path: String): Boolean = {
+    val isNotControllerPath = path != ControllerZNode.path
+    val isNotMigrationPath = path != MigrationZNode.path
+    isNotControllerPath && isNotMigrationPath
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index cef3dad7ac8..b3041dd0807 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -1442,6 +1442,21 @@ class KafkaZkClientTest extends QuorumTestHarness {
     } finally System.clearProperty(ZKConfig.JUTE_MAXBUFFER)
   }
 
+  @Test
+  def testMigrationZnodeWithNullValue(): Unit = {
+    val (controllerEpoch, stat) = zkClient.getControllerEpoch.get
+    var migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42, 
Time.SYSTEM.milliseconds(), -1, controllerEpoch, stat.getVersion)
+    zkClient.retryRequestUntilConnected(CreateRequest(
+      MigrationZNode.path,
+      null,
+      zkClient.defaultAcls(MigrationZNode.path),
+      CreateMode.PERSISTENT))
+    
+    migrationState = zkClient.getOrCreateMigrationState(migrationState)
+
+    assertEquals(0, migrationState.migrationZkVersion())
+  }
+
   @Test
   def testFailToUpdateMigrationZNode(): Unit = {
     val (controllerEpoch, stat) = zkClient.getControllerEpoch.get
diff --git a/docs/ops.html b/docs/ops.html
index e8309187a56..f5f95ffaf44 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -4064,6 +4064,8 @@ inter.broker.listener.name=PLAINTEXT
 
   <p>The new standalone controller in the example configuration above should 
be formatted using the <code>kafka-storage format 
--standalone</code>command.</p>
 
+  <p>Note: The migration can stall if the <a 
href="#zk_authz_migration">ZooKeeper Security Migration Tool</a> was previously 
executed (fixed from 3.9.2, see <a 
href="https://issues.apache.org/jira/browse/KAFKA-19480";>KAFKA-19026</a> for 
more details). As a workaround, the malformed "/migration" node can be removed 
from ZooKeeper by running <code>delete /migration</code> with the 
<code>zookeeper-shell.sh</code> CLI tool.</p>
+
   <p><em>Note: The KRaft cluster <code>node.id</code> values must be different 
from any existing ZK broker <code>broker.id</code>.
   In KRaft-mode, the brokers and controllers share the same Node ID 
namespace.</em></p>
 

Reply via email to