This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1bf70399999 KAFKA-15098 Allow authorizers to be configured in ZK
migration (#13895)
1bf70399999 is described below
commit 1bf703999999123c4ac7901ee45e5523be2236f8
Author: David Arthur <[email protected]>
AuthorDate: Thu Jun 22 09:34:49 2023 -0400
KAFKA-15098 Allow authorizers to be configured in ZK migration (#13895)
Reviewers: Ron Dagostino <[email protected]>
---
core/src/main/scala/kafka/server/KafkaConfig.scala | 2 -
.../kafka/zk/ZkMigrationIntegrationTest.scala | 54 ++++++++++++++++++++++
.../scala/unit/kafka/server/KafkaConfigTest.scala | 7 +--
.../metadata/migration/KRaftMigrationDriver.java | 4 +-
.../metadata/migration/MigrationDriverState.java | 2 +-
5 files changed, 59 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index ebc398822da..81b02600395 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -2305,8 +2305,6 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
} else {
// ZK-based
if (migrationEnabled) {
- require(!originals.containsKey(KafkaConfig.AuthorizerClassNameProp),
- s"ZooKeeper migration does not yet support authorizers. Remove
${KafkaConfig.AuthorizerClassNameProp} before performing a migration.")
validateNonEmptyQuorumVotersForMigration()
require(controllerListenerNames.nonEmpty,
s"${KafkaConfig.ControllerListenerNamesProp} must not be empty when
running in ZooKeeper migration mode: ${controllerListenerNames.asJava}")
diff --git
a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 02bc1b18726..fc5da56721f 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -121,6 +121,60 @@ class ZkMigrationIntegrationTest {
}
}
+ @ClusterTest(
+ brokers = 3, clusterType = Type.ZK, autoStart = AutoStart.YES,
+ metadataVersion = MetadataVersion.IBP_3_4_IV0,
+ serverProperties = Array(
+ new ClusterConfigProperty(key = "authorizer.class.name", value =
"kafka.security.authorizer.AclAuthorizer"),
+ new ClusterConfigProperty(key = "super.users", value = "User:ANONYMOUS"),
+ new ClusterConfigProperty(key = "inter.broker.listener.name", value =
"EXTERNAL"),
+ new ClusterConfigProperty(key = "listeners", value =
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+ new ClusterConfigProperty(key = "advertised.listeners", value =
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+ new ClusterConfigProperty(key = "listener.security.protocol.map", value
= "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+ )
+ )
+ def testStartZkBrokerWithAuthorizer(zkCluster: ClusterInstance): Unit = {
+ // Bootstrap the ZK cluster ID into KRaft
+ val clusterId = zkCluster.clusterId()
+ val kraftCluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
+ setClusterId(Uuid.fromString(clusterId)).
+ setNumBrokerNodes(0).
+ setNumControllerNodes(1).build())
+ .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
+ .setConfigProp(KafkaConfig.ZkConnectProp,
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+ .build()
+ try {
+ kraftCluster.format()
+ kraftCluster.startup()
+ val readyFuture =
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
+
+ // Enable migration configs and restart brokers
+ log.info("Restart brokers in migration mode")
+ val clientProps = kraftCluster.controllerClientProperties()
+ val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
+
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp,
"true")
+
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG,
voters)
+
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
"CONTROLLER")
+
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp,
"CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+ zkCluster.rollingBrokerRestart() // This would throw if authorizers
weren't allowed
+ zkCluster.waitForReadyBrokers()
+ readyFuture.get(30, TimeUnit.SECONDS)
+
+ val zkClient =
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
+ TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000),
"Timed out waiting for KRaft controller to take over")
+
+ def inDualWrite(): Boolean = {
+ val migrationState =
kraftCluster.controllers().get(3000).migrationSupport.get.migrationDriver.migrationState().get(10,
TimeUnit.SECONDS)
+ migrationState.allowDualWrite()
+ }
+ TestUtils.waitUntilTrue(() => inDualWrite(), "Timed out waiting for
dual-write mode")
+ } finally {
+ shutdownInSequence(zkCluster, kraftCluster)
+ }
+ }
+
/**
* Test ZkMigrationClient against a real ZooKeeper-backed Kafka cluster.
This test creates a ZK cluster
* and modifies data using AdminClient. The ZkMigrationClient is then used
to read the metadata from ZK
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 40d1e99f1b8..1b969a49359 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1714,12 +1714,9 @@ class KafkaConfigTest {
// All needed configs are now set
KafkaConfig.fromProps(props)
- // Don't allow migration startup with an authorizer set
+ // Check that we allow authorizer to be set
props.setProperty(KafkaConfig.AuthorizerClassNameProp,
classOf[AclAuthorizer].getCanonicalName)
- assertEquals(
- "requirement failed: ZooKeeper migration does not yet support
authorizers. Remove authorizer.class.name before performing a migration.",
- assertThrows(classOf[IllegalArgumentException], () =>
KafkaConfig.fromProps(props)).getMessage)
- props.remove(KafkaConfig.AuthorizerClassNameProp)
+ KafkaConfig.fromProps(props)
// Don't allow migration startup with an older IBP
props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp,
MetadataVersion.IBP_3_3_IV0.version())
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 0b8aa0341f9..de691fa36f9 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -145,7 +145,7 @@ public class KRaftMigrationDriver implements
MetadataPublisher {
}
// Visible for testing
- CompletableFuture<MigrationDriverState> migrationState() {
+ public CompletableFuture<MigrationDriverState> migrationState() {
CompletableFuture<MigrationDriverState> stateFuture = new
CompletableFuture<>();
eventQueue.append(() -> stateFuture.complete(migrationState));
return stateFuture;
@@ -328,7 +328,7 @@ public class KRaftMigrationDriver implements
MetadataPublisher {
/**
* Construct and enqueue a {@link MetadataChangeEvent} with a given
completion handler. In production use cases,
- * this handler is a no-op. This method exists so we can add additional
logic in our unit tests to wait for the
+ * this handler is a no-op. This method exists so that we can add
additional logic in our unit tests to wait for the
* enqueued event to finish executing.
*/
void enqueueMetadataChangeEvent(
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java
b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java
index 2f354855640..07d318d1209 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java
@@ -55,7 +55,7 @@ public enum MigrationDriverState {
this.allowDualWrite = allowDualWrite;
}
- boolean allowDualWrite() {
+ public boolean allowDualWrite() {
return allowDualWrite;
}
}