eolivelli commented on a change in pull request #2090: Update and flush 
lastLogMark when replaying journal
URL: https://github.com/apache/bookkeeper/pull/2090#discussion_r284095750
 
 

 ##########
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
 ##########
 @@ -883,12 +883,51 @@ public void process(int journalVersion, long offset, 
ByteBuffer recBuff) throws
         };
 
         for (Journal journal : journals) {
-            journal.replay(scanner);
+            replay(journal, scanner);
         }
         long elapsedTs = System.currentTimeMillis() - startTs;
         LOG.info("Finished replaying journal in {} ms.", elapsedTs);
     }
 
+    /**
+     * Replay journal files and updates journal's in-memory lastLogMark object.
+     *
+     * @param journal Journal object corresponding to a journalDir
+     * @param scanner Scanner to process replayed entries.
+     * @throws IOException
+     */
+    private void replay(Journal journal, JournalScanner scanner) throws 
IOException {
+        final LogMark markedLog = journal.getLastLogMark().getCurMark();
+        List<Long> logs = 
Journal.listJournalIds(journal.getJournalDirectory(), journalId -> {
+            if (journalId < markedLog.getLogFileId()) {
+                return false;
+            }
+            return true;
+        });
+        // last log mark may be missed due to no sync up before
+        // validate filtered log ids only when we have markedLogId
+        if (markedLog.getLogFileId() > 0) {
+            if (logs.size() == 0 || logs.get(0) != markedLog.getLogFileId()) {
+                throw new IOException("Recovery log " + 
markedLog.getLogFileId() + " is missing");
+            }
+        }
+
+        // TODO: When reading in the journal logs that need to be synced, we
+        // should use BufferedChannels instead to minimize the amount of
+        // system calls done.
+        for (Long id : logs) {
+            long logPosition = 0L;
+            if (id == markedLog.getLogFileId()) {
+                logPosition = markedLog.getLogFileOffset();
+            }
+            LOG.info("Replaying journal {} from position {}", id, logPosition);
+            journal.scanJournal(id, logPosition, scanner);
+            // Update LastLogMark to Long.MAX_VALUE position after replaying 
journal
+            // After LedgerStorage flush, SyncThread should persist this to 
disk
 
 Review comment:
   What about creating a method *moveLastMaskToEof()* in Journal?
   
   This we will notice better this new case in the future and we will have a 
better encapsulation of the behaviour of the Journal (I don't like 
'x.get.modifyYourInternals) 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to