This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.15 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 25463d8e4f62049cd44eaae1a8ba99e2961ce3d8 Author: Hang Chen <[email protected]> AuthorDate: Mon Jun 19 19:40:50 2023 +0800 Support skip invalid journal record in replying journal stage (#3956) Co-authored-by: zhiyuanlei <[email protected]> (cherry picked from commit 5e9fdc2f81a7add645bba43f90ef630dffc1993f) --- .../org/apache/bookkeeper/bookie/BookieImpl.java | 2 +- .../java/org/apache/bookkeeper/bookie/Journal.java | 16 ++- .../bookie/storage/ldb/LedgersIndexRebuildOp.java | 2 +- .../bookkeeper/conf/ServerConfiguration.java | 21 ++++ .../cli/commands/bookie/ReadJournalCommand.java | 2 +- .../bookkeeper/bookie/BookieJournalTest.java | 117 ++++++++++++++++++++- 6 files changed, 151 insertions(+), 9 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index eda91147f1..c5c4185cbf 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -623,7 +623,7 @@ public class BookieImpl extends BookieCriticalThread implements Bookie { logPosition = markedLog.getLogFileOffset(); } LOG.info("Replaying journal {} from position {}", id, logPosition); - long scanOffset = journal.scanJournal(id, logPosition, scanner); + long scanOffset = journal.scanJournal(id, logPosition, scanner, conf.isSkipReplayJournalInvalidRecord()); // Update LastLogMark after completely replaying journal // scanOffset will point to EOF position // After LedgerStorage flush, SyncThread should persist this to disk diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 33e99d3ee5..a5aa12b70c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -869,13 +869,14 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { /** * Scan the journal. * - * @param journalId Journal Log Id - * @param journalPos Offset to start scanning - * @param scanner Scanner to handle entries + * @param journalId Journal Log Id + * @param journalPos Offset to start scanning + * @param scanner Scanner to handle entries + * @param skipInvalidRecord when invalid record,should we skip it or not * @return scanOffset - represents the byte till which journal was read * @throws IOException */ - public long scanJournal(long journalId, long journalPos, JournalScanner scanner) + public long scanJournal(long journalId, long journalPos, JournalScanner scanner, boolean skipInvalidRecord) throws IOException { JournalChannel recLog; if (journalPos <= 0) { @@ -939,6 +940,13 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { } } return recLog.fc.position(); + } catch (IOException e) { + if (skipInvalidRecord) { + LOG.warn("Failed to parse journal file, and skipInvalidRecord is true, skip this journal file reply"); + } else { + throw e; + } + return recLog.fc.position(); } finally { recLog.close(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java index 6e481ca569..746592df54 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java @@ -207,7 +207,7 @@ public class LedgersIndexRebuildOp { LOG.info("Found ledger {} in journal", ledgerId); } } - }); + }, false); } private void delete(Path path) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index c0e33d563b..606423b49b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -336,6 +336,8 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati // Used for location index, lots of writes and much bigger dataset protected static final String LEDGER_METADATA_ROCKSDB_CONF = "ledgerMetadataRocksdbConf"; + protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD = "skipReplayJournalInvalidRecord"; + /** * Construct a default configuration object. */ @@ -3913,6 +3915,25 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati return this.getBoolean(DATA_INTEGRITY_COOKIE_STAMPING_ENABLED, false); } + + /** + * When this config is set to true,if we replay journal failed, we will skip. + * @param skipReplayJournalInvalidRecord + * @return + */ + public ServerConfiguration setSkipReplayJournalInvalidRecord(boolean skipReplayJournalInvalidRecord) { + this.setProperty(SKIP_REPLAY_JOURNAL_INVALID_RECORD, + Boolean.toString(skipReplayJournalInvalidRecord)); + return this; + } + + /** + * @see #isSkipReplayJournalInvalidRecord . + */ + public boolean isSkipReplayJournalInvalidRecord() { + return this.getBoolean(SKIP_REPLAY_JOURNAL_INVALID_RECORD, false); + } + /** * Get default rocksdb conf. * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java index 66d31068b7..ab985070ff 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java @@ -211,6 +211,6 @@ public class ReadJournalCommand extends BookieCommand<ReadJournalCommand.ReadJou } private void scanJournal(Journal journal, long journalId, Journal.JournalScanner scanner) throws IOException { - journal.scanJournal(journalId, 0L, scanner); + journal.scanJournal(journalId, 0L, scanner, false); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java index e5a1d78d8c..1036d279ca 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java @@ -299,6 +299,52 @@ public class BookieJournalTest { return jc; } + private JournalChannel writeV4JournalWithInvalidRecord(File journalDir, + int numEntries, byte[] masterKey) throws Exception { + long logId = System.currentTimeMillis(); + JournalChannel jc = new JournalChannel(journalDir, logId); + + moveToPosition(jc, JournalChannel.VERSION_HEADER_SIZE); + + BufferedChannel bc = jc.getBufferedChannel(); + + byte[] data = new byte[1024]; + Arrays.fill(data, (byte) 'X'); + long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID; + for (int i = 0; i <= numEntries; i++) { + ByteBuf packet; + if (i == 0) { + packet = generateMetaEntry(1, masterKey); + } else { + packet = ClientUtil.generatePacket(1, i, lastConfirmed, i * data.length, data); + } + lastConfirmed = i; + ByteBuffer lenBuff = ByteBuffer.allocate(4); + if (i == numEntries - 1) { + //mock when flush data to file ,it writes an invalid entry to journal + lenBuff.putInt(-1); + } else { + lenBuff.putInt(packet.readableBytes()); + } + lenBuff.flip(); + bc.write(Unpooled.wrappedBuffer(lenBuff)); + bc.write(packet); + packet.release(); + } + + // write fence key + ByteBuf packet = generateFenceEntry(1); + ByteBuf lenBuf = Unpooled.buffer(); + lenBuf.writeInt(packet.readableBytes()); + //mock + bc.write(lenBuf); + bc.write(packet); + bc.flushAndForceWrite(false); + updateJournalVersion(jc, JournalChannel.V4); + + return jc; + } + static JournalChannel writeV5Journal(File journalDir, int numEntries, byte[] masterKey) throws Exception { return writeV5Journal(journalDir, numEntries, masterKey, false); @@ -844,7 +890,7 @@ public class BookieJournalTest { assertEquals(journalIds.size(), 1); try { - journal.scanJournal(journalIds.get(0), Long.MAX_VALUE, journalScanner); + journal.scanJournal(journalIds.get(0), Long.MAX_VALUE, journalScanner, false); fail("Should not have been able to scan the journal"); } catch (Exception e) { // Expected @@ -854,7 +900,74 @@ public class BookieJournalTest { b.shutdown(); } - private class DummyJournalScan implements Journal.JournalScanner { + /** + * Test for invalid record data during read of Journal. + */ + @Test + public void testJournalScanInvalidRecordWithSkipFlag() throws Exception { + File journalDir = createTempDir("bookie", "journal"); + BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(journalDir)); + + File ledgerDir = createTempDir("bookie", "ledger"); + BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(ledgerDir)); + + try { + writeV4JournalWithInvalidRecord(BookieImpl.getCurrentDirectory(journalDir), + 100, "testPasswd".getBytes()); + } catch (Exception e) { + fail(); + } + + + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + // Disabled skip broken journal files by default + conf.setJournalDirName(journalDir.getPath()) + .setLedgerDirNames(new String[] { ledgerDir.getPath() }) + .setMetadataServiceUri(null) + .setSkipReplayJournalInvalidRecord(true); + + Journal.JournalScanner journalScanner = new DummyJournalScan(); + + BookieImpl b = new TestBookieImpl(conf); + + for (Journal journal : b.journals) { + List<Long> journalIds = Journal.listJournalIds(journal.getJournalDirectory(), null); + assertEquals(journalIds.size(), 1); + try { + journal.scanJournal(journalIds.get(0), 0, journalScanner, conf.isSkipReplayJournalInvalidRecord()); + } catch (Exception e) { + fail("Should pass the journal scanning because we enabled skip flag by default."); + } + } + + b.shutdown(); + + // Disabled skip broken journal files by default + conf = TestBKConfiguration.newServerConfiguration(); + conf.setJournalDirName(journalDir.getPath()) + .setLedgerDirNames(new String[] { ledgerDir.getPath() }) + .setMetadataServiceUri(null); + + journalScanner = new DummyJournalScan(); + + b = new TestBookieImpl(conf); + + for (Journal journal : b.journals) { + List<Long> journalIds = Journal.listJournalIds(journal.getJournalDirectory(), null); + assertEquals(journalIds.size(), 1); + try { + journal.scanJournal(journalIds.get(0), 0, journalScanner, conf.isSkipReplayJournalInvalidRecord()); + fail("Should fail the journal scanning because of disabled skip flag"); + } catch (Exception e) { + // expected. + } + } + + b.shutdown(); + } + + + static class DummyJournalScan implements Journal.JournalScanner { @Override public void process(int journalVersion, long offset, ByteBuffer entry) throws IOException {
