This is an automated email from the ASF dual-hosted git repository. sjust pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new 41e4bcc ISSUE #1757: prevent race between flush and delete from recreating index 41e4bcc is described below commit 41e4bccb9694e1f373e919f6891b8e88b2232c5e Author: Samuel Just <sj...@salesforce.com> AuthorDate: Tue Oct 30 11:26:50 2018 -0700 ISSUE #1757: prevent race between flush and delete from recreating index IndexPersistencManager.flushLedgerHandle can race with delete by obtaining a FileInfo just prior to delete and then proceeding to rewrite the file info resurrecting it. FileInfo provides a convenient point of synchronization for avoinding this race. FileInfo.moveLedgerIndexFile, FileInfo.flushHeader, and FileInfo.delete() are synchronized already, so this patch simply adds a deleted flag to the FileInfo object to indicate that the FileInfo has become invalid. checkOpen is called in every method and will now throw FileInfoDeleted if delete has been called. IndexPersistenceManager can catch it and deal with it appropriately in flush (which generally means moving onto the next ledger). This patch also eliminates ledgersToFlush and ledgersFlushing. Their purpose appears to be to allow delete to avoid flushing a ledger which has been selected for flushing but not flushed yet avoiding the above race. It's not sufficient, however, because IndexInMemPageMgr calls IndexPersistenceManager.flushLedgerHeader, which can obtain a FileInfo for the ledger prior to the deletion and then call relocateIndexFileAndFlushHeader afterwards. Also, if the purpose was to avoid concurrent calls into flushSpecificLedger on the same ledger, it's wrong because of the following sequence: t0: thread 0 calls flushOneOrMoreLedgers t1: thread 0 place ledger 10 into ledgersFlushing and completes flushSpecificLedger t2: thread 2 performs a write to ledger 10 t3: thread 1 calls flushOneOrMoreLedgers skipping ledger 10 t4: thread 0 releases ledger 10 from ledgersFlushing t5: thread 1 completes flushOneOrMoreLedgers Although thread 1 begins to flush after the write to ledger 10, it won't capture the write rendering the flush incorrect. I don't think it's actually worth avoiding overlapping flushes here because both FileInfo and LedgerEntryPage track dirty state. As such, it seems simpler to just get rid of them. This patch also adds a more agressive version of testFlushDeleteRace to test the new behavior. Testing with multiple flushers turned up a bug with LedgerEntryPage.getPageToWrite where didn't return a buffer with independent read pointers, so this patch addresses that as well. (bug W-5549455) (rev cguttapalem) Signed-off-by: Samuel Just <sjustsalesforce.com> (cherry picked from commit 7b5ac3d5e76ac4df618764cafe80aef2994703ec) Author: Reviewers: Enrico Olivelli <eolive...@gmail.com>, Sijie Guo <si...@apache.org> This closes #1769 from athanatos/forupstream/wip-1757, closes #1757 --- .../org/apache/bookkeeper/bookie/FileInfo.java | 17 +++++ .../bookkeeper/bookie/FileInfoBackingCache.java | 2 + .../bookkeeper/bookie/IndexInMemPageMgr.java | 22 ++---- .../bookkeeper/bookie/IndexPersistenceMgr.java | 16 ++++- .../apache/bookkeeper/bookie/LedgerEntryPage.java | 3 +- .../apache/bookkeeper/bookie/LedgerCacheTest.java | 83 +++++++++++++++------- 6 files changed, 95 insertions(+), 48 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java index a5ddacf..656674f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java @@ -101,6 +101,8 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> { // this FileInfo Header Version int headerVersion; + private boolean deleted; + public FileInfo(File lf, byte[] masterKey, int fileInfoVersionToWrite) throws IOException { super(WATCHER_RECYCLER); @@ -108,6 +110,7 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> { this.masterKey = masterKey; mode = "rw"; this.headerVersion = fileInfoVersionToWrite; + this.deleted = false; } synchronized Long getLastAddConfirmed() { @@ -257,6 +260,16 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> { } } + public synchronized boolean isDeleted() { + return deleted; + } + + public static class FileInfoDeletedException extends IOException { + FileInfoDeletedException() { + super("FileInfo already deleted"); + } + } + @VisibleForTesting void checkOpen(boolean create) throws IOException { checkOpen(create, false); @@ -264,6 +277,9 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> { private synchronized void checkOpen(boolean create, boolean openBeforeClose) throws IOException { + if (deleted) { + throw new FileInfoDeletedException(); + } if (fc != null) { return; } @@ -540,6 +556,7 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> { } public synchronized boolean delete() { + deleted = true; return lf.delete(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java index 6beba6a..078292f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java @@ -49,6 +49,8 @@ class FileInfoBackingCache { boolean retained = fi.tryRetain(); if (!retained) { throw new IOException("FileInfo " + fi + " is already marked dead"); + } else if (fi.isDeleted()) { + throw new Bookie.NoLedgerException(fi.ledgerId); } return fi; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java index 66e97f7..0cf5cc9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java @@ -40,7 +40,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.conf.ServerConfiguration; @@ -345,12 +344,6 @@ class IndexInMemPageMgr { // flush and read pages private final IndexPersistenceMgr indexPersistenceManager; - /** - * the list of potentially dirty ledgers. - */ - private final ConcurrentLinkedQueue<Long> ledgersToFlush = new ConcurrentLinkedQueue<Long>(); - private final ConcurrentSkipListSet<Long> ledgersFlushing = new ConcurrentSkipListSet<Long>(); - // Stats private final Counter ledgerCacheHitCounter; private final Counter ledgerCacheMissCounter; @@ -504,7 +497,6 @@ class IndexInMemPageMgr { void removePagesForLedger(long ledgerId) { pageMapAndList.removeEntriesForALedger(ledgerId); - ledgersToFlush.remove(ledgerId); } long getLastEntryInMem(long ledgerId) { @@ -542,18 +534,12 @@ class IndexInMemPageMgr { } void flushOneOrMoreLedgers(boolean doAll) throws IOException { - if (ledgersToFlush.isEmpty()) { - ledgersToFlush.addAll(pageMapAndList.getActiveLedgers()); - } - Long potentiallyDirtyLedger; - while (null != (potentiallyDirtyLedger = ledgersToFlush.poll())) { - if (!ledgersFlushing.add(potentiallyDirtyLedger)) { - continue; - } + List<Long> ledgersToFlush = new ArrayList<>(pageMapAndList.getActiveLedgers()); + for (Long potentiallyDirtyLedger : ledgersToFlush) { try { flushSpecificLedger(potentiallyDirtyLedger); - } finally { - ledgersFlushing.remove(potentiallyDirtyLedger); + } catch (Bookie.NoLedgerException e) { + continue; } if (!doAll) { break; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java index 83cb88f..11292d1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java @@ -525,7 +525,12 @@ public class IndexPersistenceMgr { private void moveLedgerIndexFile(Long l, FileInfo fi) throws NoWritableLedgerDirException, IOException { File newLedgerIndexFile = getNewLedgerIndexFile(l, getLedgerDirForLedger(fi)); - fi.moveToNewLocation(newLedgerIndexFile, fi.getSizeSinceLastwrite()); + try { + fi.moveToNewLocation(newLedgerIndexFile, fi.getSizeSinceLastwrite()); + } catch (FileInfo.FileInfoDeletedException fileInfoDeleted) { + // File concurrently deleted + throw new Bookie.NoLedgerException(l); + } } void flushLedgerHeader(long ledger) throws IOException { @@ -599,7 +604,7 @@ public class IndexPersistenceMgr { private void writeBuffers(Long ledger, List<LedgerEntryPage> entries, FileInfo fi, - int start, int count) throws IOException { + int start, int count) throws IOException, Bookie.NoLedgerException { if (LOG.isTraceEnabled()) { LOG.trace("Writing {} buffers of {}", count, Long.toHexString(ledger)); } @@ -616,7 +621,12 @@ public class IndexPersistenceMgr { } long totalWritten = 0; while (buffs[buffs.length - 1].remaining() > 0) { - long rc = fi.write(buffs, entries.get(start + 0).getFirstEntryPosition()); + long rc = 0; + try { + rc = fi.write(buffs, entries.get(start + 0).getFirstEntryPosition()); + } catch (FileInfo.FileInfoDeletedException e) { + throw new Bookie.NoLedgerException(ledger); + } if (rc <= 0) { throw new IOException("Short write to ledger " + ledger + " rc = " + rc); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java index a9cef72..be87559 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java @@ -207,7 +207,8 @@ public class LedgerEntryPage { public ByteBuffer getPageToWrite() { checkPage(); page.clear(); - return page; + // Different callers to this method should be able to reasonably expect independent read pointers + return page.duplicate(); } long getLedger() { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java index 1a01299..2321735 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java @@ -379,24 +379,33 @@ public class LedgerCacheTest { } /** - * Race where a flush would fail because a garbage collection occurred at - * the wrong time. + * Test for race between delete and flush. * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-604} + * {@link https://github.com/apache/bookkeeper/issues/1757} */ @Test public void testFlushDeleteRace() throws Exception { newLedgerCache(); final AtomicInteger rc = new AtomicInteger(0); - final LinkedBlockingQueue<Long> ledgerQ = new LinkedBlockingQueue<Long>(1); + final LinkedBlockingQueue<Long> ledgerQ = new LinkedBlockingQueue<>(100); final byte[] masterKey = "masterKey".getBytes(); + final long numLedgers = 1000; + final int numFlushers = 10; + final int numDeleters = 10; + final AtomicBoolean running = new AtomicBoolean(true); Thread newLedgerThread = new Thread() { public void run() { try { - for (int i = 0; i < 1000 && rc.get() == 0; i++) { + for (long i = 0; i < numLedgers && rc.get() == 0; i++) { ledgerCache.setMasterKey(i, masterKey); - ledgerQ.put((long) i); + + ledgerCache.putEntryOffset(i, 1, 0); + ledgerQ.put(i); + } + for (int i = 0; i < numDeleters; ++i) { + ledgerQ.put(-1L); } - } catch (Exception e) { + } catch (Throwable e) { rc.set(-1); LOG.error("Exception in new ledger thread", e); } @@ -404,51 +413,73 @@ public class LedgerCacheTest { }; newLedgerThread.start(); - Thread flushThread = new Thread() { + Thread[] flushThreads = new Thread[numFlushers]; + for (int i = 0; i < numFlushers; ++i) { + Thread flushThread = new Thread() { public void run() { try { - while (true) { - Long id = ledgerQ.peek(); - if (id == null) { - continue; - } - LOG.info("Put entry for {}", id); - try { - ledgerCache.putEntryOffset((long) id, 1, 0); - } catch (Bookie.NoLedgerException nle) { - //ignore - } + while (running.get()) { ledgerCache.flushLedger(true); } - } catch (Exception e) { + } catch (Throwable e) { rc.set(-1); LOG.error("Exception in flush thread", e); } + LOG.error("Shutting down flush thread"); } }; - flushThread.start(); + flushThread.start(); + flushThreads[i] = flushThread; + } - Thread deleteThread = new Thread() { + Thread[] deleteThreads = new Thread[numDeleters]; + for (int i = 0; i < numDeleters; ++i) { + Thread deleteThread = new Thread() { public void run() { try { while (true) { long id = ledgerQ.take(); + if (id == -1L) { + break; + } LOG.info("Deleting {}", id); ledgerCache.deleteLedger(id); } - } catch (Exception e) { + } catch (Throwable e) { rc.set(-1); LOG.error("Exception in delete thread", e); } } }; - deleteThread.start(); + deleteThread.start(); + deleteThreads[i] = deleteThread; + } newLedgerThread.join(); - assertEquals("Should have been no errors", rc.get(), 0); - deleteThread.interrupt(); - flushThread.interrupt(); + for (Thread deleteThread : deleteThreads) { + deleteThread.join(); + } + + running.set(false); + for (Thread flushThread : flushThreads) { + flushThread.join(); + } + + assertEquals("Should have been no errors", rc.get(), 0); + for (long i = 0L; i < numLedgers; ++i) { + boolean gotError = false; + try { + LOG.error("Checking {}", i); + ledgerCache.getEntryOffset(i, 0); + } catch (NoLedgerException e) { + gotError = true; + } + if (!gotError) { + LOG.error("Ledger {} is still around", i); + fail("Found ledger " + i + ", which should have been removed"); + } + } } // Mock SortedLedgerStorage to simulate flush failure (Dependency Fault Injection)