athanatos closed pull request #1773: ISSUE #1757: prevent race between flush and delete from recreating index URL: https://github.com/apache/bookkeeper/pull/1773
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java index a5ddacf0ff..656674f5e0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java @@ -101,6 +101,8 @@ // this FileInfo Header Version int headerVersion; + private boolean deleted; + public FileInfo(File lf, byte[] masterKey, int fileInfoVersionToWrite) throws IOException { super(WATCHER_RECYCLER); @@ -108,6 +110,7 @@ public FileInfo(File lf, byte[] masterKey, int fileInfoVersionToWrite) throws IO this.masterKey = masterKey; mode = "rw"; this.headerVersion = fileInfoVersionToWrite; + this.deleted = false; } synchronized Long getLastAddConfirmed() { @@ -257,6 +260,16 @@ public synchronized void readHeader() throws IOException { } } + public synchronized boolean isDeleted() { + return deleted; + } + + public static class FileInfoDeletedException extends IOException { + FileInfoDeletedException() { + super("FileInfo already deleted"); + } + } + @VisibleForTesting void checkOpen(boolean create) throws IOException { checkOpen(create, false); @@ -264,6 +277,9 @@ void checkOpen(boolean create) throws IOException { private synchronized void checkOpen(boolean create, boolean openBeforeClose) throws IOException { + if (deleted) { + throw new FileInfoDeletedException(); + } if (fc != null) { return; } @@ -540,6 +556,7 @@ public synchronized void moveToNewLocation(File newFile, long size) throws IOExc } public synchronized boolean delete() { + deleted = true; return lf.delete(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java index 6beba6a744..078292fb81 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java @@ -49,6 +49,8 @@ private static CachedFileInfo tryRetainFileInfo(CachedFileInfo fi) throws IOExce boolean retained = fi.tryRetain(); if (!retained) { throw new IOException("FileInfo " + fi + " is already marked dead"); + } else if (fi.isDeleted()) { + throw new Bookie.NoLedgerException(fi.ledgerId); } return fi; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java index 66e97f7942..0cf5cc93f8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java @@ -40,7 +40,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.conf.ServerConfiguration; @@ -345,12 +344,6 @@ public void onSetDirty(LedgerEntryPage lep) { // flush and read pages private final IndexPersistenceMgr indexPersistenceManager; - /** - * the list of potentially dirty ledgers. - */ - private final ConcurrentLinkedQueue<Long> ledgersToFlush = new ConcurrentLinkedQueue<Long>(); - private final ConcurrentSkipListSet<Long> ledgersFlushing = new ConcurrentSkipListSet<Long>(); - // Stats private final Counter ledgerCacheHitCounter; private final Counter ledgerCacheMissCounter; @@ -504,7 +497,6 @@ private LedgerEntryPage grabLedgerEntryPage(long ledger, long pageEntry) throws void removePagesForLedger(long ledgerId) { pageMapAndList.removeEntriesForALedger(ledgerId); - ledgersToFlush.remove(ledgerId); } long getLastEntryInMem(long ledgerId) { @@ -542,18 +534,12 @@ private LedgerEntryPage grabCleanPage(long ledger, long entry) throws IOExceptio } void flushOneOrMoreLedgers(boolean doAll) throws IOException { - if (ledgersToFlush.isEmpty()) { - ledgersToFlush.addAll(pageMapAndList.getActiveLedgers()); - } - Long potentiallyDirtyLedger; - while (null != (potentiallyDirtyLedger = ledgersToFlush.poll())) { - if (!ledgersFlushing.add(potentiallyDirtyLedger)) { - continue; - } + List<Long> ledgersToFlush = new ArrayList<>(pageMapAndList.getActiveLedgers()); + for (Long potentiallyDirtyLedger : ledgersToFlush) { try { flushSpecificLedger(potentiallyDirtyLedger); - } finally { - ledgersFlushing.remove(potentiallyDirtyLedger); + } catch (Bookie.NoLedgerException e) { + continue; } if (!doAll) { break; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java index 83cb88fcc3..11292d1688 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java @@ -525,7 +525,12 @@ private File getLedgerDirForLedger(FileInfo fi) { private void moveLedgerIndexFile(Long l, FileInfo fi) throws NoWritableLedgerDirException, IOException { File newLedgerIndexFile = getNewLedgerIndexFile(l, getLedgerDirForLedger(fi)); - fi.moveToNewLocation(newLedgerIndexFile, fi.getSizeSinceLastwrite()); + try { + fi.moveToNewLocation(newLedgerIndexFile, fi.getSizeSinceLastwrite()); + } catch (FileInfo.FileInfoDeletedException fileInfoDeleted) { + // File concurrently deleted + throw new Bookie.NoLedgerException(l); + } } void flushLedgerHeader(long ledger) throws IOException { @@ -599,7 +604,7 @@ public int compare(LedgerEntryPage o1, LedgerEntryPage o2) { private void writeBuffers(Long ledger, List<LedgerEntryPage> entries, FileInfo fi, - int start, int count) throws IOException { + int start, int count) throws IOException, Bookie.NoLedgerException { if (LOG.isTraceEnabled()) { LOG.trace("Writing {} buffers of {}", count, Long.toHexString(ledger)); } @@ -616,7 +621,12 @@ private void writeBuffers(Long ledger, } long totalWritten = 0; while (buffs[buffs.length - 1].remaining() > 0) { - long rc = fi.write(buffs, entries.get(start + 0).getFirstEntryPosition()); + long rc = 0; + try { + rc = fi.write(buffs, entries.get(start + 0).getFirstEntryPosition()); + } catch (FileInfo.FileInfoDeletedException e) { + throw new Bookie.NoLedgerException(ledger); + } if (rc <= 0) { throw new IOException("Short write to ledger " + ledger + " rc = " + rc); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java index a9cef72306..be8755959c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java @@ -207,7 +207,8 @@ public void readPage(FileInfo fi) throws IOException { public ByteBuffer getPageToWrite() { checkPage(); page.clear(); - return page; + // Different callers to this method should be able to reasonably expect independent read pointers + return page.duplicate(); } long getLedger() { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java index 1a01299a6c..232173507b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java @@ -379,24 +379,33 @@ public void testSyncThreadNPE() throws IOException { } /** - * Race where a flush would fail because a garbage collection occurred at - * the wrong time. + * Test for race between delete and flush. * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-604} + * {@link https://github.com/apache/bookkeeper/issues/1757} */ @Test public void testFlushDeleteRace() throws Exception { newLedgerCache(); final AtomicInteger rc = new AtomicInteger(0); - final LinkedBlockingQueue<Long> ledgerQ = new LinkedBlockingQueue<Long>(1); + final LinkedBlockingQueue<Long> ledgerQ = new LinkedBlockingQueue<>(100); final byte[] masterKey = "masterKey".getBytes(); + final long numLedgers = 1000; + final int numFlushers = 10; + final int numDeleters = 10; + final AtomicBoolean running = new AtomicBoolean(true); Thread newLedgerThread = new Thread() { public void run() { try { - for (int i = 0; i < 1000 && rc.get() == 0; i++) { + for (long i = 0; i < numLedgers && rc.get() == 0; i++) { ledgerCache.setMasterKey(i, masterKey); - ledgerQ.put((long) i); + + ledgerCache.putEntryOffset(i, 1, 0); + ledgerQ.put(i); + } + for (int i = 0; i < numDeleters; ++i) { + ledgerQ.put(-1L); } - } catch (Exception e) { + } catch (Throwable e) { rc.set(-1); LOG.error("Exception in new ledger thread", e); } @@ -404,51 +413,73 @@ public void run() { }; newLedgerThread.start(); - Thread flushThread = new Thread() { + Thread[] flushThreads = new Thread[numFlushers]; + for (int i = 0; i < numFlushers; ++i) { + Thread flushThread = new Thread() { public void run() { try { - while (true) { - Long id = ledgerQ.peek(); - if (id == null) { - continue; - } - LOG.info("Put entry for {}", id); - try { - ledgerCache.putEntryOffset((long) id, 1, 0); - } catch (Bookie.NoLedgerException nle) { - //ignore - } + while (running.get()) { ledgerCache.flushLedger(true); } - } catch (Exception e) { + } catch (Throwable e) { rc.set(-1); LOG.error("Exception in flush thread", e); } + LOG.error("Shutting down flush thread"); } }; - flushThread.start(); + flushThread.start(); + flushThreads[i] = flushThread; + } - Thread deleteThread = new Thread() { + Thread[] deleteThreads = new Thread[numDeleters]; + for (int i = 0; i < numDeleters; ++i) { + Thread deleteThread = new Thread() { public void run() { try { while (true) { long id = ledgerQ.take(); + if (id == -1L) { + break; + } LOG.info("Deleting {}", id); ledgerCache.deleteLedger(id); } - } catch (Exception e) { + } catch (Throwable e) { rc.set(-1); LOG.error("Exception in delete thread", e); } } }; - deleteThread.start(); + deleteThread.start(); + deleteThreads[i] = deleteThread; + } newLedgerThread.join(); - assertEquals("Should have been no errors", rc.get(), 0); - deleteThread.interrupt(); - flushThread.interrupt(); + for (Thread deleteThread : deleteThreads) { + deleteThread.join(); + } + + running.set(false); + for (Thread flushThread : flushThreads) { + flushThread.join(); + } + + assertEquals("Should have been no errors", rc.get(), 0); + for (long i = 0L; i < numLedgers; ++i) { + boolean gotError = false; + try { + LOG.error("Checking {}", i); + ledgerCache.getEntryOffset(i, 0); + } catch (NoLedgerException e) { + gotError = true; + } + if (!gotError) { + LOG.error("Ledger {} is still around", i); + fail("Found ledger " + i + ", which should have been removed"); + } + } } // Mock SortedLedgerStorage to simulate flush failure (Dependency Fault Injection) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services