This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 16cac978735 KAFKA-17636 Fix missing SCRAM bootstrap records (#17305)
16cac978735 is described below
commit 16cac978735eb2e73421247e74f7e09fec6d0410
Author: David Arthur <[email protected]>
AuthorDate: Sat Sep 28 09:57:25 2024 -0400
KAFKA-17636 Fix missing SCRAM bootstrap records (#17305)
Fixes a regression introduced by #16669 which inadvertently stopped
processing SCRAM arguments from kafka-storage.sh
Reviewers: Colin P. McCabe <[email protected]>, Federico Valeri
<[email protected]>
---
core/src/main/scala/kafka/tools/StorageTool.scala | 2 +
.../scala/unit/kafka/tools/StorageToolTest.scala | 50 +++++++++++++++++++++-
2 files changed, 51 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala
b/core/src/main/scala/kafka/tools/StorageTool.scala
index 13c7eaac17d..73ec9f4761b 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -139,6 +139,8 @@ object StorageTool extends Logging {
if (namespace.getBoolean("standalone")) {
formatter.setInitialVoters(createStandaloneDynamicVoters(config))
}
+ Option(namespace.getList("add_scram")).
+ foreach(scramArgs =>
formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]]))
configToLogDirectories(config).foreach(formatter.addDirectory(_))
formatter.run()
}
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index a4711a1969c..51aea8de83f 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -21,12 +21,14 @@ import java.io.{ByteArrayOutputStream, File, PrintStream}
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.util
-import java.util.Properties
+import java.util.{Optional, Properties}
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import net.sourceforge.argparse4j.inf.ArgumentParserException
+import org.apache.kafka.common.metadata.UserScramCredentialRecord
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.{Features, MetadataVersion}
+import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
import org.apache.kafka.metadata.properties.{MetaPropertiesEnsemble,
PropertiesUtils}
import org.apache.kafka.metadata.storage.FormatterException
import org.apache.kafka.raft.QuorumConfig
@@ -37,6 +39,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.collection.mutable.ListBuffer
+import scala.jdk.CollectionConverters.IterableHasAsScala
@Timeout(value = 40)
class StorageToolTest {
@@ -619,4 +622,49 @@ Found problem:
assertEquals("Invalid version format: invalid for feature
metadata.version", exception.getMessage)
}
+
+ @Test
+ def testBootstrapScramRecords(): Unit = {
+ val availableDirs = Seq(TestUtils.tempDir())
+ val properties = new Properties()
+ properties.putAll(defaultDynamicQuorumProperties)
+ properties.setProperty("log.dirs", availableDirs.mkString(","))
+ val stream = new ByteArrayOutputStream()
+ val arguments = ListBuffer[String](
+ "--release-version", "3.9-IV0",
+ "--add-scram", "SCRAM-SHA-512=[name=alice,password=changeit]",
+ "--add-scram", "SCRAM-SHA-512=[name=bob,password=changeit]"
+ )
+
+ assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq))
+
+ // Not doing full SCRAM record validation since that's covered elsewhere.
+ // Just checking that we generate the correct number of records
+ val bootstrapMetadata = new
BootstrapDirectory(availableDirs.head.toString, Optional.empty).read
+ val scramRecords = bootstrapMetadata.records().asScala
+ .filter(apiMessageAndVersion =>
apiMessageAndVersion.message().isInstanceOf[UserScramCredentialRecord])
+ .map(apiMessageAndVersion =>
apiMessageAndVersion.message().asInstanceOf[UserScramCredentialRecord])
+ .toList
+ assertEquals(2, scramRecords.size)
+ assertEquals("alice", scramRecords.head.name())
+ assertEquals("bob", scramRecords.last.name())
+ }
+
+ @Test
+ def testScramRecordsOldReleaseVersion(): Unit = {
+ val availableDirs = Seq(TestUtils.tempDir())
+ val properties = new Properties()
+ properties.putAll(defaultDynamicQuorumProperties)
+ properties.setProperty("log.dirs", availableDirs.mkString(","))
+ val stream = new ByteArrayOutputStream()
+ val arguments = ListBuffer[String](
+ "--release-version", "3.4",
+ "--add-scram", "SCRAM-SHA-512=[name=alice,password=changeit]",
+ "--add-scram", "SCRAM-SHA-512=[name=bob,password=changeit]"
+ )
+
+ assertEquals(
+ "SCRAM is only supported in metadata.version 3.5-IV2 or later.",
+ assertThrows(classOf[FormatterException], () => runFormatCommand(stream,
properties, arguments.toSeq)).getMessage)
+ }
}