This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.17 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit e7122c3d6e2cb7a9befc6fd72c3ea68afbeb0cf3 Author: fengyubiao <[email protected]> AuthorDate: Fri Sep 13 09:01:55 2024 +0800 [fix] Fix data lost after when writing ledger and deleting legder execute concurrency (#4462) ### Motivation | step | `BK client 1` | `BK client 2` | | --- | --- | --- | | 1 | create ledger `1` | | 2 | | open ledger `1` | | 3 | | delete ledger `1` | | 4 | write data to ledger `1` | At the step `4`, the write should fail, but it succeeds. It leads users to assume the data has been written, but it can not be read. You can reproduce the issue by `testWriteAfterDeleted` There is a scenario that will lead to Pulsar loss messages - `broker-2` created a ledger - `broker-2`'s ZK session is expired, which will lead the topic it owned to be assigned to other brokers - `broker-0` owned the topic again - it will delete the last empty ledger - consumers connected to `broker-0` - producers connected to `broker-2` - send messages to the topic - on `broker-2`, the ledger can not be closed due to the ledger metadata has been deleted ### Changes Once the ledger is fenced, it can not be wrote anymore. (cherry picked from commit 47ef48e074df42705bc0bf14bac5b3cc05c5f0c3) --- .../org/apache/bookkeeper/bookie/BookieImpl.java | 14 ++--- .../apache/bookkeeper/bookie/HandleFactory.java | 2 +- .../bookkeeper/bookie/HandleFactoryImpl.java | 31 ++++++++++- .../bookkeeper/bookie/BookieJournalTest.java | 4 +- .../org/apache/bookkeeper/client/TestFencing.java | 65 ++++++++++++++++++++++ 5 files changed, 105 insertions(+), 11 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 8ad6401f88..296a866092 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -543,7 +543,7 @@ public class BookieImpl implements Bookie { masterKeyCache.put(ledgerId, masterKey); // Force to re-insert the master key in ledger storage - handles.getHandle(ledgerId, masterKey); + handles.getHandle(ledgerId, masterKey, true); } else { throw new IOException("Invalid journal. Contains journalKey " + " but layout version (" + journalVersion @@ -555,7 +555,7 @@ public class BookieImpl implements Bookie { if (key == null) { key = ledgerStorage.readMasterKey(ledgerId); } - LedgerDescriptor handle = handles.getHandle(ledgerId, key); + LedgerDescriptor handle = handles.getHandle(ledgerId, key, true); handle.setFenced(); } else { throw new IOException("Invalid journal. Contains fenceKey " @@ -573,7 +573,7 @@ public class BookieImpl implements Bookie { if (key == null) { key = ledgerStorage.readMasterKey(ledgerId); } - LedgerDescriptor handle = handles.getHandle(ledgerId, key); + LedgerDescriptor handle = handles.getHandle(ledgerId, key, true); handle.setExplicitLac(explicitLacBuf); } else { throw new IOException("Invalid journal. Contains explicitLAC " + " but layout version (" @@ -596,7 +596,7 @@ public class BookieImpl implements Bookie { if (key == null) { key = ledgerStorage.readMasterKey(ledgerId); } - LedgerDescriptor handle = handles.getHandle(ledgerId, key); + LedgerDescriptor handle = handles.getHandle(ledgerId, key, true); recBuff.rewind(); handle.addEntry(Unpooled.wrappedBuffer(recBuff)); @@ -933,7 +933,7 @@ public class BookieImpl implements Bookie { throws IOException, BookieException { final long ledgerId = entry.getLong(entry.readerIndex()); - return handles.getHandle(ledgerId, masterKey); + return handles.getHandle(ledgerId, masterKey, false); } private Journal getJournal(long ledgerId) { @@ -1042,7 +1042,7 @@ public class BookieImpl implements Bookie { ByteBuf explicitLACEntry = null; try { long ledgerId = entry.getLong(entry.readerIndex()); - LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey); + LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey, false); synchronized (handle) { entry.markReaderIndex(); handle.setExplicitLac(entry); @@ -1131,7 +1131,7 @@ public class BookieImpl implements Bookie { */ public CompletableFuture<Boolean> fenceLedger(long ledgerId, byte[] masterKey) throws IOException, BookieException { - LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey); + LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey, false); return handle.fenceAndLogInJournal(getJournal(ledgerId)); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactory.java index 22500b74cb..c81294d4db 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactory.java @@ -24,7 +24,7 @@ package org.apache.bookkeeper.bookie; import java.io.IOException; interface HandleFactory { - LedgerDescriptor getHandle(long ledgerId, byte[] masterKey) + LedgerDescriptor getHandle(long ledgerId, byte[] masterKey, boolean journalReplay) throws IOException, BookieException; LedgerDescriptor getReadOnlyHandle(long ledgerId) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java index 3f643019c9..b331f12506 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java @@ -21,7 +21,10 @@ package org.apache.bookkeeper.bookie; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import java.io.IOException; +import java.time.Duration; import org.apache.bookkeeper.bookie.LedgerStorage.LedgerDeletionListener; import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap; @@ -29,6 +32,14 @@ class HandleFactoryImpl implements HandleFactory, LedgerDeletionListener { private final ConcurrentLongHashMap<LedgerDescriptor> ledgers; private final ConcurrentLongHashMap<LedgerDescriptor> readOnlyLedgers; + /** + * Once the ledger was marked "fenced" before, the ledger was accessed by multi clients. One client is calling + * "delete" now, and other clients may call "write" continuously later. We mark these ledgers can not be written + * anymore. And maintains the state for 7 days is safety. + */ + private final Cache<Long, Boolean> recentlyFencedAndDeletedLedgers = CacheBuilder.newBuilder() + .expireAfterAccess(Duration.ofDays(7)).build(); + final LedgerStorage ledgerStorage; HandleFactoryImpl(LedgerStorage ledgerStorage) { @@ -40,10 +51,14 @@ class HandleFactoryImpl implements HandleFactory, LedgerDeletionListener { } @Override - public LedgerDescriptor getHandle(final long ledgerId, final byte[] masterKey) throws IOException, BookieException { + public LedgerDescriptor getHandle(final long ledgerId, final byte[] masterKey, boolean journalReplay) + throws IOException, BookieException { LedgerDescriptor handle = ledgers.get(ledgerId); if (handle == null) { + if (!journalReplay && recentlyFencedAndDeletedLedgers.getIfPresent(ledgerId) != null) { + throw BookieException.create(BookieException.Code.LedgerFencedException); + } handle = LedgerDescriptor.create(masterKey, ledgerId, ledgerStorage); ledgers.putIfAbsent(ledgerId, handle); } @@ -64,8 +79,22 @@ class HandleFactoryImpl implements HandleFactory, LedgerDeletionListener { return handle; } + private void markIfConflictWritingOccurs(long ledgerId) { + LedgerDescriptor ledgerDescriptor = ledgers.get(ledgerId); + try { + if (ledgerDescriptor != null && ledgerDescriptor.isFenced()) { + recentlyFencedAndDeletedLedgers.put(ledgerId, true); + } + } catch (IOException | BookieException ex) { + // The ledger is in limbo state. + recentlyFencedAndDeletedLedgers.put(ledgerId, true); + } + } + @Override public void ledgerDeleted(long ledgerId) { + markIfConflictWritingOccurs(ledgerId); + // Do delete. ledgers.remove(ledgerId); readOnlyLedgers.remove(ledgerId); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java index 21608a19b2..29632dfb17 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java @@ -452,7 +452,7 @@ public class BookieJournalTest { } catch (Bookie.NoEntryException e) { // correct behaviour } - assertTrue(b.handles.getHandle(1, "testPasswd".getBytes()).isFenced()); + assertTrue(b.handles.getHandle(1, "testPasswd".getBytes(), false).isFenced()); b.shutdown(); } @@ -484,7 +484,7 @@ public class BookieJournalTest { } catch (Bookie.NoEntryException e) { // correct behavior } - assertTrue(b.handles.getHandle(1, "testV5Journal".getBytes()).isFenced()); + assertTrue(b.handles.getHandle(1, "testV5Journal".getBytes(), false).isFenced()); b.shutdown(); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java index 34751525aa..77382b4ebd 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java @@ -26,10 +26,18 @@ import static org.junit.Assert.fail; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.InterleavedLedgerStorage; +import org.apache.bookkeeper.bookie.LedgerStorage; +import org.apache.bookkeeper.bookie.SortedLedgerStorage; +import org.apache.bookkeeper.bookie.storage.ldb.SingleDirectoryDbLedgerStorage; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.apache.bookkeeper.test.TestStatsProvider; +import org.awaitility.reflect.WhiteboxImpl; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +46,7 @@ import org.slf4j.LoggerFactory; * This unit test tests ledger fencing. * */ +@Slf4j public class TestFencing extends BookKeeperClusterTestCase { private static final Logger LOG = LoggerFactory.getLogger(TestFencing.class); @@ -77,6 +86,7 @@ public class TestFencing extends BookKeeperClusterTestCase { fail("Should have thrown an exception when trying to write"); } catch (BKException.BKLedgerFencedException e) { // correct behaviour + log.info("expected a fenced error", e); } /* @@ -87,6 +97,61 @@ public class TestFencing extends BookKeeperClusterTestCase { readlh.getLastAddConfirmed() == writelh.getLastAddConfirmed()); } + @Test + public void testWriteAfterDeleted() throws Exception { + LedgerHandle writeLedger; + writeLedger = bkc.createLedger(digestType, "password".getBytes()); + + String tmp = "BookKeeper is cool!"; + for (int i = 0; i < 10; i++) { + long entryId = writeLedger.addEntry(tmp.getBytes()); + LOG.info("entryId: {}", entryId); + } + + // Fence and delete. + BookKeeperTestClient bkc2 = new BookKeeperTestClient(baseClientConf, new TestStatsProvider()); + LedgerHandle readLedger = bkc2.openLedger(writeLedger.getId(), digestType, "password".getBytes()); + bkc2.deleteLedger(readLedger.ledgerId); + + // Waiting for GC. + for (ServerTester server : servers) { + triggerGC(server.getServer().getBookie()); + } + + try { + long entryId = writeLedger.addEntry(tmp.getBytes()); + LOG.info("Not expected: entryId: {}", entryId); + LOG.error("Should have thrown an exception"); + fail("Should have thrown an exception when trying to write"); + } catch (BKException.BKLedgerFencedException e) { + log.info("expected a fenced error", e); + // correct behaviour + } + + /* + * Check it has been recovered properly. + */ + assertTrue("Has not recovered correctly: " + readLedger.getLastAddConfirmed() + + " original " + writeLedger.getLastAddConfirmed(), + readLedger.getLastAddConfirmed() == writeLedger.getLastAddConfirmed()); + + // cleanup. + bkc2.close(); + } + + private void triggerGC(Bookie bookie) { + LedgerStorage ledgerStorage = bookie.getLedgerStorage(); + if (ledgerStorage instanceof InterleavedLedgerStorage + || ledgerStorage instanceof SingleDirectoryDbLedgerStorage) { + Runnable gcThread = WhiteboxImpl.getInternalState(ledgerStorage, "gcThread"); + gcThread.run(); + } else if (ledgerStorage instanceof SortedLedgerStorage) { + Object actLedgerStorage = WhiteboxImpl.getInternalState(ledgerStorage, "interleavedLedgerStorage"); + Runnable gcThread = WhiteboxImpl.getInternalState(actLedgerStorage, "gcThread"); + gcThread.run(); + } + } + private static int threadCount = 0; class LedgerOpenThread extends Thread {
