sodonnel commented on a change in pull request #1028: HDFS-14617 - Improve 
fsimage load time by writing sub-sections to the fsimage index
URL: https://github.com/apache/hadoop/pull/1028#discussion_r313941659
 
 

 ##########
 File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
 ##########
 @@ -217,33 +273,151 @@ void loadINodeDirectorySection(InputStream in) throws 
IOException {
         INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
         for (long id : e.getChildrenList()) {
           INode child = dir.getInode(id);
-          addToParent(p, child);
+          if (addToParent(p, child)) {
+            if (child.isFile()) {
+              inodeList.add(child);
+            }
+            if (inodeList.size() >= 1000) {
+              addToCacheAndBlockMap(inodeList);
+              inodeList.clear();
+            }
+          }
+
         }
+
         for (int refId : e.getRefChildrenList()) {
           INodeReference ref = refList.get(refId);
-          addToParent(p, ref);
+          if (addToParent(p, ref)) {
+            if (ref.isFile()) {
+              inodeList.add(ref);
+            }
+            if (inodeList.size() >= 1000) {
+              addToCacheAndBlockMap(inodeList);
+              inodeList.clear();
+            }
+          }
         }
       }
+      addToCacheAndBlockMap(inodeList);
+    }
+
+    private void addToCacheAndBlockMap(ArrayList<INode> inodeList) {
+      try {
+        cacheNameMapLock.lock();
+        for (INode i : inodeList) {
+          dir.cacheName(i);
+        }
+      } finally {
+        cacheNameMapLock.unlock();
+      }
+
+      try {
+        blockMapLock.lock();
+        for (INode i : inodeList) {
+          updateBlocksMap(i.asFile(), fsn.getBlockManager());
+        }
+      } finally {
+        blockMapLock.unlock();
+      }
     }
 
     void loadINodeSection(InputStream in, StartupProgress prog,
         Step currentStep) throws IOException {
-      INodeSection s = INodeSection.parseDelimitedFrom(in);
-      fsn.dir.resetLastInodeId(s.getLastInodeId());
-      long numInodes = s.getNumInodes();
-      LOG.info("Loading " + numInodes + " INodes.");
-      prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes);
+      loadINodeSectionHeader(in, prog, currentStep);
       Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep);
-      for (int i = 0; i < numInodes; ++i) {
+      int totalLoaded = loadINodesInSection(in, counter);
+      LOG.info("Successfully loaded {} inodes", totalLoaded);
+    }
+
+    private int loadINodesInSection(InputStream in, Counter counter)
+        throws IOException {
+      // As the input stream is a LimitInputStream, the reading will stop when
+      // EOF is encountered at the end of the stream.
+      int cntr = 0;
+      while (true) {
         INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
+        if (p == null) {
+          break;
+        }
         if (p.getId() == INodeId.ROOT_INODE_ID) {
-          loadRootINode(p);
+          synchronized(this) {
+            loadRootINode(p);
+          }
         } else {
           INode n = loadINode(p);
-          dir.addToInodeMap(n);
+          synchronized(this) {
+            dir.addToInodeMap(n);
+          }
+        }
+        cntr ++;
+        if (counter != null) {
+          counter.increment();
         }
-        counter.increment();
       }
+      return cntr;
+    }
+
+
+    private void loadINodeSectionHeader(InputStream in, StartupProgress prog,
+        Step currentStep) throws IOException {
+      INodeSection s = INodeSection.parseDelimitedFrom(in);
+      fsn.dir.resetLastInodeId(s.getLastInodeId());
+      long numInodes = s.getNumInodes();
+      LOG.info("Loading " + numInodes + " INodes.");
+      prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes);
+    }
+
+    void loadINodeSectionInParallel(ExecutorService service,
+        ArrayList<FileSummary.Section> sections,
+        String compressionCodec, StartupProgress prog,
+        Step currentStep) throws IOException {
+      LOG.info("Loading the INode section in parallel with {} sub-sections",
+          sections.size());
+      CountDownLatch latch = new CountDownLatch(sections.size());
+      AtomicInteger totalLoaded = new AtomicInteger(0);
+      final CopyOnWriteArrayList<IOException> exceptions =
+          new CopyOnWriteArrayList<>();
+
+      for (int i=0; i < sections.size(); i++) {
+        FileSummary.Section s = sections.get(i);
+        InputStream ins = parent.getInputStreamForSection(s, compressionCodec);
+        if (i == 0) {
+          // The first inode section has a header which must be processed first
+          loadINodeSectionHeader(ins, prog, currentStep);
+        }
+
+        service.submit(new Runnable() {
+           public void run() {
+            try {
+               totalLoaded.addAndGet(loadINodesInSection(ins, null));
+               prog.setCount(Phase.LOADING_FSIMAGE, currentStep,
+                   totalLoaded.get());
+            } catch (Exception e) {
+              LOG.error("An exception occurred loading INodes in parallel", e);
+              exceptions.add(new IOException(e));
+            } finally {
+              latch.countDown();
+              try {
+                ins.close();
+              } catch (IOException ioe) {
+                LOG.warn("Failed to close the input stream, ignoring", ioe);
+              }
+            }
+          }
+        });
+      }
+      try {
+        latch.await();
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted waiting for countdown latch");
+      }
+      if (exceptions.size() != 0) {
+        LOG.error("{} exceptions occurred loading INodes", exceptions.size());
+        throw exceptions.get(0);
+      }
+      // TODO - should we fail if total_loaded != total_expected?
 
 Review comment:
   The latest version I pushed removes this TODO and causes the load to fail if 
the number loaded != number expected.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to