This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new f89e3fb  Update lastLogMark to EOF when replaying journal
f89e3fb is described below

commit f89e3fbb751fbef7012e695ed5873eade13ba4f5
Author: karanmehta93 <[email protected]>
AuthorDate: Fri May 31 09:49:13 2019 -0500

    Update lastLogMark to EOF when replaying journal
    
    Descriptions of the changes in this PR:
    
    ### Motivation
    
    The 
[commit](https://github.com/apache/bookkeeper/commit/36be8362399341022c8de64f9319270726df2cb3)
 caused integration test failure `test101_RegenerateIndex`, with the exception
    ```
    ```java.io.IOException: Invalid argument
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.FileDispatcherImpl.read(FileDispatcherImpl.java:46)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:197)
        at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:159)
        at 
org.apache.bookkeeper.bookie.JournalChannel.read(JournalChannel.java:257)
        at org.apache.bookkeeper.bookie.Journal.fullRead(Journal.java:1171)
        at org.apache.bookkeeper.bookie.Journal.scanJournal(Journal.java:792)
        at org.apache.bookkeeper.bookie.Bookie.replay(Bookie.java:924)
        at org.apache.bookkeeper.bookie.Bookie.readJournal(Bookie.java:886)
        at org.apache.bookkeeper.bookie.Bookie.start(Bookie.java:943)
        at org.apache.bookkeeper.proto.BookieServer.start(BookieServer.java:141)
        at 
org.apache.bookkeeper.server.service.BookieService.doStart(BookieService.java:58)
        at 
org.apache.bookkeeper.common.component.AbstractLifecycleComponent.start(AbstractLifecycleComponent.java:78)
        at 
org.apache.bookkeeper.common.component.LifecycleComponentStack.lambda$start$2(LifecycleComponentStack.java:113)
        at 
com.google.common.collect.ImmutableList.forEach(ImmutableList.java:408)
        at 
org.apache.bookkeeper.common.component.LifecycleComponentStack.start(LifecycleComponentStack.java:113)
        at 
org.apache.bookkeeper.common.component.ComponentStarter.startComponent(ComponentStarter.java:80)
        at org.apache.bookkeeper.server.Main.doMain(Main.java:229)
        at org.apache.bookkeeper.server.Main.main(Main.java:203)
    ```
    
    As discussed on slack, it is hard to figure out an exact reason as to why 
the native JNI call fails with an invalid argument. Hence this PR proposes that 
the `lastLogMark` is updated to journal EOF instead of an arbitrary 
LONG.MAX_VALUE. The FileChannel interface defines that the implementors can 
pass in any long offset and the file handler should return EOF immediately when 
trying to read it. However it doesn't seem to be working as expected.
    
    ### Changes
    
    Updated `Journal#setLastLogMark()` method to accept an `scanOffset` instead 
of constant `LONG.MAX_VALUE`.
    
    ivankelly eolivelli
    
    Reviewers: Ivan Kelly <[email protected]>, Enrico Olivelli 
<[email protected]>
    
    This closes #2105 from karanmehta93/master
---
 .../src/main/java/org/apache/bookkeeper/bookie/Bookie.java    |  7 ++++---
 .../src/main/java/org/apache/bookkeeper/bookie/Journal.java   | 11 +++++++----
 2 files changed, 11 insertions(+), 7 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 467cb03..9bb9e5d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -921,10 +921,11 @@ public class Bookie extends BookieCriticalThread {
                 logPosition = markedLog.getLogFileOffset();
             }
             LOG.info("Replaying journal {} from position {}", id, logPosition);
-            journal.scanJournal(id, logPosition, scanner);
-            // Update LastLogMark to Long.MAX_VALUE position after replaying 
journal
+            long scanOffset = journal.scanJournal(id, logPosition, scanner);
+            // Update LastLogMark after completely replaying journal
+            // scanOffset will point to EOF position
             // After LedgerStorage flush, SyncThread should persist this to 
disk
-            journal.setLastLogMarkToEof(id);
+            journal.setLastLogMark(id, scanOffset);
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 8941b68..06fc5e2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -710,12 +710,13 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
     }
 
     /**
-     * Update lastLogMark of the journal,
+     * Update lastLogMark of the journal
      * Indicates that the file has been processed.
      * @param id
+     * @param scanOffset
      */
-    void setLastLogMarkToEof(Long id) {
-        lastLogMark.getCurMark().setLogMark(id, Long.MAX_VALUE);
+    void setLastLogMark(Long id, long scanOffset) {
+        lastLogMark.setCurLogMark(id, scanOffset);
     }
 
     /**
@@ -769,9 +770,10 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
      * @param journalId Journal Log Id
      * @param journalPos Offset to start scanning
      * @param scanner Scanner to handle entries
+     * @return scanOffset - represents the byte till which journal was read
      * @throws IOException
      */
-    public void scanJournal(long journalId, long journalPos, JournalScanner 
scanner)
+    public long scanJournal(long journalId, long journalPos, JournalScanner 
scanner)
         throws IOException {
         JournalChannel recLog;
         if (journalPos <= 0) {
@@ -832,6 +834,7 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                     scanner.process(journalVersion, offset, recBuff);
                 }
             }
+            return recLog.fc.position();
         } finally {
             recLog.close();
         }

Reply via email to