ZanderXu commented on code in PR #6745:
URL: https://github.com/apache/hadoop/pull/6745#discussion_r1571614915


##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java:
##########
@@ -641,6 +663,207 @@ private void closeBlockReaders() {
 
   }
 
+  private class VerifyReadableCommand extends DebugCommand {

Review Comment:
   Can add some comments to describe this command.



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java:
##########
@@ -641,6 +663,207 @@ private void closeBlockReaders() {
 
   }
 
+  private class VerifyReadableCommand extends DebugCommand {
+    private DistributedFileSystem dfs;
+    private boolean suppressed = false;
+
+    VerifyReadableCommand() {
+      super("verifyReadable",
+          "verifyReadable "
+              + "[-path <path> | -input <input>] "
+              + "[-output <output>] "
+              + "[-concurrency <concurrency>] "
+              + "[-suppressed]",
+          "  Verify if one or multiple paths are fully readable and have no 
missing blocks.");
+    }
+
+    @Override
+    int run(List<String> args) throws IOException {
+      if (args.isEmpty()) {
+        System.out.println(usageText);
+        System.out.println(helpText + System.lineSeparator());
+        return 1;
+      }
+      dfs = AdminHelper.getDFS(getConf());
+      String pathStr = StringUtils.popOptionWithArgument("-path", args);
+      String inputStr = StringUtils.popOptionWithArgument("-input", args);
+      String outputStr = StringUtils.popOptionWithArgument("-output", args);
+      String concurrencyStr = 
StringUtils.popOptionWithArgument("-concurrency", args);
+      suppressed = StringUtils.popOption("-suppressed", args);
+      if (pathStr == null && inputStr == null) {
+        System.out.println("Either -path or -input must be present.");
+        System.out.println(usageText);
+        System.out.println(helpText + System.lineSeparator());
+        return 1;
+      }
+      try {
+        return handleArgs(pathStr, inputStr, outputStr, concurrencyStr);
+      } catch (Exception e) {
+        System.err.println(
+            "Got IOE: " + StringUtils.stringifyException(e) + " for command: " 
+ StringUtils.join(
+                ",", args));
+        return 1;
+      }
+    }
+
+    private int handleArgs(String pathStr, String inputStr, String outputStr, 
String concurrencyStr)
+        throws IOException, ExecutionException, InterruptedException {
+      BufferedWriter writer = null;
+      try {
+        if (outputStr != null) {
+          File output = new File(outputStr);
+          writer = new BufferedWriter(new 
OutputStreamWriter(Files.newOutputStream(output.toPath()),
+              StandardCharsets.UTF_8));
+        }
+
+        // -path takes priority over -input
+        if (pathStr != null) {
+          int result = handlePath(new Path(pathStr));

Review Comment:
   how about using `handlePaths()` method too?



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java:
##########
@@ -641,6 +663,207 @@ private void closeBlockReaders() {
 
   }
 
+  private class VerifyReadableCommand extends DebugCommand {
+    private DistributedFileSystem dfs;
+    private boolean suppressed = false;
+
+    VerifyReadableCommand() {
+      super("verifyReadable",
+          "verifyReadable "
+              + "[-path <path> | -input <input>] "
+              + "[-output <output>] "
+              + "[-concurrency <concurrency>] "
+              + "[-suppressed]",
+          "  Verify if one or multiple paths are fully readable and have no 
missing blocks.");
+    }
+
+    @Override
+    int run(List<String> args) throws IOException {
+      if (args.isEmpty()) {
+        System.out.println(usageText);
+        System.out.println(helpText + System.lineSeparator());
+        return 1;
+      }
+      dfs = AdminHelper.getDFS(getConf());
+      String pathStr = StringUtils.popOptionWithArgument("-path", args);
+      String inputStr = StringUtils.popOptionWithArgument("-input", args);
+      String outputStr = StringUtils.popOptionWithArgument("-output", args);
+      String concurrencyStr = 
StringUtils.popOptionWithArgument("-concurrency", args);
+      suppressed = StringUtils.popOption("-suppressed", args);
+      if (pathStr == null && inputStr == null) {
+        System.out.println("Either -path or -input must be present.");
+        System.out.println(usageText);
+        System.out.println(helpText + System.lineSeparator());
+        return 1;
+      }
+      try {
+        return handleArgs(pathStr, inputStr, outputStr, concurrencyStr);
+      } catch (Exception e) {
+        System.err.println(
+            "Got IOE: " + StringUtils.stringifyException(e) + " for command: " 
+ StringUtils.join(
+                ",", args));
+        return 1;
+      }
+    }
+
+    private int handleArgs(String pathStr, String inputStr, String outputStr, 
String concurrencyStr)
+        throws IOException, ExecutionException, InterruptedException {
+      BufferedWriter writer = null;
+      try {
+        if (outputStr != null) {
+          File output = new File(outputStr);
+          writer = new BufferedWriter(new 
OutputStreamWriter(Files.newOutputStream(output.toPath()),
+              StandardCharsets.UTF_8));
+        }
+
+        // -path takes priority over -input
+        if (pathStr != null) {
+          int result = handlePath(new Path(pathStr));
+          writeToOutput(writer, pathStr, result);
+          return result;
+        }
+
+        // -input must be defined by this point
+        File input = new File(inputStr);
+        if (!input.exists()) {
+          return 1;
+        }
+        BufferedReader reader = new BufferedReader(
+            new InputStreamReader(Files.newInputStream(input.toPath()), 
StandardCharsets.UTF_8));
+        Set<Path> paths = new HashSet<>();
+        String line;
+        while ((line = reader.readLine()) != null) {
+          paths.add(new Path(line.trim()));
+        }
+        reader.close();
+        int concurrency = concurrencyStr == null ? 1 : 
Integer.parseInt(concurrencyStr);
+        return handlePaths(paths, writer, concurrency);
+      } finally {
+        if (writer != null) {
+          writer.flush();
+          writer.close();
+        }
+      }
+    }
+
+    private void writeToOutput(BufferedWriter writer, String path, int result) 
throws IOException {
+      if (writer == null) {
+        return;
+      }
+      writer.write(path);
+      writer.write(" ");
+      writer.write(String.valueOf(result));
+      writer.write("\n");
+      writer.flush();
+    }
+
+    private int handlePaths(Set<Path> paths, BufferedWriter writer, int 
concurrency)
+        throws ExecutionException, InterruptedException, IOException {
+      int total = paths.size();
+      long start = Time.monotonicNow();
+      ExecutorService threadPool = Executors.newFixedThreadPool(concurrency);
+      List<Callable<Pair<Path, Integer>>> tasks = new ArrayList<>();
+      for (Path path : paths) {
+        tasks.add(() -> Pair.of(path, handlePath(path)));
+      }
+      List<Future<Pair<Path, Integer>>> futures =
+          tasks.stream().map(threadPool::submit).collect(Collectors.toList());
+
+      boolean failed = false;
+      int done = 0;
+      for (Future<Pair<Path, Integer>> future : futures) {
+        done++;
+        if (done % 1000 == 0) {
+          long elapsed = Time.monotonicNow() - start;
+          double rate = (double) done / elapsed * 1000;
+          String msg = "Progress: %d/%d, elapsed: %d ms, rate: %5.2f 
files/s%n";
+          System.out.printf(msg, done, total, elapsed, rate);
+        }
+        writeToOutput(writer, future.get().getLeft().toString(), 
future.get().getRight());
+        failed |= future.get().getRight() != 0;
+      }
+      return failed ? 1 : 0;
+    }
+
+    private int handlePath(Path path) {
+
+      HdfsBlockLocation[] locs;
+      try {
+        locs = (HdfsBlockLocation[]) dfs.getFileBlockLocations(path, 0,
+            dfs.getFileStatus(path).getLen());

Review Comment:
   here can use Long.MAX simply.



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java:
##########
@@ -641,6 +663,207 @@ private void closeBlockReaders() {
 
   }
 
+  private class VerifyReadableCommand extends DebugCommand {
+    private DistributedFileSystem dfs;
+    private boolean suppressed = false;
+
+    VerifyReadableCommand() {
+      super("verifyReadable",
+          "verifyReadable "
+              + "[-path <path> | -input <input>] "
+              + "[-output <output>] "
+              + "[-concurrency <concurrency>] "
+              + "[-suppressed]",
+          "  Verify if one or multiple paths are fully readable and have no 
missing blocks.");
+    }
+
+    @Override
+    int run(List<String> args) throws IOException {
+      if (args.isEmpty()) {
+        System.out.println(usageText);
+        System.out.println(helpText + System.lineSeparator());
+        return 1;
+      }
+      dfs = AdminHelper.getDFS(getConf());
+      String pathStr = StringUtils.popOptionWithArgument("-path", args);
+      String inputStr = StringUtils.popOptionWithArgument("-input", args);
+      String outputStr = StringUtils.popOptionWithArgument("-output", args);
+      String concurrencyStr = 
StringUtils.popOptionWithArgument("-concurrency", args);
+      suppressed = StringUtils.popOption("-suppressed", args);
+      if (pathStr == null && inputStr == null) {
+        System.out.println("Either -path or -input must be present.");
+        System.out.println(usageText);
+        System.out.println(helpText + System.lineSeparator());
+        return 1;
+      }
+      try {
+        return handleArgs(pathStr, inputStr, outputStr, concurrencyStr);
+      } catch (Exception e) {
+        System.err.println(
+            "Got IOE: " + StringUtils.stringifyException(e) + " for command: " 
+ StringUtils.join(
+                ",", args));
+        return 1;
+      }
+    }
+
+    private int handleArgs(String pathStr, String inputStr, String outputStr, 
String concurrencyStr)
+        throws IOException, ExecutionException, InterruptedException {
+      BufferedWriter writer = null;
+      try {
+        if (outputStr != null) {
+          File output = new File(outputStr);
+          writer = new BufferedWriter(new 
OutputStreamWriter(Files.newOutputStream(output.toPath()),
+              StandardCharsets.UTF_8));
+        }
+
+        // -path takes priority over -input
+        if (pathStr != null) {
+          int result = handlePath(new Path(pathStr));
+          writeToOutput(writer, pathStr, result);
+          return result;
+        }
+
+        // -input must be defined by this point
+        File input = new File(inputStr);
+        if (!input.exists()) {
+          return 1;
+        }
+        BufferedReader reader = new BufferedReader(
+            new InputStreamReader(Files.newInputStream(input.toPath()), 
StandardCharsets.UTF_8));
+        Set<Path> paths = new HashSet<>();
+        String line;
+        while ((line = reader.readLine()) != null) {
+          paths.add(new Path(line.trim()));
+        }
+        reader.close();
+        int concurrency = concurrencyStr == null ? 1 : 
Integer.parseInt(concurrencyStr);
+        return handlePaths(paths, writer, concurrency);
+      } finally {
+        if (writer != null) {
+          writer.flush();
+          writer.close();
+        }
+      }
+    }
+
+    private void writeToOutput(BufferedWriter writer, String path, int result) 
throws IOException {

Review Comment:
   how about writing more detailed data to this writer? such as:
   
   1. path  result
   2. number of blocks have been verified
   3. detailed blocks lists and the verify result for each block
   
   or you can add a parameter to control it 



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java:
##########
@@ -641,6 +663,207 @@ private void closeBlockReaders() {
 
   }
 
+  private class VerifyReadableCommand extends DebugCommand {
+    private DistributedFileSystem dfs;
+    private boolean suppressed = false;
+
+    VerifyReadableCommand() {
+      super("verifyReadable",
+          "verifyReadable "
+              + "[-path <path> | -input <input>] "

Review Comment:
   can add some descriptions for these parameters.



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java:
##########
@@ -641,6 +663,207 @@ private void closeBlockReaders() {
 
   }
 
+  private class VerifyReadableCommand extends DebugCommand {
+    private DistributedFileSystem dfs;
+    private boolean suppressed = false;
+
+    VerifyReadableCommand() {
+      super("verifyReadable",
+          "verifyReadable "
+              + "[-path <path> | -input <input>] "
+              + "[-output <output>] "
+              + "[-concurrency <concurrency>] "
+              + "[-suppressed]",
+          "  Verify if one or multiple paths are fully readable and have no 
missing blocks.");
+    }
+
+    @Override
+    int run(List<String> args) throws IOException {
+      if (args.isEmpty()) {
+        System.out.println(usageText);
+        System.out.println(helpText + System.lineSeparator());
+        return 1;
+      }
+      dfs = AdminHelper.getDFS(getConf());
+      String pathStr = StringUtils.popOptionWithArgument("-path", args);
+      String inputStr = StringUtils.popOptionWithArgument("-input", args);
+      String outputStr = StringUtils.popOptionWithArgument("-output", args);
+      String concurrencyStr = 
StringUtils.popOptionWithArgument("-concurrency", args);
+      suppressed = StringUtils.popOption("-suppressed", args);
+      if (pathStr == null && inputStr == null) {
+        System.out.println("Either -path or -input must be present.");
+        System.out.println(usageText);
+        System.out.println(helpText + System.lineSeparator());
+        return 1;
+      }
+      try {
+        return handleArgs(pathStr, inputStr, outputStr, concurrencyStr);
+      } catch (Exception e) {
+        System.err.println(
+            "Got IOE: " + StringUtils.stringifyException(e) + " for command: " 
+ StringUtils.join(
+                ",", args));
+        return 1;
+      }
+    }
+
+    private int handleArgs(String pathStr, String inputStr, String outputStr, 
String concurrencyStr)

Review Comment:
   maybe you can abstract this method as the following steps to make it clear
   
   1. prepare the paths need to be verified
   2. verify these paths



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to