symious commented on a change in pull request #2918:
URL: https://github.com/apache/hadoop/pull/2918#discussion_r761133799



##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java
##########
@@ -651,14 +683,123 @@ private void output(Configuration conf, FileSummary 
summary,
         is = FSImageUtil.wrapInputStreamForCompression(conf,
             summary.getCodec(), new BufferedInputStream(new LimitInputStream(
                 fin, section.getLength())));
-        outputINodes(is);
+        INodeSection s = INodeSection.parseDelimitedFrom(is);
+        LOG.info("Found {} INodes in the INode section", s.getNumInodes());
+        int count = outputINodes(is, out);
+        LOG.info("Outputted {} INodes.", count);
       }
     }
     afterOutput();
     long timeTaken = Time.monotonicNow() - startTime;
     LOG.debug("Time to output inodes: {}ms", timeTaken);
   }
 
+  /**
+   * STEP1: Multi-threaded process sub-sections.
+   * Given n (n>1) threads to process k (k>=n) sections,
+   * E.g. 10 sections and 4 threads, grouped as follows:
+   * |---------------------------------------------------------------|
+   * | (0    1    2)    (3    4    5)    (6    7)     (8    9)       |
+   * | thread[0]        thread[1]        thread[2]    thread[3]      |
+   * |---------------------------------------------------------------|
+   *
+   * STEP2: Merge files.
+   */
+  private void outputInParallel(Configuration conf, FileSummary summary,
+      ArrayList<FileSummary.Section> subSections)
+      throws IOException {
+    int nThreads = Integer.min(numThreads, subSections.size());
+    LOG.info("Outputting in parallel with {} sub-sections" +
+        " using {} threads", subSections.size(), nThreads);
+    final CopyOnWriteArrayList<IOException> exceptions =
+        new CopyOnWriteArrayList<>();
+    Thread[] threads = new Thread[nThreads];
+    String[] paths = new String[nThreads];
+    for (int i = 0; i < paths.length; i++) {
+      paths[i] = parallelOut + ".tmp." + i;
+    }
+    AtomicLong expectedINodes = new AtomicLong(0);
+    AtomicLong totalParsed = new AtomicLong(0);
+    String codec = summary.getCodec();
+
+    int mark = 0;
+    for (int i = 0; i < nThreads; i++) {
+      // Each thread processes different ordered sub-sections
+      // and outputs to different paths
+      int step = subSections.size() / nThreads +
+          (i < subSections.size() % nThreads ? 1 : 0);
+      int start = mark;
+      int end = start + step;
+      ArrayList<FileSummary.Section> subList = new ArrayList<>(
+          subSections.subList(start, end));
+      mark = end;
+      String path = paths[i];
+
+      threads[i] = new Thread(() -> {
+        LOG.info("output iNodes of sub-sections: [{},{})", start, end);
+        InputStream is = null;
+        try (PrintStream theOut = new PrintStream(path, "UTF-8")) {
+          long startTime = Time.monotonicNow();
+          for (int j = 0; j < subList.size(); j++) {
+            is = getInputStreamForSection(subList.get(j), codec, conf);
+            if (start == 0 && j == 0) {
+              // The first iNode section has a header which must be
+              // processed first
+              INodeSection s = INodeSection.parseDelimitedFrom(is);
+              expectedINodes.set(s.getNumInodes());
+            }
+            totalParsed.addAndGet(outputINodes(is, theOut));
+          }
+          long timeTaken = Time.monotonicNow() - startTime;
+          LOG.info("Time to output iNodes of sub-sections: [{},{}) {} ms",
+              start, end, timeTaken);
+        } catch (Exception e) {
+          exceptions.add(new IOException(e));
+        } finally {
+          try {
+            is.close();
+          } catch (IOException ioe) {
+            LOG.warn("Failed to close the input stream, ignoring", ioe);
+          }
+        }
+      });
+    }
+    for (Thread t : threads) {
+      t.start();
+    }
+    for (Thread t : threads) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted when thread join.", e);
+        throw new IOException(e);
+      }
+    }
+
+    if (exceptions.size() != 0) {

Review comment:
       Is this check happening after all threads are finished? Can we return 
the exception promptly?

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java
##########
@@ -651,14 +683,123 @@ private void output(Configuration conf, FileSummary 
summary,
         is = FSImageUtil.wrapInputStreamForCompression(conf,
             summary.getCodec(), new BufferedInputStream(new LimitInputStream(
                 fin, section.getLength())));
-        outputINodes(is);
+        INodeSection s = INodeSection.parseDelimitedFrom(is);
+        LOG.info("Found {} INodes in the INode section", s.getNumInodes());
+        int count = outputINodes(is, out);
+        LOG.info("Outputted {} INodes.", count);
       }
     }
     afterOutput();
     long timeTaken = Time.monotonicNow() - startTime;
     LOG.debug("Time to output inodes: {}ms", timeTaken);
   }
 
+  /**
+   * STEP1: Multi-threaded process sub-sections.
+   * Given n (n>1) threads to process k (k>=n) sections,
+   * E.g. 10 sections and 4 threads, grouped as follows:
+   * |---------------------------------------------------------------|
+   * | (0    1    2)    (3    4    5)    (6    7)     (8    9)       |
+   * | thread[0]        thread[1]        thread[2]    thread[3]      |
+   * |---------------------------------------------------------------|
+   *
+   * STEP2: Merge files.
+   */
+  private void outputInParallel(Configuration conf, FileSummary summary,
+      ArrayList<FileSummary.Section> subSections)
+      throws IOException {
+    int nThreads = Integer.min(numThreads, subSections.size());
+    LOG.info("Outputting in parallel with {} sub-sections" +
+        " using {} threads", subSections.size(), nThreads);
+    final CopyOnWriteArrayList<IOException> exceptions =
+        new CopyOnWriteArrayList<>();
+    Thread[] threads = new Thread[nThreads];
+    String[] paths = new String[nThreads];
+    for (int i = 0; i < paths.length; i++) {
+      paths[i] = parallelOut + ".tmp." + i;
+    }
+    AtomicLong expectedINodes = new AtomicLong(0);
+    AtomicLong totalParsed = new AtomicLong(0);
+    String codec = summary.getCodec();
+
+    int mark = 0;
+    for (int i = 0; i < nThreads; i++) {
+      // Each thread processes different ordered sub-sections
+      // and outputs to different paths
+      int step = subSections.size() / nThreads +
+          (i < subSections.size() % nThreads ? 1 : 0);
+      int start = mark;
+      int end = start + step;
+      ArrayList<FileSummary.Section> subList = new ArrayList<>(
+          subSections.subList(start, end));
+      mark = end;
+      String path = paths[i];
+
+      threads[i] = new Thread(() -> {

Review comment:
       Maybe thread pool is better here?

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java
##########
@@ -642,6 +661,19 @@ long getParentId(long id) throws IOException {
   private void output(Configuration conf, FileSummary summary,
       FileInputStream fin, ArrayList<FileSummary.Section> sections)
       throws IOException {
+    ArrayList<FileSummary.Section> allINodeSubSections =
+        getINodeSubSections(sections);
+    if (numThreads > 1 && !parallelOut.equals("-") &&
+        allINodeSubSections.size() > 1) {

Review comment:
       It would be good to add some logs showing it's using serial or parallel 
output. And if the user is using the tool with option "threads" but her fsimage 
doesn't support the feature, better add some logs notifying her it's falling 
back to the serial output.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to