This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 2cbc5bd3ca2 KAFKA-17636 Fix missing SCRAM bootstrap records (#17305)
2cbc5bd3ca2 is described below

commit 2cbc5bd3ca22185d9cd357c9db23a2cfb43a0fff
Author: David Arthur <[email protected]>
AuthorDate: Sat Sep 28 09:57:25 2024 -0400

    KAFKA-17636 Fix missing SCRAM bootstrap records (#17305)
    
    Fixes a regression introduced by #16669 which inadvertently stopped 
processing SCRAM arguments from kafka-storage.sh
    
    Reviewers: Colin P. McCabe <[email protected]>, Federico Valeri 
<[email protected]>
---
 core/src/main/scala/kafka/tools/StorageTool.scala  |  2 +
 .../scala/unit/kafka/tools/StorageToolTest.scala   | 51 +++++++++++++++++++++-
 2 files changed, 51 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala 
b/core/src/main/scala/kafka/tools/StorageTool.scala
index 43eb6579765..6dc46539614 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -130,6 +130,8 @@ object StorageTool extends Logging {
     if (namespace.getBoolean("standalone")) {
       formatter.setInitialVoters(createStandaloneDynamicVoters(config))
     }
+    Option(namespace.getList("add_scram")).
+      foreach(scramArgs => 
formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]]))
     configToLogDirectories(config).foreach(formatter.addDirectory(_))
     formatter.run()
   }
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala 
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 3a8f064b1df..83b21b43fba 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -21,12 +21,14 @@ import java.io.{ByteArrayOutputStream, File, PrintStream}
 import java.nio.charset.StandardCharsets
 import java.nio.file.Files
 import java.util
-import java.util.Properties
+import java.util.{Optional, Properties}
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import net.sourceforge.argparse4j.inf.ArgumentParserException
+import org.apache.kafka.common.metadata.UserScramCredentialRecord
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.common.Features
+import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
 import org.apache.kafka.metadata.properties.{MetaPropertiesEnsemble, 
PropertiesUtils}
 import org.apache.kafka.metadata.storage.FormatterException
 import org.apache.kafka.raft.QuorumConfig
@@ -37,6 +39,7 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
 import scala.collection.mutable.ListBuffer
+import scala.jdk.CollectionConverters.IterableHasAsScala
 
 @Timeout(value = 40)
 class StorageToolTest {
@@ -433,5 +436,49 @@ Found problem:
       contains("Formatting dynamic metadata voter directory 
%s".format(availableDirs.head)),
       "Failed to find content in output: " + stream.toString())
   }
-}
 
+  @Test
+  def testBootstrapScramRecords(): Unit = {
+    val availableDirs = Seq(TestUtils.tempDir())
+    val properties = new Properties()
+    properties.putAll(defaultDynamicQuorumProperties)
+    properties.setProperty("log.dirs", availableDirs.mkString(","))
+    val stream = new ByteArrayOutputStream()
+    val arguments = ListBuffer[String](
+      "--release-version", "3.9-IV0",
+      "--add-scram", "SCRAM-SHA-512=[name=alice,password=changeit]",
+      "--add-scram", "SCRAM-SHA-512=[name=bob,password=changeit]"
+    )
+
+    assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq))
+
+    // Not doing full SCRAM record validation since that's covered elsewhere.
+    // Just checking that we generate the correct number of records
+    val bootstrapMetadata = new 
BootstrapDirectory(availableDirs.head.toString, Optional.empty).read
+    val scramRecords = bootstrapMetadata.records().asScala
+      .filter(apiMessageAndVersion => 
apiMessageAndVersion.message().isInstanceOf[UserScramCredentialRecord])
+      .map(apiMessageAndVersion => 
apiMessageAndVersion.message().asInstanceOf[UserScramCredentialRecord])
+      .toList
+    assertEquals(2, scramRecords.size)
+    assertEquals("alice", scramRecords.head.name())
+    assertEquals("bob", scramRecords.last.name())
+  }
+
+  @Test
+  def testScramRecordsOldReleaseVersion(): Unit = {
+    val availableDirs = Seq(TestUtils.tempDir())
+    val properties = new Properties()
+    properties.putAll(defaultDynamicQuorumProperties)
+    properties.setProperty("log.dirs", availableDirs.mkString(","))
+    val stream = new ByteArrayOutputStream()
+    val arguments = ListBuffer[String](
+      "--release-version", "3.4",
+      "--add-scram", "SCRAM-SHA-512=[name=alice,password=changeit]",
+      "--add-scram", "SCRAM-SHA-512=[name=bob,password=changeit]"
+    )
+
+    assertEquals(
+      "SCRAM is only supported in metadata.version 3.5-IV2 or later.",
+      assertThrows(classOf[FormatterException], () => runFormatCommand(stream, 
properties, arguments.toSeq)).getMessage)
+  }
+}

Reply via email to