fvaleri commented on code in PR #13417:
URL: https://github.com/apache/kafka/pull/13417#discussion_r1271824235


##########
tools/src/main/java/org/apache/kafka/tools/StorageTool.java:
##########
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.security.scram.internals.ScramFormatter;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.broker.BrokerMetadataCheckpoint;
+import org.apache.kafka.metadata.broker.MetaProperties;
+import org.apache.kafka.metadata.broker.RawMetaProperties;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.append;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+
+public class StorageTool {
+    public static void main(String... args) {
+        try {
+            Namespace namespace = parseArguments(args);
+            String command = namespace.getString("command");
+            Optional<LogConfig> config = 
Optional.ofNullable(namespace.getString("config")).map(p -> {
+                try {
+                    return new LogConfig(Utils.loadProps(p), 
Collections.emptySet(), true);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            executeCommand(namespace, command, config);
+        } catch (Exception exception) {

Review Comment:
   Given we use TerseException, I think it's better to have the following error 
handling.
   
   ```sh
           } catch (TerseException e) {
               System.err.println(e.getMessage());
               Exit.exit(1);
           } catch (Throwable e) {
               System.err.println(e.getMessage());
               System.err.println(Utils.stackTrace(e));
               Exit.exit(1);
           }
   ```



##########
tools/src/main/java/org/apache/kafka/tools/StorageTool.java:
##########
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.security.scram.internals.ScramFormatter;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.broker.BrokerMetadataCheckpoint;
+import org.apache.kafka.metadata.broker.MetaProperties;
+import org.apache.kafka.metadata.broker.RawMetaProperties;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.append;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+
+public class StorageTool {
+    public static void main(String... args) {
+        try {
+            Namespace namespace = parseArguments(args);
+            String command = namespace.getString("command");
+            Optional<LogConfig> config = 
Optional.ofNullable(namespace.getString("config")).map(p -> {
+                try {
+                    return new LogConfig(Utils.loadProps(p), 
Collections.emptySet(), true);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            executeCommand(namespace, command, config);
+        } catch (Exception exception) {
+            System.err.println(exception.getMessage());
+            Exit.exit(1, exception.getMessage());
+        }
+    }
+
+    private static void executeCommand(Namespace namespace, String command, 
Optional<LogConfig> config) throws Exception {
+        final String info = "info";
+        final String format = "format";
+        if ((command.equals(info) || command.equals(format)) && 
!config.isPresent()) {

Review Comment:
   This is useless, because argparse4j validates all required args when 
parsing, which is before this point. This also means that we don't need the two 
variables above.



##########
tools/src/main/java/org/apache/kafka/tools/StorageTool.java:
##########
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.security.scram.internals.ScramFormatter;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.broker.BrokerMetadataCheckpoint;
+import org.apache.kafka.metadata.broker.MetaProperties;
+import org.apache.kafka.metadata.broker.RawMetaProperties;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.append;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+
+public class StorageTool {
+    public static void main(String... args) {
+        try {
+            Namespace namespace = parseArguments(args);
+            String command = namespace.getString("command");
+            Optional<LogConfig> config = 
Optional.ofNullable(namespace.getString("config")).map(p -> {
+                try {
+                    return new LogConfig(Utils.loadProps(p), 
Collections.emptySet(), true);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            executeCommand(namespace, command, config);
+        } catch (Exception exception) {
+            System.err.println(exception.getMessage());
+            Exit.exit(1, exception.getMessage());
+        }
+    }
+
+    private static void executeCommand(Namespace namespace, String command, 
Optional<LogConfig> config) throws Exception {
+        final String info = "info";
+        final String format = "format";
+        if ((command.equals(info) || command.equals(format)) && 
!config.isPresent()) {
+            return; // Do nothing if config is not present
+        }
+
+        switch (command) {
+            case info: {
+                List<String> directories = 
configToLogDirectories(config.get());
+                boolean selfManagedMode = 
configToSelfManagedMode(config.get());
+                Exit.exit(infoCommand(System.out, selfManagedMode, 
directories));
+                break;
+            }
+            case format: {
+                List<String> directories = 
configToLogDirectories(config.get());
+                String clusterId = namespace.getString("cluster_id");
+                MetadataVersion metadataVersion = 
getMetadataVersion(namespace, 
Optional.of(config.get().getInterBrokerProtocolVersionString()));
+                if (!metadataVersion.isKRaftSupported()) {
+                    throw new TerseException("Must specify a valid KRaft 
metadata version of at least 3.0.");
+                }
+                MetaProperties metaProperties = 
buildMetadataProperties(clusterId, config.get());
+
+                List<ApiMessageAndVersion> metadataRecords = new ArrayList<>();
+                Optional<List<UserScramCredentialRecord>> scramRecordsOptional 
= getUserScramCredentialRecords(namespace);
+                if (scramRecordsOptional.isPresent()) {
+                    if (!metadataVersion.isScramSupported()) {
+                        throw new TerseException("SCRAM is only supported in 
metadataVersion IBP_3_5_IV2 or later.");
+                    }
+                    for (ApiMessage record : scramRecordsOptional.get()) {
+                        metadataRecords.add(new ApiMessageAndVersion(record, 
(short) 0));
+                    }
+                }
+
+                BootstrapMetadata bootstrapMetadata = 
buildBootstrapMetadata(metadataVersion, Optional.of(metadataRecords), "format 
command");
+                boolean ignoreFormatted = 
namespace.getBoolean("ignore_formatted");
+                if (!configToSelfManagedMode(config.get())) {
+                    throw new TerseException("The kafka configuration file 
appears to be for " +
+                        "a legacy cluster. Formatting is only supported for 
clusters in KRaft mode.");
+                }
+                Exit.exit(formatCommand(System.out, directories, 
metaProperties, bootstrapMetadata,
+                    metadataVersion, ignoreFormatted));
+                break;
+            }
+            case "random-uuid": {
+                System.out.println(Uuid.randomUuid());
+                Exit.exit(0);
+                break;
+            }
+            default:
+                throw new RuntimeException("Unknown command " + command);
+        }
+    }
+
+    static int infoCommand(PrintStream stream, boolean selfManagedMode, 
List<String> directories) throws IOException {
+        List<String> problems = new ArrayList<>();
+        List<String> foundDirectories = new ArrayList<>();
+        Optional<RawMetaProperties> prevMetadata = Optional.empty();
+        for (String directory : directories) {

Review Comment:
   Thanks for removing the double sorting contained in 
`directories.sorted.foreach`.



##########
tools/src/main/java/org/apache/kafka/tools/StorageTool.java:
##########
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.security.scram.internals.ScramFormatter;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.broker.BrokerMetadataCheckpoint;
+import org.apache.kafka.metadata.broker.MetaProperties;
+import org.apache.kafka.metadata.broker.RawMetaProperties;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.append;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+
+public class StorageTool {
+    public static void main(String... args) {
+        try {
+            Namespace namespace = parseArguments(args);
+            String command = namespace.getString("command");
+            Optional<LogConfig> config = 
Optional.ofNullable(namespace.getString("config")).map(p -> {
+                try {
+                    return new LogConfig(Utils.loadProps(p), 
Collections.emptySet(), true);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            executeCommand(namespace, command, config);
+        } catch (Exception exception) {
+            System.err.println(exception.getMessage());
+            Exit.exit(1, exception.getMessage());
+        }
+    }
+
+    private static void executeCommand(Namespace namespace, String command, 
Optional<LogConfig> config) throws Exception {
+        final String info = "info";
+        final String format = "format";
+        if ((command.equals(info) || command.equals(format)) && 
!config.isPresent()) {
+            return; // Do nothing if config is not present
+        }
+
+        switch (command) {
+            case info: {
+                List<String> directories = 
configToLogDirectories(config.get());
+                boolean selfManagedMode = 
configToSelfManagedMode(config.get());
+                Exit.exit(infoCommand(System.out, selfManagedMode, 
directories));
+                break;
+            }
+            case format: {
+                List<String> directories = 
configToLogDirectories(config.get());
+                String clusterId = namespace.getString("cluster_id");
+                MetadataVersion metadataVersion = 
getMetadataVersion(namespace, 
Optional.of(config.get().getInterBrokerProtocolVersionString()));
+                if (!metadataVersion.isKRaftSupported()) {
+                    throw new TerseException("Must specify a valid KRaft 
metadata version of at least 3.0.");
+                }
+                MetaProperties metaProperties = 
buildMetadataProperties(clusterId, config.get());
+
+                List<ApiMessageAndVersion> metadataRecords = new ArrayList<>();
+                Optional<List<UserScramCredentialRecord>> scramRecordsOptional 
= getUserScramCredentialRecords(namespace);
+                if (scramRecordsOptional.isPresent()) {
+                    if (!metadataVersion.isScramSupported()) {
+                        throw new TerseException("SCRAM is only supported in 
metadataVersion IBP_3_5_IV2 or later.");
+                    }
+                    for (ApiMessage record : scramRecordsOptional.get()) {
+                        metadataRecords.add(new ApiMessageAndVersion(record, 
(short) 0));
+                    }
+                }
+
+                BootstrapMetadata bootstrapMetadata = 
buildBootstrapMetadata(metadataVersion, Optional.of(metadataRecords), "format 
command");
+                boolean ignoreFormatted = 
namespace.getBoolean("ignore_formatted");
+                if (!configToSelfManagedMode(config.get())) {
+                    throw new TerseException("The kafka configuration file 
appears to be for " +
+                        "a legacy cluster. Formatting is only supported for 
clusters in KRaft mode.");
+                }
+                Exit.exit(formatCommand(System.out, directories, 
metaProperties, bootstrapMetadata,
+                    metadataVersion, ignoreFormatted));
+                break;
+            }
+            case "random-uuid": {
+                System.out.println(Uuid.randomUuid());
+                Exit.exit(0);
+                break;
+            }
+            default:
+                throw new RuntimeException("Unknown command " + command);
+        }
+    }
+
+    static int infoCommand(PrintStream stream, boolean selfManagedMode, 
List<String> directories) throws IOException {
+        List<String> problems = new ArrayList<>();
+        List<String> foundDirectories = new ArrayList<>();
+        Optional<RawMetaProperties> prevMetadata = Optional.empty();
+        for (String directory : directories) {
+            Path directoryPath = Paths.get(directory);
+            if (!Files.isDirectory(directoryPath)) {
+                if (!Files.exists(directoryPath)) {
+                    problems.add(directoryPath + " does not exist");
+                } else {
+                    problems.add(directoryPath + " is not a directory");
+                }
+            } else {
+                foundDirectories.add(directoryPath.toString());
+                Path metaPath = directoryPath.resolve("meta.properties");
+                if (!Files.exists(metaPath)) {
+                    problems.add(directoryPath + " is not formatted.");
+                } else {
+                    Properties properties = 
Utils.loadProps(metaPath.toString());
+                    RawMetaProperties rawMetaProperties = new 
RawMetaProperties(properties);
+                    Optional<RawMetaProperties> curMetadata;
+
+                    switch (rawMetaProperties.getVersion()) {
+                        case 0:
+                        case 1:
+                            curMetadata = Optional.of(rawMetaProperties);
+                            break;
+                        default:
+                            problems.add("Unsupported version for " + metaPath 
+ ": " + rawMetaProperties.getVersion());
+                            curMetadata = Optional.empty();
+                            break;
+                    }
+
+                    if (!prevMetadata.isPresent()) {
+                        prevMetadata = curMetadata;
+                    } else {
+                        if (curMetadata.isPresent() && 
!prevMetadata.get().equals(curMetadata.get())) {
+                            problems.add(String.format("Metadata for %s was 
%s, but other directories featured %s", metaPath, curMetadata.get(), 
prevMetadata.get()));
+                        }
+                    }
+                }
+            }
+        }
+
+        if (prevMetadata.isPresent()) {
+            if (selfManagedMode) {
+                if (prevMetadata.get().getVersion() == 0) {
+                    problems.add("The kafka configuration file appears to be 
for a cluster in KRaft mode, but " + "the directories are formatted for legacy 
mode.");
+                }
+            } else if (prevMetadata.get().getVersion() == 1) {
+                problems.add("The kafka configuration file appears to be for a 
legacy cluster, but " + "the directories are formatted for a cluster in KRaft 
mode.");
+            }
+        }
+
+        return validateDirectories(stream, directories, problems, 
foundDirectories, prevMetadata);
+    }
+
+    private static int validateDirectories(PrintStream stream, List<String> 
directories, List<String> problems, List<String> foundDirectories, 
Optional<RawMetaProperties> prevMetadata) {
+        if (directories.isEmpty()) {
+            stream.println("No directories specified.");
+            return 0;
+        } else {
+            if (!foundDirectories.isEmpty()) {
+                if (foundDirectories.size() == 1) {
+                    stream.println("Found log directory:");
+                } else {
+                    stream.println("Found log directories:");
+                }
+                foundDirectories.forEach(d -> stream.println("  " + d));
+                stream.println("");
+            }
+
+            if (prevMetadata.isPresent()) {
+                RawMetaProperties prev = prevMetadata.get();
+                stream.println("Found metadata: " + prev);
+                stream.println("");
+            }
+
+            if (!problems.isEmpty()) {
+                if (problems.size() == 1) {
+                    stream.println("Found problem:");
+                } else {
+                    stream.println("Found problems:");
+                }
+                problems.forEach(d -> stream.println("  " + d));
+                stream.println("");
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+    }
+
+
+    public static Namespace parseArguments(String... args) {
+        ArgumentParser parser = 
ArgumentParsers.newArgumentParser("kafka-storage").defaultHelp(true).description("The
 Kafka storage tool.");
+        Subparsers subparsers = parser.addSubparsers().dest("command");
+        Subparser infoParser = subparsers.addParser("info").help("Get 
information about the Kafka log directories on this node.");
+        Subparser formatParser = subparsers.addParser("format").help("Format 
the Kafka log directories on this node.");
+        subparsers.addParser("random-uuid").help("Print a random UUID.");
+
+        for (Subparser subpparser : Arrays.asList(infoParser, formatParser)) {
+            subpparser.addArgument("--config", 
"-c").action(store()).required(true).help("The Kafka configuration file to 
use.");
+        }
+
+        formatParser.addArgument("--cluster-id", 
"-t").action(store()).required(true).help("The cluster ID to use.");
+        formatParser.addArgument("--add-scram", "-S").action(append()).help("A 
SCRAM_CREDENTIAL to add to the __cluster_metadata log e.g.\n" +
+            "'SCRAM-SHA-256=[name=alice,password=alice-secret]'\n" +
+            
"'SCRAM-SHA-512=[name=alice,iterations=8192,salt=\"N3E=\",saltedpassword=\"YCE=\"]'");
+        formatParser.addArgument("--ignore-formatted", 
"-g").action(storeTrue());
+        formatParser.addArgument("--release-version", 
"-r").action(store()).help(String.format("A KRaft release version to use for 
the initial metadata version. The minimum is 3.0, the default is %s", 
MetadataVersion.latest().version()));
+
+        return parser.parseArgsOrFail(args);
+    }
+
+    static List<String> configToLogDirectories(LogConfig logConfig) {
+        List<String> logDirs = logConfig.getLogDirs();
+        SortedSet<String> directories = new TreeSet<>(logDirs);
+        String metadataLogDir = logConfig.getMetadataLogDir();
+        if (metadataLogDir != null) {
+            directories.add(metadataLogDir);
+        }
+        return new ArrayList<>(directories);
+    }
+
+    static boolean configToSelfManagedMode(LogConfig logConfig) {
+        return !logConfig.parseProcessRoles().isEmpty();
+    }
+
+

Review Comment:
   Nit: extra empty line.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -489,6 +575,29 @@ public static void validateValues(Map<?, ?> props) {
         }
     }
 
+    public static Map<?, ?> populateAndValidateProps(Map<?, ?> props) {

Review Comment:
   I think we can find a better name or merge with validateOtherConfig. Wdyt?



##########
tools/src/main/java/org/apache/kafka/tools/StorageTool.java:
##########
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.security.scram.internals.ScramFormatter;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.broker.BrokerMetadataCheckpoint;
+import org.apache.kafka.metadata.broker.MetaProperties;
+import org.apache.kafka.metadata.broker.RawMetaProperties;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.append;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+
+public class StorageTool {
+    public static void main(String... args) {
+        try {
+            Namespace namespace = parseArguments(args);
+            String command = namespace.getString("command");
+            Optional<LogConfig> config = 
Optional.ofNullable(namespace.getString("config")).map(p -> {
+                try {
+                    return new LogConfig(Utils.loadProps(p), 
Collections.emptySet(), true);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            executeCommand(namespace, command, config);
+        } catch (Exception exception) {
+            System.err.println(exception.getMessage());
+            Exit.exit(1, exception.getMessage());
+        }
+    }
+
+    private static void executeCommand(Namespace namespace, String command, 
Optional<LogConfig> config) throws Exception {
+        final String info = "info";
+        final String format = "format";
+        if ((command.equals(info) || command.equals(format)) && 
!config.isPresent()) {
+            return; // Do nothing if config is not present
+        }
+
+        switch (command) {
+            case info: {
+                List<String> directories = 
configToLogDirectories(config.get());
+                boolean selfManagedMode = 
configToSelfManagedMode(config.get());
+                Exit.exit(infoCommand(System.out, selfManagedMode, 
directories));
+                break;
+            }
+            case format: {

Review Comment:
   This code looks good, but I was wondering that we could improve it by moving 
all checks and object creations to the command methods. Something like:
   
   ```sh
           switch (command) {
               case info: {
                   Exit.exit(infoCommand(System.out, config));
                   break;
               }
               case format: {
                   Exit.exit(formatCommand(System.out, namespace, config));
                   break;
               }
   ...
   ```



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -389,10 +462,23 @@ public LogConfig(Map<?, ?> props, Set<String> 
overriddenConfigs) {
         this.leaderReplicationThrottledReplicas = 
Collections.unmodifiableList(getList(LogConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
         this.followerReplicationThrottledReplicas = 
Collections.unmodifiableList(getList(LogConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
         this.messageDownConversionEnable = 
getBoolean(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG);
-
+        this.interBrokerProtocolVersion = 
getString(INTER_BROKER_PROTOCOL_VERSION_PROP);
+        this.brokerId = getInt(BROKER_ID_PROP);
+        this.maxReservedBrokerId = getInt(MAX_RESERVED_BROKER_ID_PROP);
+        this.nodeId = getInt(LogConfig.NODE_ID_PROP);
+        this.brokerIdGenerationEnable = 
getBoolean(BROKER_ID_GENERATION_ENABLE_PROP);
+        this.logDirs = getString(LOG_DIRS_PROP);
+        this.logDir = getString(LOG_DIR_PROP);
+        processRoles = parseProcessRoles();
+        this.zkConnect = getString(ZOOKEEPER_CONNECT_PROP);
         remoteLogConfig = new RemoteLogConfig(this, retentionMs, 
retentionSize);
     }
 
+    public LogConfig(Map<?, ?> props, Set<String> overriddenConfigs, boolean 
populateSynonymsAndValidateProps) {

Review Comment:
   populateSynonymsAndValidateProps is never used



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -230,6 +235,55 @@ public Optional<String> serverConfigName(String 
configName) {
         FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG
     ));
 
+    private static final String DEFAULT_LOG_DIR = "/tmp/kafka-logs";
+    private static final String LOG_CONFIG_PREFIX = "log.";
+    private static final String LOG_DIR_PROP = LOG_CONFIG_PREFIX + "dir";
+    private static final String LOG_DIRS_PROP = LOG_CONFIG_PREFIX + "dirs";
+
+    private static final String NODE_ID_PROP = "node.id";
+    private static final String BROKER_ID_PROP = "broker.id";
+    private static final String MAX_RESERVED_BROKER_ID_PROP = 
"reserved.broker.max.id";
+
+    private static final int DEFAULT_EMPTY_NODE_ID = -1;
+
+    private static final String METADATA_LOG_DIR_PROP = "metadata.log.dir";
+
+    private static final String PROCESS_ROLES_PROP = "process.roles";
+
+    private static final String INTER_BROKER_PROTOCOL_VERSION_PROP = 
"inter.broker.protocol.version";
+
+    private static final String BROKER_ID_GENERATION_ENABLE_PROP = 
"broker.id.generation.enable";
+
+    private static final String ZOOKEEPER_CONNECT_PROP = "zookeeper.connect";
+
+    private static final String 
INTER_BROKER_PROTOCOL_VERSION_PROP_DEFAULT_VALUE = 
MetadataVersion.latest().version();
+    private static final String LOG_DIR_DOC = "The directory in which the log 
data is kept (supplemental for " + LOG_DIR_PROP + " property)";
+    private static final String METADATA_LOG_DIR_DOC = "This configuration 
determines where we put the metadata log for clusters in KRaft mode. " +
+        "If it is not set, the metadata log is placed in the first log 
directory from log.dirs.";
+    private static final String INTER_BROKER_PROTOCOL_VERSION_DOC = "Specify 
which version of the inter-broker protocol will be used.\n" +
+        " This is typically bumped after all brokers were upgraded to a new 
version.\n" +
+        " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 
0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1 Check MetadataVersion for the full list.";
+
+    private static final String LOG_DIRS_DOC = "A comma-separated list of the 
directories where the log data is stored. If not set, the value in " + 
LOG_DIRS_PROP + " is used.";
+
+    private static final String BROKER_ID_DOC = "The broker id for this 
server. If unset, a unique broker id will be generated." +
+        "To avoid conflicts between ZooKeeper generated broker id's and user 
configured broker id's, generated broker ids " +
+        "start from " + MAX_RESERVED_BROKER_ID_PROP + " + 1.";
+
+    private static final String NODE_ID_DOC = "The node ID associated with the 
roles this process is playing when `process.roles` is non-empty. " +
+        "This is required configuration when running in KRaft mode.";
+    private static final String PROCESS_ROLES_DOC = "The roles that this 
process plays: 'broker', 'controller', or 'broker,controller' if it is both. " +
+        "This configuration is only applicable for clusters in KRaft (Kafka 
Raft) mode (instead of ZooKeeper). Leave this config undefined or empty for 
ZooKeeper clusters.";
+
+    private static final String BROKER_ID_GENERATION_ENABLE_DOC = "Enable 
automatic broker id generation on the server. When enabled the value configured 
for $MaxReservedBrokerIdProp should be reviewed.";
+
+    private static final String MAX_RESERVED_BROKER_ID_DOC = "Max number that 
can be used for a broker.id";
+
+    private static final String ZK_CONNECT_DOC = "Specifies the ZooKeeper 
connection string in the form <code>hostname:port</code> where host and port 
are the " +

Review Comment:
   I don't think this information should be in LogConfig.



##########
tools/src/main/java/org/apache/kafka/tools/StorageTool.java:
##########
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.security.scram.internals.ScramFormatter;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.broker.BrokerMetadataCheckpoint;
+import org.apache.kafka.metadata.broker.MetaProperties;
+import org.apache.kafka.metadata.broker.RawMetaProperties;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.append;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+
+public class StorageTool {
+    public static void main(String... args) {
+        try {
+            Namespace namespace = parseArguments(args);
+            String command = namespace.getString("command");
+            Optional<LogConfig> config = 
Optional.ofNullable(namespace.getString("config")).map(p -> {
+                try {
+                    return new LogConfig(Utils.loadProps(p), 
Collections.emptySet(), true);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            executeCommand(namespace, command, config);
+        } catch (Exception exception) {
+            System.err.println(exception.getMessage());
+            Exit.exit(1, exception.getMessage());
+        }
+    }
+
+    private static void executeCommand(Namespace namespace, String command, 
Optional<LogConfig> config) throws Exception {
+        final String info = "info";
+        final String format = "format";
+        if ((command.equals(info) || command.equals(format)) && 
!config.isPresent()) {
+            return; // Do nothing if config is not present
+        }
+
+        switch (command) {
+            case info: {
+                List<String> directories = 
configToLogDirectories(config.get());
+                boolean selfManagedMode = 
configToSelfManagedMode(config.get());
+                Exit.exit(infoCommand(System.out, selfManagedMode, 
directories));
+                break;
+            }
+            case format: {
+                List<String> directories = 
configToLogDirectories(config.get());
+                String clusterId = namespace.getString("cluster_id");
+                MetadataVersion metadataVersion = 
getMetadataVersion(namespace, 
Optional.of(config.get().getInterBrokerProtocolVersionString()));
+                if (!metadataVersion.isKRaftSupported()) {
+                    throw new TerseException("Must specify a valid KRaft 
metadata version of at least 3.0.");
+                }
+                MetaProperties metaProperties = 
buildMetadataProperties(clusterId, config.get());
+
+                List<ApiMessageAndVersion> metadataRecords = new ArrayList<>();
+                Optional<List<UserScramCredentialRecord>> scramRecordsOptional 
= getUserScramCredentialRecords(namespace);
+                if (scramRecordsOptional.isPresent()) {
+                    if (!metadataVersion.isScramSupported()) {
+                        throw new TerseException("SCRAM is only supported in 
metadataVersion IBP_3_5_IV2 or later.");
+                    }
+                    for (ApiMessage record : scramRecordsOptional.get()) {
+                        metadataRecords.add(new ApiMessageAndVersion(record, 
(short) 0));
+                    }
+                }
+
+                BootstrapMetadata bootstrapMetadata = 
buildBootstrapMetadata(metadataVersion, Optional.of(metadataRecords), "format 
command");
+                boolean ignoreFormatted = 
namespace.getBoolean("ignore_formatted");
+                if (!configToSelfManagedMode(config.get())) {
+                    throw new TerseException("The kafka configuration file 
appears to be for " +
+                        "a legacy cluster. Formatting is only supported for 
clusters in KRaft mode.");
+                }
+                Exit.exit(formatCommand(System.out, directories, 
metaProperties, bootstrapMetadata,
+                    metadataVersion, ignoreFormatted));
+                break;
+            }
+            case "random-uuid": {
+                System.out.println(Uuid.randomUuid());
+                Exit.exit(0);
+                break;
+            }
+            default:
+                throw new RuntimeException("Unknown command " + command);
+        }
+    }
+
+    static int infoCommand(PrintStream stream, boolean selfManagedMode, 
List<String> directories) throws IOException {
+        List<String> problems = new ArrayList<>();
+        List<String> foundDirectories = new ArrayList<>();
+        Optional<RawMetaProperties> prevMetadata = Optional.empty();
+        for (String directory : directories) {
+            Path directoryPath = Paths.get(directory);
+            if (!Files.isDirectory(directoryPath)) {
+                if (!Files.exists(directoryPath)) {
+                    problems.add(directoryPath + " does not exist");
+                } else {
+                    problems.add(directoryPath + " is not a directory");
+                }
+            } else {
+                foundDirectories.add(directoryPath.toString());
+                Path metaPath = directoryPath.resolve("meta.properties");
+                if (!Files.exists(metaPath)) {
+                    problems.add(directoryPath + " is not formatted.");
+                } else {
+                    Properties properties = 
Utils.loadProps(metaPath.toString());
+                    RawMetaProperties rawMetaProperties = new 
RawMetaProperties(properties);
+                    Optional<RawMetaProperties> curMetadata;
+
+                    switch (rawMetaProperties.getVersion()) {
+                        case 0:
+                        case 1:
+                            curMetadata = Optional.of(rawMetaProperties);
+                            break;
+                        default:
+                            problems.add("Unsupported version for " + metaPath 
+ ": " + rawMetaProperties.getVersion());
+                            curMetadata = Optional.empty();
+                            break;
+                    }
+
+                    if (!prevMetadata.isPresent()) {
+                        prevMetadata = curMetadata;
+                    } else {
+                        if (curMetadata.isPresent() && 
!prevMetadata.get().equals(curMetadata.get())) {
+                            problems.add(String.format("Metadata for %s was 
%s, but other directories featured %s", metaPath, curMetadata.get(), 
prevMetadata.get()));
+                        }
+                    }
+                }
+            }
+        }
+
+        if (prevMetadata.isPresent()) {
+            if (selfManagedMode) {
+                if (prevMetadata.get().getVersion() == 0) {
+                    problems.add("The kafka configuration file appears to be 
for a cluster in KRaft mode, but " + "the directories are formatted for legacy 
mode.");

Review Comment:
   Nit: remove string concatenation or add a carriage return.



##########
tools/src/main/java/org/apache/kafka/tools/StorageTool.java:
##########
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.security.scram.internals.ScramFormatter;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.broker.BrokerMetadataCheckpoint;
+import org.apache.kafka.metadata.broker.MetaProperties;
+import org.apache.kafka.metadata.broker.RawMetaProperties;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.append;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+
+public class StorageTool {
+    public static void main(String... args) {
+        try {
+            Namespace namespace = parseArguments(args);
+            String command = namespace.getString("command");
+            Optional<LogConfig> config = 
Optional.ofNullable(namespace.getString("config")).map(p -> {
+                try {
+                    return new LogConfig(Utils.loadProps(p), 
Collections.emptySet(), true);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            executeCommand(namespace, command, config);
+        } catch (Exception exception) {
+            System.err.println(exception.getMessage());
+            Exit.exit(1, exception.getMessage());
+        }
+    }
+
+    private static void executeCommand(Namespace namespace, String command, 
Optional<LogConfig> config) throws Exception {
+        final String info = "info";
+        final String format = "format";
+        if ((command.equals(info) || command.equals(format)) && 
!config.isPresent()) {
+            return; // Do nothing if config is not present
+        }
+
+        switch (command) {
+            case info: {
+                List<String> directories = 
configToLogDirectories(config.get());
+                boolean selfManagedMode = 
configToSelfManagedMode(config.get());
+                Exit.exit(infoCommand(System.out, selfManagedMode, 
directories));
+                break;
+            }
+            case format: {
+                List<String> directories = 
configToLogDirectories(config.get());
+                String clusterId = namespace.getString("cluster_id");
+                MetadataVersion metadataVersion = 
getMetadataVersion(namespace, 
Optional.of(config.get().getInterBrokerProtocolVersionString()));
+                if (!metadataVersion.isKRaftSupported()) {
+                    throw new TerseException("Must specify a valid KRaft 
metadata version of at least 3.0.");
+                }
+                MetaProperties metaProperties = 
buildMetadataProperties(clusterId, config.get());
+
+                List<ApiMessageAndVersion> metadataRecords = new ArrayList<>();
+                Optional<List<UserScramCredentialRecord>> scramRecordsOptional 
= getUserScramCredentialRecords(namespace);
+                if (scramRecordsOptional.isPresent()) {
+                    if (!metadataVersion.isScramSupported()) {
+                        throw new TerseException("SCRAM is only supported in 
metadataVersion IBP_3_5_IV2 or later.");
+                    }
+                    for (ApiMessage record : scramRecordsOptional.get()) {
+                        metadataRecords.add(new ApiMessageAndVersion(record, 
(short) 0));
+                    }
+                }
+
+                BootstrapMetadata bootstrapMetadata = 
buildBootstrapMetadata(metadataVersion, Optional.of(metadataRecords), "format 
command");
+                boolean ignoreFormatted = 
namespace.getBoolean("ignore_formatted");
+                if (!configToSelfManagedMode(config.get())) {
+                    throw new TerseException("The kafka configuration file 
appears to be for " +
+                        "a legacy cluster. Formatting is only supported for 
clusters in KRaft mode.");
+                }
+                Exit.exit(formatCommand(System.out, directories, 
metaProperties, bootstrapMetadata,
+                    metadataVersion, ignoreFormatted));
+                break;
+            }
+            case "random-uuid": {
+                System.out.println(Uuid.randomUuid());
+                Exit.exit(0);
+                break;
+            }
+            default:
+                throw new RuntimeException("Unknown command " + command);
+        }
+    }
+
+    static int infoCommand(PrintStream stream, boolean selfManagedMode, 
List<String> directories) throws IOException {
+        List<String> problems = new ArrayList<>();
+        List<String> foundDirectories = new ArrayList<>();
+        Optional<RawMetaProperties> prevMetadata = Optional.empty();
+        for (String directory : directories) {
+            Path directoryPath = Paths.get(directory);
+            if (!Files.isDirectory(directoryPath)) {
+                if (!Files.exists(directoryPath)) {
+                    problems.add(directoryPath + " does not exist");
+                } else {
+                    problems.add(directoryPath + " is not a directory");
+                }
+            } else {
+                foundDirectories.add(directoryPath.toString());
+                Path metaPath = directoryPath.resolve("meta.properties");
+                if (!Files.exists(metaPath)) {
+                    problems.add(directoryPath + " is not formatted.");
+                } else {
+                    Properties properties = 
Utils.loadProps(metaPath.toString());
+                    RawMetaProperties rawMetaProperties = new 
RawMetaProperties(properties);
+                    Optional<RawMetaProperties> curMetadata;
+
+                    switch (rawMetaProperties.getVersion()) {
+                        case 0:
+                        case 1:
+                            curMetadata = Optional.of(rawMetaProperties);
+                            break;
+                        default:
+                            problems.add("Unsupported version for " + metaPath 
+ ": " + rawMetaProperties.getVersion());
+                            curMetadata = Optional.empty();
+                            break;
+                    }
+
+                    if (!prevMetadata.isPresent()) {
+                        prevMetadata = curMetadata;
+                    } else {
+                        if (curMetadata.isPresent() && 
!prevMetadata.get().equals(curMetadata.get())) {
+                            problems.add(String.format("Metadata for %s was 
%s, but other directories featured %s", metaPath, curMetadata.get(), 
prevMetadata.get()));
+                        }
+                    }
+                }
+            }
+        }
+
+        if (prevMetadata.isPresent()) {
+            if (selfManagedMode) {
+                if (prevMetadata.get().getVersion() == 0) {
+                    problems.add("The kafka configuration file appears to be 
for a cluster in KRaft mode, but " + "the directories are formatted for legacy 
mode.");
+                }
+            } else if (prevMetadata.get().getVersion() == 1) {
+                problems.add("The kafka configuration file appears to be for a 
legacy cluster, but " + "the directories are formatted for a cluster in KRaft 
mode.");

Review Comment:
   Nit: remove string concatenation or add a carriage return.



##########
tools/src/main/java/org/apache/kafka/tools/StorageTool.java:
##########
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.security.scram.internals.ScramFormatter;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.broker.BrokerMetadataCheckpoint;
+import org.apache.kafka.metadata.broker.MetaProperties;
+import org.apache.kafka.metadata.broker.RawMetaProperties;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.append;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+
+public class StorageTool {
+    public static void main(String... args) {
+        try {
+            Namespace namespace = parseArguments(args);
+            String command = namespace.getString("command");
+            Optional<LogConfig> config = 
Optional.ofNullable(namespace.getString("config")).map(p -> {
+                try {
+                    return new LogConfig(Utils.loadProps(p), 
Collections.emptySet(), true);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            executeCommand(namespace, command, config);
+        } catch (Exception exception) {
+            System.err.println(exception.getMessage());
+            Exit.exit(1, exception.getMessage());
+        }
+    }
+
+    private static void executeCommand(Namespace namespace, String command, 
Optional<LogConfig> config) throws Exception {
+        final String info = "info";
+        final String format = "format";
+        if ((command.equals(info) || command.equals(format)) && 
!config.isPresent()) {
+            return; // Do nothing if config is not present
+        }
+
+        switch (command) {
+            case info: {
+                List<String> directories = 
configToLogDirectories(config.get());
+                boolean selfManagedMode = 
configToSelfManagedMode(config.get());
+                Exit.exit(infoCommand(System.out, selfManagedMode, 
directories));
+                break;
+            }
+            case format: {
+                List<String> directories = 
configToLogDirectories(config.get());
+                String clusterId = namespace.getString("cluster_id");
+                MetadataVersion metadataVersion = 
getMetadataVersion(namespace, 
Optional.of(config.get().getInterBrokerProtocolVersionString()));
+                if (!metadataVersion.isKRaftSupported()) {
+                    throw new TerseException("Must specify a valid KRaft 
metadata version of at least 3.0.");
+                }
+                MetaProperties metaProperties = 
buildMetadataProperties(clusterId, config.get());
+
+                List<ApiMessageAndVersion> metadataRecords = new ArrayList<>();
+                Optional<List<UserScramCredentialRecord>> scramRecordsOptional 
= getUserScramCredentialRecords(namespace);
+                if (scramRecordsOptional.isPresent()) {
+                    if (!metadataVersion.isScramSupported()) {
+                        throw new TerseException("SCRAM is only supported in 
metadataVersion IBP_3_5_IV2 or later.");
+                    }
+                    for (ApiMessage record : scramRecordsOptional.get()) {
+                        metadataRecords.add(new ApiMessageAndVersion(record, 
(short) 0));
+                    }
+                }
+
+                BootstrapMetadata bootstrapMetadata = 
buildBootstrapMetadata(metadataVersion, Optional.of(metadataRecords), "format 
command");
+                boolean ignoreFormatted = 
namespace.getBoolean("ignore_formatted");
+                if (!configToSelfManagedMode(config.get())) {
+                    throw new TerseException("The kafka configuration file 
appears to be for " +
+                        "a legacy cluster. Formatting is only supported for 
clusters in KRaft mode.");
+                }
+                Exit.exit(formatCommand(System.out, directories, 
metaProperties, bootstrapMetadata,
+                    metadataVersion, ignoreFormatted));
+                break;
+            }
+            case "random-uuid": {
+                System.out.println(Uuid.randomUuid());
+                Exit.exit(0);
+                break;
+            }
+            default:
+                throw new RuntimeException("Unknown command " + command);
+        }
+    }
+
+    static int infoCommand(PrintStream stream, boolean selfManagedMode, 
List<String> directories) throws IOException {
+        List<String> problems = new ArrayList<>();
+        List<String> foundDirectories = new ArrayList<>();
+        Optional<RawMetaProperties> prevMetadata = Optional.empty();
+        for (String directory : directories) {
+            Path directoryPath = Paths.get(directory);
+            if (!Files.isDirectory(directoryPath)) {
+                if (!Files.exists(directoryPath)) {
+                    problems.add(directoryPath + " does not exist");
+                } else {
+                    problems.add(directoryPath + " is not a directory");
+                }
+            } else {
+                foundDirectories.add(directoryPath.toString());
+                Path metaPath = directoryPath.resolve("meta.properties");
+                if (!Files.exists(metaPath)) {
+                    problems.add(directoryPath + " is not formatted.");
+                } else {
+                    Properties properties = 
Utils.loadProps(metaPath.toString());
+                    RawMetaProperties rawMetaProperties = new 
RawMetaProperties(properties);
+                    Optional<RawMetaProperties> curMetadata;
+
+                    switch (rawMetaProperties.getVersion()) {
+                        case 0:
+                        case 1:
+                            curMetadata = Optional.of(rawMetaProperties);
+                            break;
+                        default:
+                            problems.add("Unsupported version for " + metaPath 
+ ": " + rawMetaProperties.getVersion());
+                            curMetadata = Optional.empty();
+                            break;
+                    }
+
+                    if (!prevMetadata.isPresent()) {
+                        prevMetadata = curMetadata;
+                    } else {
+                        if (curMetadata.isPresent() && 
!prevMetadata.get().equals(curMetadata.get())) {
+                            problems.add(String.format("Metadata for %s was 
%s, but other directories featured %s", metaPath, curMetadata.get(), 
prevMetadata.get()));
+                        }
+                    }
+                }
+            }
+        }
+
+        if (prevMetadata.isPresent()) {
+            if (selfManagedMode) {
+                if (prevMetadata.get().getVersion() == 0) {
+                    problems.add("The kafka configuration file appears to be 
for a cluster in KRaft mode, but " + "the directories are formatted for legacy 
mode.");
+                }
+            } else if (prevMetadata.get().getVersion() == 1) {
+                problems.add("The kafka configuration file appears to be for a 
legacy cluster, but " + "the directories are formatted for a cluster in KRaft 
mode.");
+            }
+        }
+
+        return validateDirectories(stream, directories, problems, 
foundDirectories, prevMetadata);
+    }
+
+    private static int validateDirectories(PrintStream stream, List<String> 
directories, List<String> problems, List<String> foundDirectories, 
Optional<RawMetaProperties> prevMetadata) {
+        if (directories.isEmpty()) {
+            stream.println("No directories specified.");
+            return 0;
+        } else {
+            if (!foundDirectories.isEmpty()) {
+                if (foundDirectories.size() == 1) {
+                    stream.println("Found log directory:");
+                } else {
+                    stream.println("Found log directories:");
+                }
+                foundDirectories.forEach(d -> stream.println("  " + d));
+                stream.println("");
+            }
+
+            if (prevMetadata.isPresent()) {
+                RawMetaProperties prev = prevMetadata.get();
+                stream.println("Found metadata: " + prev);
+                stream.println("");
+            }
+
+            if (!problems.isEmpty()) {
+                if (problems.size() == 1) {
+                    stream.println("Found problem:");
+                } else {
+                    stream.println("Found problems:");
+                }
+                problems.forEach(d -> stream.println("  " + d));
+                stream.println("");
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+    }
+
+
+    public static Namespace parseArguments(String... args) {
+        ArgumentParser parser = 
ArgumentParsers.newArgumentParser("kafka-storage").defaultHelp(true).description("The
 Kafka storage tool.");
+        Subparsers subparsers = parser.addSubparsers().dest("command");
+        Subparser infoParser = subparsers.addParser("info").help("Get 
information about the Kafka log directories on this node.");
+        Subparser formatParser = subparsers.addParser("format").help("Format 
the Kafka log directories on this node.");
+        subparsers.addParser("random-uuid").help("Print a random UUID.");
+
+        for (Subparser subpparser : Arrays.asList(infoParser, formatParser)) {
+            subpparser.addArgument("--config", 
"-c").action(store()).required(true).help("The Kafka configuration file to 
use.");
+        }
+
+        formatParser.addArgument("--cluster-id", 
"-t").action(store()).required(true).help("The cluster ID to use.");
+        formatParser.addArgument("--add-scram", "-S").action(append()).help("A 
SCRAM_CREDENTIAL to add to the __cluster_metadata log e.g.\n" +
+            "'SCRAM-SHA-256=[name=alice,password=alice-secret]'\n" +
+            
"'SCRAM-SHA-512=[name=alice,iterations=8192,salt=\"N3E=\",saltedpassword=\"YCE=\"]'");
+        formatParser.addArgument("--ignore-formatted", 
"-g").action(storeTrue());
+        formatParser.addArgument("--release-version", 
"-r").action(store()).help(String.format("A KRaft release version to use for 
the initial metadata version. The minimum is 3.0, the default is %s", 
MetadataVersion.latest().version()));
+
+        return parser.parseArgsOrFail(args);
+    }
+
+    static List<String> configToLogDirectories(LogConfig logConfig) {
+        List<String> logDirs = logConfig.getLogDirs();
+        SortedSet<String> directories = new TreeSet<>(logDirs);
+        String metadataLogDir = logConfig.getMetadataLogDir();
+        if (metadataLogDir != null) {
+            directories.add(metadataLogDir);
+        }
+        return new ArrayList<>(directories);
+    }
+
+    static boolean configToSelfManagedMode(LogConfig logConfig) {
+        return !logConfig.parseProcessRoles().isEmpty();
+    }
+
+
+    static MetadataVersion getMetadataVersion(Namespace namespace, 
Optional<String> defaultVersionString) {
+        MetadataVersion defaultValue;
+        if (defaultVersionString != null && defaultVersionString.isPresent()) {
+            defaultValue = 
MetadataVersion.fromVersionString(defaultVersionString.get());
+        } else {
+            defaultValue = MetadataVersion.latest();
+        }
+        String releaseVersion = namespace.getString("release_version");
+        if (releaseVersion != null) {
+            return MetadataVersion.fromVersionString(releaseVersion);
+        } else {
+            return defaultValue;
+        }
+    }
+
+    static MetaProperties buildMetadataProperties(String clusterIdStr, 
LogConfig config) throws TerseException {
+        Uuid effectiveClusterId;
+        try {
+            effectiveClusterId = Uuid.fromString(clusterIdStr);
+        } catch (Throwable e) {
+            throw new TerseException("Cluster ID string " + clusterIdStr + " 
does not appear to be a valid UUID: " + e.getMessage());
+        }
+
+        if (config.getNodeId() < 0) {
+            throw new TerseException("The node.id must be set to a 
non-negative integer. We saw " + config.getNodeId());
+        }
+
+        return new MetaProperties(effectiveClusterId.toString(), 
config.getNodeId());
+    }
+
+    static int formatCommand(PrintStream stream, List<String> directories, 
MetaProperties metaProperties, BootstrapMetadata bootstrapMetadata, 
MetadataVersion metadataVersion, boolean ignoreFormatted) throws TerseException 
{
+        if (directories.isEmpty()) {
+            throw new TerseException("No log directories found in the 
configuration.");
+        }
+        List<String> unformattedDirectories = 
directories.stream().filter(directory -> {
+            if (!Files.isDirectory(Paths.get(directory)) || 
!Files.exists(Paths.get(directory, "meta.properties"))) {
+                return true;
+            } else if (!ignoreFormatted) {
+                try {
+                    throw new TerseException("Log directory " + directory + " 
is already formatted. " +
+                        "Use --ignore-formatted to ignore this directory and 
format the others.");
+                } catch (TerseException e) {
+                    throw new RuntimeException(e.getMessage());
+                }
+            } else {
+                return false;
+            }
+        }).collect(Collectors.toList());
+
+        if (unformattedDirectories.isEmpty()) {
+            stream.println("All of the log directories are already 
formatted.");
+        }
+        unformattedDirectories.forEach(directory -> {
+            try {
+                Files.createDirectories(Paths.get(directory));
+            } catch (Exception e) {
+                try {
+                    throw new TerseException("Unable to create storage 
directory " + directory + ": " + e.getMessage());
+                } catch (TerseException ex) {
+                    throw new RuntimeException(ex);
+                }
+            }
+
+            Path metaPropertiesPath = Paths.get(directory, "meta.properties");
+            BrokerMetadataCheckpoint checkpoint = new 
BrokerMetadataCheckpoint(metaPropertiesPath.toFile());
+            try {
+                checkpoint.write(metaProperties.toProperties());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
+            BootstrapDirectory bootstrapDirectory = new 
BootstrapDirectory(directory, Optional.empty());
+            try {
+                bootstrapDirectory.writeBinaryFile(bootstrapMetadata);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+
+            stream.println("Formatting " + directory + " with metadata.version 
" + metadataVersion + ".");
+        });
+        return 0;
+    }
+
+    public static BootstrapMetadata buildBootstrapMetadata(MetadataVersion 
metadataVersion,
+                                                           
Optional<List<ApiMessageAndVersion>> metadataOptionalArguments,
+                                                           String source) {
+        List<ApiMessageAndVersion> metadataRecords = new ArrayList<>();
+        metadataRecords.add(new ApiMessageAndVersion(new FeatureLevelRecord()
+            .setName(MetadataVersion.FEATURE_NAME)
+            .setFeatureLevel(metadataVersion.featureLevel()), (short) 0));
+
+        metadataOptionalArguments.ifPresent(metadataArguments -> 
metadataArguments.forEach(metadataRecords::add));

Review Comment:
   Use can use addAll here.



##########
tools/src/main/java/org/apache/kafka/tools/StorageTool.java:
##########
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.security.scram.internals.ScramFormatter;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.broker.BrokerMetadataCheckpoint;
+import org.apache.kafka.metadata.broker.MetaProperties;
+import org.apache.kafka.metadata.broker.RawMetaProperties;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.append;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+
+public class StorageTool {
+    public static void main(String... args) {
+        try {
+            Namespace namespace = parseArguments(args);
+            String command = namespace.getString("command");
+            Optional<LogConfig> config = 
Optional.ofNullable(namespace.getString("config")).map(p -> {
+                try {
+                    return new LogConfig(Utils.loadProps(p), 
Collections.emptySet(), true);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            executeCommand(namespace, command, config);
+        } catch (Exception exception) {
+            System.err.println(exception.getMessage());
+            Exit.exit(1, exception.getMessage());
+        }
+    }
+
+    private static void executeCommand(Namespace namespace, String command, 
Optional<LogConfig> config) throws Exception {
+        final String info = "info";
+        final String format = "format";
+        if ((command.equals(info) || command.equals(format)) && 
!config.isPresent()) {
+            return; // Do nothing if config is not present
+        }
+
+        switch (command) {
+            case info: {
+                List<String> directories = 
configToLogDirectories(config.get());
+                boolean selfManagedMode = 
configToSelfManagedMode(config.get());
+                Exit.exit(infoCommand(System.out, selfManagedMode, 
directories));
+                break;
+            }
+            case format: {
+                List<String> directories = 
configToLogDirectories(config.get());
+                String clusterId = namespace.getString("cluster_id");
+                MetadataVersion metadataVersion = 
getMetadataVersion(namespace, 
Optional.of(config.get().getInterBrokerProtocolVersionString()));
+                if (!metadataVersion.isKRaftSupported()) {
+                    throw new TerseException("Must specify a valid KRaft 
metadata version of at least 3.0.");
+                }
+                MetaProperties metaProperties = 
buildMetadataProperties(clusterId, config.get());
+
+                List<ApiMessageAndVersion> metadataRecords = new ArrayList<>();
+                Optional<List<UserScramCredentialRecord>> scramRecordsOptional 
= getUserScramCredentialRecords(namespace);
+                if (scramRecordsOptional.isPresent()) {
+                    if (!metadataVersion.isScramSupported()) {
+                        throw new TerseException("SCRAM is only supported in 
metadataVersion IBP_3_5_IV2 or later.");
+                    }
+                    for (ApiMessage record : scramRecordsOptional.get()) {
+                        metadataRecords.add(new ApiMessageAndVersion(record, 
(short) 0));
+                    }
+                }
+
+                BootstrapMetadata bootstrapMetadata = 
buildBootstrapMetadata(metadataVersion, Optional.of(metadataRecords), "format 
command");
+                boolean ignoreFormatted = 
namespace.getBoolean("ignore_formatted");
+                if (!configToSelfManagedMode(config.get())) {
+                    throw new TerseException("The kafka configuration file 
appears to be for " +
+                        "a legacy cluster. Formatting is only supported for 
clusters in KRaft mode.");
+                }
+                Exit.exit(formatCommand(System.out, directories, 
metaProperties, bootstrapMetadata,
+                    metadataVersion, ignoreFormatted));
+                break;
+            }
+            case "random-uuid": {
+                System.out.println(Uuid.randomUuid());
+                Exit.exit(0);
+                break;
+            }
+            default:
+                throw new RuntimeException("Unknown command " + command);
+        }
+    }
+
+    static int infoCommand(PrintStream stream, boolean selfManagedMode, 
List<String> directories) throws IOException {
+        List<String> problems = new ArrayList<>();
+        List<String> foundDirectories = new ArrayList<>();
+        Optional<RawMetaProperties> prevMetadata = Optional.empty();
+        for (String directory : directories) {
+            Path directoryPath = Paths.get(directory);
+            if (!Files.isDirectory(directoryPath)) {
+                if (!Files.exists(directoryPath)) {
+                    problems.add(directoryPath + " does not exist");
+                } else {
+                    problems.add(directoryPath + " is not a directory");
+                }
+            } else {
+                foundDirectories.add(directoryPath.toString());
+                Path metaPath = directoryPath.resolve("meta.properties");
+                if (!Files.exists(metaPath)) {
+                    problems.add(directoryPath + " is not formatted.");
+                } else {
+                    Properties properties = 
Utils.loadProps(metaPath.toString());
+                    RawMetaProperties rawMetaProperties = new 
RawMetaProperties(properties);
+                    Optional<RawMetaProperties> curMetadata;
+
+                    switch (rawMetaProperties.getVersion()) {
+                        case 0:
+                        case 1:
+                            curMetadata = Optional.of(rawMetaProperties);
+                            break;
+                        default:
+                            problems.add("Unsupported version for " + metaPath 
+ ": " + rawMetaProperties.getVersion());
+                            curMetadata = Optional.empty();
+                            break;
+                    }
+
+                    if (!prevMetadata.isPresent()) {
+                        prevMetadata = curMetadata;
+                    } else {
+                        if (curMetadata.isPresent() && 
!prevMetadata.get().equals(curMetadata.get())) {
+                            problems.add(String.format("Metadata for %s was 
%s, but other directories featured %s", metaPath, curMetadata.get(), 
prevMetadata.get()));
+                        }
+                    }
+                }
+            }
+        }
+
+        if (prevMetadata.isPresent()) {
+            if (selfManagedMode) {
+                if (prevMetadata.get().getVersion() == 0) {
+                    problems.add("The kafka configuration file appears to be 
for a cluster in KRaft mode, but " + "the directories are formatted for legacy 
mode.");
+                }
+            } else if (prevMetadata.get().getVersion() == 1) {
+                problems.add("The kafka configuration file appears to be for a 
legacy cluster, but " + "the directories are formatted for a cluster in KRaft 
mode.");
+            }
+        }
+
+        return validateDirectories(stream, directories, problems, 
foundDirectories, prevMetadata);
+    }
+
+    private static int validateDirectories(PrintStream stream, List<String> 
directories, List<String> problems, List<String> foundDirectories, 
Optional<RawMetaProperties> prevMetadata) {
+        if (directories.isEmpty()) {
+            stream.println("No directories specified.");
+            return 0;
+        } else {
+            if (!foundDirectories.isEmpty()) {
+                if (foundDirectories.size() == 1) {
+                    stream.println("Found log directory:");
+                } else {
+                    stream.println("Found log directories:");
+                }
+                foundDirectories.forEach(d -> stream.println("  " + d));
+                stream.println("");
+            }
+
+            if (prevMetadata.isPresent()) {
+                RawMetaProperties prev = prevMetadata.get();
+                stream.println("Found metadata: " + prev);
+                stream.println("");
+            }
+
+            if (!problems.isEmpty()) {
+                if (problems.size() == 1) {
+                    stream.println("Found problem:");
+                } else {
+                    stream.println("Found problems:");
+                }
+                problems.forEach(d -> stream.println("  " + d));
+                stream.println("");
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+    }
+
+

Review Comment:
   Nit: extra empty line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to