fvaleri commented on code in PR #13417: URL: https://github.com/apache/kafka/pull/13417#discussion_r1271824235
########## tools/src/main/java/org/apache/kafka/tools/StorageTool.java: ########## @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.security.scram.internals.ScramFormatter; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.bootstrap.BootstrapDirectory; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.broker.BrokerMetadataCheckpoint; +import org.apache.kafka.metadata.broker.MetaProperties; +import org.apache.kafka.metadata.broker.RawMetaProperties; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.storage.internals.log.LogConfig; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.append; +import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; + +public class StorageTool { + public static void main(String... args) { + try { + Namespace namespace = parseArguments(args); + String command = namespace.getString("command"); + Optional<LogConfig> config = Optional.ofNullable(namespace.getString("config")).map(p -> { + try { + return new LogConfig(Utils.loadProps(p), Collections.emptySet(), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + executeCommand(namespace, command, config); + } catch (Exception exception) { Review Comment: Given we use TerseException, I think it's better to have the following error handling. ```sh } catch (TerseException e) { System.err.println(e.getMessage()); Exit.exit(1); } catch (Throwable e) { System.err.println(e.getMessage()); System.err.println(Utils.stackTrace(e)); Exit.exit(1); } ``` ########## tools/src/main/java/org/apache/kafka/tools/StorageTool.java: ########## @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.security.scram.internals.ScramFormatter; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.bootstrap.BootstrapDirectory; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.broker.BrokerMetadataCheckpoint; +import org.apache.kafka.metadata.broker.MetaProperties; +import org.apache.kafka.metadata.broker.RawMetaProperties; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.storage.internals.log.LogConfig; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.append; +import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; + +public class StorageTool { + public static void main(String... args) { + try { + Namespace namespace = parseArguments(args); + String command = namespace.getString("command"); + Optional<LogConfig> config = Optional.ofNullable(namespace.getString("config")).map(p -> { + try { + return new LogConfig(Utils.loadProps(p), Collections.emptySet(), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + executeCommand(namespace, command, config); + } catch (Exception exception) { + System.err.println(exception.getMessage()); + Exit.exit(1, exception.getMessage()); + } + } + + private static void executeCommand(Namespace namespace, String command, Optional<LogConfig> config) throws Exception { + final String info = "info"; + final String format = "format"; + if ((command.equals(info) || command.equals(format)) && !config.isPresent()) { Review Comment: This is useless, because argparse4j validates all required args when parsing, which is before this point. This also means that we don't need the two variables above. ########## tools/src/main/java/org/apache/kafka/tools/StorageTool.java: ########## @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.security.scram.internals.ScramFormatter; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.bootstrap.BootstrapDirectory; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.broker.BrokerMetadataCheckpoint; +import org.apache.kafka.metadata.broker.MetaProperties; +import org.apache.kafka.metadata.broker.RawMetaProperties; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.storage.internals.log.LogConfig; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.append; +import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; + +public class StorageTool { + public static void main(String... args) { + try { + Namespace namespace = parseArguments(args); + String command = namespace.getString("command"); + Optional<LogConfig> config = Optional.ofNullable(namespace.getString("config")).map(p -> { + try { + return new LogConfig(Utils.loadProps(p), Collections.emptySet(), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + executeCommand(namespace, command, config); + } catch (Exception exception) { + System.err.println(exception.getMessage()); + Exit.exit(1, exception.getMessage()); + } + } + + private static void executeCommand(Namespace namespace, String command, Optional<LogConfig> config) throws Exception { + final String info = "info"; + final String format = "format"; + if ((command.equals(info) || command.equals(format)) && !config.isPresent()) { + return; // Do nothing if config is not present + } + + switch (command) { + case info: { + List<String> directories = configToLogDirectories(config.get()); + boolean selfManagedMode = configToSelfManagedMode(config.get()); + Exit.exit(infoCommand(System.out, selfManagedMode, directories)); + break; + } + case format: { + List<String> directories = configToLogDirectories(config.get()); + String clusterId = namespace.getString("cluster_id"); + MetadataVersion metadataVersion = getMetadataVersion(namespace, Optional.of(config.get().getInterBrokerProtocolVersionString())); + if (!metadataVersion.isKRaftSupported()) { + throw new TerseException("Must specify a valid KRaft metadata version of at least 3.0."); + } + MetaProperties metaProperties = buildMetadataProperties(clusterId, config.get()); + + List<ApiMessageAndVersion> metadataRecords = new ArrayList<>(); + Optional<List<UserScramCredentialRecord>> scramRecordsOptional = getUserScramCredentialRecords(namespace); + if (scramRecordsOptional.isPresent()) { + if (!metadataVersion.isScramSupported()) { + throw new TerseException("SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later."); + } + for (ApiMessage record : scramRecordsOptional.get()) { + metadataRecords.add(new ApiMessageAndVersion(record, (short) 0)); + } + } + + BootstrapMetadata bootstrapMetadata = buildBootstrapMetadata(metadataVersion, Optional.of(metadataRecords), "format command"); + boolean ignoreFormatted = namespace.getBoolean("ignore_formatted"); + if (!configToSelfManagedMode(config.get())) { + throw new TerseException("The kafka configuration file appears to be for " + + "a legacy cluster. Formatting is only supported for clusters in KRaft mode."); + } + Exit.exit(formatCommand(System.out, directories, metaProperties, bootstrapMetadata, + metadataVersion, ignoreFormatted)); + break; + } + case "random-uuid": { + System.out.println(Uuid.randomUuid()); + Exit.exit(0); + break; + } + default: + throw new RuntimeException("Unknown command " + command); + } + } + + static int infoCommand(PrintStream stream, boolean selfManagedMode, List<String> directories) throws IOException { + List<String> problems = new ArrayList<>(); + List<String> foundDirectories = new ArrayList<>(); + Optional<RawMetaProperties> prevMetadata = Optional.empty(); + for (String directory : directories) { Review Comment: Thanks for removing the double sorting contained in `directories.sorted.foreach`. ########## tools/src/main/java/org/apache/kafka/tools/StorageTool.java: ########## @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.security.scram.internals.ScramFormatter; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.bootstrap.BootstrapDirectory; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.broker.BrokerMetadataCheckpoint; +import org.apache.kafka.metadata.broker.MetaProperties; +import org.apache.kafka.metadata.broker.RawMetaProperties; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.storage.internals.log.LogConfig; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.append; +import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; + +public class StorageTool { + public static void main(String... args) { + try { + Namespace namespace = parseArguments(args); + String command = namespace.getString("command"); + Optional<LogConfig> config = Optional.ofNullable(namespace.getString("config")).map(p -> { + try { + return new LogConfig(Utils.loadProps(p), Collections.emptySet(), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + executeCommand(namespace, command, config); + } catch (Exception exception) { + System.err.println(exception.getMessage()); + Exit.exit(1, exception.getMessage()); + } + } + + private static void executeCommand(Namespace namespace, String command, Optional<LogConfig> config) throws Exception { + final String info = "info"; + final String format = "format"; + if ((command.equals(info) || command.equals(format)) && !config.isPresent()) { + return; // Do nothing if config is not present + } + + switch (command) { + case info: { + List<String> directories = configToLogDirectories(config.get()); + boolean selfManagedMode = configToSelfManagedMode(config.get()); + Exit.exit(infoCommand(System.out, selfManagedMode, directories)); + break; + } + case format: { + List<String> directories = configToLogDirectories(config.get()); + String clusterId = namespace.getString("cluster_id"); + MetadataVersion metadataVersion = getMetadataVersion(namespace, Optional.of(config.get().getInterBrokerProtocolVersionString())); + if (!metadataVersion.isKRaftSupported()) { + throw new TerseException("Must specify a valid KRaft metadata version of at least 3.0."); + } + MetaProperties metaProperties = buildMetadataProperties(clusterId, config.get()); + + List<ApiMessageAndVersion> metadataRecords = new ArrayList<>(); + Optional<List<UserScramCredentialRecord>> scramRecordsOptional = getUserScramCredentialRecords(namespace); + if (scramRecordsOptional.isPresent()) { + if (!metadataVersion.isScramSupported()) { + throw new TerseException("SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later."); + } + for (ApiMessage record : scramRecordsOptional.get()) { + metadataRecords.add(new ApiMessageAndVersion(record, (short) 0)); + } + } + + BootstrapMetadata bootstrapMetadata = buildBootstrapMetadata(metadataVersion, Optional.of(metadataRecords), "format command"); + boolean ignoreFormatted = namespace.getBoolean("ignore_formatted"); + if (!configToSelfManagedMode(config.get())) { + throw new TerseException("The kafka configuration file appears to be for " + + "a legacy cluster. Formatting is only supported for clusters in KRaft mode."); + } + Exit.exit(formatCommand(System.out, directories, metaProperties, bootstrapMetadata, + metadataVersion, ignoreFormatted)); + break; + } + case "random-uuid": { + System.out.println(Uuid.randomUuid()); + Exit.exit(0); + break; + } + default: + throw new RuntimeException("Unknown command " + command); + } + } + + static int infoCommand(PrintStream stream, boolean selfManagedMode, List<String> directories) throws IOException { + List<String> problems = new ArrayList<>(); + List<String> foundDirectories = new ArrayList<>(); + Optional<RawMetaProperties> prevMetadata = Optional.empty(); + for (String directory : directories) { + Path directoryPath = Paths.get(directory); + if (!Files.isDirectory(directoryPath)) { + if (!Files.exists(directoryPath)) { + problems.add(directoryPath + " does not exist"); + } else { + problems.add(directoryPath + " is not a directory"); + } + } else { + foundDirectories.add(directoryPath.toString()); + Path metaPath = directoryPath.resolve("meta.properties"); + if (!Files.exists(metaPath)) { + problems.add(directoryPath + " is not formatted."); + } else { + Properties properties = Utils.loadProps(metaPath.toString()); + RawMetaProperties rawMetaProperties = new RawMetaProperties(properties); + Optional<RawMetaProperties> curMetadata; + + switch (rawMetaProperties.getVersion()) { + case 0: + case 1: + curMetadata = Optional.of(rawMetaProperties); + break; + default: + problems.add("Unsupported version for " + metaPath + ": " + rawMetaProperties.getVersion()); + curMetadata = Optional.empty(); + break; + } + + if (!prevMetadata.isPresent()) { + prevMetadata = curMetadata; + } else { + if (curMetadata.isPresent() && !prevMetadata.get().equals(curMetadata.get())) { + problems.add(String.format("Metadata for %s was %s, but other directories featured %s", metaPath, curMetadata.get(), prevMetadata.get())); + } + } + } + } + } + + if (prevMetadata.isPresent()) { + if (selfManagedMode) { + if (prevMetadata.get().getVersion() == 0) { + problems.add("The kafka configuration file appears to be for a cluster in KRaft mode, but " + "the directories are formatted for legacy mode."); + } + } else if (prevMetadata.get().getVersion() == 1) { + problems.add("The kafka configuration file appears to be for a legacy cluster, but " + "the directories are formatted for a cluster in KRaft mode."); + } + } + + return validateDirectories(stream, directories, problems, foundDirectories, prevMetadata); + } + + private static int validateDirectories(PrintStream stream, List<String> directories, List<String> problems, List<String> foundDirectories, Optional<RawMetaProperties> prevMetadata) { + if (directories.isEmpty()) { + stream.println("No directories specified."); + return 0; + } else { + if (!foundDirectories.isEmpty()) { + if (foundDirectories.size() == 1) { + stream.println("Found log directory:"); + } else { + stream.println("Found log directories:"); + } + foundDirectories.forEach(d -> stream.println(" " + d)); + stream.println(""); + } + + if (prevMetadata.isPresent()) { + RawMetaProperties prev = prevMetadata.get(); + stream.println("Found metadata: " + prev); + stream.println(""); + } + + if (!problems.isEmpty()) { + if (problems.size() == 1) { + stream.println("Found problem:"); + } else { + stream.println("Found problems:"); + } + problems.forEach(d -> stream.println(" " + d)); + stream.println(""); + return 1; + } else { + return 0; + } + } + } + + + public static Namespace parseArguments(String... args) { + ArgumentParser parser = ArgumentParsers.newArgumentParser("kafka-storage").defaultHelp(true).description("The Kafka storage tool."); + Subparsers subparsers = parser.addSubparsers().dest("command"); + Subparser infoParser = subparsers.addParser("info").help("Get information about the Kafka log directories on this node."); + Subparser formatParser = subparsers.addParser("format").help("Format the Kafka log directories on this node."); + subparsers.addParser("random-uuid").help("Print a random UUID."); + + for (Subparser subpparser : Arrays.asList(infoParser, formatParser)) { + subpparser.addArgument("--config", "-c").action(store()).required(true).help("The Kafka configuration file to use."); + } + + formatParser.addArgument("--cluster-id", "-t").action(store()).required(true).help("The cluster ID to use."); + formatParser.addArgument("--add-scram", "-S").action(append()).help("A SCRAM_CREDENTIAL to add to the __cluster_metadata log e.g.\n" + + "'SCRAM-SHA-256=[name=alice,password=alice-secret]'\n" + + "'SCRAM-SHA-512=[name=alice,iterations=8192,salt=\"N3E=\",saltedpassword=\"YCE=\"]'"); + formatParser.addArgument("--ignore-formatted", "-g").action(storeTrue()); + formatParser.addArgument("--release-version", "-r").action(store()).help(String.format("A KRaft release version to use for the initial metadata version. The minimum is 3.0, the default is %s", MetadataVersion.latest().version())); + + return parser.parseArgsOrFail(args); + } + + static List<String> configToLogDirectories(LogConfig logConfig) { + List<String> logDirs = logConfig.getLogDirs(); + SortedSet<String> directories = new TreeSet<>(logDirs); + String metadataLogDir = logConfig.getMetadataLogDir(); + if (metadataLogDir != null) { + directories.add(metadataLogDir); + } + return new ArrayList<>(directories); + } + + static boolean configToSelfManagedMode(LogConfig logConfig) { + return !logConfig.parseProcessRoles().isEmpty(); + } + + Review Comment: Nit: extra empty line. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -489,6 +575,29 @@ public static void validateValues(Map<?, ?> props) { } } + public static Map<?, ?> populateAndValidateProps(Map<?, ?> props) { Review Comment: I think we can find a better name or merge with validateOtherConfig. Wdyt? ########## tools/src/main/java/org/apache/kafka/tools/StorageTool.java: ########## @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.security.scram.internals.ScramFormatter; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.bootstrap.BootstrapDirectory; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.broker.BrokerMetadataCheckpoint; +import org.apache.kafka.metadata.broker.MetaProperties; +import org.apache.kafka.metadata.broker.RawMetaProperties; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.storage.internals.log.LogConfig; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.append; +import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; + +public class StorageTool { + public static void main(String... args) { + try { + Namespace namespace = parseArguments(args); + String command = namespace.getString("command"); + Optional<LogConfig> config = Optional.ofNullable(namespace.getString("config")).map(p -> { + try { + return new LogConfig(Utils.loadProps(p), Collections.emptySet(), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + executeCommand(namespace, command, config); + } catch (Exception exception) { + System.err.println(exception.getMessage()); + Exit.exit(1, exception.getMessage()); + } + } + + private static void executeCommand(Namespace namespace, String command, Optional<LogConfig> config) throws Exception { + final String info = "info"; + final String format = "format"; + if ((command.equals(info) || command.equals(format)) && !config.isPresent()) { + return; // Do nothing if config is not present + } + + switch (command) { + case info: { + List<String> directories = configToLogDirectories(config.get()); + boolean selfManagedMode = configToSelfManagedMode(config.get()); + Exit.exit(infoCommand(System.out, selfManagedMode, directories)); + break; + } + case format: { Review Comment: This code looks good, but I was wondering that we could improve it by moving all checks and object creations to the command methods. Something like: ```sh switch (command) { case info: { Exit.exit(infoCommand(System.out, config)); break; } case format: { Exit.exit(formatCommand(System.out, namespace, config)); break; } ... ``` ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -389,10 +462,23 @@ public LogConfig(Map<?, ?> props, Set<String> overriddenConfigs) { this.leaderReplicationThrottledReplicas = Collections.unmodifiableList(getList(LogConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG)); this.followerReplicationThrottledReplicas = Collections.unmodifiableList(getList(LogConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG)); this.messageDownConversionEnable = getBoolean(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG); - + this.interBrokerProtocolVersion = getString(INTER_BROKER_PROTOCOL_VERSION_PROP); + this.brokerId = getInt(BROKER_ID_PROP); + this.maxReservedBrokerId = getInt(MAX_RESERVED_BROKER_ID_PROP); + this.nodeId = getInt(LogConfig.NODE_ID_PROP); + this.brokerIdGenerationEnable = getBoolean(BROKER_ID_GENERATION_ENABLE_PROP); + this.logDirs = getString(LOG_DIRS_PROP); + this.logDir = getString(LOG_DIR_PROP); + processRoles = parseProcessRoles(); + this.zkConnect = getString(ZOOKEEPER_CONNECT_PROP); remoteLogConfig = new RemoteLogConfig(this, retentionMs, retentionSize); } + public LogConfig(Map<?, ?> props, Set<String> overriddenConfigs, boolean populateSynonymsAndValidateProps) { Review Comment: populateSynonymsAndValidateProps is never used ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -230,6 +235,55 @@ public Optional<String> serverConfigName(String configName) { FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG )); + private static final String DEFAULT_LOG_DIR = "/tmp/kafka-logs"; + private static final String LOG_CONFIG_PREFIX = "log."; + private static final String LOG_DIR_PROP = LOG_CONFIG_PREFIX + "dir"; + private static final String LOG_DIRS_PROP = LOG_CONFIG_PREFIX + "dirs"; + + private static final String NODE_ID_PROP = "node.id"; + private static final String BROKER_ID_PROP = "broker.id"; + private static final String MAX_RESERVED_BROKER_ID_PROP = "reserved.broker.max.id"; + + private static final int DEFAULT_EMPTY_NODE_ID = -1; + + private static final String METADATA_LOG_DIR_PROP = "metadata.log.dir"; + + private static final String PROCESS_ROLES_PROP = "process.roles"; + + private static final String INTER_BROKER_PROTOCOL_VERSION_PROP = "inter.broker.protocol.version"; + + private static final String BROKER_ID_GENERATION_ENABLE_PROP = "broker.id.generation.enable"; + + private static final String ZOOKEEPER_CONNECT_PROP = "zookeeper.connect"; + + private static final String INTER_BROKER_PROTOCOL_VERSION_PROP_DEFAULT_VALUE = MetadataVersion.latest().version(); + private static final String LOG_DIR_DOC = "The directory in which the log data is kept (supplemental for " + LOG_DIR_PROP + " property)"; + private static final String METADATA_LOG_DIR_DOC = "This configuration determines where we put the metadata log for clusters in KRaft mode. " + + "If it is not set, the metadata log is placed in the first log directory from log.dirs."; + private static final String INTER_BROKER_PROTOCOL_VERSION_DOC = "Specify which version of the inter-broker protocol will be used.\n" + + " This is typically bumped after all brokers were upgraded to a new version.\n" + + " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1 Check MetadataVersion for the full list."; + + private static final String LOG_DIRS_DOC = "A comma-separated list of the directories where the log data is stored. If not set, the value in " + LOG_DIRS_PROP + " is used."; + + private static final String BROKER_ID_DOC = "The broker id for this server. If unset, a unique broker id will be generated." + + "To avoid conflicts between ZooKeeper generated broker id's and user configured broker id's, generated broker ids " + + "start from " + MAX_RESERVED_BROKER_ID_PROP + " + 1."; + + private static final String NODE_ID_DOC = "The node ID associated with the roles this process is playing when `process.roles` is non-empty. " + + "This is required configuration when running in KRaft mode."; + private static final String PROCESS_ROLES_DOC = "The roles that this process plays: 'broker', 'controller', or 'broker,controller' if it is both. " + + "This configuration is only applicable for clusters in KRaft (Kafka Raft) mode (instead of ZooKeeper). Leave this config undefined or empty for ZooKeeper clusters."; + + private static final String BROKER_ID_GENERATION_ENABLE_DOC = "Enable automatic broker id generation on the server. When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed."; + + private static final String MAX_RESERVED_BROKER_ID_DOC = "Max number that can be used for a broker.id"; + + private static final String ZK_CONNECT_DOC = "Specifies the ZooKeeper connection string in the form <code>hostname:port</code> where host and port are the " + Review Comment: I don't think this information should be in LogConfig. ########## tools/src/main/java/org/apache/kafka/tools/StorageTool.java: ########## @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.security.scram.internals.ScramFormatter; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.bootstrap.BootstrapDirectory; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.broker.BrokerMetadataCheckpoint; +import org.apache.kafka.metadata.broker.MetaProperties; +import org.apache.kafka.metadata.broker.RawMetaProperties; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.storage.internals.log.LogConfig; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.append; +import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; + +public class StorageTool { + public static void main(String... args) { + try { + Namespace namespace = parseArguments(args); + String command = namespace.getString("command"); + Optional<LogConfig> config = Optional.ofNullable(namespace.getString("config")).map(p -> { + try { + return new LogConfig(Utils.loadProps(p), Collections.emptySet(), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + executeCommand(namespace, command, config); + } catch (Exception exception) { + System.err.println(exception.getMessage()); + Exit.exit(1, exception.getMessage()); + } + } + + private static void executeCommand(Namespace namespace, String command, Optional<LogConfig> config) throws Exception { + final String info = "info"; + final String format = "format"; + if ((command.equals(info) || command.equals(format)) && !config.isPresent()) { + return; // Do nothing if config is not present + } + + switch (command) { + case info: { + List<String> directories = configToLogDirectories(config.get()); + boolean selfManagedMode = configToSelfManagedMode(config.get()); + Exit.exit(infoCommand(System.out, selfManagedMode, directories)); + break; + } + case format: { + List<String> directories = configToLogDirectories(config.get()); + String clusterId = namespace.getString("cluster_id"); + MetadataVersion metadataVersion = getMetadataVersion(namespace, Optional.of(config.get().getInterBrokerProtocolVersionString())); + if (!metadataVersion.isKRaftSupported()) { + throw new TerseException("Must specify a valid KRaft metadata version of at least 3.0."); + } + MetaProperties metaProperties = buildMetadataProperties(clusterId, config.get()); + + List<ApiMessageAndVersion> metadataRecords = new ArrayList<>(); + Optional<List<UserScramCredentialRecord>> scramRecordsOptional = getUserScramCredentialRecords(namespace); + if (scramRecordsOptional.isPresent()) { + if (!metadataVersion.isScramSupported()) { + throw new TerseException("SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later."); + } + for (ApiMessage record : scramRecordsOptional.get()) { + metadataRecords.add(new ApiMessageAndVersion(record, (short) 0)); + } + } + + BootstrapMetadata bootstrapMetadata = buildBootstrapMetadata(metadataVersion, Optional.of(metadataRecords), "format command"); + boolean ignoreFormatted = namespace.getBoolean("ignore_formatted"); + if (!configToSelfManagedMode(config.get())) { + throw new TerseException("The kafka configuration file appears to be for " + + "a legacy cluster. Formatting is only supported for clusters in KRaft mode."); + } + Exit.exit(formatCommand(System.out, directories, metaProperties, bootstrapMetadata, + metadataVersion, ignoreFormatted)); + break; + } + case "random-uuid": { + System.out.println(Uuid.randomUuid()); + Exit.exit(0); + break; + } + default: + throw new RuntimeException("Unknown command " + command); + } + } + + static int infoCommand(PrintStream stream, boolean selfManagedMode, List<String> directories) throws IOException { + List<String> problems = new ArrayList<>(); + List<String> foundDirectories = new ArrayList<>(); + Optional<RawMetaProperties> prevMetadata = Optional.empty(); + for (String directory : directories) { + Path directoryPath = Paths.get(directory); + if (!Files.isDirectory(directoryPath)) { + if (!Files.exists(directoryPath)) { + problems.add(directoryPath + " does not exist"); + } else { + problems.add(directoryPath + " is not a directory"); + } + } else { + foundDirectories.add(directoryPath.toString()); + Path metaPath = directoryPath.resolve("meta.properties"); + if (!Files.exists(metaPath)) { + problems.add(directoryPath + " is not formatted."); + } else { + Properties properties = Utils.loadProps(metaPath.toString()); + RawMetaProperties rawMetaProperties = new RawMetaProperties(properties); + Optional<RawMetaProperties> curMetadata; + + switch (rawMetaProperties.getVersion()) { + case 0: + case 1: + curMetadata = Optional.of(rawMetaProperties); + break; + default: + problems.add("Unsupported version for " + metaPath + ": " + rawMetaProperties.getVersion()); + curMetadata = Optional.empty(); + break; + } + + if (!prevMetadata.isPresent()) { + prevMetadata = curMetadata; + } else { + if (curMetadata.isPresent() && !prevMetadata.get().equals(curMetadata.get())) { + problems.add(String.format("Metadata for %s was %s, but other directories featured %s", metaPath, curMetadata.get(), prevMetadata.get())); + } + } + } + } + } + + if (prevMetadata.isPresent()) { + if (selfManagedMode) { + if (prevMetadata.get().getVersion() == 0) { + problems.add("The kafka configuration file appears to be for a cluster in KRaft mode, but " + "the directories are formatted for legacy mode."); Review Comment: Nit: remove string concatenation or add a carriage return. ########## tools/src/main/java/org/apache/kafka/tools/StorageTool.java: ########## @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.security.scram.internals.ScramFormatter; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.bootstrap.BootstrapDirectory; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.broker.BrokerMetadataCheckpoint; +import org.apache.kafka.metadata.broker.MetaProperties; +import org.apache.kafka.metadata.broker.RawMetaProperties; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.storage.internals.log.LogConfig; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.append; +import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; + +public class StorageTool { + public static void main(String... args) { + try { + Namespace namespace = parseArguments(args); + String command = namespace.getString("command"); + Optional<LogConfig> config = Optional.ofNullable(namespace.getString("config")).map(p -> { + try { + return new LogConfig(Utils.loadProps(p), Collections.emptySet(), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + executeCommand(namespace, command, config); + } catch (Exception exception) { + System.err.println(exception.getMessage()); + Exit.exit(1, exception.getMessage()); + } + } + + private static void executeCommand(Namespace namespace, String command, Optional<LogConfig> config) throws Exception { + final String info = "info"; + final String format = "format"; + if ((command.equals(info) || command.equals(format)) && !config.isPresent()) { + return; // Do nothing if config is not present + } + + switch (command) { + case info: { + List<String> directories = configToLogDirectories(config.get()); + boolean selfManagedMode = configToSelfManagedMode(config.get()); + Exit.exit(infoCommand(System.out, selfManagedMode, directories)); + break; + } + case format: { + List<String> directories = configToLogDirectories(config.get()); + String clusterId = namespace.getString("cluster_id"); + MetadataVersion metadataVersion = getMetadataVersion(namespace, Optional.of(config.get().getInterBrokerProtocolVersionString())); + if (!metadataVersion.isKRaftSupported()) { + throw new TerseException("Must specify a valid KRaft metadata version of at least 3.0."); + } + MetaProperties metaProperties = buildMetadataProperties(clusterId, config.get()); + + List<ApiMessageAndVersion> metadataRecords = new ArrayList<>(); + Optional<List<UserScramCredentialRecord>> scramRecordsOptional = getUserScramCredentialRecords(namespace); + if (scramRecordsOptional.isPresent()) { + if (!metadataVersion.isScramSupported()) { + throw new TerseException("SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later."); + } + for (ApiMessage record : scramRecordsOptional.get()) { + metadataRecords.add(new ApiMessageAndVersion(record, (short) 0)); + } + } + + BootstrapMetadata bootstrapMetadata = buildBootstrapMetadata(metadataVersion, Optional.of(metadataRecords), "format command"); + boolean ignoreFormatted = namespace.getBoolean("ignore_formatted"); + if (!configToSelfManagedMode(config.get())) { + throw new TerseException("The kafka configuration file appears to be for " + + "a legacy cluster. Formatting is only supported for clusters in KRaft mode."); + } + Exit.exit(formatCommand(System.out, directories, metaProperties, bootstrapMetadata, + metadataVersion, ignoreFormatted)); + break; + } + case "random-uuid": { + System.out.println(Uuid.randomUuid()); + Exit.exit(0); + break; + } + default: + throw new RuntimeException("Unknown command " + command); + } + } + + static int infoCommand(PrintStream stream, boolean selfManagedMode, List<String> directories) throws IOException { + List<String> problems = new ArrayList<>(); + List<String> foundDirectories = new ArrayList<>(); + Optional<RawMetaProperties> prevMetadata = Optional.empty(); + for (String directory : directories) { + Path directoryPath = Paths.get(directory); + if (!Files.isDirectory(directoryPath)) { + if (!Files.exists(directoryPath)) { + problems.add(directoryPath + " does not exist"); + } else { + problems.add(directoryPath + " is not a directory"); + } + } else { + foundDirectories.add(directoryPath.toString()); + Path metaPath = directoryPath.resolve("meta.properties"); + if (!Files.exists(metaPath)) { + problems.add(directoryPath + " is not formatted."); + } else { + Properties properties = Utils.loadProps(metaPath.toString()); + RawMetaProperties rawMetaProperties = new RawMetaProperties(properties); + Optional<RawMetaProperties> curMetadata; + + switch (rawMetaProperties.getVersion()) { + case 0: + case 1: + curMetadata = Optional.of(rawMetaProperties); + break; + default: + problems.add("Unsupported version for " + metaPath + ": " + rawMetaProperties.getVersion()); + curMetadata = Optional.empty(); + break; + } + + if (!prevMetadata.isPresent()) { + prevMetadata = curMetadata; + } else { + if (curMetadata.isPresent() && !prevMetadata.get().equals(curMetadata.get())) { + problems.add(String.format("Metadata for %s was %s, but other directories featured %s", metaPath, curMetadata.get(), prevMetadata.get())); + } + } + } + } + } + + if (prevMetadata.isPresent()) { + if (selfManagedMode) { + if (prevMetadata.get().getVersion() == 0) { + problems.add("The kafka configuration file appears to be for a cluster in KRaft mode, but " + "the directories are formatted for legacy mode."); + } + } else if (prevMetadata.get().getVersion() == 1) { + problems.add("The kafka configuration file appears to be for a legacy cluster, but " + "the directories are formatted for a cluster in KRaft mode."); Review Comment: Nit: remove string concatenation or add a carriage return. ########## tools/src/main/java/org/apache/kafka/tools/StorageTool.java: ########## @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.security.scram.internals.ScramFormatter; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.bootstrap.BootstrapDirectory; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.broker.BrokerMetadataCheckpoint; +import org.apache.kafka.metadata.broker.MetaProperties; +import org.apache.kafka.metadata.broker.RawMetaProperties; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.storage.internals.log.LogConfig; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.append; +import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; + +public class StorageTool { + public static void main(String... args) { + try { + Namespace namespace = parseArguments(args); + String command = namespace.getString("command"); + Optional<LogConfig> config = Optional.ofNullable(namespace.getString("config")).map(p -> { + try { + return new LogConfig(Utils.loadProps(p), Collections.emptySet(), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + executeCommand(namespace, command, config); + } catch (Exception exception) { + System.err.println(exception.getMessage()); + Exit.exit(1, exception.getMessage()); + } + } + + private static void executeCommand(Namespace namespace, String command, Optional<LogConfig> config) throws Exception { + final String info = "info"; + final String format = "format"; + if ((command.equals(info) || command.equals(format)) && !config.isPresent()) { + return; // Do nothing if config is not present + } + + switch (command) { + case info: { + List<String> directories = configToLogDirectories(config.get()); + boolean selfManagedMode = configToSelfManagedMode(config.get()); + Exit.exit(infoCommand(System.out, selfManagedMode, directories)); + break; + } + case format: { + List<String> directories = configToLogDirectories(config.get()); + String clusterId = namespace.getString("cluster_id"); + MetadataVersion metadataVersion = getMetadataVersion(namespace, Optional.of(config.get().getInterBrokerProtocolVersionString())); + if (!metadataVersion.isKRaftSupported()) { + throw new TerseException("Must specify a valid KRaft metadata version of at least 3.0."); + } + MetaProperties metaProperties = buildMetadataProperties(clusterId, config.get()); + + List<ApiMessageAndVersion> metadataRecords = new ArrayList<>(); + Optional<List<UserScramCredentialRecord>> scramRecordsOptional = getUserScramCredentialRecords(namespace); + if (scramRecordsOptional.isPresent()) { + if (!metadataVersion.isScramSupported()) { + throw new TerseException("SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later."); + } + for (ApiMessage record : scramRecordsOptional.get()) { + metadataRecords.add(new ApiMessageAndVersion(record, (short) 0)); + } + } + + BootstrapMetadata bootstrapMetadata = buildBootstrapMetadata(metadataVersion, Optional.of(metadataRecords), "format command"); + boolean ignoreFormatted = namespace.getBoolean("ignore_formatted"); + if (!configToSelfManagedMode(config.get())) { + throw new TerseException("The kafka configuration file appears to be for " + + "a legacy cluster. Formatting is only supported for clusters in KRaft mode."); + } + Exit.exit(formatCommand(System.out, directories, metaProperties, bootstrapMetadata, + metadataVersion, ignoreFormatted)); + break; + } + case "random-uuid": { + System.out.println(Uuid.randomUuid()); + Exit.exit(0); + break; + } + default: + throw new RuntimeException("Unknown command " + command); + } + } + + static int infoCommand(PrintStream stream, boolean selfManagedMode, List<String> directories) throws IOException { + List<String> problems = new ArrayList<>(); + List<String> foundDirectories = new ArrayList<>(); + Optional<RawMetaProperties> prevMetadata = Optional.empty(); + for (String directory : directories) { + Path directoryPath = Paths.get(directory); + if (!Files.isDirectory(directoryPath)) { + if (!Files.exists(directoryPath)) { + problems.add(directoryPath + " does not exist"); + } else { + problems.add(directoryPath + " is not a directory"); + } + } else { + foundDirectories.add(directoryPath.toString()); + Path metaPath = directoryPath.resolve("meta.properties"); + if (!Files.exists(metaPath)) { + problems.add(directoryPath + " is not formatted."); + } else { + Properties properties = Utils.loadProps(metaPath.toString()); + RawMetaProperties rawMetaProperties = new RawMetaProperties(properties); + Optional<RawMetaProperties> curMetadata; + + switch (rawMetaProperties.getVersion()) { + case 0: + case 1: + curMetadata = Optional.of(rawMetaProperties); + break; + default: + problems.add("Unsupported version for " + metaPath + ": " + rawMetaProperties.getVersion()); + curMetadata = Optional.empty(); + break; + } + + if (!prevMetadata.isPresent()) { + prevMetadata = curMetadata; + } else { + if (curMetadata.isPresent() && !prevMetadata.get().equals(curMetadata.get())) { + problems.add(String.format("Metadata for %s was %s, but other directories featured %s", metaPath, curMetadata.get(), prevMetadata.get())); + } + } + } + } + } + + if (prevMetadata.isPresent()) { + if (selfManagedMode) { + if (prevMetadata.get().getVersion() == 0) { + problems.add("The kafka configuration file appears to be for a cluster in KRaft mode, but " + "the directories are formatted for legacy mode."); + } + } else if (prevMetadata.get().getVersion() == 1) { + problems.add("The kafka configuration file appears to be for a legacy cluster, but " + "the directories are formatted for a cluster in KRaft mode."); + } + } + + return validateDirectories(stream, directories, problems, foundDirectories, prevMetadata); + } + + private static int validateDirectories(PrintStream stream, List<String> directories, List<String> problems, List<String> foundDirectories, Optional<RawMetaProperties> prevMetadata) { + if (directories.isEmpty()) { + stream.println("No directories specified."); + return 0; + } else { + if (!foundDirectories.isEmpty()) { + if (foundDirectories.size() == 1) { + stream.println("Found log directory:"); + } else { + stream.println("Found log directories:"); + } + foundDirectories.forEach(d -> stream.println(" " + d)); + stream.println(""); + } + + if (prevMetadata.isPresent()) { + RawMetaProperties prev = prevMetadata.get(); + stream.println("Found metadata: " + prev); + stream.println(""); + } + + if (!problems.isEmpty()) { + if (problems.size() == 1) { + stream.println("Found problem:"); + } else { + stream.println("Found problems:"); + } + problems.forEach(d -> stream.println(" " + d)); + stream.println(""); + return 1; + } else { + return 0; + } + } + } + + + public static Namespace parseArguments(String... args) { + ArgumentParser parser = ArgumentParsers.newArgumentParser("kafka-storage").defaultHelp(true).description("The Kafka storage tool."); + Subparsers subparsers = parser.addSubparsers().dest("command"); + Subparser infoParser = subparsers.addParser("info").help("Get information about the Kafka log directories on this node."); + Subparser formatParser = subparsers.addParser("format").help("Format the Kafka log directories on this node."); + subparsers.addParser("random-uuid").help("Print a random UUID."); + + for (Subparser subpparser : Arrays.asList(infoParser, formatParser)) { + subpparser.addArgument("--config", "-c").action(store()).required(true).help("The Kafka configuration file to use."); + } + + formatParser.addArgument("--cluster-id", "-t").action(store()).required(true).help("The cluster ID to use."); + formatParser.addArgument("--add-scram", "-S").action(append()).help("A SCRAM_CREDENTIAL to add to the __cluster_metadata log e.g.\n" + + "'SCRAM-SHA-256=[name=alice,password=alice-secret]'\n" + + "'SCRAM-SHA-512=[name=alice,iterations=8192,salt=\"N3E=\",saltedpassword=\"YCE=\"]'"); + formatParser.addArgument("--ignore-formatted", "-g").action(storeTrue()); + formatParser.addArgument("--release-version", "-r").action(store()).help(String.format("A KRaft release version to use for the initial metadata version. The minimum is 3.0, the default is %s", MetadataVersion.latest().version())); + + return parser.parseArgsOrFail(args); + } + + static List<String> configToLogDirectories(LogConfig logConfig) { + List<String> logDirs = logConfig.getLogDirs(); + SortedSet<String> directories = new TreeSet<>(logDirs); + String metadataLogDir = logConfig.getMetadataLogDir(); + if (metadataLogDir != null) { + directories.add(metadataLogDir); + } + return new ArrayList<>(directories); + } + + static boolean configToSelfManagedMode(LogConfig logConfig) { + return !logConfig.parseProcessRoles().isEmpty(); + } + + + static MetadataVersion getMetadataVersion(Namespace namespace, Optional<String> defaultVersionString) { + MetadataVersion defaultValue; + if (defaultVersionString != null && defaultVersionString.isPresent()) { + defaultValue = MetadataVersion.fromVersionString(defaultVersionString.get()); + } else { + defaultValue = MetadataVersion.latest(); + } + String releaseVersion = namespace.getString("release_version"); + if (releaseVersion != null) { + return MetadataVersion.fromVersionString(releaseVersion); + } else { + return defaultValue; + } + } + + static MetaProperties buildMetadataProperties(String clusterIdStr, LogConfig config) throws TerseException { + Uuid effectiveClusterId; + try { + effectiveClusterId = Uuid.fromString(clusterIdStr); + } catch (Throwable e) { + throw new TerseException("Cluster ID string " + clusterIdStr + " does not appear to be a valid UUID: " + e.getMessage()); + } + + if (config.getNodeId() < 0) { + throw new TerseException("The node.id must be set to a non-negative integer. We saw " + config.getNodeId()); + } + + return new MetaProperties(effectiveClusterId.toString(), config.getNodeId()); + } + + static int formatCommand(PrintStream stream, List<String> directories, MetaProperties metaProperties, BootstrapMetadata bootstrapMetadata, MetadataVersion metadataVersion, boolean ignoreFormatted) throws TerseException { + if (directories.isEmpty()) { + throw new TerseException("No log directories found in the configuration."); + } + List<String> unformattedDirectories = directories.stream().filter(directory -> { + if (!Files.isDirectory(Paths.get(directory)) || !Files.exists(Paths.get(directory, "meta.properties"))) { + return true; + } else if (!ignoreFormatted) { + try { + throw new TerseException("Log directory " + directory + " is already formatted. " + + "Use --ignore-formatted to ignore this directory and format the others."); + } catch (TerseException e) { + throw new RuntimeException(e.getMessage()); + } + } else { + return false; + } + }).collect(Collectors.toList()); + + if (unformattedDirectories.isEmpty()) { + stream.println("All of the log directories are already formatted."); + } + unformattedDirectories.forEach(directory -> { + try { + Files.createDirectories(Paths.get(directory)); + } catch (Exception e) { + try { + throw new TerseException("Unable to create storage directory " + directory + ": " + e.getMessage()); + } catch (TerseException ex) { + throw new RuntimeException(ex); + } + } + + Path metaPropertiesPath = Paths.get(directory, "meta.properties"); + BrokerMetadataCheckpoint checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile()); + try { + checkpoint.write(metaProperties.toProperties()); + } catch (IOException e) { + throw new RuntimeException(e); + } + + BootstrapDirectory bootstrapDirectory = new BootstrapDirectory(directory, Optional.empty()); + try { + bootstrapDirectory.writeBinaryFile(bootstrapMetadata); + } catch (Exception e) { + throw new RuntimeException(e); + } + + stream.println("Formatting " + directory + " with metadata.version " + metadataVersion + "."); + }); + return 0; + } + + public static BootstrapMetadata buildBootstrapMetadata(MetadataVersion metadataVersion, + Optional<List<ApiMessageAndVersion>> metadataOptionalArguments, + String source) { + List<ApiMessageAndVersion> metadataRecords = new ArrayList<>(); + metadataRecords.add(new ApiMessageAndVersion(new FeatureLevelRecord() + .setName(MetadataVersion.FEATURE_NAME) + .setFeatureLevel(metadataVersion.featureLevel()), (short) 0)); + + metadataOptionalArguments.ifPresent(metadataArguments -> metadataArguments.forEach(metadataRecords::add)); Review Comment: Use can use addAll here. ########## tools/src/main/java/org/apache/kafka/tools/StorageTool.java: ########## @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.security.scram.internals.ScramFormatter; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.bootstrap.BootstrapDirectory; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.broker.BrokerMetadataCheckpoint; +import org.apache.kafka.metadata.broker.MetaProperties; +import org.apache.kafka.metadata.broker.RawMetaProperties; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.storage.internals.log.LogConfig; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.append; +import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; + +public class StorageTool { + public static void main(String... args) { + try { + Namespace namespace = parseArguments(args); + String command = namespace.getString("command"); + Optional<LogConfig> config = Optional.ofNullable(namespace.getString("config")).map(p -> { + try { + return new LogConfig(Utils.loadProps(p), Collections.emptySet(), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + executeCommand(namespace, command, config); + } catch (Exception exception) { + System.err.println(exception.getMessage()); + Exit.exit(1, exception.getMessage()); + } + } + + private static void executeCommand(Namespace namespace, String command, Optional<LogConfig> config) throws Exception { + final String info = "info"; + final String format = "format"; + if ((command.equals(info) || command.equals(format)) && !config.isPresent()) { + return; // Do nothing if config is not present + } + + switch (command) { + case info: { + List<String> directories = configToLogDirectories(config.get()); + boolean selfManagedMode = configToSelfManagedMode(config.get()); + Exit.exit(infoCommand(System.out, selfManagedMode, directories)); + break; + } + case format: { + List<String> directories = configToLogDirectories(config.get()); + String clusterId = namespace.getString("cluster_id"); + MetadataVersion metadataVersion = getMetadataVersion(namespace, Optional.of(config.get().getInterBrokerProtocolVersionString())); + if (!metadataVersion.isKRaftSupported()) { + throw new TerseException("Must specify a valid KRaft metadata version of at least 3.0."); + } + MetaProperties metaProperties = buildMetadataProperties(clusterId, config.get()); + + List<ApiMessageAndVersion> metadataRecords = new ArrayList<>(); + Optional<List<UserScramCredentialRecord>> scramRecordsOptional = getUserScramCredentialRecords(namespace); + if (scramRecordsOptional.isPresent()) { + if (!metadataVersion.isScramSupported()) { + throw new TerseException("SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later."); + } + for (ApiMessage record : scramRecordsOptional.get()) { + metadataRecords.add(new ApiMessageAndVersion(record, (short) 0)); + } + } + + BootstrapMetadata bootstrapMetadata = buildBootstrapMetadata(metadataVersion, Optional.of(metadataRecords), "format command"); + boolean ignoreFormatted = namespace.getBoolean("ignore_formatted"); + if (!configToSelfManagedMode(config.get())) { + throw new TerseException("The kafka configuration file appears to be for " + + "a legacy cluster. Formatting is only supported for clusters in KRaft mode."); + } + Exit.exit(formatCommand(System.out, directories, metaProperties, bootstrapMetadata, + metadataVersion, ignoreFormatted)); + break; + } + case "random-uuid": { + System.out.println(Uuid.randomUuid()); + Exit.exit(0); + break; + } + default: + throw new RuntimeException("Unknown command " + command); + } + } + + static int infoCommand(PrintStream stream, boolean selfManagedMode, List<String> directories) throws IOException { + List<String> problems = new ArrayList<>(); + List<String> foundDirectories = new ArrayList<>(); + Optional<RawMetaProperties> prevMetadata = Optional.empty(); + for (String directory : directories) { + Path directoryPath = Paths.get(directory); + if (!Files.isDirectory(directoryPath)) { + if (!Files.exists(directoryPath)) { + problems.add(directoryPath + " does not exist"); + } else { + problems.add(directoryPath + " is not a directory"); + } + } else { + foundDirectories.add(directoryPath.toString()); + Path metaPath = directoryPath.resolve("meta.properties"); + if (!Files.exists(metaPath)) { + problems.add(directoryPath + " is not formatted."); + } else { + Properties properties = Utils.loadProps(metaPath.toString()); + RawMetaProperties rawMetaProperties = new RawMetaProperties(properties); + Optional<RawMetaProperties> curMetadata; + + switch (rawMetaProperties.getVersion()) { + case 0: + case 1: + curMetadata = Optional.of(rawMetaProperties); + break; + default: + problems.add("Unsupported version for " + metaPath + ": " + rawMetaProperties.getVersion()); + curMetadata = Optional.empty(); + break; + } + + if (!prevMetadata.isPresent()) { + prevMetadata = curMetadata; + } else { + if (curMetadata.isPresent() && !prevMetadata.get().equals(curMetadata.get())) { + problems.add(String.format("Metadata for %s was %s, but other directories featured %s", metaPath, curMetadata.get(), prevMetadata.get())); + } + } + } + } + } + + if (prevMetadata.isPresent()) { + if (selfManagedMode) { + if (prevMetadata.get().getVersion() == 0) { + problems.add("The kafka configuration file appears to be for a cluster in KRaft mode, but " + "the directories are formatted for legacy mode."); + } + } else if (prevMetadata.get().getVersion() == 1) { + problems.add("The kafka configuration file appears to be for a legacy cluster, but " + "the directories are formatted for a cluster in KRaft mode."); + } + } + + return validateDirectories(stream, directories, problems, foundDirectories, prevMetadata); + } + + private static int validateDirectories(PrintStream stream, List<String> directories, List<String> problems, List<String> foundDirectories, Optional<RawMetaProperties> prevMetadata) { + if (directories.isEmpty()) { + stream.println("No directories specified."); + return 0; + } else { + if (!foundDirectories.isEmpty()) { + if (foundDirectories.size() == 1) { + stream.println("Found log directory:"); + } else { + stream.println("Found log directories:"); + } + foundDirectories.forEach(d -> stream.println(" " + d)); + stream.println(""); + } + + if (prevMetadata.isPresent()) { + RawMetaProperties prev = prevMetadata.get(); + stream.println("Found metadata: " + prev); + stream.println(""); + } + + if (!problems.isEmpty()) { + if (problems.size() == 1) { + stream.println("Found problem:"); + } else { + stream.println("Found problems:"); + } + problems.forEach(d -> stream.println(" " + d)); + stream.println(""); + return 1; + } else { + return 0; + } + } + } + + Review Comment: Nit: extra empty line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org