This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 7240bde1877e9680b4ed26e94e7372baeecbd87e
Author: Hang Chen <[email protected]>
AuthorDate: Mon Jun 19 19:40:50 2023 +0800

    Support skip invalid journal record in replying journal stage (#3956)
    
    Co-authored-by: zhiyuanlei <[email protected]>
    (cherry picked from commit 5e9fdc2f81a7add645bba43f90ef630dffc1993f)
---
 .../org/apache/bookkeeper/bookie/BookieImpl.java   |   2 +-
 .../java/org/apache/bookkeeper/bookie/Journal.java |  16 ++-
 .../bookie/storage/ldb/LedgersIndexRebuildOp.java  |   2 +-
 .../bookkeeper/conf/ServerConfiguration.java       |  21 ++++
 .../cli/commands/bookie/ReadJournalCommand.java    |   2 +-
 .../bookkeeper/bookie/BookieJournalTest.java       | 117 ++++++++++++++++++++-
 6 files changed, 151 insertions(+), 9 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index e5520185f3..6aac4ecf68 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -631,7 +631,7 @@ public class BookieImpl extends BookieCriticalThread 
implements Bookie {
                 logPosition = markedLog.getLogFileOffset();
             }
             LOG.info("Replaying journal {} from position {}", id, logPosition);
-            long scanOffset = journal.scanJournal(id, logPosition, scanner);
+            long scanOffset = journal.scanJournal(id, logPosition, scanner, 
conf.isSkipReplayJournalInvalidRecord());
             // Update LastLogMark after completely replaying journal
             // scanOffset will point to EOF position
             // After LedgerStorage flush, SyncThread should persist this to 
disk
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 9a056ca67b..9735e13717 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -792,13 +792,14 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
     /**
      * Scan the journal.
      *
-     * @param journalId Journal Log Id
-     * @param journalPos Offset to start scanning
-     * @param scanner Scanner to handle entries
+     * @param journalId         Journal Log Id
+     * @param journalPos        Offset to start scanning
+     * @param scanner           Scanner to handle entries
+     * @param skipInvalidRecord when invalid record,should we skip it or not
      * @return scanOffset - represents the byte till which journal was read
      * @throws IOException
      */
-    public long scanJournal(long journalId, long journalPos, JournalScanner 
scanner)
+    public long scanJournal(long journalId, long journalPos, JournalScanner 
scanner, boolean skipInvalidRecord)
         throws IOException {
         JournalChannel recLog;
         if (journalPos <= 0) {
@@ -862,6 +863,13 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                 }
             }
             return recLog.fc.position();
+        } catch (IOException e) {
+            if (skipInvalidRecord) {
+                LOG.warn("Failed to parse journal file, and skipInvalidRecord 
is true, skip this journal file reply");
+            } else {
+                throw e;
+            }
+            return recLog.fc.position();
         } finally {
             recLog.close();
         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
index f6969237b8..2725897e80 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
@@ -222,7 +222,7 @@ public class LedgersIndexRebuildOp {
                     LOG.info("Found ledger {} in journal", ledgerId);
                 }
             }
-        });
+        }, false);
     }
 
     private void delete(Path path) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index cf74f6af01..2849d3fd66 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -338,6 +338,8 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
     // Used for location index, lots of writes and much bigger dataset
     protected static final String LEDGER_METADATA_ROCKSDB_CONF = 
"ledgerMetadataRocksdbConf";
 
+    protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD = 
"skipReplayJournalInvalidRecord";
+
     /**
      * Construct a default configuration object.
      */
@@ -3987,6 +3989,25 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
         return this.getBoolean(DATA_INTEGRITY_COOKIE_STAMPING_ENABLED, false);
     }
 
