This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6d36db1c78f KAFKA-14765 and KAFKA-14776: Support for SCRAM at 
bootstrap with integration tests (#13374)
6d36db1c78f is described below

commit 6d36db1c78ff28ed3e6134e271e72a5c2ff1c276
Author: Proven Provenzano <[email protected]>
AuthorDate: Tue Apr 4 11:34:09 2023 -0400

    KAFKA-14765 and KAFKA-14776: Support for SCRAM at bootstrap with 
integration tests (#13374)
    
    Implement KIP-900
    
    Update kafka-storage to be able to add SCRAM records to the bootstrap 
metadata file at format time so that SCRAM is enabled at initial start 
(bootstrap) of KRaft cluster. Includes unit tests.
    
    Update 
./core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
 to use bootstrap and
    enable the test to run with both ZK and KRaft quorum.
    
    Moved the one test from ScramServerStartupTest.scala into 
SaslScramSslEndToEndAuthorizationTest.scala. This test is really small, so 
there was no point in recreating all the bootstrap startup just for a 5 line 
test when it could easily be run elsewhere.
    
    Reviewers: Colin P. McCabe <[email protected]>, Manikumar Reddy 
<[email protected]>
---
 .../apache/kafka/clients/admin/ScramMechanism.java |   5 +
 .../security/scram/internals/ScramMechanism.java   |  34 +++-
 core/src/main/scala/kafka/tools/StorageTool.scala  | 186 +++++++++++++++++++--
 .../kafka/api/CustomQuotaCallbackTest.scala        |   4 +-
 .../DelegationTokenEndToEndAuthorizationTest.scala |   4 +-
 .../api/DescribeAuthorizedOperationsTest.scala     |   2 +-
 .../SaslClientsWithInvalidCredentialsTest.scala    |   4 +-
 .../SaslScramSslEndToEndAuthorizationTest.scala    |  46 ++++-
 .../scala/integration/kafka/api/SaslSetup.scala    |  16 +-
 .../kafka/api/SaslSslAdminIntegrationTest.scala    |   2 +-
 .../kafka/server/QuorumTestHarness.scala           |  23 ++-
 .../kafka/server/ScramServerStartupTest.scala      |  65 -------
 .../admin/UserScramCredentialsCommandTest.scala    |  44 ++---
 .../kafka/integration/KafkaServerTestHarness.scala |   4 +-
 .../scala/unit/kafka/tools/StorageToolTest.scala   | 148 +++++++++++++++-
 15 files changed, 456 insertions(+), 131 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java
index 95ad18c04c9..5c5e371529e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java
@@ -23,6 +23,11 @@ import java.util.Arrays;
  * Representation of a SASL/SCRAM Mechanism.
  *
  * @see <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API";>KIP-554:
 Add Broker-side SCRAM Config API</a>
+ *
+ * This code is duplicated in 
org.apache.kafka.common.security.scram.internals.ScramMechanism.
+ * The type field in both files must match and must not change. The type field
+ * is used both for passing ScramCredentialUpsertion and for the internal
+ * UserScramCredentialRecord. Do not change the type field.
  */
 public enum ScramMechanism {
     UNKNOWN((byte) 0),
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramMechanism.java
 
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramMechanism.java
index 9f6e69d9c5e..6ba78b4a4f4 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramMechanism.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramMechanism.java
@@ -21,15 +21,23 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+/*
+ * This code is duplicated in org.apache.kafka.clients.admin.ScramMechanism.
+ * The type field in both files must match and must not change. The type field
+ * is used both for passing ScramCredentialUpsertion and for the internal 
+ * UserScramCredentialRecord. Do not change the type field.
+ */
 public enum ScramMechanism {
 
-    SCRAM_SHA_256("SHA-256", "HmacSHA256", 4096),
-    SCRAM_SHA_512("SHA-512", "HmacSHA512", 4096);
+    SCRAM_SHA_256((byte) 1, "SHA-256", "HmacSHA256", 4096, 16384),
+    SCRAM_SHA_512((byte) 2, "SHA-512", "HmacSHA512", 4096, 16384);
 
+    private final byte type;
     private final String mechanismName;
     private final String hashAlgorithm;
     private final String macAlgorithm;
     private final int minIterations;
+    private final int maxIterations;
 
     private static final Map<String, ScramMechanism> MECHANISMS_MAP;
 
@@ -40,11 +48,19 @@ public enum ScramMechanism {
         MECHANISMS_MAP = Collections.unmodifiableMap(map);
     }
 
-    ScramMechanism(String hashAlgorithm, String macAlgorithm, int 
minIterations) {
+    ScramMechanism(
+        byte type,
+        String hashAlgorithm,
+        String macAlgorithm,
+        int minIterations,
+        int maxIterations
+    ) {
+        this.type = type;
         this.mechanismName = "SCRAM-" + hashAlgorithm;
         this.hashAlgorithm = hashAlgorithm;
         this.macAlgorithm = macAlgorithm;
         this.minIterations = minIterations;
+        this.maxIterations = maxIterations;
     }
 
     public final String mechanismName() {
@@ -63,6 +79,10 @@ public enum ScramMechanism {
         return minIterations;
     }
 
+    public int maxIterations() {
+        return maxIterations;
+    }
+
     public static ScramMechanism forMechanismName(String mechanismName) {
         return MECHANISMS_MAP.get(mechanismName);
     }
@@ -74,4 +94,12 @@ public enum ScramMechanism {
     public static boolean isScram(String mechanismName) {
         return MECHANISMS_MAP.containsKey(mechanismName);
     }
+
+    /**
+     *
+     * @return the type indicator for this SASL SCRAM mechanism
+     */
+    public byte type() {
+        return this.type;
+    }
 }
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala 
b/core/src/main/scala/kafka/tools/StorageTool.scala
index 0f798e24fc2..04c87b883ca 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -22,15 +22,24 @@ import java.nio.file.{Files, Paths}
 import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, MetaProperties, 
RawMetaProperties}
 import kafka.utils.{Exit, Logging}
 import net.sourceforge.argparse4j.ArgumentParsers
-import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue}
+import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue, append}
 import net.sourceforge.argparse4j.inf.Namespace
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, 
BootstrapMetadata}
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.common.metadata.FeatureLevelRecord
+import org.apache.kafka.common.metadata.UserScramCredentialRecord
+import org.apache.kafka.common.security.scram.internals.ScramMechanism
+import org.apache.kafka.common.security.scram.internals.ScramFormatter
 
+
+import java.util
+import java.util.Base64
 import java.util.Optional
 import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.ArrayBuffer
 
 object StorageTool extends Logging {
   def main(args: Array[String]): Unit = {
@@ -53,12 +62,23 @@ object StorageTool extends Logging {
             throw new TerseFailure(s"Must specify a valid KRaft metadata 
version of at least 3.0.")
           }
           val metaProperties = buildMetadataProperties(clusterId, config.get)
+          val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+          
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
+            if (!metadataVersion.isScramSupported()) {
+              throw new TerseFailure(s"SCRAM is only supported in 
metadataVersion IBP_3_5_IV0 or later.");
+            }
+            for (record <- userScramCredentialRecords) {
+              metadataRecords.append(new ApiMessageAndVersion(record, 
0.toShort))
+            }
+          })
+          val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, 
Some(metadataRecords), "format command")
           val ignoreFormatted = namespace.getBoolean("ignore_formatted")
           if (!configToSelfManagedMode(config.get)) {
             throw new TerseFailure("The kafka configuration file appears to be 
for " +
               "a legacy cluster. Formatting is only supported for clusters in 
KRaft mode.")
           }
-          Exit.exit(formatCommand(System.out, directories, metaProperties, 
metadataVersion, ignoreFormatted))
+          Exit.exit(formatCommand(System.out, directories, metaProperties, 
bootstrapMetadata,
+                                  metadataVersion,ignoreFormatted))
 
         case "random-uuid" =>
           System.out.println(Uuid.randomUuid)
@@ -70,15 +90,15 @@ object StorageTool extends Logging {
     } catch {
       case e: TerseFailure =>
         System.err.println(e.getMessage)
-        System.exit(1)
+        Exit.exit(1, Some(e.getMessage))
     }
   }
 
   def parseArguments(args: Array[String]): Namespace = {
     val parser = ArgumentParsers.
-      newArgumentParser("kafka-storage").
-      defaultHelp(true).
+      newArgumentParser("kafka-storage", /* defaultHelp */ true, /* 
prefixChars */ "-", /* fromFilePrefix */ "@").
       description("The Kafka storage tool.")
+
     val subparsers = parser.addSubparsers().dest("command")
 
     val infoParser = subparsers.addParser("info").
@@ -96,6 +116,11 @@ object StorageTool extends Logging {
       action(store()).
       required(true).
       help("The cluster ID to use.")
+    formatParser.addArgument("--add-scram", "-S").
+      action(append()).
+      help("""A SCRAM_CREDENTIAL to add to the __cluster_metadata log e.g.
+              |'SCRAM-SHA-256=[user=alice,password=alice-secret]'
+              
|'SCRAM-SHA-512=[user=alice,iterations=8192,salt="N3E=",saltedpassword="YCE="]'""".stripMargin)
     formatParser.addArgument("--ignore-formatted", "-g").
       action(storeTrue())
     formatParser.addArgument("--release-version", "-r").
@@ -128,6 +153,112 @@ object StorageTool extends Logging {
       .getOrElse(defaultValue)
   }
 
+  def getUserScramCredentialRecord(
+    mechanism: String,
+    config: String
+  ) : UserScramCredentialRecord = {
+    /*
+     * Remove  '[' amd ']'
+     * Split K->V pairs on ',' and no K or V should contain ','
+     * Split K and V on '=' but V could contain '=' if inside ""
+     * Create Map of K to V and replace all " in V
+     */
+    val argMap = config.substring(1, config.length - 1)
+                       .split(",")
+                       .map(_.split("=(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"))
+                       .map(args => args(0) -> args(1).replaceAll("\"", 
"")).toMap
+
+    val scramMechanism = ScramMechanism.forMechanismName(mechanism)
+
+    def getName(argMap: Map[String,String]) : String = {
+      if (!argMap.contains("name")) {
+        throw new TerseFailure(s"You must supply 'name' to add-scram")
+      }
+      argMap("name")
+    }
+
+    def getSalt(argMap: Map[String,String], scramMechanism : ScramMechanism) : 
Array[Byte] = {
+      if (argMap.contains("salt")) {
+        Base64.getDecoder.decode(argMap("salt"))
+      } else {
+        new ScramFormatter(scramMechanism).secureRandomBytes()
+      }
+    }
+
+    def getIterations(argMap: Map[String,String], scramMechanism : 
ScramMechanism) : Int = {
+      if (argMap.contains("salt")) {
+        val iterations = argMap("iterations").toInt
+        if (iterations < scramMechanism.minIterations()) {
+            throw new TerseFailure(s"The 'iterations' value must be >= 
${scramMechanism.minIterations()} for add-scram")
+        }
+        if (iterations > scramMechanism.maxIterations()) {
+            throw new TerseFailure(s"The 'iterations' value must be <= 
${scramMechanism.maxIterations()} for add-scram")
+        }
+        iterations
+      } else {
+        4096
+      }
+    }
+
+    def getSaltedPassword(
+      argMap: Map[String,String],
+      scramMechanism : ScramMechanism,
+      salt : Array[Byte],
+      iterations: Int
+    ) : Array[Byte] = {
+      if (argMap.contains("password")) {
+        if (argMap.contains("saltedpassword")) {
+            throw new TerseFailure(s"You must only supply one of 'password' or 
'saltedpassword' to add-scram")
+        }
+        new ScramFormatter(scramMechanism).saltedPassword(argMap("password"), 
salt, iterations)
+      } else {
+        if (!argMap.contains("saltedpassword")) {
+            throw new TerseFailure(s"You must supply one of 'password' or 
'saltedpassword' to add-scram")
+        }
+        if (!argMap.contains("salt")) {
+            throw new TerseFailure(s"You must supply 'salt' with 
'saltedpassword' to add-scram")
+        }
+        Base64.getDecoder.decode(argMap("saltedpassword"))
+      }
+    }
+
+    val name = getName(argMap)
+    val salt = getSalt(argMap, scramMechanism)
+    val iterations = getIterations(argMap, scramMechanism)
+    val saltedPassword = getSaltedPassword(argMap, scramMechanism, salt, 
iterations)
+
+    val myrecord = new UserScramCredentialRecord()
+                         .setName(name)
+                         .setMechanism(scramMechanism.`type`)
+                         .setSalt(salt)
+                         .setIterations(iterations)
+                         .setSaltedPassword(saltedPassword)
+    myrecord
+  }
+
+  def getUserScramCredentialRecords(namespace: Namespace): 
Option[ArrayBuffer[UserScramCredentialRecord]] = {
+    if (namespace.getList("add_scram") != null) {
+      val listofAddConfig : List[String] = 
namespace.getList("add_scram").asScala.toList
+      val userScramCredentialRecords : ArrayBuffer[UserScramCredentialRecord] 
= ArrayBuffer()
+      for (singleAddConfig <- listofAddConfig) {
+        val singleAddConfigList = singleAddConfig.split("\\s+")
+
+        // The first subarg must be of the form key=value
+        val nameValueRecord = singleAddConfigList(0).split("=", 2)
+        nameValueRecord(0) match {
+          case "SCRAM-SHA-256" =>
+            
userScramCredentialRecords.append(getUserScramCredentialRecord(nameValueRecord(0),
 nameValueRecord(1)))
+          case "SCRAM-SHA-512" =>
+            
userScramCredentialRecords.append(getUserScramCredentialRecord(nameValueRecord(0),
 nameValueRecord(1)))
+          case _ => throw new TerseFailure(s"The add-scram mechanism 
${nameValueRecord(0)} is not supported.")
+        }
+      }
+      Some(userScramCredentialRecords)
+    } else {
+      None
+    }
+  }
+
   def infoCommand(stream: PrintStream, selfManagedMode: Boolean, directories: 
Seq[String]): Int = {
     val problems = new mutable.ArrayBuffer[String]
     val foundDirectories = new mutable.ArrayBuffer[String]
@@ -214,6 +345,23 @@ object StorageTool extends Logging {
     }
   }
 
+  def buildBootstrapMetadata(metadataVersion: MetadataVersion,
+                             metadataOptionalArguments: 
Option[ArrayBuffer[ApiMessageAndVersion]],
+                             source: String): BootstrapMetadata = {
+
+    val metadataRecords = new util.ArrayList[ApiMessageAndVersion]
+    metadataRecords.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+                        setName(MetadataVersion.FEATURE_NAME).
+                        setFeatureLevel(metadataVersion.featureLevel()), 
0.toShort));
+
+    metadataOptionalArguments.foreach { metadataArguments =>
+      for (record <- metadataArguments) metadataRecords.add(record)
+    }
+
+    BootstrapMetadata.fromRecords(metadataRecords, source)
+  }
+
+
   def buildMetadataProperties(
     clusterIdStr: String,
     config: KafkaConfig
@@ -230,14 +378,29 @@ object StorageTool extends Logging {
     new MetaProperties(effectiveClusterId.toString, config.nodeId)
   }
 
-  def formatCommand(stream: PrintStream,
-                    directories: Seq[String],
-                    metaProperties: MetaProperties,
-                    metadataVersion: MetadataVersion,
-                    ignoreFormatted: Boolean): Int = {
+  def formatCommand(
+    stream: PrintStream,
+    directories: Seq[String],
+    metaProperties: MetaProperties,
+    metadataVersion: MetadataVersion,
+    ignoreFormatted: Boolean
+  ): Int = {
+    val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, None, 
"format command")
+    formatCommand(stream, directories, metaProperties, bootstrapMetadata, 
metadataVersion, ignoreFormatted)
+  }
+
+  def formatCommand(
+    stream: PrintStream,
+    directories: Seq[String],
+    metaProperties: MetaProperties,
+    bootstrapMetadata: BootstrapMetadata,
+    metadataVersion: MetadataVersion,
+    ignoreFormatted: Boolean
+  ): Int = {
     if (directories.isEmpty) {
       throw new TerseFailure("No log directories found in the configuration.")
     }
+
     val unformattedDirectories = directories.filter(directory => {
       if (!Files.isDirectory(Paths.get(directory)) || 
!Files.exists(Paths.get(directory, "meta.properties"))) {
           true
@@ -262,7 +425,6 @@ object StorageTool extends Logging {
       val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile)
       checkpoint.write(metaProperties.toProperties)
 
-      val bootstrapMetadata = BootstrapMetadata.fromVersion(metadataVersion, 
"format command")
       val bootstrapDirectory = new BootstrapDirectory(directory, 
Optional.empty())
       bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
 
diff --git 
a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala 
b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 63beb509442..9a4c5d32b1d 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -80,8 +80,8 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness 
with SaslSetup {
     super.tearDown()
   }
 
-  override def configureSecurityBeforeServersStart(): Unit = {
-    super.configureSecurityBeforeServersStart()
+  override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = 
{
+    super.configureSecurityBeforeServersStart(testInfo)
     
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
     createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, 
JaasTestUtils.KafkaScramAdminPassword)
   }
diff --git 
a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
index 920d0ccb254..1ae958dbb38 100644
--- 
a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
@@ -56,8 +56,8 @@ class DelegationTokenEndToEndAuthorizationTest extends 
EndToEndAuthorizationTest
 
   def configureTokenAclsBeforeServersStart(): Unit = { }
 
-  override def configureSecurityBeforeServersStart(): Unit = {
-    super.configureSecurityBeforeServersStart()
+  override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = 
{
+    super.configureSecurityBeforeServersStart(testInfo)
     configureTokenAclsBeforeServersStart()
     
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
     // Create broker admin credentials before starting brokers
diff --git 
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
 
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index e6f4a6039be..6ea0c2eaec0 100644
--- 
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -84,7 +84,7 @@ class DescribeAuthorizedOperationsTest extends 
IntegrationTestHarness with SaslS
 
   override protected lazy val trustStoreFile = 
Some(TestUtils.tempFile("truststore", ".jks"))
 
-  override def configureSecurityBeforeServersStart(): Unit = {
+  override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = 
{
     val authorizer = 
CoreUtils.createObject[Authorizer](classOf[AclAuthorizer].getName)
     val clusterResource = new ResourcePattern(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL)
     val topicResource = new ResourcePattern(ResourceType.TOPIC, 
AclEntry.WildcardResource, PatternType.LITERAL)
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index dbf1a6ce9d1..e36f437f1f1 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -51,8 +51,8 @@ class SaslClientsWithInvalidCredentialsTest extends 
IntegrationTestHarness with
   val numPartitions = 1
   val tp = new TopicPartition(topic, 0)
 
-  override def configureSecurityBeforeServersStart(): Unit = {
-    super.configureSecurityBeforeServersStart()
+  override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = 
{
+    super.configureSecurityBeforeServersStart(testInfo)
     
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
     // Create broker credentials before starting brokers
     createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, 
JaasTestUtils.KafkaScramAdminPassword)
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
index b44704350bf..2d1aaf8cc77 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
@@ -19,13 +19,20 @@ package kafka.api
 import java.util.Properties
 
 import kafka.utils._
+import kafka.tools.StorageTool
 import kafka.zk.ConfigEntityChangeNotificationZNode
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.test.TestSslUtils
 
 import scala.jdk.CollectionConverters._
+import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import scala.collection.mutable.ArrayBuffer
+import org.apache.kafka.server.common.ApiMessageAndVersion
 
 class SaslScramSslEndToEndAuthorizationTest extends 
SaslEndToEndAuthorizationTest {
   override protected def kafkaClientSaslMechanism = "SCRAM-SHA-256"
@@ -33,18 +40,34 @@ class SaslScramSslEndToEndAuthorizationTest extends 
SaslEndToEndAuthorizationTes
   override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
JaasTestUtils.KafkaScramUser)
   override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
JaasTestUtils.KafkaScramAdmin)
   private val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword
-  override val unimplementedquorum = "kraft"
 
-  override def configureSecurityBeforeServersStart(): Unit = {
-    super.configureSecurityBeforeServersStart()
-    
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
-    // Create broker credentials before starting brokers
-    createScramCredentials(zkConnect, kafkaPrincipal.getName, kafkaPassword)
+  override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = 
{
+    super.configureSecurityBeforeServersStart(testInfo)
+
+    if (!TestInfoUtils.isKRaft(testInfo)) {
+      
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
+      // Create broker credentials before starting brokers
+      createScramCredentials(zkConnect, kafkaPrincipal.getName, kafkaPassword)
+    }
     TestSslUtils.convertToPemWithoutFiles(producerConfig)
     TestSslUtils.convertToPemWithoutFiles(consumerConfig)
     TestSslUtils.convertToPemWithoutFiles(adminClientConfig)
   }
 
+  // Create the admin credentials for KRaft as part of controller 
initialization
+  override def optionalMetadataRecords: 
Option[ArrayBuffer[ApiMessageAndVersion]] = {
+    val args = Seq("format", "-c", "config.props", "-t", 
"XcZZOzUqS4yHOjhMQB6JLQ", "-S",
+                   
s"SCRAM-SHA-256=[name=${JaasTestUtils.KafkaScramAdmin},password=${JaasTestUtils.KafkaScramAdminPassword}]")
+    val namespace = StorageTool.parseArguments(args.toArray)
+    val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer()
+    StorageTool.getUserScramCredentialRecords(namespace).foreach {
+      userScramCredentialRecords => for (record <- userScramCredentialRecords) 
{
+        metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort))
+      }
+    }
+    Some(metadataRecords)
+  }
+
   override def configureListeners(props: collection.Seq[Properties]): Unit = {
     props.foreach(TestSslUtils.convertToPemWithoutFiles)
     super.configureListeners(props)
@@ -54,11 +77,18 @@ class SaslScramSslEndToEndAuthorizationTest extends 
SaslEndToEndAuthorizationTes
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
-    if (!TestInfoUtils.isKRaft(testInfo)) {
       super.setUp(testInfo)
       // Create client credentials after starting brokers so that dynamic 
credential creation is also tested
       
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser, 
JaasTestUtils.KafkaScramPassword)
       
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser2, 
JaasTestUtils.KafkaScramPassword2)
-    }
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft", "zk"))
+  def testAuthentications(quorum: String): Unit = {
+    val successfulAuths = TestUtils.totalMetricValue(brokers.head, 
"successful-authentication-total")
+    assertTrue(successfulAuths > 0, "No successful authentications")
+    val failedAuths = TestUtils.totalMetricValue(brokers.head, 
"failed-authentication-total")
+    assertEquals(0, failedAuths)
   }
 }
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala 
b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 3648e596b0b..fe0d347c096 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -29,7 +29,7 @@ import kafka.server.{ConfigType, KafkaConfig}
 import kafka.utils.JaasTestUtils.{JaasSection, Krb5LoginModule, ZkDigestModule}
 import kafka.utils.{JaasTestUtils, TestUtils}
 import kafka.zk.{AdminZkClient, KafkaZkClient}
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
ScramCredentialInfo, UserScramCredentialAlteration, 
UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
ScramCredentialInfo, UserScramCredentialUpsertion, ScramMechanism => 
PublicScramMechanism}
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.security.JaasUtils
@@ -39,8 +39,6 @@ import 
org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.client.ZKClientConfig
 
-import scala.jdk.CollectionConverters._
-
 /*
  * Implements an enumeration for the modes enabled here:
  * zk only, kafka only, both, custom KafkaServer.
@@ -191,11 +189,13 @@ trait SaslSetup {
     }
   }
 
-    def createScramCredentials(adminClient: Admin, userName: String, password: 
String): Unit = {
-    val results = 
adminClient.alterUserScramCredentials(PublicScramMechanism.values().filter(_ != 
PublicScramMechanism.UNKNOWN).map(mechanism =>
-      new UserScramCredentialUpsertion(userName, new 
ScramCredentialInfo(mechanism, 4096), password)
-        .asInstanceOf[UserScramCredentialAlteration]).toList.asJava)
-    results.all.get
+  def createScramCredentials(adminClient: Admin, userName: String, password: 
String): Unit = {
+    PublicScramMechanism.values().filter(_ != 
PublicScramMechanism.UNKNOWN).map(mechanism => {
+
+      val results = adminClient.alterUserScramCredentials(util.Arrays.asList(
+        new UserScramCredentialUpsertion(userName, new 
ScramCredentialInfo(mechanism, 4096), password)))
+      results.all.get
+    })
   }
 
   def createScramCredentials(zkConnect: String, userName: String, password: 
String): Unit = {
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index 80a35849d78..29b51713257 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -56,7 +56,7 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
     super.generateConfigs
   }
 
-  override def configureSecurityBeforeServersStart(): Unit = {
+  override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = 
{
     authorizationAdmin.initializeAcls()
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala 
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 113df27147b..0a2cf270bf9 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{Exit, Time}
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
+import org.apache.kafka.common.metadata.FeatureLevelRecord
 import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
 import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
@@ -40,6 +41,7 @@ import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, 
TestInfo}
 
+import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.ListBuffer
 import scala.collection.{Seq, immutable}
 import scala.compat.java8.OptionConverters._
@@ -272,13 +274,18 @@ abstract class QuorumTestHarness extends Logging {
     CoreUtils.swallow(kRaftQuorumImplementation.controllerServer.shutdown(), 
kRaftQuorumImplementation.log)
   }
 
+  def optionalMetadataRecords: Option[ArrayBuffer[ApiMessageAndVersion]] = None
+
   private def formatDirectories(directories: immutable.Seq[String],
                                 metaProperties: MetaProperties): Unit = {
     val stream = new ByteArrayOutputStream()
     var out: PrintStream = null
     try {
       out = new PrintStream(stream)
-      if (StorageTool.formatCommand(out, directories, metaProperties, 
metadataVersion, ignoreFormatted = false) != 0) {
+      val bootstrapMetadata = 
StorageTool.buildBootstrapMetadata(metadataVersion,
+                                                                 
optionalMetadataRecords, "format command")
+      if (StorageTool.formatCommand(out, directories, metaProperties, 
bootstrapMetadata, metadataVersion,
+                                    ignoreFormatted = false) != 0) {
         throw new RuntimeException(stream.toString())
       }
       debug(s"Formatted storage directory(ies) ${directories}")
@@ -303,6 +310,18 @@ abstract class QuorumTestHarness extends Logging {
     val metadataDir = TestUtils.tempDir()
     val metaProperties = new MetaProperties(Uuid.randomUuid().toString, nodeId)
     formatDirectories(immutable.Seq(metadataDir.getAbsolutePath), 
metaProperties)
+
+    val metadataRecords = new util.ArrayList[ApiMessageAndVersion]
+    metadataRecords.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+                        setName(MetadataVersion.FEATURE_NAME).
+                        setFeatureLevel(metadataVersion.featureLevel()), 
0.toShort));
+
+    optionalMetadataRecords.foreach { metadataArguments =>
+      for (record <- metadataArguments) metadataRecords.add(record)
+    }
+
+    val bootstrapMetadata = BootstrapMetadata.fromRecords(metadataRecords, 
"test harness")
+
     props.setProperty(KafkaConfig.MetadataLogDirProp, 
metadataDir.getAbsolutePath)
     val proto = controllerListenerSecurityProtocol.toString
     props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"CONTROLLER:${proto}")
@@ -322,7 +341,7 @@ abstract class QuorumTestHarness extends Logging {
       controllerServer = new ControllerServer(
         sharedServer,
         KafkaRaftServer.configSchema,
-        BootstrapMetadata.fromVersion(metadataVersion, "test harness")
+        bootstrapMetadata
       )
       controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e) 
=> {
         if (e != null) {
diff --git 
a/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala 
b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
deleted file mode 100644
index 9190647ad2f..00000000000
--- a/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements.  See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership.  The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.server
-
-import java.util.Collections
-
-import kafka.api.{IntegrationTestHarness, KafkaSasl, SaslSetup}
-import kafka.utils._
-import kafka.zk.ConfigEntityChangeNotificationZNode
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
-import scala.jdk.CollectionConverters._
-
-/**
- * Tests that there are no failed authentications during broker startup. This 
is to verify
- * that SCRAM credentials are loaded by brokers before client connections can 
be made.
- * For simplicity of testing, this test verifies authentications of controller 
connections.
- */
-class ScramServerStartupTest extends IntegrationTestHarness with SaslSetup {
-
-  override val brokerCount = 1
-
-  private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
-  private val kafkaServerSaslMechanisms = 
Collections.singletonList("SCRAM-SHA-256").asScala
-
-  override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
-
-  override protected val serverSaslProperties = 
Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, 
kafkaClientSaslMechanism))
-  override protected val clientSaslProperties = 
Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
-
-  override def configureSecurityBeforeServersStart(): Unit = {
-    super.configureSecurityBeforeServersStart()
-    
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
-    // Create credentials before starting brokers
-    createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, 
JaasTestUtils.KafkaScramAdminPassword)
-
-    startSasl(jaasSections(kafkaServerSaslMechanisms, 
Option(kafkaClientSaslMechanism), KafkaSasl))
-  }
-
-  @Test
-  def testAuthentications(): Unit = {
-    val successfulAuths = TestUtils.totalMetricValue(servers.head, 
"successful-authentication-total")
-    assertTrue(successfulAuths > 0, "No successful authentications")
-    val failedAuths = TestUtils.totalMetricValue(servers.head, 
"failed-authentication-total")
-    assertEquals(0, failedAuths)
-  }
-}
diff --git 
a/core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala
index 1735e30d435..9e3caa85571 100644
--- a/core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala
@@ -21,8 +21,11 @@ import java.nio.charset.StandardCharsets
 
 import kafka.server.BaseRequestTest
 import kafka.utils.Exit
+import kafka.utils.TestUtils
+import kafka.utils.TestInfoUtils
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 class UserScramCredentialsCommandTest extends BaseRequestTest {
   override def brokerCount = 1
@@ -57,22 +60,23 @@ class UserScramCredentialsCommandTest extends 
BaseRequestTest {
     }
   }
 
-  @Test
-  def testUserScramCredentialsRequests(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft", "zk"))
+  def testUserScramCredentialsRequests(quorum: String): Unit = {
     val user1 = "user1"
     // create and describe a credential
     var result = runConfigCommandViaBroker(Array("--user", user1, "--alter", 
"--add-config", "SCRAM-SHA-256=[iterations=4096,password=foo-secret]"))
     val alterConfigsUser1Out = s"Completed updating config for user $user1.\n"
     assertEquals(alterConfigsUser1Out, result.stdout)
-    result = runConfigCommandViaBroker(Array("--user", user1, "--describe"))
     val scramCredentialConfigsUser1Out = s"SCRAM credential configs for 
user-principal '$user1' are SCRAM-SHA-256=iterations=4096\n"
-    assertEquals(scramCredentialConfigsUser1Out, result.stdout)
+    TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", 
user1, "--describe")).stdout ==
+      scramCredentialConfigsUser1Out, s"Failed to describe SCRAM credential 
change '$user1'")
     // create a user quota and describe the user again
     result = runConfigCommandViaBroker(Array("--user", user1, "--alter", 
"--add-config", "consumer_byte_rate=20000"))
     assertEquals(alterConfigsUser1Out, result.stdout)
-    result = runConfigCommandViaBroker(Array("--user", user1, "--describe"))
     val quotaConfigsUser1Out = s"Quota configs for user-principal '$user1' are 
consumer_byte_rate=20000.0\n"
-    assertEquals(s"$quotaConfigsUser1Out$scramCredentialConfigsUser1Out", 
result.stdout)
+    TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", 
user1, "--describe")).stdout ==
+      s"$quotaConfigsUser1Out$scramCredentialConfigsUser1Out", s"Failed to 
describe Quota change for '$user1'")
 
     // now do the same thing for user2
     val user2 = "user2"
@@ -80,15 +84,15 @@ class UserScramCredentialsCommandTest extends 
BaseRequestTest {
     result = runConfigCommandViaBroker(Array("--user", user2, "--alter", 
"--add-config", "SCRAM-SHA-256=[iterations=4096,password=foo-secret]"))
     val alterConfigsUser2Out = s"Completed updating config for user $user2.\n"
     assertEquals(alterConfigsUser2Out, result.stdout)
-    result = runConfigCommandViaBroker(Array("--user", user2, "--describe"))
     val scramCredentialConfigsUser2Out = s"SCRAM credential configs for 
user-principal '$user2' are SCRAM-SHA-256=iterations=4096\n"
-    assertEquals(scramCredentialConfigsUser2Out, result.stdout)
+    TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", 
user2, "--describe")).stdout ==
+      scramCredentialConfigsUser2Out, s"Failed to describe SCRAM credential 
change '$user2'")
     // create a user quota and describe the user again
     result = runConfigCommandViaBroker(Array("--user", user2, "--alter", 
"--add-config", "consumer_byte_rate=20000"))
     assertEquals(alterConfigsUser2Out, result.stdout)
-    result = runConfigCommandViaBroker(Array("--user", user2, "--describe"))
     val quotaConfigsUser2Out = s"Quota configs for user-principal '$user2' are 
consumer_byte_rate=20000.0\n"
-    assertEquals(s"$quotaConfigsUser2Out$scramCredentialConfigsUser2Out", 
result.stdout)
+    TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", 
user2, "--describe")).stdout ==
+      s"$quotaConfigsUser2Out$scramCredentialConfigsUser2Out", s"Failed to 
describe Quota change for '$user2'")
 
     // describe both
     result = runConfigCommandViaBroker(Array("--entity-type", "users", 
"--describe"))
@@ -107,28 +111,30 @@ class UserScramCredentialsCommandTest extends 
BaseRequestTest {
     assertEquals(alterConfigsUser1Out, result.stdout)
     result = runConfigCommandViaBroker(Array("--user", user2, "--alter", 
"--delete-config", "SCRAM-SHA-256"))
     assertEquals(alterConfigsUser2Out, result.stdout)
-    result = runConfigCommandViaBroker(Array("--entity-type", "users", 
"--describe"))
-    assertEquals(s"$quotaConfigsUser2Out$scramCredentialConfigsUser1Out", 
result.stdout)
+    TestUtils.waitUntilTrue(() => 
runConfigCommandViaBroker(Array("--entity-type", "users", "--describe")).stdout 
==
+      s"$quotaConfigsUser2Out$scramCredentialConfigsUser1Out", s"Failed to 
describe Quota change for '$user2'")
 
     // now delete the rest of the configs, for user1 and user2, and describe
     result = runConfigCommandViaBroker(Array("--user", user1, "--alter", 
"--delete-config", "SCRAM-SHA-256"))
     assertEquals(alterConfigsUser1Out, result.stdout)
     result = runConfigCommandViaBroker(Array("--user", user2, "--alter", 
"--delete-config", "consumer_byte_rate"))
     assertEquals(alterConfigsUser2Out, result.stdout)
-    result = runConfigCommandViaBroker(Array("--entity-type", "users", 
"--describe"))
-    assertEquals("", result.stdout)
+    TestUtils.waitUntilTrue(() => 
runConfigCommandViaBroker(Array("--entity-type", "users", "--describe")).stdout 
== "",
+      s"Failed to describe All users deleted")
   }
 
-  @Test
-  def testAlterWithEmptyPassword(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft", "zk"))
+  def testAlterWithEmptyPassword(quorum: String): Unit = {
     val user1 = "user1"
     val result = runConfigCommandViaBroker(Array("--user", user1, "--alter", 
"--add-config", "SCRAM-SHA-256=[iterations=4096,password=]"))
     assertTrue(result.exitStatus.isDefined, "Expected System.exit() to be 
called with an empty password")
     assertEquals(1, result.exitStatus.get, "Expected empty password to cause 
failure with exit status=1")
   }
 
-  @Test
-  def testDescribeUnknownUser(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft", "zk"))
+  def testDescribeUnknownUser(quorum: String): Unit = {
     val unknownUser = "unknownUser"
     val result = runConfigCommandViaBroker(Array("--user", unknownUser, 
"--describe"))
     assertTrue(result.exitStatus.isEmpty, "Expected System.exit() to not be 
called with an unknown user")
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index cf52b5ce9b3..005e17aa98d 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -79,7 +79,7 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
    *
    * The default implementation of this method is a no-op.
    */
-  def configureSecurityBeforeServersStart(): Unit = {}
+  def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = {}
 
   /**
    * Override this in case Tokens or security credentials needs to be created 
after `servers` are started.
@@ -116,7 +116,7 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
       throw new KafkaException("Must supply at least one server config.")
 
     // default implementation is a no-op, it is overridden by subclasses if 
required
-    configureSecurityBeforeServersStart()
+    configureSecurityBeforeServersStart(testInfo)
 
     createBrokers(startup = true)
 
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala 
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index cbffbf0292e..539dcd390c8 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -22,18 +22,22 @@ import java.nio.charset.StandardCharsets
 import java.nio.file.Files
 import java.util
 import java.util.Properties
+import org.apache.kafka.common.KafkaException
 import kafka.server.{KafkaConfig, MetaProperties}
+import kafka.utils.Exit
 import kafka.utils.TestUtils
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.common.metadata.UserScramCredentialRecord
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
 import org.junit.jupiter.api.{Test, Timeout}
 
 import scala.collection.mutable
-
+import scala.collection.mutable.ArrayBuffer
 
 @Timeout(value = 40)
 class StorageToolTest {
+
   private def newSelfManagedProperties() = {
     val properties = new Properties()
     properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo,/tmp/bar")
@@ -161,12 +165,13 @@ Found problem:
       val metaProperties = MetaProperties(
         clusterId = "XcZZOzUqS4yHOjhMQB6JLQ", nodeId = 2)
       val stream = new ByteArrayOutputStream()
+      val bootstrapMetadata = 
StorageTool.buildBootstrapMetadata(MetadataVersion.latest(), None, "test format 
command")
       assertEquals(0, StorageTool.
-        formatCommand(new PrintStream(stream), Seq(tempDir.toString), 
metaProperties, MetadataVersion.latest(), ignoreFormatted = false))
+        formatCommand(new PrintStream(stream), Seq(tempDir.toString), 
metaProperties, bootstrapMetadata, MetadataVersion.latest(), ignoreFormatted = 
false))
       assertTrue(stream.toString().startsWith("Formatting %s".format(tempDir)))
 
       try assertEquals(1, StorageTool.
-        formatCommand(new PrintStream(new ByteArrayOutputStream()), 
Seq(tempDir.toString), metaProperties, MetadataVersion.latest(), 
ignoreFormatted = false)) catch {
+        formatCommand(new PrintStream(new ByteArrayOutputStream()), 
Seq(tempDir.toString), metaProperties, bootstrapMetadata, 
MetadataVersion.latest(), ignoreFormatted = false)) catch {
         case e: TerseFailure => assertEquals(s"Log directory ${tempDir} is 
already " +
           "formatted. Use --ignore-formatted to ignore this directory and 
format the " +
           "others.", e.getMessage)
@@ -174,7 +179,7 @@ Found problem:
 
       val stream2 = new ByteArrayOutputStream()
       assertEquals(0, StorageTool.
-        formatCommand(new PrintStream(stream2), Seq(tempDir.toString), 
metaProperties, MetadataVersion.latest(), ignoreFormatted = true))
+        formatCommand(new PrintStream(stream2), Seq(tempDir.toString), 
metaProperties, bootstrapMetadata, MetadataVersion.latest(), ignoreFormatted = 
true))
       assertEquals("All of the log directories are already 
formatted.%n".format(), stream2.toString())
     } finally Utils.delete(tempDir)
   }
@@ -221,4 +226,139 @@ Found problem:
 
     assertThrows(classOf[IllegalArgumentException], () => 
parseMetadataVersion("--release-version", "0.0"))
   }
+
+  @Test
+  def testAddScram():Unit = {
+    def parseAddScram(strings: String*): 
Option[ArrayBuffer[UserScramCredentialRecord]] = {
+      var args = mutable.Seq("format", "-c", "config.props", "-t", 
"XcZZOzUqS4yHOjhMQB6JLQ")
+      args ++= strings
+      val namespace = StorageTool.parseArguments(args.toArray)
+      StorageTool.getUserScramCredentialRecords(namespace)
+    }
+
+    var scramRecords = parseAddScram()
+    assertEquals(None, scramRecords)
+
+    // Validate we can add multiple SCRAM creds.
+    scramRecords = parseAddScram("-S",
+    
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]",
+    "-S",
+    
"SCRAM-SHA-256=[name=george,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]")
+    
+    assertEquals(2, scramRecords.get.size)
+
+    // Require name subfield.
+    try assertEquals(1, parseAddScram("-S", 
+      
"SCRAM-SHA-256=[salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]"))
 catch {
+      case e: TerseFailure => assertEquals(s"You must supply 'name' to 
add-scram", e.getMessage)
+    }
+
+    // Require password xor saltedpassword
+    try assertEquals(1, parseAddScram("-S", 
+      
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]"))
+    catch {
+      case e: TerseFailure => assertEquals(s"You must only supply one of 
'password' or 'saltedpassword' to add-scram", e.getMessage)
+    }
+
+    try assertEquals(1, parseAddScram("-S", 
+      
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",iterations=8192]"))
+    catch {
+      case e: TerseFailure => assertEquals(s"You must supply one of 'password' 
or 'saltedpassword' to add-scram", e.getMessage)
+    }
+
+    // Validate salt is required with saltedpassword
+    try assertEquals(1, parseAddScram("-S", 
+      
"SCRAM-SHA-256=[name=alice,saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]"))
+    catch {
+      case e: TerseFailure => assertEquals(s"You must supply 'salt' with 
'saltedpassword' to add-scram", e.getMessage)
+    }
+
+    // Validate salt is optional with password
+    assertEquals(1, parseAddScram("-S", 
"SCRAM-SHA-256=[name=alice,password=alice,iterations=4096]").get.size)
+
+    // Require 4096 <= iterations <= 16384
+    try assertEquals(1, parseAddScram("-S", 
+      
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=16385]"))
+    catch {
+      case e: TerseFailure => assertEquals(s"The 'iterations' value must be <= 
16384 for add-scram", e.getMessage)
+    }
+
+    assertEquals(1, parseAddScram("-S",
+      
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=16384]")
+      .get.size)
+
+    try assertEquals(1, parseAddScram("-S", 
+      
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=4095]"))
+    catch {
+      case e: TerseFailure => assertEquals(s"The 'iterations' value must be >= 
4096 for add-scram", e.getMessage)
+    }
+
+    assertEquals(1, parseAddScram("-S",
+      
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=4096]")
+      .get.size)
+
+    // Validate iterations is optional
+    assertEquals(1, parseAddScram("-S", 
"SCRAM-SHA-256=[name=alice,password=alice]") .get.size)
+  }
+
+  class StorageToolTestException(message: String)  extends 
KafkaException(message) {
+  }
+
+  @Test
+  def testScramWithBadMetadataVersion(): Unit = {
+    var exitString: String = ""
+    def exitProcedure(exitStatus: Int, message: Option[String]) : Nothing = {
+      exitString = message.getOrElse("")
+      throw new StorageToolTestException(exitString)
+    }
+    Exit.setExitProcedure(exitProcedure)
+
+    val properties = newSelfManagedProperties()
+    val propsFile = TestUtils.tempFile()
+    val propsStream = Files.newOutputStream(propsFile.toPath)
+    properties.store(propsStream, "config.props")
+    propsStream.close()
+
+    val args = Array("format", "-c", s"${propsFile.toPath}", "-t", 
"XcZZOzUqS4yHOjhMQB6JLQ", "--release-version", "3.4", "-S", 
+      
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=8192]")
+
+    try {
+      assertEquals(1, StorageTool.main(args))
+    } catch {
+      case e: StorageToolTestException => assertEquals(s"SCRAM is only 
supported in metadataVersion IBP_3_5_IV0 or later.", exitString)
+    } finally {
+      Exit.resetExitProcedure()
+    }
+  }
+
+  @Test
+  def testNoScramWithMetadataVersion(): Unit = {
+    var exitString: String = ""
+    var exitStatus: Int = 1
+    def exitProcedure(status: Int, message: Option[String]) : Nothing = {
+      exitStatus = status
+      exitString = message.getOrElse("")
+      throw new StorageToolTestException(exitString)
+    }
+    Exit.setExitProcedure(exitProcedure)
+
+    val properties = newSelfManagedProperties()
+    val propsFile = TestUtils.tempFile()
+    val propsStream = Files.newOutputStream(propsFile.toPath)
+    // This test does format the directory specified so use a tempdir
+    properties.setProperty(KafkaConfig.LogDirsProp, 
TestUtils.tempDir().toString)
+    properties.store(propsStream, "config.props")
+    propsStream.close()
+
+    val args = Array("format", "-c", s"${propsFile.toPath}", "-t", 
"XcZZOzUqS4yHOjhMQB6JLQ", "--release-version", "3.4")
+
+    try {
+      StorageTool.main(args)
+    } catch {
+      case e: StorageToolTestException => assertEquals("", exitString)
+      assertEquals(0, exitStatus)
+    } finally {
+      Exit.resetExitProcedure()
+    }
+  }
 }

Reply via email to