pprovenzano commented on code in PR #13729:
URL: https://github.com/apache/kafka/pull/13729#discussion_r1204709476


##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -187,13 +188,74 @@ class ZkMigrationIntegrationTest {
     migrationState = 
migrationClient.releaseControllerLeadership(migrationState)
   }
 
+  // SCRAM and Quota are intermixed. Test SCRAM Only here
+  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_5_IV2, serverProperties = Array(
+    new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+    new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+    new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+    new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+  ))
+  def testDualWriteScram(zkCluster: ClusterInstance): Unit = {
+    var admin = zkCluster.createAdminClient()
+    createUserScramCredentials(admin).all().get(60, TimeUnit.SECONDS)
+    admin.close()
+
+    val zkClient = 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
+
+    // Bootstrap the ZK cluster ID into KRaft
+    val clusterId = zkCluster.clusterId()
+    val kraftCluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setBootstrapMetadataVersion(MetadataVersion.IBP_3_5_IV2).
+        setClusterId(Uuid.fromString(clusterId)).
+        setNumBrokerNodes(0).
+        setNumControllerNodes(1).build())
+      .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
+      .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+      .build()
+    try {
+      kraftCluster.format()
+      kraftCluster.startup()
+      val readyFuture = 
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
+
+      // Enable migration configs and restart brokers
+      log.info("Restart brokers in migration mode")
+      val clientProps = kraftCluster.controllerClientProperties()
+      val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
+      
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
+      
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, 
voters)
+      
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
+      
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+      zkCluster.rollingBrokerRestart()
+      zkCluster.waitForReadyBrokers()
+      readyFuture.get(30, TimeUnit.SECONDS)
+
+      // Wait for migration to begin
+      log.info("Waiting for ZK migration to begin")
+      TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000), 
"Timed out waiting for KRaft controller to take over")
+
+      // Alter the metadata
+      log.info("Updating metadata with AdminClient")
+      admin = zkCluster.createAdminClient()
+      alterUserScramCredentials(admin).all().get(60, TimeUnit.SECONDS)
+
+      // Verify the changes made to KRaft are seen in ZK
+      log.info("Verifying metadata changes with ZK")
+      verifyUserScramCredentials(zkClient)
+    } finally {
+      zkCluster.stop()
+      kraftCluster.close()
+    }
+  }
+
+  // SCRAM and Quota are intermixed. Test Quota Only here
   @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
     new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
     new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
     new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
     new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
   ))
-  def testDualWrite(zkCluster: ClusterInstance): Unit = {
+  def testDualWriteQuota(zkCluster: ClusterInstance): Unit = {

Review Comment:
   Yes!
   Fixed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to