whbing commented on a change in pull request #2918:
URL: https://github.com/apache/hadoop/pull/2918#discussion_r761727777



##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java
##########
@@ -651,14 +683,123 @@ private void output(Configuration conf, FileSummary 
summary,
         is = FSImageUtil.wrapInputStreamForCompression(conf,
             summary.getCodec(), new BufferedInputStream(new LimitInputStream(
                 fin, section.getLength())));
-        outputINodes(is);
+        INodeSection s = INodeSection.parseDelimitedFrom(is);
+        LOG.info("Found {} INodes in the INode section", s.getNumInodes());
+        int count = outputINodes(is, out);
+        LOG.info("Outputted {} INodes.", count);
       }
     }
     afterOutput();
     long timeTaken = Time.monotonicNow() - startTime;
     LOG.debug("Time to output inodes: {}ms", timeTaken);
   }
 
+  /**
+   * STEP1: Multi-threaded process sub-sections.
+   * Given n (n>1) threads to process k (k>=n) sections,
+   * E.g. 10 sections and 4 threads, grouped as follows:
+   * |---------------------------------------------------------------|
+   * | (0    1    2)    (3    4    5)    (6    7)     (8    9)       |
+   * | thread[0]        thread[1]        thread[2]    thread[3]      |
+   * |---------------------------------------------------------------|
+   *
+   * STEP2: Merge files.
+   */
+  private void outputInParallel(Configuration conf, FileSummary summary,
+      ArrayList<FileSummary.Section> subSections)
+      throws IOException {
+    int nThreads = Integer.min(numThreads, subSections.size());
+    LOG.info("Outputting in parallel with {} sub-sections" +
+        " using {} threads", subSections.size(), nThreads);
+    final CopyOnWriteArrayList<IOException> exceptions =
+        new CopyOnWriteArrayList<>();
+    Thread[] threads = new Thread[nThreads];
+    String[] paths = new String[nThreads];
+    for (int i = 0; i < paths.length; i++) {
+      paths[i] = parallelOut + ".tmp." + i;
+    }
+    AtomicLong expectedINodes = new AtomicLong(0);
+    AtomicLong totalParsed = new AtomicLong(0);
+    String codec = summary.getCodec();
+
+    int mark = 0;
+    for (int i = 0; i < nThreads; i++) {
+      // Each thread processes different ordered sub-sections
+      // and outputs to different paths
+      int step = subSections.size() / nThreads +
+          (i < subSections.size() % nThreads ? 1 : 0);
+      int start = mark;
+      int end = start + step;
+      ArrayList<FileSummary.Section> subList = new ArrayList<>(
+          subSections.subList(start, end));
+      mark = end;
+      String path = paths[i];
+
+      threads[i] = new Thread(() -> {

Review comment:
       > Maybe thread pool is better here?
   
   @symious  Thanks! I will try this suggestion in next commit.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to