This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-4.14 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 51adea54526e870b3967c8e49d78aa294b280eed Author: Hang Chen <[email protected]> AuthorDate: Mon Jun 19 19:40:50 2023 +0800 Support skip invalid journal record in replying journal stage (#3956) Co-authored-by: zhiyuanlei <[email protected]> (cherry picked from commit 5e9fdc2f81a7add645bba43f90ef630dffc1993f) --- .../java/org/apache/bookkeeper/bookie/Bookie.java | 2 +- .../java/org/apache/bookkeeper/bookie/Journal.java | 16 ++- .../bookkeeper/conf/ServerConfiguration.java | 21 +++- .../cli/commands/bookie/ReadJournalCommand.java | 2 +- .../bookkeeper/bookie/BookieJournalTest.java | 117 ++++++++++++++++++++- 5 files changed, 149 insertions(+), 9 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index 1a29ff1811..0e7e62d04d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -994,7 +994,7 @@ public class Bookie extends BookieCriticalThread { logPosition = markedLog.getLogFileOffset(); } LOG.info("Replaying journal {} from position {}", id, logPosition); - long scanOffset = journal.scanJournal(id, logPosition, scanner); + long scanOffset = journal.scanJournal(id, logPosition, scanner, conf.isSkipReplayJournalInvalidRecord()); // Update LastLogMark after completely replaying journal // scanOffset will point to EOF position // After LedgerStorage flush, SyncThread should persist this to disk diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index b01cc3115e..ba09dc1b2e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -777,13 +777,14 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { /** * Scan the journal. * - * @param journalId Journal Log Id - * @param journalPos Offset to start scanning - * @param scanner Scanner to handle entries + * @param journalId Journal Log Id + * @param journalPos Offset to start scanning + * @param scanner Scanner to handle entries + * @param skipInvalidRecord when invalid record,should we skip it or not * @return scanOffset - represents the byte till which journal was read * @throws IOException */ - public long scanJournal(long journalId, long journalPos, JournalScanner scanner) + public long scanJournal(long journalId, long journalPos, JournalScanner scanner, boolean skipInvalidRecord) throws IOException { JournalChannel recLog; if (journalPos <= 0) { @@ -846,6 +847,13 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { } } return recLog.fc.position(); + } catch (IOException e) { + if (skipInvalidRecord) { + LOG.warn("Failed to parse journal file, and skipInvalidRecord is true, skip this journal file reply"); + } else { + throw e; + } + return recLog.fc.position(); } finally { recLog.close(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 367132bc47..18c7e32112 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -309,6 +309,8 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati protected static final String AUTHORIZED_ROLES = "authorizedRoles"; protected static final String ROCKSDB_DELETE_ENTRIES_BATCH_SIZE = "rocksDBDeleteEntriesBatchSize"; + protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD = "skipReplayJournalInvalidRecord"; + /** * Construct a default configuration object. */ @@ -3617,7 +3619,24 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati } /** - * Get entry log location index delete entries batch size from RocksDB. + * When this config is set to true,if we replay journal failed, we will skip. + * @param skipReplayJournalInvalidRecord + * @return + */ + public ServerConfiguration setSkipReplayJournalInvalidRecord(boolean skipReplayJournalInvalidRecord) { + this.setProperty(SKIP_REPLAY_JOURNAL_INVALID_RECORD, + Boolean.toString(skipReplayJournalInvalidRecord)); + return this; + } + + /** + * @see #isSkipReplayJournalInvalidRecord . + */ + public boolean isSkipReplayJournalInvalidRecord() { + return this.getBoolean(SKIP_REPLAY_JOURNAL_INVALID_RECORD, false); + } + + /** * * @return Int rocksDB delete entries batch size configured in Service configuration. */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java index b2c63f4991..d38f236c55 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java @@ -211,6 +211,6 @@ public class ReadJournalCommand extends BookieCommand<ReadJournalCommand.ReadJou } private void scanJournal(Journal journal, long journalId, Journal.JournalScanner scanner) throws IOException { - journal.scanJournal(journalId, 0L, scanner); + journal.scanJournal(journalId, 0L, scanner, false); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java index 04b2138031..4043cb5b62 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java @@ -297,6 +297,52 @@ public class BookieJournalTest { return jc; } + private JournalChannel writeV4JournalWithInvalidRecord(File journalDir, + int numEntries, byte[] masterKey) throws Exception { + long logId = System.currentTimeMillis(); + JournalChannel jc = new JournalChannel(journalDir, logId); + + moveToPosition(jc, JournalChannel.VERSION_HEADER_SIZE); + + BufferedChannel bc = jc.getBufferedChannel(); + + byte[] data = new byte[1024]; + Arrays.fill(data, (byte) 'X'); + long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID; + for (int i = 0; i <= numEntries; i++) { + ByteBuf packet; + if (i == 0) { + packet = generateMetaEntry(1, masterKey); + } else { + packet = ClientUtil.generatePacket(1, i, lastConfirmed, i * data.length, data); + } + lastConfirmed = i; + ByteBuffer lenBuff = ByteBuffer.allocate(4); + if (i == numEntries - 1) { + //mock when flush data to file ,it writes an invalid entry to journal + lenBuff.putInt(-1); + } else { + lenBuff.putInt(packet.readableBytes()); + } + lenBuff.flip(); + bc.write(Unpooled.wrappedBuffer(lenBuff)); + bc.write(packet); + packet.release(); + } + + // write fence key + ByteBuf packet = generateFenceEntry(1); + ByteBuf lenBuf = Unpooled.buffer(); + lenBuf.writeInt(packet.readableBytes()); + //mock + bc.write(lenBuf); + bc.write(packet); + bc.flushAndForceWrite(false); + updateJournalVersion(jc, JournalChannel.V4); + + return jc; + } + static JournalChannel writeV5Journal(File journalDir, int numEntries, byte[] masterKey) throws Exception { return writeV5Journal(journalDir, numEntries, masterKey, false); @@ -838,7 +884,7 @@ public class BookieJournalTest { assertEquals(journalIds.size(), 1); try { - journal.scanJournal(journalIds.get(0), Long.MAX_VALUE, journalScanner); + journal.scanJournal(journalIds.get(0), Long.MAX_VALUE, journalScanner, false); fail("Should not have been able to scan the journal"); } catch (Exception e) { // Expected @@ -848,7 +894,74 @@ public class BookieJournalTest { b.shutdown(); } - private class DummyJournalScan implements Journal.JournalScanner { + /** + * Test for invalid record data during read of Journal. + */ + @Test + public void testJournalScanInvalidRecordWithSkipFlag() throws Exception { + File journalDir = createTempDir("bookie", "journal"); + Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir)); + + File ledgerDir = createTempDir("bookie", "ledger"); + Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir)); + + try { + writeV4JournalWithInvalidRecord(Bookie.getCurrentDirectory(journalDir), + 100, "testPasswd".getBytes()); + } catch (Exception e) { + fail(); + } + + + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + // Disabled skip broken journal files by default + conf.setJournalDirName(journalDir.getPath()) + .setLedgerDirNames(new String[] { ledgerDir.getPath() }) + .setMetadataServiceUri(null) + .setSkipReplayJournalInvalidRecord(true); + + Journal.JournalScanner journalScanner = new DummyJournalScan(); + + Bookie b = new Bookie(conf); + + for (Journal journal : b.journals) { + List<Long> journalIds = Journal.listJournalIds(journal.getJournalDirectory(), null); + assertEquals(journalIds.size(), 1); + try { + journal.scanJournal(journalIds.get(0), 0, journalScanner, conf.isSkipReplayJournalInvalidRecord()); + } catch (Exception e) { + fail("Should pass the journal scanning because we enabled skip flag by default."); + } + } + + b.shutdown(); + + // Disabled skip broken journal files by default + conf = TestBKConfiguration.newServerConfiguration(); + conf.setJournalDirName(journalDir.getPath()) + .setLedgerDirNames(new String[] { ledgerDir.getPath() }) + .setMetadataServiceUri(null); + + journalScanner = new DummyJournalScan(); + + b = new Bookie(conf); + + for (Journal journal : b.journals) { + List<Long> journalIds = Journal.listJournalIds(journal.getJournalDirectory(), null); + assertEquals(journalIds.size(), 1); + try { + journal.scanJournal(journalIds.get(0), 0, journalScanner, conf.isSkipReplayJournalInvalidRecord()); + fail("Should fail the journal scanning because of disabled skip flag"); + } catch (Exception e) { + // expected. + } + } + + b.shutdown(); + } + + + static class DummyJournalScan implements Journal.JournalScanner { @Override public void process(int journalVersion, long offset, ByteBuffer entry) throws IOException {
