This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 5e9fdc2f81 Support skip invalid journal record in replying journal
stage (#3956)
5e9fdc2f81 is described below
commit 5e9fdc2f81a7add645bba43f90ef630dffc1993f
Author: Hang Chen <[email protected]>
AuthorDate: Mon Jun 19 19:40:50 2023 +0800
Support skip invalid journal record in replying journal stage (#3956)
Co-authored-by: zhiyuanlei <[email protected]>
---
.../org/apache/bookkeeper/bookie/BookieImpl.java | 2 +-
.../java/org/apache/bookkeeper/bookie/Journal.java | 16 ++-
.../bookie/storage/ldb/LedgersIndexRebuildOp.java | 2 +-
.../bookkeeper/conf/ServerConfiguration.java | 21 ++++
.../cli/commands/bookie/ReadJournalCommand.java | 2 +-
.../bookkeeper/bookie/BookieJournalTest.java | 117 ++++++++++++++++++++-
6 files changed, 151 insertions(+), 9 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index 0628ec28af..d0c8c567fd 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -631,7 +631,7 @@ public class BookieImpl extends BookieCriticalThread
implements Bookie {
logPosition = markedLog.getLogFileOffset();
}
LOG.info("Replaying journal {} from position {}", id, logPosition);
- long scanOffset = journal.scanJournal(id, logPosition, scanner);
+ long scanOffset = journal.scanJournal(id, logPosition, scanner,
conf.isSkipReplayJournalInvalidRecord());
// Update LastLogMark after completely replaying journal
// scanOffset will point to EOF position
// After LedgerStorage flush, SyncThread should persist this to
disk
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index d8df972878..c362e2d626 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -792,13 +792,14 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
/**
* Scan the journal.
*
- * @param journalId Journal Log Id
- * @param journalPos Offset to start scanning
- * @param scanner Scanner to handle entries
+ * @param journalId Journal Log Id
+ * @param journalPos Offset to start scanning
+ * @param scanner Scanner to handle entries
+ * @param skipInvalidRecord when invalid record,should we skip it or not
* @return scanOffset - represents the byte till which journal was read
* @throws IOException
*/
- public long scanJournal(long journalId, long journalPos, JournalScanner
scanner)
+ public long scanJournal(long journalId, long journalPos, JournalScanner
scanner, boolean skipInvalidRecord)
throws IOException {
JournalChannel recLog;
if (journalPos <= 0) {
@@ -862,6 +863,13 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
}
}
return recLog.fc.position();
+ } catch (IOException e) {
+ if (skipInvalidRecord) {
+ LOG.warn("Failed to parse journal file, and skipInvalidRecord
is true, skip this journal file reply");
+ } else {
+ throw e;
+ }
+ return recLog.fc.position();
} finally {
recLog.close();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
index f6969237b8..2725897e80 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
@@ -222,7 +222,7 @@ public class LedgersIndexRebuildOp {
LOG.info("Found ledger {} in journal", ledgerId);
}
}
- });
+ }, false);
}
private void delete(Path path) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 3b58610848..14d98ce43c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -339,6 +339,8 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
// Used for location index, lots of writes and much bigger dataset
protected static final String LEDGER_METADATA_ROCKSDB_CONF =
"ledgerMetadataRocksdbConf";
+ protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD =
"skipReplayJournalInvalidRecord";
+
/**
* Construct a default configuration object.
*/
@@ -4010,6 +4012,25 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
return this.getBoolean(DATA_INTEGRITY_COOKIE_STAMPING_ENABLED, false);
}
+
+ /**
+ * When this config is set to true,if we replay journal failed, we will
skip.
+ * @param skipReplayJournalInvalidRecord
+ * @return
+ */
+ public ServerConfiguration setSkipReplayJournalInvalidRecord(boolean
skipReplayJournalInvalidRecord) {
+ this.setProperty(SKIP_REPLAY_JOURNAL_INVALID_RECORD,
+ Boolean.toString(skipReplayJournalInvalidRecord));
+ return this;
+ }
+
+ /**
+ * @see #isSkipReplayJournalInvalidRecord .
+ */
+ public boolean isSkipReplayJournalInvalidRecord() {
+ return this.getBoolean(SKIP_REPLAY_JOURNAL_INVALID_RECORD, false);
+ }
+
/**
* Get default rocksdb conf.
*
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java
index 66d31068b7..ab985070ff 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java
@@ -211,6 +211,6 @@ public class ReadJournalCommand extends
BookieCommand<ReadJournalCommand.ReadJou
}
private void scanJournal(Journal journal, long journalId,
Journal.JournalScanner scanner) throws IOException {
- journal.scanJournal(journalId, 0L, scanner);
+ journal.scanJournal(journalId, 0L, scanner, false);
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index eeee8ca050..a048bffc8a 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -299,6 +299,52 @@ public class BookieJournalTest {
return jc;
}
+ private JournalChannel writeV4JournalWithInvalidRecord(File journalDir,
+ int numEntries,
byte[] masterKey) throws Exception {
+ long logId = System.currentTimeMillis();
+ JournalChannel jc = new JournalChannel(journalDir, logId);
+
+ moveToPosition(jc, JournalChannel.VERSION_HEADER_SIZE);
+
+ BufferedChannel bc = jc.getBufferedChannel();
+
+ byte[] data = new byte[1024];
+ Arrays.fill(data, (byte) 'X');
+ long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
+ for (int i = 0; i <= numEntries; i++) {
+ ByteBuf packet;
+ if (i == 0) {
+ packet = generateMetaEntry(1, masterKey);
+ } else {
+ packet = ClientUtil.generatePacket(1, i, lastConfirmed, i *
data.length, data);
+ }
+ lastConfirmed = i;
+ ByteBuffer lenBuff = ByteBuffer.allocate(4);
+ if (i == numEntries - 1) {
+ //mock when flush data to file ,it writes an invalid entry to
journal
+ lenBuff.putInt(-1);
+ } else {
+ lenBuff.putInt(packet.readableBytes());
+ }
+ lenBuff.flip();
+ bc.write(Unpooled.wrappedBuffer(lenBuff));
+ bc.write(packet);
+ packet.release();
+ }
+
+ // write fence key
+ ByteBuf packet = generateFenceEntry(1);
+ ByteBuf lenBuf = Unpooled.buffer();
+ lenBuf.writeInt(packet.readableBytes());
+ //mock
+ bc.write(lenBuf);
+ bc.write(packet);
+ bc.flushAndForceWrite(false);
+ updateJournalVersion(jc, JournalChannel.V4);
+
+ return jc;
+ }
+
static JournalChannel writeV5Journal(File journalDir, int numEntries,
byte[] masterKey) throws Exception {
return writeV5Journal(journalDir, numEntries, masterKey, false);
@@ -844,7 +890,7 @@ public class BookieJournalTest {
assertEquals(journalIds.size(), 1);
try {
- journal.scanJournal(journalIds.get(0), Long.MAX_VALUE,
journalScanner);
+ journal.scanJournal(journalIds.get(0), Long.MAX_VALUE,
journalScanner, false);
fail("Should not have been able to scan the journal");
} catch (Exception e) {
// Expected
@@ -854,7 +900,74 @@ public class BookieJournalTest {
b.shutdown();
}
- private class DummyJournalScan implements Journal.JournalScanner {
+ /**
+ * Test for invalid record data during read of Journal.
+ */
+ @Test
+ public void testJournalScanInvalidRecordWithSkipFlag() throws Exception {
+ File journalDir = createTempDir("bookie", "journal");
+
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(journalDir));
+
+ File ledgerDir = createTempDir("bookie", "ledger");
+
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(ledgerDir));
+
+ try {
+
writeV4JournalWithInvalidRecord(BookieImpl.getCurrentDirectory(journalDir),
+ 100, "testPasswd".getBytes());
+ } catch (Exception e) {
+ fail();
+ }
+
+
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ // Disabled skip broken journal files by default
+ conf.setJournalDirName(journalDir.getPath())
+ .setLedgerDirNames(new String[] { ledgerDir.getPath() })
+ .setMetadataServiceUri(null)
+ .setSkipReplayJournalInvalidRecord(true);
+
+ Journal.JournalScanner journalScanner = new DummyJournalScan();
+
+ BookieImpl b = new TestBookieImpl(conf);
+
+ for (Journal journal : b.journals) {
+ List<Long> journalIds =
Journal.listJournalIds(journal.getJournalDirectory(), null);
+ assertEquals(journalIds.size(), 1);
+ try {
+ journal.scanJournal(journalIds.get(0), 0, journalScanner,
conf.isSkipReplayJournalInvalidRecord());
+ } catch (Exception e) {
+ fail("Should pass the journal scanning because we enabled skip
flag by default.");
+ }
+ }
+
+ b.shutdown();
+
+ // Disabled skip broken journal files by default
+ conf = TestBKConfiguration.newServerConfiguration();
+ conf.setJournalDirName(journalDir.getPath())
+ .setLedgerDirNames(new String[] { ledgerDir.getPath() })
+ .setMetadataServiceUri(null);
+
+ journalScanner = new DummyJournalScan();
+
+ b = new TestBookieImpl(conf);
+
+ for (Journal journal : b.journals) {
+ List<Long> journalIds =
Journal.listJournalIds(journal.getJournalDirectory(), null);
+ assertEquals(journalIds.size(), 1);
+ try {
+ journal.scanJournal(journalIds.get(0), 0, journalScanner,
conf.isSkipReplayJournalInvalidRecord());
+ fail("Should fail the journal scanning because of disabled
skip flag");
+ } catch (Exception e) {
+ // expected.
+ }
+ }
+
+ b.shutdown();
+ }
+
+
+ static class DummyJournalScan implements Journal.JournalScanner {
@Override
public void process(int journalVersion, long offset, ByteBuffer entry)
throws IOException {