This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-4.14 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 60d56e6d06dc9ef5645b1b2b6a6a1aa5d1fb85e7 Author: Hang Chen <[email protected]> AuthorDate: Sun Jun 25 23:58:57 2023 +0800 Fix data lost when configured multiple ledger directories (#3329) (cherry picked from commit 8a76703ee44b1f5af9eaedd68a53368dbf5855f0) --- .../java/org/apache/bookkeeper/bookie/Bookie.java | 5 + .../java/org/apache/bookkeeper/bookie/Journal.java | 2 +- .../ldb/SingleDirectoryDbLedgerStorage.java | 7 +- .../bookie/storage/ldb/DbLedgerStorageTest.java | 183 +++++++++++++++++++++ .../TestCompatUpgradeWithHostnameBookieId.groovy | 2 +- 5 files changed, 196 insertions(+), 3 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index 0e7e62d04d..f691f320ee 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -1714,4 +1714,9 @@ public class Bookie extends BookieCriticalThread { } } } + + @VisibleForTesting + public List<Journal> getJournals() { + return this.journals; + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index ba09dc1b2e..340d67bd33 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -229,7 +229,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { * The last mark should first be max journal log id, * and then max log position in max journal log. */ - void readLog() { + public void readLog() { byte[] buff = new byte[16]; ByteBuffer bb = ByteBuffer.wrap(buff); LogMark mark = new LogMark(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 23013e0139..db05913790 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -137,6 +137,8 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage private final long maxReadAheadBytesSize; + private final boolean singleLedgerDirs; + public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource, Checkpointer checkpointer, StatsLogger statsLogger, @@ -152,6 +154,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage this.writeCacheMaxSize = writeCacheSize; this.writeCache = new WriteCache(allocator, writeCacheMaxSize / 2); this.writeCacheBeingFlushed = new WriteCache(allocator, writeCacheMaxSize / 2); + this.singleLedgerDirs = conf.getLedgerDirs().length == 1; this.checkpointSource = checkpointSource; @@ -717,7 +720,9 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage public void flush() throws IOException { Checkpoint cp = checkpointSource.newCheckpoint(); checkpoint(cp); - checkpointSource.checkpointComplete(cp, true); + if (singleLedgerDirs) { + checkpointSource.checkpointComplete(cp, true); + } } @Override diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java index a39d380e6d..cb667c57df 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java @@ -31,15 +31,20 @@ import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.Bookie.NoEntryException; import org.apache.bookkeeper.bookie.BookieException; +import org.apache.bookkeeper.bookie.CheckpointSource; +import org.apache.bookkeeper.bookie.CheckpointSourceList; import org.apache.bookkeeper.bookie.EntryLocation; import org.apache.bookkeeper.bookie.EntryLogger; import org.apache.bookkeeper.bookie.LedgerDirsManager; +import org.apache.bookkeeper.bookie.LogMark; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.proto.BookieProtocol; @@ -445,4 +450,182 @@ public class DbLedgerStorageTest { // and another is EntryLogManagerForEntryLogPerLedger assertEquals(2, ledgerDirsManager.getListeners().size()); } + + @Test + public void testMultiLedgerDirectoryCheckpoint() throws Exception { + int gcWaitTime = 1000; + File firstDir = new File(tmpDir, "dir1"); + File secondDir = new File(tmpDir, "dir2"); + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + conf.setGcWaitTime(gcWaitTime); + conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4); + conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4); + conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); + conf.setLedgerDirNames(new String[] { firstDir.getCanonicalPath(), secondDir.getCanonicalPath() }); + + Bookie bookie = new Bookie(conf); + ByteBuf entry1 = Unpooled.buffer(1024); + entry1.writeLong(1); // ledger id + entry1.writeLong(2); // entry id + entry1.writeBytes("entry-1".getBytes()); + + bookie.getLedgerStorage().addEntry(entry1); + // write one entry to first ledger directory and flush with logMark(1, 2), + // only the first ledger directory should have lastMark + bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2); + ((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush(); + + File firstDirMark = new File(firstDir + "/current", "lastMark"); + File secondDirMark = new File(secondDir + "/current", "lastMark"); + + // LedgerStorage flush won't trigger lastMark update due to two ledger directories configured + try { + readLogMark(firstDirMark); + readLogMark(secondDirMark); + fail(); + } catch (Exception e) { + // + } + + // write the second entry to second leger directory and flush with log(4, 5), + // the fist ledger directory's lastMark is (1, 2) and the second ledger directory's lastMark is (4, 5); + ByteBuf entry2 = Unpooled.buffer(1024); + entry2.writeLong(2); // ledger id + entry2.writeLong(1); // entry id + entry2.writeBytes("entry-2".getBytes()); + + bookie.getLedgerStorage().addEntry(entry2); + // write one entry to first ledger directory and flush with logMark(1, 2), + // only the first ledger directory should have lastMark + bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5); + ((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(1).flush(); + + // LedgerStorage flush won't trigger lastMark update due to two ledger directories configured + try { + readLogMark(firstDirMark); + readLogMark(secondDirMark); + fail(); + } catch (Exception e) { + // + } + + // The dbLedgerStorage flush also won't trigger lastMark update due to two ledger directories configured. + bookie.getLedgerStorage().flush(); + try { + readLogMark(firstDirMark); + readLogMark(secondDirMark); + fail(); + } catch (Exception e) { + // + } + + // trigger checkpoint simulate SyncThread do checkpoint. + CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals()); + bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8); + CheckpointSource.Checkpoint checkpoint = checkpointSource.newCheckpoint(); + checkpointSource.checkpointComplete(checkpoint, false); + + try { + LogMark firstLogMark = readLogMark(firstDirMark); + LogMark secondLogMark = readLogMark(secondDirMark); + assertEquals(7, firstLogMark.getLogFileId()); + assertEquals(8, firstLogMark.getLogFileOffset()); + assertEquals(7, secondLogMark.getLogFileId()); + assertEquals(8, secondLogMark.getLogFileOffset()); + } catch (Exception e) { + fail(); + } + + // test replay journal lastMark, to make sure we get the right LastMark position + bookie.getJournals().get(0).getLastLogMark().readLog(); + LogMark logMark = bookie.getJournals().get(0).getLastLogMark().getCurMark(); + assertEquals(7, logMark.getLogFileId()); + assertEquals(8, logMark.getLogFileOffset()); + } + + private LogMark readLogMark(File file) throws IOException { + byte[] buff = new byte[16]; + ByteBuffer bb = ByteBuffer.wrap(buff); + LogMark mark = new LogMark(); + try (FileInputStream fis = new FileInputStream(file)) { + int bytesRead = fis.read(buff); + if (bytesRead != 16) { + throw new IOException("Couldn't read enough bytes from lastMark." + + " Wanted " + 16 + ", got " + bytesRead); + } + } + bb.clear(); + mark.readLogMark(bb); + + return mark; + } + + @Test + public void testSingleLedgerDirectoryCheckpoint() throws Exception { + int gcWaitTime = 1000; + File ledgerDir = new File(tmpDir, "dir"); + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + conf.setGcWaitTime(gcWaitTime); + conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4); + conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4); + conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); + conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() }); + + Bookie bookie = new Bookie(conf); + ByteBuf entry1 = Unpooled.buffer(1024); + entry1.writeLong(1); // ledger id + entry1.writeLong(2); // entry id + entry1.writeBytes("entry-1".getBytes()); + bookie.getLedgerStorage().addEntry(entry1); + + bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2); + ((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush(); + + File ledgerDirMark = new File(ledgerDir + "/current", "lastMark"); + try { + LogMark logMark = readLogMark(ledgerDirMark); + assertEquals(1, logMark.getLogFileId()); + assertEquals(2, logMark.getLogFileOffset()); + } catch (Exception e) { + fail(); + } + + ByteBuf entry2 = Unpooled.buffer(1024); + entry2.writeLong(2); // ledger id + entry2.writeLong(1); // entry id + entry2.writeBytes("entry-2".getBytes()); + + bookie.getLedgerStorage().addEntry(entry2); + // write one entry to first ledger directory and flush with logMark(1, 2), + // only the first ledger directory should have lastMark + bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5); + + bookie.getLedgerStorage().flush(); + try { + LogMark logMark = readLogMark(ledgerDirMark); + assertEquals(4, logMark.getLogFileId()); + assertEquals(5, logMark.getLogFileOffset()); + } catch (Exception e) { + fail(); + } + + CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals()); + bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8); + CheckpointSource.Checkpoint checkpoint = checkpointSource.newCheckpoint(); + checkpointSource.checkpointComplete(checkpoint, false); + + try { + LogMark firstLogMark = readLogMark(ledgerDirMark); + assertEquals(7, firstLogMark.getLogFileId()); + assertEquals(8, firstLogMark.getLogFileOffset()); + } catch (Exception e) { + fail(); + } + + // test replay journal lastMark, to make sure we get the right LastMark position + bookie.getJournals().get(0).getLastLogMark().readLog(); + LogMark logMark = bookie.getJournals().get(0).getLastLogMark().getCurMark(); + assertEquals(7, logMark.getLogFileId()); + assertEquals(8, logMark.getLogFileOffset()); + } } diff --git a/tests/backward-compat/hostname-bookieid/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeWithHostnameBookieId.groovy b/tests/backward-compat/hostname-bookieid/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeWithHostnameBookieId.groovy index eb6114da57..e4a6cfb614 100644 --- a/tests/backward-compat/hostname-bookieid/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeWithHostnameBookieId.groovy +++ b/tests/backward-compat/hostname-bookieid/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeWithHostnameBookieId.groovy @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information
