This is an automated email from the ASF dual-hosted git repository.
viktor pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new d13c1f652d0 KAFKA-19480: Recreate /migration when it has null value
(#20627)
d13c1f652d0 is described below
commit d13c1f652d0866a1149f93c7e62760e1acba8ef3
Author: Viktor Somogyi-Vass <[email protected]>
AuthorDate: Tue Oct 7 12:12:41 2025 +0200
KAFKA-19480: Recreate /migration when it has null value (#20627)
When using the zookeeper-security-migration
tool without the '–enable.path.check' option, the script not only
updates the ACLs for the existing znodes, but also creates any
non-existing ones (with the ACL options specified) using null values
based on the list defined in
`ZkData.SecureRootPaths`.
This is especially problematic for the /migration znode as the current
logic only checks for the existence of the znode and later the migration
process will hang when it tries to parse the null value over and over
again.
In summary, the migration cannot be completed if the
zookeeper-security-migration script was run previously, and the only
workaround is to manually remove the /migration znode in such cases. I
propose a simple fix to circumvent the manual step by recreating the
/migration znode if it contains a null value.
---------
Co-authored-by: Gergely Harmadas <[email protected]>
---
.../scala/kafka/admin/ZkSecurityMigrator.scala | 10 ++++--
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 20 ++++++++---
.../kafka/security/auth/ZkAuthorizationTest.scala | 42 +++++++++++++++++-----
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 15 ++++++++
docs/ops.html | 2 ++
5 files changed, 74 insertions(+), 15 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index b5c54596b21..affac796610 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -21,7 +21,7 @@ import joptsimple.{OptionSet, OptionSpec, OptionSpecBuilder}
import kafka.server.KafkaConfig
import kafka.utils.{Exit, Logging, ToolsUtils}
import kafka.utils.Implicits._
-import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData,
ZkSecurityMigratorUtils}
+import kafka.zk.{ControllerZNode, KafkaZkClient, MigrationZNode, ZkData,
ZkSecurityMigratorUtils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.config.ZkConfigs
@@ -260,12 +260,18 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends
Logging {
}
private def run(enablePathCheck: Boolean): Unit = {
+ def skipSetAcl(path: String): Boolean = {
+ val isControllerPath = path == ControllerZNode.path
+ val isMigrationPath = path == MigrationZNode.path
+ (isControllerPath || isMigrationPath) && !zkClient.pathExists(path)
+ }
+
try {
setAclIndividually("/")
checkPathExistenceAndMaybeExit(enablePathCheck)
for (path <- ZkData.SecureRootPaths) {
debug("Going to set ACL for %s".format(path))
- if (path == ControllerZNode.path && !zkClient.pathExists(path)) {
+ if (skipSetAcl(path)) {
debug("Ignoring to set ACL for %s, because it doesn't
exist".format(path))
} else {
zkClient.makeSurePersistentPathExists(path)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index cb22d3caccf..c447f22a142 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -1734,20 +1734,32 @@ class KafkaZkClient private[zk] (
val getDataResponse = retryRequestUntilConnected(getDataRequest)
getDataResponse.resultCode match {
case Code.OK =>
- MigrationZNode.decode(getDataResponse.data,
getDataResponse.stat.getVersion, getDataResponse.stat.getMtime)
+ Option(getDataResponse.data) match {
+ case Some(data) =>
+ MigrationZNode.decode(data, getDataResponse.stat.getVersion,
getDataResponse.stat.getMtime)
+ case None =>
+ info("Migration znode exists with null data, recreating initial
migration state")
+ createInitialMigrationState(initialState, removeFirst = true)
+ }
case Code.NONODE =>
createInitialMigrationState(initialState)
case _ => throw getDataResponse.resultException.get
}
}
- private def createInitialMigrationState(initialState:
ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
- val createRequest = CreateRequest(
+ private def createInitialMigrationState(initialState:
ZkMigrationLeadershipState, removeFirst: Boolean = false):
ZkMigrationLeadershipState = {
+ val createOp = CreateOp(
MigrationZNode.path,
MigrationZNode.encode(initialState),
defaultAcls(MigrationZNode.path),
CreateMode.PERSISTENT)
- val response = retryRequestUntilConnected(createRequest)
+ val deleteOp = DeleteOp(MigrationZNode.path, ZkVersion.MatchAnyVersion)
+ val multi = if (removeFirst) {
+ MultiRequest(Seq(deleteOp, createOp))
+ } else {
+ MultiRequest(Seq(createOp))
+ }
+ val response = retryRequestUntilConnected(multi)
response.maybeThrow()
initialState.withMigrationZkVersion(0)
}
diff --git
a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 4f5bec4d527..0fcf5cc1a94 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -38,6 +38,8 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.common.MetadataVersion
import org.apache.zookeeper.client.ZKClientConfig
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
import scala.collection.Seq
@@ -147,11 +149,12 @@ class ZkAuthorizationTest extends QuorumTestHarness with
Logging {
* Tests the migration tool when making an unsecure
* cluster secure.
*/
- @Test
- def testZkMigration(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(false, true))
+ def testZkMigration(includeAllZnodes: Boolean): Unit = {
val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false)
try {
- testMigration(zkConnect, unsecureZkClient, zkClient)
+ testMigration(zkConnect, unsecureZkClient, zkClient, includeAllZnodes)
} finally {
unsecureZkClient.close()
}
@@ -161,11 +164,12 @@ class ZkAuthorizationTest extends QuorumTestHarness with
Logging {
* Tests the migration tool when making a secure
* cluster unsecure.
*/
- @Test
- def testZkAntiMigration(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(false, true))
+ def testZkAntiMigration(includeAllZnodes: Boolean): Unit = {
val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false)
try {
- testMigration(zkConnect, zkClient, unsecureZkClient)
+ testMigration(zkConnect, zkClient, unsecureZkClient, includeAllZnodes)
} finally {
unsecureZkClient.close()
}
@@ -218,9 +222,17 @@ class ZkAuthorizationTest extends QuorumTestHarness with
Logging {
* Exercises the migration tool. It is used in these test cases:
* testZkMigration, testZkAntiMigration, testChroot.
*/
- private def testMigration(zkUrl: String, firstZk: KafkaZkClient, secondZk:
KafkaZkClient): Unit = {
+ private def testMigration(
+ zkUrl: String,
+ firstZk: KafkaZkClient,
+ secondZk: KafkaZkClient,
+ includeAllZnodes: Boolean = true): Unit = {
info(s"zkConnect string: $zkUrl")
- for (path <- ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths) {
+ // Optionally do not create controller and migration znodes
+ val predicate: String => Boolean = if (includeAllZnodes) _ => true else
skipCreateZnodes
+ val paths = (ZkData.SecureRootPaths ++
ZkData.SensitiveRootPaths).filter(predicate)
+
+ for (path <- paths) {
info(s"Creating $path")
firstZk.makeSurePersistentPathExists(path)
// Create a child for each znode to exercise the recurrent
@@ -241,7 +253,7 @@ class ZkAuthorizationTest extends QuorumTestHarness with
Logging {
}
ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt",
s"--zookeeper.connect=$zkUrl"))
info("Done with migration")
- for (path <- ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths) {
+ for (path <- paths) {
val sensitive = ZkData.sensitivePath(path)
val listParent = secondZk.getAcl(path)
assertTrue(isAclCorrect(listParent, secondZk.secure, sensitive), path)
@@ -257,6 +269,18 @@ class ZkAuthorizationTest extends QuorumTestHarness with
Logging {
ZkData.sensitivePath(ExtendedAclZNode.path)), "/kafka-acl-extended")
assertTrue(isAclCorrect(firstZk.getAcl("/feature"), secondZk.secure,
ZkData.sensitivePath(FeatureZNode.path)), "ACL mismatch for /feature
path")
+
+ if (!includeAllZnodes) {
+ // Check controller and migration znodes should not be created
+ assertFalse(firstZk.pathExists(ControllerZNode.path))
+ assertFalse(firstZk.pathExists(MigrationZNode.path))
+ }
+ }
+
+ private def skipCreateZnodes(path: String): Boolean = {
+ val isNotControllerPath = path != ControllerZNode.path
+ val isNotMigrationPath = path != MigrationZNode.path
+ isNotControllerPath && isNotMigrationPath
}
/**
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index cef3dad7ac8..b3041dd0807 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -1442,6 +1442,21 @@ class KafkaZkClientTest extends QuorumTestHarness {
} finally System.clearProperty(ZKConfig.JUTE_MAXBUFFER)
}
+ @Test
+ def testMigrationZnodeWithNullValue(): Unit = {
+ val (controllerEpoch, stat) = zkClient.getControllerEpoch.get
+ var migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42,
Time.SYSTEM.milliseconds(), -1, controllerEpoch, stat.getVersion)
+ zkClient.retryRequestUntilConnected(CreateRequest(
+ MigrationZNode.path,
+ null,
+ zkClient.defaultAcls(MigrationZNode.path),
+ CreateMode.PERSISTENT))
+
+ migrationState = zkClient.getOrCreateMigrationState(migrationState)
+
+ assertEquals(0, migrationState.migrationZkVersion())
+ }
+
@Test
def testFailToUpdateMigrationZNode(): Unit = {
val (controllerEpoch, stat) = zkClient.getControllerEpoch.get
diff --git a/docs/ops.html b/docs/ops.html
index e8309187a56..f5f95ffaf44 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -4064,6 +4064,8 @@ inter.broker.listener.name=PLAINTEXT
<p>The new standalone controller in the example configuration above should
be formatted using the <code>kafka-storage format
--standalone</code>command.</p>
+ <p>Note: The migration can stall if the <a
href="#zk_authz_migration">ZooKeeper Security Migration Tool</a> was previously
executed (fixed from 3.9.2, see <a
href="https://issues.apache.org/jira/browse/KAFKA-19480">KAFKA-19026</a> for
more details). As a workaround, the malformed "/migration" node can be removed
from ZooKeeper by running <code>delete /migration</code> with the
<code>zookeeper-shell.sh</code> CLI tool.</p>
+
<p><em>Note: The KRaft cluster <code>node.id</code> values must be different
from any existing ZK broker <code>broker.id</code>.
In KRaft-mode, the brokers and controllers share the same Node ID
namespace.</em></p>