[
https://issues.apache.org/jira/browse/HDFS-17475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17838832#comment-17838832
]
ASF GitHub Bot commented on HDFS-17475:
---------------------------------------
ZanderXu commented on code in PR #6745:
URL: https://github.com/apache/hadoop/pull/6745#discussion_r1571614915
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java:
##########
@@ -641,6 +663,207 @@ private void closeBlockReaders() {
}
+ private class VerifyReadableCommand extends DebugCommand {
Review Comment:
Can add some comments to describe this command.
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java:
##########
@@ -641,6 +663,207 @@ private void closeBlockReaders() {
}
+ private class VerifyReadableCommand extends DebugCommand {
+ private DistributedFileSystem dfs;
+ private boolean suppressed = false;
+
+ VerifyReadableCommand() {
+ super("verifyReadable",
+ "verifyReadable "
+ + "[-path <path> | -input <input>] "
+ + "[-output <output>] "
+ + "[-concurrency <concurrency>] "
+ + "[-suppressed]",
+ " Verify if one or multiple paths are fully readable and have no
missing blocks.");
+ }
+
+ @Override
+ int run(List<String> args) throws IOException {
+ if (args.isEmpty()) {
+ System.out.println(usageText);
+ System.out.println(helpText + System.lineSeparator());
+ return 1;
+ }
+ dfs = AdminHelper.getDFS(getConf());
+ String pathStr = StringUtils.popOptionWithArgument("-path", args);
+ String inputStr = StringUtils.popOptionWithArgument("-input", args);
+ String outputStr = StringUtils.popOptionWithArgument("-output", args);
+ String concurrencyStr =
StringUtils.popOptionWithArgument("-concurrency", args);
+ suppressed = StringUtils.popOption("-suppressed", args);
+ if (pathStr == null && inputStr == null) {
+ System.out.println("Either -path or -input must be present.");
+ System.out.println(usageText);
+ System.out.println(helpText + System.lineSeparator());
+ return 1;
+ }
+ try {
+ return handleArgs(pathStr, inputStr, outputStr, concurrencyStr);
+ } catch (Exception e) {
+ System.err.println(
+ "Got IOE: " + StringUtils.stringifyException(e) + " for command: "
+ StringUtils.join(
+ ",", args));
+ return 1;
+ }
+ }
+
+ private int handleArgs(String pathStr, String inputStr, String outputStr,
String concurrencyStr)
+ throws IOException, ExecutionException, InterruptedException {
+ BufferedWriter writer = null;
+ try {
+ if (outputStr != null) {
+ File output = new File(outputStr);
+ writer = new BufferedWriter(new
OutputStreamWriter(Files.newOutputStream(output.toPath()),
+ StandardCharsets.UTF_8));
+ }
+
+ // -path takes priority over -input
+ if (pathStr != null) {
+ int result = handlePath(new Path(pathStr));
Review Comment:
how about using `handlePaths()` method too?
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java:
##########
@@ -641,6 +663,207 @@ private void closeBlockReaders() {
}
+ private class VerifyReadableCommand extends DebugCommand {
+ private DistributedFileSystem dfs;
+ private boolean suppressed = false;
+
+ VerifyReadableCommand() {
+ super("verifyReadable",
+ "verifyReadable "
+ + "[-path <path> | -input <input>] "
+ + "[-output <output>] "
+ + "[-concurrency <concurrency>] "
+ + "[-suppressed]",
+ " Verify if one or multiple paths are fully readable and have no
missing blocks.");
+ }
+
+ @Override
+ int run(List<String> args) throws IOException {
+ if (args.isEmpty()) {
+ System.out.println(usageText);
+ System.out.println(helpText + System.lineSeparator());
+ return 1;
+ }
+ dfs = AdminHelper.getDFS(getConf());
+ String pathStr = StringUtils.popOptionWithArgument("-path", args);
+ String inputStr = StringUtils.popOptionWithArgument("-input", args);
+ String outputStr = StringUtils.popOptionWithArgument("-output", args);
+ String concurrencyStr =
StringUtils.popOptionWithArgument("-concurrency", args);
+ suppressed = StringUtils.popOption("-suppressed", args);
+ if (pathStr == null && inputStr == null) {
+ System.out.println("Either -path or -input must be present.");
+ System.out.println(usageText);
+ System.out.println(helpText + System.lineSeparator());
+ return 1;
+ }
+ try {
+ return handleArgs(pathStr, inputStr, outputStr, concurrencyStr);
+ } catch (Exception e) {
+ System.err.println(
+ "Got IOE: " + StringUtils.stringifyException(e) + " for command: "
+ StringUtils.join(
+ ",", args));
+ return 1;
+ }
+ }
+
+ private int handleArgs(String pathStr, String inputStr, String outputStr,
String concurrencyStr)
+ throws IOException, ExecutionException, InterruptedException {
+ BufferedWriter writer = null;
+ try {
+ if (outputStr != null) {
+ File output = new File(outputStr);
+ writer = new BufferedWriter(new
OutputStreamWriter(Files.newOutputStream(output.toPath()),
+ StandardCharsets.UTF_8));
+ }
+
+ // -path takes priority over -input
+ if (pathStr != null) {
+ int result = handlePath(new Path(pathStr));
+ writeToOutput(writer, pathStr, result);
+ return result;
+ }
+
+ // -input must be defined by this point
+ File input = new File(inputStr);
+ if (!input.exists()) {
+ return 1;
+ }
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(Files.newInputStream(input.toPath()),
StandardCharsets.UTF_8));
+ Set<Path> paths = new HashSet<>();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ paths.add(new Path(line.trim()));
+ }
+ reader.close();
+ int concurrency = concurrencyStr == null ? 1 :
Integer.parseInt(concurrencyStr);
+ return handlePaths(paths, writer, concurrency);
+ } finally {
+ if (writer != null) {
+ writer.flush();
+ writer.close();
+ }
+ }
+ }
+
+ private void writeToOutput(BufferedWriter writer, String path, int result)
throws IOException {
+ if (writer == null) {
+ return;
+ }
+ writer.write(path);
+ writer.write(" ");
+ writer.write(String.valueOf(result));
+ writer.write("\n");
+ writer.flush();
+ }
+
+ private int handlePaths(Set<Path> paths, BufferedWriter writer, int
concurrency)
+ throws ExecutionException, InterruptedException, IOException {
+ int total = paths.size();
+ long start = Time.monotonicNow();
+ ExecutorService threadPool = Executors.newFixedThreadPool(concurrency);
+ List<Callable<Pair<Path, Integer>>> tasks = new ArrayList<>();
+ for (Path path : paths) {
+ tasks.add(() -> Pair.of(path, handlePath(path)));
+ }
+ List<Future<Pair<Path, Integer>>> futures =
+ tasks.stream().map(threadPool::submit).collect(Collectors.toList());
+
+ boolean failed = false;
+ int done = 0;
+ for (Future<Pair<Path, Integer>> future : futures) {
+ done++;
+ if (done % 1000 == 0) {
+ long elapsed = Time.monotonicNow() - start;
+ double rate = (double) done / elapsed * 1000;
+ String msg = "Progress: %d/%d, elapsed: %d ms, rate: %5.2f
files/s%n";
+ System.out.printf(msg, done, total, elapsed, rate);
+ }
+ writeToOutput(writer, future.get().getLeft().toString(),
future.get().getRight());
+ failed |= future.get().getRight() != 0;
+ }
+ return failed ? 1 : 0;
+ }
+
+ private int handlePath(Path path) {
+
+ HdfsBlockLocation[] locs;
+ try {
+ locs = (HdfsBlockLocation[]) dfs.getFileBlockLocations(path, 0,
+ dfs.getFileStatus(path).getLen());
Review Comment:
here can use Long.MAX simply.
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java:
##########
@@ -641,6 +663,207 @@ private void closeBlockReaders() {
}
+ private class VerifyReadableCommand extends DebugCommand {
+ private DistributedFileSystem dfs;
+ private boolean suppressed = false;
+
+ VerifyReadableCommand() {
+ super("verifyReadable",
+ "verifyReadable "
+ + "[-path <path> | -input <input>] "
+ + "[-output <output>] "
+ + "[-concurrency <concurrency>] "
+ + "[-suppressed]",
+ " Verify if one or multiple paths are fully readable and have no
missing blocks.");
+ }
+
+ @Override
+ int run(List<String> args) throws IOException {
+ if (args.isEmpty()) {
+ System.out.println(usageText);
+ System.out.println(helpText + System.lineSeparator());
+ return 1;
+ }
+ dfs = AdminHelper.getDFS(getConf());
+ String pathStr = StringUtils.popOptionWithArgument("-path", args);
+ String inputStr = StringUtils.popOptionWithArgument("-input", args);
+ String outputStr = StringUtils.popOptionWithArgument("-output", args);
+ String concurrencyStr =
StringUtils.popOptionWithArgument("-concurrency", args);
+ suppressed = StringUtils.popOption("-suppressed", args);
+ if (pathStr == null && inputStr == null) {
+ System.out.println("Either -path or -input must be present.");
+ System.out.println(usageText);
+ System.out.println(helpText + System.lineSeparator());
+ return 1;
+ }
+ try {
+ return handleArgs(pathStr, inputStr, outputStr, concurrencyStr);
+ } catch (Exception e) {
+ System.err.println(
+ "Got IOE: " + StringUtils.stringifyException(e) + " for command: "
+ StringUtils.join(
+ ",", args));
+ return 1;
+ }
+ }
+
+ private int handleArgs(String pathStr, String inputStr, String outputStr,
String concurrencyStr)
+ throws IOException, ExecutionException, InterruptedException {
+ BufferedWriter writer = null;
+ try {
+ if (outputStr != null) {
+ File output = new File(outputStr);
+ writer = new BufferedWriter(new
OutputStreamWriter(Files.newOutputStream(output.toPath()),
+ StandardCharsets.UTF_8));
+ }
+
+ // -path takes priority over -input
+ if (pathStr != null) {
+ int result = handlePath(new Path(pathStr));
+ writeToOutput(writer, pathStr, result);
+ return result;
+ }
+
+ // -input must be defined by this point
+ File input = new File(inputStr);
+ if (!input.exists()) {
+ return 1;
+ }
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(Files.newInputStream(input.toPath()),
StandardCharsets.UTF_8));
+ Set<Path> paths = new HashSet<>();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ paths.add(new Path(line.trim()));
+ }
+ reader.close();
+ int concurrency = concurrencyStr == null ? 1 :
Integer.parseInt(concurrencyStr);
+ return handlePaths(paths, writer, concurrency);
+ } finally {
+ if (writer != null) {
+ writer.flush();
+ writer.close();
+ }
+ }
+ }
+
+ private void writeToOutput(BufferedWriter writer, String path, int result)
throws IOException {
Review Comment:
how about writing more detailed data to this writer? such as:
1. path result
2. number of blocks have been verified
3. detailed blocks lists and the verify result for each block
or you can add a parameter to control it
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java:
##########
@@ -641,6 +663,207 @@ private void closeBlockReaders() {
}
+ private class VerifyReadableCommand extends DebugCommand {
+ private DistributedFileSystem dfs;
+ private boolean suppressed = false;
+
+ VerifyReadableCommand() {
+ super("verifyReadable",
+ "verifyReadable "
+ + "[-path <path> | -input <input>] "
Review Comment:
can add some descriptions for these parameters.
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java:
##########
@@ -641,6 +663,207 @@ private void closeBlockReaders() {
}
+ private class VerifyReadableCommand extends DebugCommand {
+ private DistributedFileSystem dfs;
+ private boolean suppressed = false;
+
+ VerifyReadableCommand() {
+ super("verifyReadable",
+ "verifyReadable "
+ + "[-path <path> | -input <input>] "
+ + "[-output <output>] "
+ + "[-concurrency <concurrency>] "
+ + "[-suppressed]",
+ " Verify if one or multiple paths are fully readable and have no
missing blocks.");
+ }
+
+ @Override
+ int run(List<String> args) throws IOException {
+ if (args.isEmpty()) {
+ System.out.println(usageText);
+ System.out.println(helpText + System.lineSeparator());
+ return 1;
+ }
+ dfs = AdminHelper.getDFS(getConf());
+ String pathStr = StringUtils.popOptionWithArgument("-path", args);
+ String inputStr = StringUtils.popOptionWithArgument("-input", args);
+ String outputStr = StringUtils.popOptionWithArgument("-output", args);
+ String concurrencyStr =
StringUtils.popOptionWithArgument("-concurrency", args);
+ suppressed = StringUtils.popOption("-suppressed", args);
+ if (pathStr == null && inputStr == null) {
+ System.out.println("Either -path or -input must be present.");
+ System.out.println(usageText);
+ System.out.println(helpText + System.lineSeparator());
+ return 1;
+ }
+ try {
+ return handleArgs(pathStr, inputStr, outputStr, concurrencyStr);
+ } catch (Exception e) {
+ System.err.println(
+ "Got IOE: " + StringUtils.stringifyException(e) + " for command: "
+ StringUtils.join(
+ ",", args));
+ return 1;
+ }
+ }
+
+ private int handleArgs(String pathStr, String inputStr, String outputStr,
String concurrencyStr)
Review Comment:
maybe you can abstract this method as the following steps to make it clear
1. prepare the paths need to be verified
2. verify these paths
> Add a command to check if files are readable
> --------------------------------------------
>
> Key: HDFS-17475
> URL: https://issues.apache.org/jira/browse/HDFS-17475
> Project: Hadoop HDFS
> Issue Type: Improvement
> Components: hdfs
> Reporter: Felix N
> Assignee: Felix N
> Priority: Minor
> Labels: pull-request-available
>
> Sometimes a job can fail due to one unreadable file down the line due to
> missing replicas or dead DNs or other reason. This command should allow users
> to check whether files are readable by checking for metadata on DNs without
> executing full read pipelines of the files.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]