+
+    /**
+     * When this config is set to true,if we replay journal failed, we will 
skip.
+     * @param skipReplayJournalInvalidRecord
+     * @return
+     */
+    public ServerConfiguration setSkipReplayJournalInvalidRecord(boolean 
skipReplayJournalInvalidRecord) {
+        this.setProperty(SKIP_REPLAY_JOURNAL_INVALID_RECORD,
+                Boolean.toString(skipReplayJournalInvalidRecord));
+        return this;
+    }
+
+    /**
+     * @see #isSkipReplayJournalInvalidRecord .
+     */
+    public boolean isSkipReplayJournalInvalidRecord() {
+        return this.getBoolean(SKIP_REPLAY_JOURNAL_INVALID_RECORD, false);
+    }
+
     /**
      * Get default rocksdb conf.
      *
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java
index 66d31068b7..ab985070ff 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java
@@ -211,6 +211,6 @@ public class ReadJournalCommand extends 
BookieCommand<ReadJournalCommand.ReadJou
     }
 
     private void scanJournal(Journal journal, long journalId, 
Journal.JournalScanner scanner) throws IOException {
-        journal.scanJournal(journalId, 0L, scanner);
+        journal.scanJournal(journalId, 0L, scanner, false);
     }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index eeee8ca050..a048bffc8a 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -299,6 +299,52 @@ public class BookieJournalTest {
         return jc;
     }
 
+    private JournalChannel writeV4JournalWithInvalidRecord(File journalDir,
+                                                           int numEntries, 
byte[] masterKey) throws Exception {
+        long logId = System.currentTimeMillis();
+        JournalChannel jc = new JournalChannel(journalDir, logId);
+
+        moveToPosition(jc, JournalChannel.VERSION_HEADER_SIZE);
+
+        BufferedChannel bc = jc.getBufferedChannel();
+
+        byte[] data = new byte[1024];
+        Arrays.fill(data, (byte) 'X');
+        long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
+        for (int i = 0; i <= numEntries; i++) {
+            ByteBuf packet;
+            if (i == 0) {
+                packet = generateMetaEntry(1, masterKey);
+            } else {
+                packet = ClientUtil.generatePacket(1, i, lastConfirmed, i * 
data.length, data);
+            }
+            lastConfirmed = i;
+            ByteBuffer lenBuff = ByteBuffer.allocate(4);
+            if (i == numEntries - 1) {
+                //mock when flush data to file ,it writes an invalid entry to 
journal
+                lenBuff.putInt(-1);
+            } else {
+                lenBuff.putInt(packet.readableBytes());
+            }
+            lenBuff.flip();
+            bc.write(Unpooled.wrappedBuffer(lenBuff));
+            bc.write(packet);
+            packet.release();
+        }
+
+        // write fence key
+        ByteBuf packet = generateFenceEntry(1);
+        ByteBuf lenBuf = Unpooled.buffer();
+        lenBuf.writeInt(packet.readableBytes());
+        //mock
+        bc.write(lenBuf);
+        bc.write(packet);
+        bc.flushAndForceWrite(false);
+        updateJournalVersion(jc, JournalChannel.V4);
+
+        return jc;
+    }
+
     static JournalChannel writeV5Journal(File journalDir, int numEntries,
                                          byte[] masterKey) throws Exception {
         return writeV5Journal(journalDir, numEntries, masterKey, false);
@@ -844,7 +890,7 @@ public class BookieJournalTest {
             assertEquals(journalIds.size(), 1);
 
             try {
-                journal.scanJournal(journalIds.get(0), Long.MAX_VALUE, 
journalScanner);
+                journal.scanJournal(journalIds.get(0), Long.MAX_VALUE, 
journalScanner, false);
                 fail("Should not have been able to scan the journal");
             } catch (Exception e) {
                 // Expected
@@ -854,7 +900,74 @@ public class BookieJournalTest {
         b.shutdown();
     }
 
-    private class DummyJournalScan implements Journal.JournalScanner {
+    /**
+     * Test for invalid record data during read of Journal.
+     */
+    @Test
+    public void testJournalScanInvalidRecordWithSkipFlag() throws Exception {
+        File journalDir = createTempDir("bookie", "journal");
+        
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(journalDir));
+
+        File ledgerDir = createTempDir("bookie", "ledger");
+        
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(ledgerDir));
+
+        try {
+            
writeV4JournalWithInvalidRecord(BookieImpl.getCurrentDirectory(journalDir),
+                100, "testPasswd".getBytes());
+        } catch (Exception e) {
+            fail();
+        }
+
+
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        // Disabled skip broken journal files by default
+        conf.setJournalDirName(journalDir.getPath())
+                .setLedgerDirNames(new String[] { ledgerDir.getPath() })
+                .setMetadataServiceUri(null)
+                .setSkipReplayJournalInvalidRecord(true);
+
+        Journal.JournalScanner journalScanner = new DummyJournalScan();
+
+        BookieImpl b = new TestBookieImpl(conf);
+
+        for (Journal journal : b.journals) {
+            List<Long> journalIds = 
Journal.listJournalIds(journal.getJournalDirectory(), null);
+            assertEquals(journalIds.size(), 1);
+            try {
+                journal.scanJournal(journalIds.get(0), 0, journalScanner, 
conf.isSkipReplayJournalInvalidRecord());
+            } catch (Exception e) {
+                fail("Should pass the journal scanning because we enabled skip 
flag by default.");
+            }
+        }
+
+        b.shutdown();
+
+        // Disabled skip broken journal files by default
+        conf = TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+                .setLedgerDirNames(new String[] { ledgerDir.getPath() })
+                .setMetadataServiceUri(null);
+
+        journalScanner = new DummyJournalScan();
+
+        b = new TestBookieImpl(conf);
+
+        for (Journal journal : b.journals) {
+            List<Long> journalIds = 
Journal.listJournalIds(journal.getJournalDirectory(), null);
+            assertEquals(journalIds.size(), 1);
+            try {
+                journal.scanJournal(journalIds.get(0), 0, journalScanner, 
conf.isSkipReplayJournalInvalidRecord());
+                fail("Should fail the journal scanning because of disabled 
skip flag");
+            } catch (Exception e) {
+                // expected.
+            }
+        }
+
+        b.shutdown();
+    }
+
+
+    static class DummyJournalScan implements Journal.JournalScanner {
 
         @Override
         public void process(int journalVersion, long offset, ByteBuffer entry) 
throws IOException {

Reply via email to