This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit ae6b454498180854f84497343a8785af5eb9b2a3
Author: Hang Chen <[email protected]>
AuthorDate: Sun Jun 25 23:58:57 2023 +0800

    Fix data lost when configured multiple ledger directories (#3329)
    
    (cherry picked from commit 8a76703ee44b1f5af9eaedd68a53368dbf5855f0)
---
 .../org/apache/bookkeeper/bookie/BookieImpl.java   |   5 +
 .../java/org/apache/bookkeeper/bookie/Journal.java |   2 +-
 .../ldb/SingleDirectoryDbLedgerStorage.java        |   6 +-
 .../bookie/storage/ldb/DbLedgerStorageTest.java    | 183 +++++++++++++++++++++
 4 files changed, 194 insertions(+), 2 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index 5ae5b9d481..eda91147f1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -1246,4 +1246,9 @@ public class BookieImpl extends BookieCriticalThread 
implements Bookie {
             }
         }
     }
+
+    @VisibleForTesting
+    public List<Journal> getJournals() {
+        return this.journals;
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 6105d00012..33e99d3ee5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -229,7 +229,7 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
          * The last mark should first be max journal log id,
          * and then max log position in max journal log.
          */
-        void readLog() {
+        public void readLog() {
             byte[] buff = new byte[16];
             ByteBuffer bb = ByteBuffer.wrap(buff);
             LogMark mark = new LogMark();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index d8646c1e43..455f19fbe9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -143,6 +143,7 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
     private final long maxReadAheadBytesSize;
 
     private final Counter flushExecutorTime;
+    private final boolean singleLedgerDirs;
 
     public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, 
LedgerManager ledgerManager,
             LedgerDirsManager ledgerDirsManager, LedgerDirsManager 
indexDirsManager, StatsLogger statsLogger,
@@ -160,6 +161,7 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
         this.writeCacheMaxSize = writeCacheSize;
         this.writeCache = new WriteCache(allocator, writeCacheMaxSize / 2);
         this.writeCacheBeingFlushed = new WriteCache(allocator, 
writeCacheMaxSize / 2);
+        this.singleLedgerDirs = conf.getLedgerDirs().length == 1;
 
         readCacheMaxSize = readCacheSize;
         this.readAheadCacheBatchSize = readAheadCacheBatchSize;
@@ -821,7 +823,9 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
     public void flush() throws IOException {
         Checkpoint cp = checkpointSource.newCheckpoint();
         checkpoint(cp);
-        checkpointSource.checkpointComplete(cp, true);
+        if (singleLedgerDirs) {
+            checkpointSource.checkpointComplete(cp, true);
+        }
     }
 
     @Override
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
index de62611daa..e3fabd55b4 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -29,16 +29,21 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.BookieImpl;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSourceList;
 import org.apache.bookkeeper.bookie.EntryLocation;
 import org.apache.bookkeeper.bookie.EntryLogger;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.LogMark;
 import org.apache.bookkeeper.bookie.TestBookieImpl;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -630,4 +635,182 @@ public class DbLedgerStorageTest {
 
         storage = (DbLedgerStorage) new 
TestBookieImpl(conf).getLedgerStorage();
     }
+
+    @Test
+    public void testMultiLedgerDirectoryCheckpoint() throws Exception {
+        int gcWaitTime = 1000;
+        File firstDir = new File(tmpDir, "dir1");
+        File secondDir = new File(tmpDir, "dir2");
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
+        conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        conf.setLedgerDirNames(new String[] { firstDir.getCanonicalPath(), 
secondDir.getCanonicalPath() });
+
+        BookieImpl bookie = new TestBookieImpl(conf);
+        ByteBuf entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(2); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+
+        bookie.getLedgerStorage().addEntry(entry1);
+        // write one entry to first ledger directory and flush with logMark(1, 
2),
+        // only the first ledger directory should have lastMark
+        
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2);
+        ((DbLedgerStorage) 
bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush();
+
+        File firstDirMark = new File(firstDir + "/current", "lastMark");
+        File secondDirMark = new File(secondDir + "/current", "lastMark");
+
+        // LedgerStorage flush won't trigger lastMark update due to two ledger 
directories configured
+        try {
+            readLogMark(firstDirMark);
+            readLogMark(secondDirMark);
+            fail();
+        } catch (Exception e) {
+            //
+        }
+
+        // write the second entry to second leger directory and flush with 
log(4, 5),
+        // the fist ledger directory's lastMark is (1, 2) and the second 
ledger directory's lastMark is (4, 5);
+        ByteBuf entry2 = Unpooled.buffer(1024);
+        entry2.writeLong(2); // ledger id
+        entry2.writeLong(1); // entry id
+        entry2.writeBytes("entry-2".getBytes());
+
+        bookie.getLedgerStorage().addEntry(entry2);
+        // write one entry to first ledger directory and flush with logMark(1, 
2),
+        // only the first ledger directory should have lastMark
+        
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5);
+        ((DbLedgerStorage) 
bookie.getLedgerStorage()).getLedgerStorageList().get(1).flush();
+
+        // LedgerStorage flush won't trigger lastMark update due to two ledger 
directories configured
+        try {
+            readLogMark(firstDirMark);
+            readLogMark(secondDirMark);
+            fail();
+        } catch (Exception e) {
+            //
+        }
+
+        // The dbLedgerStorage flush also won't trigger lastMark update due to 
two ledger directories configured.
+        bookie.getLedgerStorage().flush();
+        try {
+            readLogMark(firstDirMark);
+            readLogMark(secondDirMark);
+            fail();
+        } catch (Exception e) {
+            //
+        }
+
+        // trigger checkpoint simulate SyncThread do checkpoint.
+        CheckpointSource checkpointSource = new 
CheckpointSourceList(bookie.getJournals());
+        
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8);
+        CheckpointSource.Checkpoint checkpoint = 
checkpointSource.newCheckpoint();
+        checkpointSource.checkpointComplete(checkpoint, false);
+
+        try {
+            LogMark firstLogMark = readLogMark(firstDirMark);
+            LogMark secondLogMark = readLogMark(secondDirMark);
+            assertEquals(7, firstLogMark.getLogFileId());
+            assertEquals(8, firstLogMark.getLogFileOffset());
+            assertEquals(7, secondLogMark.getLogFileId());
+            assertEquals(8, secondLogMark.getLogFileOffset());
+        } catch (Exception e) {
+            fail();
+        }
+
+        // test replay journal lastMark, to make sure we get the right 
LastMark position
+        bookie.getJournals().get(0).getLastLogMark().readLog();
+        LogMark logMark = 
bookie.getJournals().get(0).getLastLogMark().getCurMark();
+        assertEquals(7, logMark.getLogFileId());
+        assertEquals(8, logMark.getLogFileOffset());
+    }
+
+    private LogMark readLogMark(File file) throws IOException {
+        byte[] buff = new byte[16];
+        ByteBuffer bb = ByteBuffer.wrap(buff);
+        LogMark mark = new LogMark();
+        try (FileInputStream fis = new FileInputStream(file)) {
+            int bytesRead = fis.read(buff);
+            if (bytesRead != 16) {
+                throw new IOException("Couldn't read enough bytes from 
lastMark."
+                    + " Wanted " + 16 + ", got " + bytesRead);
+            }
+        }
+        bb.clear();
+        mark.readLogMark(bb);
+
+        return mark;
+    }
+
+    @Test
+    public void testSingleLedgerDirectoryCheckpoint() throws Exception {
+        int gcWaitTime = 1000;
+        File ledgerDir = new File(tmpDir, "dir");
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
+        conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() });
+
+        BookieImpl bookie = new TestBookieImpl(conf);
+        ByteBuf entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(2); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+        bookie.getLedgerStorage().addEntry(entry1);
+
+        
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2);
+        ((DbLedgerStorage) 
bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush();
+
+        File ledgerDirMark = new File(ledgerDir + "/current", "lastMark");
+        try {
+            LogMark logMark = readLogMark(ledgerDirMark);
+            assertEquals(1, logMark.getLogFileId());
+            assertEquals(2, logMark.getLogFileOffset());
+        } catch (Exception e) {
+            fail();
+        }
+
+        ByteBuf entry2 = Unpooled.buffer(1024);
+        entry2.writeLong(2); // ledger id
+        entry2.writeLong(1); // entry id
+        entry2.writeBytes("entry-2".getBytes());
+
+        bookie.getLedgerStorage().addEntry(entry2);
+        // write one entry to first ledger directory and flush with logMark(1, 
2),
+        // only the first ledger directory should have lastMark
+        
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5);
+
+        bookie.getLedgerStorage().flush();
+        try {
+            LogMark logMark = readLogMark(ledgerDirMark);
+            assertEquals(4, logMark.getLogFileId());
+            assertEquals(5, logMark.getLogFileOffset());
+        } catch (Exception e) {
+            fail();
+        }
+
+        CheckpointSource checkpointSource = new 
CheckpointSourceList(bookie.getJournals());
+        
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8);
+        CheckpointSource.Checkpoint checkpoint = 
checkpointSource.newCheckpoint();
+        checkpointSource.checkpointComplete(checkpoint, false);
+
+        try {
+            LogMark firstLogMark = readLogMark(ledgerDirMark);
+            assertEquals(7, firstLogMark.getLogFileId());
+            assertEquals(8, firstLogMark.getLogFileOffset());
+        } catch (Exception e) {
+            fail();
+        }
+
+        // test replay journal lastMark, to make sure we get the right 
LastMark position
+        bookie.getJournals().get(0).getLastLogMark().readLog();
+        LogMark logMark = 
bookie.getJournals().get(0).getLastLogMark().getCurMark();
+        assertEquals(7, logMark.getLogFileId());
+        assertEquals(8, logMark.getLogFileOffset());
+    }
 }

Reply via email to