This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6d36db1c78f KAFKA-14765 and KAFKA-14776: Support for SCRAM at
bootstrap with integration tests (#13374)
6d36db1c78f is described below
commit 6d36db1c78ff28ed3e6134e271e72a5c2ff1c276
Author: Proven Provenzano <[email protected]>
AuthorDate: Tue Apr 4 11:34:09 2023 -0400
KAFKA-14765 and KAFKA-14776: Support for SCRAM at bootstrap with
integration tests (#13374)
Implement KIP-900
Update kafka-storage to be able to add SCRAM records to the bootstrap
metadata file at format time so that SCRAM is enabled at initial start
(bootstrap) of KRaft cluster. Includes unit tests.
Update
./core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
to use bootstrap and
enable the test to run with both ZK and KRaft quorum.
Moved the one test from ScramServerStartupTest.scala into
SaslScramSslEndToEndAuthorizationTest.scala. This test is really small, so
there was no point in recreating all the bootstrap startup just for a 5 line
test when it could easily be run elsewhere.
Reviewers: Colin P. McCabe <[email protected]>, Manikumar Reddy
<[email protected]>
---
.../apache/kafka/clients/admin/ScramMechanism.java | 5 +
.../security/scram/internals/ScramMechanism.java | 34 +++-
core/src/main/scala/kafka/tools/StorageTool.scala | 186 +++++++++++++++++++--
.../kafka/api/CustomQuotaCallbackTest.scala | 4 +-
.../DelegationTokenEndToEndAuthorizationTest.scala | 4 +-
.../api/DescribeAuthorizedOperationsTest.scala | 2 +-
.../SaslClientsWithInvalidCredentialsTest.scala | 4 +-
.../SaslScramSslEndToEndAuthorizationTest.scala | 46 ++++-
.../scala/integration/kafka/api/SaslSetup.scala | 16 +-
.../kafka/api/SaslSslAdminIntegrationTest.scala | 2 +-
.../kafka/server/QuorumTestHarness.scala | 23 ++-
.../kafka/server/ScramServerStartupTest.scala | 65 -------
.../admin/UserScramCredentialsCommandTest.scala | 44 ++---
.../kafka/integration/KafkaServerTestHarness.scala | 4 +-
.../scala/unit/kafka/tools/StorageToolTest.scala | 148 +++++++++++++++-
15 files changed, 456 insertions(+), 131 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java
index 95ad18c04c9..5c5e371529e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java
@@ -23,6 +23,11 @@ import java.util.Arrays;
* Representation of a SASL/SCRAM Mechanism.
*
* @see <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API">KIP-554:
Add Broker-side SCRAM Config API</a>
+ *
+ * This code is duplicated in
org.apache.kafka.common.security.scram.internals.ScramMechanism.
+ * The type field in both files must match and must not change. The type field
+ * is used both for passing ScramCredentialUpsertion and for the internal
+ * UserScramCredentialRecord. Do not change the type field.
*/
public enum ScramMechanism {
UNKNOWN((byte) 0),
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramMechanism.java
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramMechanism.java
index 9f6e69d9c5e..6ba78b4a4f4 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramMechanism.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramMechanism.java
@@ -21,15 +21,23 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+/*
+ * This code is duplicated in org.apache.kafka.clients.admin.ScramMechanism.
+ * The type field in both files must match and must not change. The type field
+ * is used both for passing ScramCredentialUpsertion and for the internal
+ * UserScramCredentialRecord. Do not change the type field.
+ */
public enum ScramMechanism {
- SCRAM_SHA_256("SHA-256", "HmacSHA256", 4096),
- SCRAM_SHA_512("SHA-512", "HmacSHA512", 4096);
+ SCRAM_SHA_256((byte) 1, "SHA-256", "HmacSHA256", 4096, 16384),
+ SCRAM_SHA_512((byte) 2, "SHA-512", "HmacSHA512", 4096, 16384);
+ private final byte type;
private final String mechanismName;
private final String hashAlgorithm;
private final String macAlgorithm;
private final int minIterations;
+ private final int maxIterations;
private static final Map<String, ScramMechanism> MECHANISMS_MAP;
@@ -40,11 +48,19 @@ public enum ScramMechanism {
MECHANISMS_MAP = Collections.unmodifiableMap(map);
}
- ScramMechanism(String hashAlgorithm, String macAlgorithm, int
minIterations) {
+ ScramMechanism(
+ byte type,
+ String hashAlgorithm,
+ String macAlgorithm,
+ int minIterations,
+ int maxIterations
+ ) {
+ this.type = type;
this.mechanismName = "SCRAM-" + hashAlgorithm;
this.hashAlgorithm = hashAlgorithm;
this.macAlgorithm = macAlgorithm;
this.minIterations = minIterations;
+ this.maxIterations = maxIterations;
}
public final String mechanismName() {
@@ -63,6 +79,10 @@ public enum ScramMechanism {
return minIterations;
}
+ public int maxIterations() {
+ return maxIterations;
+ }
+
public static ScramMechanism forMechanismName(String mechanismName) {
return MECHANISMS_MAP.get(mechanismName);
}
@@ -74,4 +94,12 @@ public enum ScramMechanism {
public static boolean isScram(String mechanismName) {
return MECHANISMS_MAP.containsKey(mechanismName);
}
+
+ /**
+ *
+ * @return the type indicator for this SASL SCRAM mechanism
+ */
+ public byte type() {
+ return this.type;
+ }
}
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala
b/core/src/main/scala/kafka/tools/StorageTool.scala
index 0f798e24fc2..04c87b883ca 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -22,15 +22,24 @@ import java.nio.file.{Files, Paths}
import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, MetaProperties,
RawMetaProperties}
import kafka.utils.{Exit, Logging}
import net.sourceforge.argparse4j.ArgumentParsers
-import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue}
+import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue, append}
import net.sourceforge.argparse4j.inf.Namespace
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory,
BootstrapMetadata}
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.common.metadata.FeatureLevelRecord
+import org.apache.kafka.common.metadata.UserScramCredentialRecord
+import org.apache.kafka.common.security.scram.internals.ScramMechanism
+import org.apache.kafka.common.security.scram.internals.ScramFormatter
+
+import java.util
+import java.util.Base64
import java.util.Optional
import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.ArrayBuffer
object StorageTool extends Logging {
def main(args: Array[String]): Unit = {
@@ -53,12 +62,23 @@ object StorageTool extends Logging {
throw new TerseFailure(s"Must specify a valid KRaft metadata
version of at least 3.0.")
}
val metaProperties = buildMetadataProperties(clusterId, config.get)
+ val metadataRecords : ArrayBuffer[ApiMessageAndVersion] =
ArrayBuffer()
+
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
+ if (!metadataVersion.isScramSupported()) {
+ throw new TerseFailure(s"SCRAM is only supported in
metadataVersion IBP_3_5_IV0 or later.");
+ }
+ for (record <- userScramCredentialRecords) {
+ metadataRecords.append(new ApiMessageAndVersion(record,
0.toShort))
+ }
+ })
+ val bootstrapMetadata = buildBootstrapMetadata(metadataVersion,
Some(metadataRecords), "format command")
val ignoreFormatted = namespace.getBoolean("ignore_formatted")
if (!configToSelfManagedMode(config.get)) {
throw new TerseFailure("The kafka configuration file appears to be
for " +
"a legacy cluster. Formatting is only supported for clusters in
KRaft mode.")
}
- Exit.exit(formatCommand(System.out, directories, metaProperties,
metadataVersion, ignoreFormatted))
+ Exit.exit(formatCommand(System.out, directories, metaProperties,
bootstrapMetadata,
+ metadataVersion,ignoreFormatted))
case "random-uuid" =>
System.out.println(Uuid.randomUuid)
@@ -70,15 +90,15 @@ object StorageTool extends Logging {
} catch {
case e: TerseFailure =>
System.err.println(e.getMessage)
- System.exit(1)
+ Exit.exit(1, Some(e.getMessage))
}
}
def parseArguments(args: Array[String]): Namespace = {
val parser = ArgumentParsers.
- newArgumentParser("kafka-storage").
- defaultHelp(true).
+ newArgumentParser("kafka-storage", /* defaultHelp */ true, /*
prefixChars */ "-", /* fromFilePrefix */ "@").
description("The Kafka storage tool.")
+
val subparsers = parser.addSubparsers().dest("command")
val infoParser = subparsers.addParser("info").
@@ -96,6 +116,11 @@ object StorageTool extends Logging {
action(store()).
required(true).
help("The cluster ID to use.")
+ formatParser.addArgument("--add-scram", "-S").
+ action(append()).
+ help("""A SCRAM_CREDENTIAL to add to the __cluster_metadata log e.g.
+ |'SCRAM-SHA-256=[user=alice,password=alice-secret]'
+
|'SCRAM-SHA-512=[user=alice,iterations=8192,salt="N3E=",saltedpassword="YCE="]'""".stripMargin)
formatParser.addArgument("--ignore-formatted", "-g").
action(storeTrue())
formatParser.addArgument("--release-version", "-r").
@@ -128,6 +153,112 @@ object StorageTool extends Logging {
.getOrElse(defaultValue)
}
+ def getUserScramCredentialRecord(
+ mechanism: String,
+ config: String
+ ) : UserScramCredentialRecord = {
+ /*
+ * Remove '[' amd ']'
+ * Split K->V pairs on ',' and no K or V should contain ','
+ * Split K and V on '=' but V could contain '=' if inside ""
+ * Create Map of K to V and replace all " in V
+ */
+ val argMap = config.substring(1, config.length - 1)
+ .split(",")
+ .map(_.split("=(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"))
+ .map(args => args(0) -> args(1).replaceAll("\"",
"")).toMap
+
+ val scramMechanism = ScramMechanism.forMechanismName(mechanism)
+
+ def getName(argMap: Map[String,String]) : String = {
+ if (!argMap.contains("name")) {
+ throw new TerseFailure(s"You must supply 'name' to add-scram")
+ }
+ argMap("name")
+ }
+
+ def getSalt(argMap: Map[String,String], scramMechanism : ScramMechanism) :
Array[Byte] = {
+ if (argMap.contains("salt")) {
+ Base64.getDecoder.decode(argMap("salt"))
+ } else {
+ new ScramFormatter(scramMechanism).secureRandomBytes()
+ }
+ }
+
+ def getIterations(argMap: Map[String,String], scramMechanism :
ScramMechanism) : Int = {
+ if (argMap.contains("salt")) {
+ val iterations = argMap("iterations").toInt
+ if (iterations < scramMechanism.minIterations()) {
+ throw new TerseFailure(s"The 'iterations' value must be >=
${scramMechanism.minIterations()} for add-scram")
+ }
+ if (iterations > scramMechanism.maxIterations()) {
+ throw new TerseFailure(s"The 'iterations' value must be <=
${scramMechanism.maxIterations()} for add-scram")
+ }
+ iterations
+ } else {
+ 4096
+ }
+ }
+
+ def getSaltedPassword(
+ argMap: Map[String,String],
+ scramMechanism : ScramMechanism,
+ salt : Array[Byte],
+ iterations: Int
+ ) : Array[Byte] = {
+ if (argMap.contains("password")) {
+ if (argMap.contains("saltedpassword")) {
+ throw new TerseFailure(s"You must only supply one of 'password' or
'saltedpassword' to add-scram")
+ }
+ new ScramFormatter(scramMechanism).saltedPassword(argMap("password"),
salt, iterations)
+ } else {
+ if (!argMap.contains("saltedpassword")) {
+ throw new TerseFailure(s"You must supply one of 'password' or
'saltedpassword' to add-scram")
+ }
+ if (!argMap.contains("salt")) {
+ throw new TerseFailure(s"You must supply 'salt' with
'saltedpassword' to add-scram")
+ }
+ Base64.getDecoder.decode(argMap("saltedpassword"))
+ }
+ }
+
+ val name = getName(argMap)
+ val salt = getSalt(argMap, scramMechanism)
+ val iterations = getIterations(argMap, scramMechanism)
+ val saltedPassword = getSaltedPassword(argMap, scramMechanism, salt,
iterations)
+
+ val myrecord = new UserScramCredentialRecord()
+ .setName(name)
+ .setMechanism(scramMechanism.`type`)
+ .setSalt(salt)
+ .setIterations(iterations)
+ .setSaltedPassword(saltedPassword)
+ myrecord
+ }
+
+ def getUserScramCredentialRecords(namespace: Namespace):
Option[ArrayBuffer[UserScramCredentialRecord]] = {
+ if (namespace.getList("add_scram") != null) {
+ val listofAddConfig : List[String] =
namespace.getList("add_scram").asScala.toList
+ val userScramCredentialRecords : ArrayBuffer[UserScramCredentialRecord]
= ArrayBuffer()
+ for (singleAddConfig <- listofAddConfig) {
+ val singleAddConfigList = singleAddConfig.split("\\s+")
+
+ // The first subarg must be of the form key=value
+ val nameValueRecord = singleAddConfigList(0).split("=", 2)
+ nameValueRecord(0) match {
+ case "SCRAM-SHA-256" =>
+
userScramCredentialRecords.append(getUserScramCredentialRecord(nameValueRecord(0),
nameValueRecord(1)))
+ case "SCRAM-SHA-512" =>
+
userScramCredentialRecords.append(getUserScramCredentialRecord(nameValueRecord(0),
nameValueRecord(1)))
+ case _ => throw new TerseFailure(s"The add-scram mechanism
${nameValueRecord(0)} is not supported.")
+ }
+ }
+ Some(userScramCredentialRecords)
+ } else {
+ None
+ }
+ }
+
def infoCommand(stream: PrintStream, selfManagedMode: Boolean, directories:
Seq[String]): Int = {
val problems = new mutable.ArrayBuffer[String]
val foundDirectories = new mutable.ArrayBuffer[String]
@@ -214,6 +345,23 @@ object StorageTool extends Logging {
}
}
+ def buildBootstrapMetadata(metadataVersion: MetadataVersion,
+ metadataOptionalArguments:
Option[ArrayBuffer[ApiMessageAndVersion]],
+ source: String): BootstrapMetadata = {
+
+ val metadataRecords = new util.ArrayList[ApiMessageAndVersion]
+ metadataRecords.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(metadataVersion.featureLevel()),
0.toShort));
+
+ metadataOptionalArguments.foreach { metadataArguments =>
+ for (record <- metadataArguments) metadataRecords.add(record)
+ }
+
+ BootstrapMetadata.fromRecords(metadataRecords, source)
+ }
+
+
def buildMetadataProperties(
clusterIdStr: String,
config: KafkaConfig
@@ -230,14 +378,29 @@ object StorageTool extends Logging {
new MetaProperties(effectiveClusterId.toString, config.nodeId)
}
- def formatCommand(stream: PrintStream,
- directories: Seq[String],
- metaProperties: MetaProperties,
- metadataVersion: MetadataVersion,
- ignoreFormatted: Boolean): Int = {
+ def formatCommand(
+ stream: PrintStream,
+ directories: Seq[String],
+ metaProperties: MetaProperties,
+ metadataVersion: MetadataVersion,
+ ignoreFormatted: Boolean
+ ): Int = {
+ val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, None,
"format command")
+ formatCommand(stream, directories, metaProperties, bootstrapMetadata,
metadataVersion, ignoreFormatted)
+ }
+
+ def formatCommand(
+ stream: PrintStream,
+ directories: Seq[String],
+ metaProperties: MetaProperties,
+ bootstrapMetadata: BootstrapMetadata,
+ metadataVersion: MetadataVersion,
+ ignoreFormatted: Boolean
+ ): Int = {
if (directories.isEmpty) {
throw new TerseFailure("No log directories found in the configuration.")
}
+
val unformattedDirectories = directories.filter(directory => {
if (!Files.isDirectory(Paths.get(directory)) ||
!Files.exists(Paths.get(directory, "meta.properties"))) {
true
@@ -262,7 +425,6 @@ object StorageTool extends Logging {
val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile)
checkpoint.write(metaProperties.toProperties)
- val bootstrapMetadata = BootstrapMetadata.fromVersion(metadataVersion,
"format command")
val bootstrapDirectory = new BootstrapDirectory(directory,
Optional.empty())
bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
diff --git
a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 63beb509442..9a4c5d32b1d 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -80,8 +80,8 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness
with SaslSetup {
super.tearDown()
}
- override def configureSecurityBeforeServersStart(): Unit = {
- super.configureSecurityBeforeServersStart()
+ override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit =
{
+ super.configureSecurityBeforeServersStart(testInfo)
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin,
JaasTestUtils.KafkaScramAdminPassword)
}
diff --git
a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
index 920d0ccb254..1ae958dbb38 100644
---
a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
@@ -56,8 +56,8 @@ class DelegationTokenEndToEndAuthorizationTest extends
EndToEndAuthorizationTest
def configureTokenAclsBeforeServersStart(): Unit = { }
- override def configureSecurityBeforeServersStart(): Unit = {
- super.configureSecurityBeforeServersStart()
+ override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit =
{
+ super.configureSecurityBeforeServersStart(testInfo)
configureTokenAclsBeforeServersStart()
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
// Create broker admin credentials before starting brokers
diff --git
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index e6f4a6039be..6ea0c2eaec0 100644
---
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -84,7 +84,7 @@ class DescribeAuthorizedOperationsTest extends
IntegrationTestHarness with SaslS
override protected lazy val trustStoreFile =
Some(TestUtils.tempFile("truststore", ".jks"))
- override def configureSecurityBeforeServersStart(): Unit = {
+ override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit =
{
val authorizer =
CoreUtils.createObject[Authorizer](classOf[AclAuthorizer].getName)
val clusterResource = new ResourcePattern(ResourceType.CLUSTER,
Resource.CLUSTER_NAME, PatternType.LITERAL)
val topicResource = new ResourcePattern(ResourceType.TOPIC,
AclEntry.WildcardResource, PatternType.LITERAL)
diff --git
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index dbf1a6ce9d1..e36f437f1f1 100644
---
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -51,8 +51,8 @@ class SaslClientsWithInvalidCredentialsTest extends
IntegrationTestHarness with
val numPartitions = 1
val tp = new TopicPartition(topic, 0)
- override def configureSecurityBeforeServersStart(): Unit = {
- super.configureSecurityBeforeServersStart()
+ override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit =
{
+ super.configureSecurityBeforeServersStart(testInfo)
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
// Create broker credentials before starting brokers
createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin,
JaasTestUtils.KafkaScramAdminPassword)
diff --git
a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
index b44704350bf..2d1aaf8cc77 100644
---
a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
@@ -19,13 +19,20 @@ package kafka.api
import java.util.Properties
import kafka.utils._
+import kafka.tools.StorageTool
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.test.TestSslUtils
import scala.jdk.CollectionConverters._
+import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import scala.collection.mutable.ArrayBuffer
+import org.apache.kafka.server.common.ApiMessageAndVersion
class SaslScramSslEndToEndAuthorizationTest extends
SaslEndToEndAuthorizationTest {
override protected def kafkaClientSaslMechanism = "SCRAM-SHA-256"
@@ -33,18 +40,34 @@ class SaslScramSslEndToEndAuthorizationTest extends
SaslEndToEndAuthorizationTes
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
JaasTestUtils.KafkaScramUser)
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
JaasTestUtils.KafkaScramAdmin)
private val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword
- override val unimplementedquorum = "kraft"
- override def configureSecurityBeforeServersStart(): Unit = {
- super.configureSecurityBeforeServersStart()
-
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
- // Create broker credentials before starting brokers
- createScramCredentials(zkConnect, kafkaPrincipal.getName, kafkaPassword)
+ override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit =
{
+ super.configureSecurityBeforeServersStart(testInfo)
+
+ if (!TestInfoUtils.isKRaft(testInfo)) {
+
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
+ // Create broker credentials before starting brokers
+ createScramCredentials(zkConnect, kafkaPrincipal.getName, kafkaPassword)
+ }
TestSslUtils.convertToPemWithoutFiles(producerConfig)
TestSslUtils.convertToPemWithoutFiles(consumerConfig)
TestSslUtils.convertToPemWithoutFiles(adminClientConfig)
}
+ // Create the admin credentials for KRaft as part of controller
initialization
+ override def optionalMetadataRecords:
Option[ArrayBuffer[ApiMessageAndVersion]] = {
+ val args = Seq("format", "-c", "config.props", "-t",
"XcZZOzUqS4yHOjhMQB6JLQ", "-S",
+
s"SCRAM-SHA-256=[name=${JaasTestUtils.KafkaScramAdmin},password=${JaasTestUtils.KafkaScramAdminPassword}]")
+ val namespace = StorageTool.parseArguments(args.toArray)
+ val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer()
+ StorageTool.getUserScramCredentialRecords(namespace).foreach {
+ userScramCredentialRecords => for (record <- userScramCredentialRecords)
{
+ metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort))
+ }
+ }
+ Some(metadataRecords)
+ }
+
override def configureListeners(props: collection.Seq[Properties]): Unit = {
props.foreach(TestSslUtils.convertToPemWithoutFiles)
super.configureListeners(props)
@@ -54,11 +77,18 @@ class SaslScramSslEndToEndAuthorizationTest extends
SaslEndToEndAuthorizationTes
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
- if (!TestInfoUtils.isKRaft(testInfo)) {
super.setUp(testInfo)
// Create client credentials after starting brokers so that dynamic
credential creation is also tested
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser,
JaasTestUtils.KafkaScramPassword)
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KafkaScramUser2,
JaasTestUtils.KafkaScramPassword2)
- }
+ }
+
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testAuthentications(quorum: String): Unit = {
+ val successfulAuths = TestUtils.totalMetricValue(brokers.head,
"successful-authentication-total")
+ assertTrue(successfulAuths > 0, "No successful authentications")
+ val failedAuths = TestUtils.totalMetricValue(brokers.head,
"failed-authentication-total")
+ assertEquals(0, failedAuths)
}
}
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 3648e596b0b..fe0d347c096 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -29,7 +29,7 @@ import kafka.server.{ConfigType, KafkaConfig}
import kafka.utils.JaasTestUtils.{JaasSection, Krb5LoginModule, ZkDigestModule}
import kafka.utils.{JaasTestUtils, TestUtils}
import kafka.zk.{AdminZkClient, KafkaZkClient}
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
ScramCredentialInfo, UserScramCredentialAlteration,
UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
ScramCredentialInfo, UserScramCredentialUpsertion, ScramMechanism =>
PublicScramMechanism}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.security.JaasUtils
@@ -39,8 +39,6 @@ import
org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S
import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.client.ZKClientConfig
-import scala.jdk.CollectionConverters._
-
/*
* Implements an enumeration for the modes enabled here:
* zk only, kafka only, both, custom KafkaServer.
@@ -191,11 +189,13 @@ trait SaslSetup {
}
}
- def createScramCredentials(adminClient: Admin, userName: String, password:
String): Unit = {
- val results =
adminClient.alterUserScramCredentials(PublicScramMechanism.values().filter(_ !=
PublicScramMechanism.UNKNOWN).map(mechanism =>
- new UserScramCredentialUpsertion(userName, new
ScramCredentialInfo(mechanism, 4096), password)
- .asInstanceOf[UserScramCredentialAlteration]).toList.asJava)
- results.all.get
+ def createScramCredentials(adminClient: Admin, userName: String, password:
String): Unit = {
+ PublicScramMechanism.values().filter(_ !=
PublicScramMechanism.UNKNOWN).map(mechanism => {
+
+ val results = adminClient.alterUserScramCredentials(util.Arrays.asList(
+ new UserScramCredentialUpsertion(userName, new
ScramCredentialInfo(mechanism, 4096), password)))
+ results.all.get
+ })
}
def createScramCredentials(zkConnect: String, userName: String, password:
String): Unit = {
diff --git
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index 80a35849d78..29b51713257 100644
---
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -56,7 +56,7 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
super.generateConfigs
}
- override def configureSecurityBeforeServersStart(): Unit = {
+ override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit =
{
authorizationAdmin.initializeAcls()
}
diff --git
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 113df27147b..0a2cf270bf9 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{Exit, Time}
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
+import org.apache.kafka.common.metadata.FeatureLevelRecord
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
@@ -40,6 +41,7 @@ import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag,
TestInfo}
+import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, immutable}
import scala.compat.java8.OptionConverters._
@@ -272,13 +274,18 @@ abstract class QuorumTestHarness extends Logging {
CoreUtils.swallow(kRaftQuorumImplementation.controllerServer.shutdown(),
kRaftQuorumImplementation.log)
}
+ def optionalMetadataRecords: Option[ArrayBuffer[ApiMessageAndVersion]] = None
+
private def formatDirectories(directories: immutable.Seq[String],
metaProperties: MetaProperties): Unit = {
val stream = new ByteArrayOutputStream()
var out: PrintStream = null
try {
out = new PrintStream(stream)
- if (StorageTool.formatCommand(out, directories, metaProperties,
metadataVersion, ignoreFormatted = false) != 0) {
+ val bootstrapMetadata =
StorageTool.buildBootstrapMetadata(metadataVersion,
+
optionalMetadataRecords, "format command")
+ if (StorageTool.formatCommand(out, directories, metaProperties,
bootstrapMetadata, metadataVersion,
+ ignoreFormatted = false) != 0) {
throw new RuntimeException(stream.toString())
}
debug(s"Formatted storage directory(ies) ${directories}")
@@ -303,6 +310,18 @@ abstract class QuorumTestHarness extends Logging {
val metadataDir = TestUtils.tempDir()
val metaProperties = new MetaProperties(Uuid.randomUuid().toString, nodeId)
formatDirectories(immutable.Seq(metadataDir.getAbsolutePath),
metaProperties)
+
+ val metadataRecords = new util.ArrayList[ApiMessageAndVersion]
+ metadataRecords.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(metadataVersion.featureLevel()),
0.toShort));
+
+ optionalMetadataRecords.foreach { metadataArguments =>
+ for (record <- metadataArguments) metadataRecords.add(record)
+ }
+
+ val bootstrapMetadata = BootstrapMetadata.fromRecords(metadataRecords,
"test harness")
+
props.setProperty(KafkaConfig.MetadataLogDirProp,
metadataDir.getAbsolutePath)
val proto = controllerListenerSecurityProtocol.toString
props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp,
s"CONTROLLER:${proto}")
@@ -322,7 +341,7 @@ abstract class QuorumTestHarness extends Logging {
controllerServer = new ControllerServer(
sharedServer,
KafkaRaftServer.configSchema,
- BootstrapMetadata.fromVersion(metadataVersion, "test harness")
+ bootstrapMetadata
)
controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e)
=> {
if (e != null) {
diff --git
a/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
deleted file mode 100644
index 9190647ad2f..00000000000
--- a/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.util.Collections
-
-import kafka.api.{IntegrationTestHarness, KafkaSasl, SaslSetup}
-import kafka.utils._
-import kafka.zk.ConfigEntityChangeNotificationZNode
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
-import scala.jdk.CollectionConverters._
-
-/**
- * Tests that there are no failed authentications during broker startup. This
is to verify
- * that SCRAM credentials are loaded by brokers before client connections can
be made.
- * For simplicity of testing, this test verifies authentications of controller
connections.
- */
-class ScramServerStartupTest extends IntegrationTestHarness with SaslSetup {
-
- override val brokerCount = 1
-
- private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
- private val kafkaServerSaslMechanisms =
Collections.singletonList("SCRAM-SHA-256").asScala
-
- override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
-
- override protected val serverSaslProperties =
Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms,
kafkaClientSaslMechanism))
- override protected val clientSaslProperties =
Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
-
- override def configureSecurityBeforeServersStart(): Unit = {
- super.configureSecurityBeforeServersStart()
-
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
- // Create credentials before starting brokers
- createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin,
JaasTestUtils.KafkaScramAdminPassword)
-
- startSasl(jaasSections(kafkaServerSaslMechanisms,
Option(kafkaClientSaslMechanism), KafkaSasl))
- }
-
- @Test
- def testAuthentications(): Unit = {
- val successfulAuths = TestUtils.totalMetricValue(servers.head,
"successful-authentication-total")
- assertTrue(successfulAuths > 0, "No successful authentications")
- val failedAuths = TestUtils.totalMetricValue(servers.head,
"failed-authentication-total")
- assertEquals(0, failedAuths)
- }
-}
diff --git
a/core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala
b/core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala
index 1735e30d435..9e3caa85571 100644
--- a/core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala
@@ -21,8 +21,11 @@ import java.nio.charset.StandardCharsets
import kafka.server.BaseRequestTest
import kafka.utils.Exit
+import kafka.utils.TestUtils
+import kafka.utils.TestInfoUtils
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
class UserScramCredentialsCommandTest extends BaseRequestTest {
override def brokerCount = 1
@@ -57,22 +60,23 @@ class UserScramCredentialsCommandTest extends
BaseRequestTest {
}
}
- @Test
- def testUserScramCredentialsRequests(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testUserScramCredentialsRequests(quorum: String): Unit = {
val user1 = "user1"
// create and describe a credential
var result = runConfigCommandViaBroker(Array("--user", user1, "--alter",
"--add-config", "SCRAM-SHA-256=[iterations=4096,password=foo-secret]"))
val alterConfigsUser1Out = s"Completed updating config for user $user1.\n"
assertEquals(alterConfigsUser1Out, result.stdout)
- result = runConfigCommandViaBroker(Array("--user", user1, "--describe"))
val scramCredentialConfigsUser1Out = s"SCRAM credential configs for
user-principal '$user1' are SCRAM-SHA-256=iterations=4096\n"
- assertEquals(scramCredentialConfigsUser1Out, result.stdout)
+ TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user",
user1, "--describe")).stdout ==
+ scramCredentialConfigsUser1Out, s"Failed to describe SCRAM credential
change '$user1'")
// create a user quota and describe the user again
result = runConfigCommandViaBroker(Array("--user", user1, "--alter",
"--add-config", "consumer_byte_rate=20000"))
assertEquals(alterConfigsUser1Out, result.stdout)
- result = runConfigCommandViaBroker(Array("--user", user1, "--describe"))
val quotaConfigsUser1Out = s"Quota configs for user-principal '$user1' are
consumer_byte_rate=20000.0\n"
- assertEquals(s"$quotaConfigsUser1Out$scramCredentialConfigsUser1Out",
result.stdout)
+ TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user",
user1, "--describe")).stdout ==
+ s"$quotaConfigsUser1Out$scramCredentialConfigsUser1Out", s"Failed to
describe Quota change for '$user1'")
// now do the same thing for user2
val user2 = "user2"
@@ -80,15 +84,15 @@ class UserScramCredentialsCommandTest extends
BaseRequestTest {
result = runConfigCommandViaBroker(Array("--user", user2, "--alter",
"--add-config", "SCRAM-SHA-256=[iterations=4096,password=foo-secret]"))
val alterConfigsUser2Out = s"Completed updating config for user $user2.\n"
assertEquals(alterConfigsUser2Out, result.stdout)
- result = runConfigCommandViaBroker(Array("--user", user2, "--describe"))
val scramCredentialConfigsUser2Out = s"SCRAM credential configs for
user-principal '$user2' are SCRAM-SHA-256=iterations=4096\n"
- assertEquals(scramCredentialConfigsUser2Out, result.stdout)
+ TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user",
user2, "--describe")).stdout ==
+ scramCredentialConfigsUser2Out, s"Failed to describe SCRAM credential
change '$user2'")
// create a user quota and describe the user again
result = runConfigCommandViaBroker(Array("--user", user2, "--alter",
"--add-config", "consumer_byte_rate=20000"))
assertEquals(alterConfigsUser2Out, result.stdout)
- result = runConfigCommandViaBroker(Array("--user", user2, "--describe"))
val quotaConfigsUser2Out = s"Quota configs for user-principal '$user2' are
consumer_byte_rate=20000.0\n"
- assertEquals(s"$quotaConfigsUser2Out$scramCredentialConfigsUser2Out",
result.stdout)
+ TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user",
user2, "--describe")).stdout ==
+ s"$quotaConfigsUser2Out$scramCredentialConfigsUser2Out", s"Failed to
describe Quota change for '$user2'")
// describe both
result = runConfigCommandViaBroker(Array("--entity-type", "users",
"--describe"))
@@ -107,28 +111,30 @@ class UserScramCredentialsCommandTest extends
BaseRequestTest {
assertEquals(alterConfigsUser1Out, result.stdout)
result = runConfigCommandViaBroker(Array("--user", user2, "--alter",
"--delete-config", "SCRAM-SHA-256"))
assertEquals(alterConfigsUser2Out, result.stdout)
- result = runConfigCommandViaBroker(Array("--entity-type", "users",
"--describe"))
- assertEquals(s"$quotaConfigsUser2Out$scramCredentialConfigsUser1Out",
result.stdout)
+ TestUtils.waitUntilTrue(() =>
runConfigCommandViaBroker(Array("--entity-type", "users", "--describe")).stdout
==
+ s"$quotaConfigsUser2Out$scramCredentialConfigsUser1Out", s"Failed to
describe Quota change for '$user2'")
// now delete the rest of the configs, for user1 and user2, and describe
result = runConfigCommandViaBroker(Array("--user", user1, "--alter",
"--delete-config", "SCRAM-SHA-256"))
assertEquals(alterConfigsUser1Out, result.stdout)
result = runConfigCommandViaBroker(Array("--user", user2, "--alter",
"--delete-config", "consumer_byte_rate"))
assertEquals(alterConfigsUser2Out, result.stdout)
- result = runConfigCommandViaBroker(Array("--entity-type", "users",
"--describe"))
- assertEquals("", result.stdout)
+ TestUtils.waitUntilTrue(() =>
runConfigCommandViaBroker(Array("--entity-type", "users", "--describe")).stdout
== "",
+ s"Failed to describe All users deleted")
}
- @Test
- def testAlterWithEmptyPassword(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testAlterWithEmptyPassword(quorum: String): Unit = {
val user1 = "user1"
val result = runConfigCommandViaBroker(Array("--user", user1, "--alter",
"--add-config", "SCRAM-SHA-256=[iterations=4096,password=]"))
assertTrue(result.exitStatus.isDefined, "Expected System.exit() to be
called with an empty password")
assertEquals(1, result.exitStatus.get, "Expected empty password to cause
failure with exit status=1")
}
- @Test
- def testDescribeUnknownUser(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testDescribeUnknownUser(quorum: String): Unit = {
val unknownUser = "unknownUser"
val result = runConfigCommandViaBroker(Array("--user", unknownUser,
"--describe"))
assertTrue(result.exitStatus.isEmpty, "Expected System.exit() to not be
called with an unknown user")
diff --git
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index cf52b5ce9b3..005e17aa98d 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -79,7 +79,7 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
*
* The default implementation of this method is a no-op.
*/
- def configureSecurityBeforeServersStart(): Unit = {}
+ def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = {}
/**
* Override this in case Tokens or security credentials needs to be created
after `servers` are started.
@@ -116,7 +116,7 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
throw new KafkaException("Must supply at least one server config.")
// default implementation is a no-op, it is overridden by subclasses if
required
- configureSecurityBeforeServersStart()
+ configureSecurityBeforeServersStart(testInfo)
createBrokers(startup = true)
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index cbffbf0292e..539dcd390c8 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -22,18 +22,22 @@ import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.util
import java.util.Properties
+import org.apache.kafka.common.KafkaException
import kafka.server.{KafkaConfig, MetaProperties}
+import kafka.utils.Exit
import kafka.utils.TestUtils
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.common.metadata.UserScramCredentialRecord
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
import org.junit.jupiter.api.{Test, Timeout}
import scala.collection.mutable
-
+import scala.collection.mutable.ArrayBuffer
@Timeout(value = 40)
class StorageToolTest {
+
private def newSelfManagedProperties() = {
val properties = new Properties()
properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo,/tmp/bar")
@@ -161,12 +165,13 @@ Found problem:
val metaProperties = MetaProperties(
clusterId = "XcZZOzUqS4yHOjhMQB6JLQ", nodeId = 2)
val stream = new ByteArrayOutputStream()
+ val bootstrapMetadata =
StorageTool.buildBootstrapMetadata(MetadataVersion.latest(), None, "test format
command")
assertEquals(0, StorageTool.
- formatCommand(new PrintStream(stream), Seq(tempDir.toString),
metaProperties, MetadataVersion.latest(), ignoreFormatted = false))
+ formatCommand(new PrintStream(stream), Seq(tempDir.toString),
metaProperties, bootstrapMetadata, MetadataVersion.latest(), ignoreFormatted =
false))
assertTrue(stream.toString().startsWith("Formatting %s".format(tempDir)))
try assertEquals(1, StorageTool.
- formatCommand(new PrintStream(new ByteArrayOutputStream()),
Seq(tempDir.toString), metaProperties, MetadataVersion.latest(),
ignoreFormatted = false)) catch {
+ formatCommand(new PrintStream(new ByteArrayOutputStream()),
Seq(tempDir.toString), metaProperties, bootstrapMetadata,
MetadataVersion.latest(), ignoreFormatted = false)) catch {
case e: TerseFailure => assertEquals(s"Log directory ${tempDir} is
already " +
"formatted. Use --ignore-formatted to ignore this directory and
format the " +
"others.", e.getMessage)
@@ -174,7 +179,7 @@ Found problem:
val stream2 = new ByteArrayOutputStream()
assertEquals(0, StorageTool.
- formatCommand(new PrintStream(stream2), Seq(tempDir.toString),
metaProperties, MetadataVersion.latest(), ignoreFormatted = true))
+ formatCommand(new PrintStream(stream2), Seq(tempDir.toString),
metaProperties, bootstrapMetadata, MetadataVersion.latest(), ignoreFormatted =
true))
assertEquals("All of the log directories are already
formatted.%n".format(), stream2.toString())
} finally Utils.delete(tempDir)
}
@@ -221,4 +226,139 @@ Found problem:
assertThrows(classOf[IllegalArgumentException], () =>
parseMetadataVersion("--release-version", "0.0"))
}
+
+ @Test
+ def testAddScram():Unit = {
+ def parseAddScram(strings: String*):
Option[ArrayBuffer[UserScramCredentialRecord]] = {
+ var args = mutable.Seq("format", "-c", "config.props", "-t",
"XcZZOzUqS4yHOjhMQB6JLQ")
+ args ++= strings
+ val namespace = StorageTool.parseArguments(args.toArray)
+ StorageTool.getUserScramCredentialRecords(namespace)
+ }
+
+ var scramRecords = parseAddScram()
+ assertEquals(None, scramRecords)
+
+ // Validate we can add multiple SCRAM creds.
+ scramRecords = parseAddScram("-S",
+
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]",
+ "-S",
+
"SCRAM-SHA-256=[name=george,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]")
+
+ assertEquals(2, scramRecords.get.size)
+
+ // Require name subfield.
+ try assertEquals(1, parseAddScram("-S",
+
"SCRAM-SHA-256=[salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]"))
catch {
+ case e: TerseFailure => assertEquals(s"You must supply 'name' to
add-scram", e.getMessage)
+ }
+
+ // Require password xor saltedpassword
+ try assertEquals(1, parseAddScram("-S",
+
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]"))
+ catch {
+ case e: TerseFailure => assertEquals(s"You must only supply one of
'password' or 'saltedpassword' to add-scram", e.getMessage)
+ }
+
+ try assertEquals(1, parseAddScram("-S",
+
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",iterations=8192]"))
+ catch {
+ case e: TerseFailure => assertEquals(s"You must supply one of 'password'
or 'saltedpassword' to add-scram", e.getMessage)
+ }
+
+ // Validate salt is required with saltedpassword
+ try assertEquals(1, parseAddScram("-S",
+
"SCRAM-SHA-256=[name=alice,saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]"))
+ catch {
+ case e: TerseFailure => assertEquals(s"You must supply 'salt' with
'saltedpassword' to add-scram", e.getMessage)
+ }
+
+ // Validate salt is optional with password
+ assertEquals(1, parseAddScram("-S",
"SCRAM-SHA-256=[name=alice,password=alice,iterations=4096]").get.size)
+
+ // Require 4096 <= iterations <= 16384
+ try assertEquals(1, parseAddScram("-S",
+
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=16385]"))
+ catch {
+ case e: TerseFailure => assertEquals(s"The 'iterations' value must be <=
16384 for add-scram", e.getMessage)
+ }
+
+ assertEquals(1, parseAddScram("-S",
+
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=16384]")
+ .get.size)
+
+ try assertEquals(1, parseAddScram("-S",
+
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=4095]"))
+ catch {
+ case e: TerseFailure => assertEquals(s"The 'iterations' value must be >=
4096 for add-scram", e.getMessage)
+ }
+
+ assertEquals(1, parseAddScram("-S",
+
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=4096]")
+ .get.size)
+
+ // Validate iterations is optional
+ assertEquals(1, parseAddScram("-S",
"SCRAM-SHA-256=[name=alice,password=alice]") .get.size)
+ }
+
+ class StorageToolTestException(message: String) extends
KafkaException(message) {
+ }
+
+ @Test
+ def testScramWithBadMetadataVersion(): Unit = {
+ var exitString: String = ""
+ def exitProcedure(exitStatus: Int, message: Option[String]) : Nothing = {
+ exitString = message.getOrElse("")
+ throw new StorageToolTestException(exitString)
+ }
+ Exit.setExitProcedure(exitProcedure)
+
+ val properties = newSelfManagedProperties()
+ val propsFile = TestUtils.tempFile()
+ val propsStream = Files.newOutputStream(propsFile.toPath)
+ properties.store(propsStream, "config.props")
+ propsStream.close()
+
+ val args = Array("format", "-c", s"${propsFile.toPath}", "-t",
"XcZZOzUqS4yHOjhMQB6JLQ", "--release-version", "3.4", "-S",
+
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=8192]")
+
+ try {
+ assertEquals(1, StorageTool.main(args))
+ } catch {
+ case e: StorageToolTestException => assertEquals(s"SCRAM is only
supported in metadataVersion IBP_3_5_IV0 or later.", exitString)
+ } finally {
+ Exit.resetExitProcedure()
+ }
+ }
+
+ @Test
+ def testNoScramWithMetadataVersion(): Unit = {
+ var exitString: String = ""
+ var exitStatus: Int = 1
+ def exitProcedure(status: Int, message: Option[String]) : Nothing = {
+ exitStatus = status
+ exitString = message.getOrElse("")
+ throw new StorageToolTestException(exitString)
+ }
+ Exit.setExitProcedure(exitProcedure)
+
+ val properties = newSelfManagedProperties()
+ val propsFile = TestUtils.tempFile()
+ val propsStream = Files.newOutputStream(propsFile.toPath)
+ // This test does format the directory specified so use a tempdir
+ properties.setProperty(KafkaConfig.LogDirsProp,
TestUtils.tempDir().toString)
+ properties.store(propsStream, "config.props")
+ propsStream.close()
+
+ val args = Array("format", "-c", s"${propsFile.toPath}", "-t",
"XcZZOzUqS4yHOjhMQB6JLQ", "--release-version", "3.4")
+
+ try {
+ StorageTool.main(args)
+ } catch {
+ case e: StorageToolTestException => assertEquals("", exitString)
+ assertEquals(0, exitStatus)
+ } finally {
+ Exit.resetExitProcedure()
+ }
+ }
}