This is an automated email from the ASF dual-hosted git repository.

sjust pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 41e4bcc  ISSUE #1757: prevent race between flush and delete from 
recreating index
41e4bcc is described below

commit 41e4bccb9694e1f373e919f6891b8e88b2232c5e
Author: Samuel Just <sj...@salesforce.com>
AuthorDate: Tue Oct 30 11:26:50 2018 -0700

    ISSUE #1757: prevent race between flush and delete from recreating index
    
    IndexPersistencManager.flushLedgerHandle can race with delete by
    obtaining a FileInfo just prior to delete and then proceeding to rewrite
    the file info resurrecting it.  FileInfo provides a convenient point of
    synchronization for avoinding this race.  FileInfo.moveLedgerIndexFile,
    FileInfo.flushHeader, and FileInfo.delete() are synchronized already, so
    this patch simply adds a deleted flag to the FileInfo object to indicate
    that the FileInfo has become invalid.  checkOpen is called in every
    method and will now throw FileInfoDeleted if delete has been called.
    IndexPersistenceManager can catch it and deal with it appropriately in
    flush (which generally means moving onto the next ledger).
    
    This patch also eliminates ledgersToFlush and ledgersFlushing.  Their
    purpose appears to be to allow delete to avoid flushing a ledger which
    has been selected for flushing but not flushed yet avoiding the above
    race.  It's not sufficient, however, because IndexInMemPageMgr calls
    IndexPersistenceManager.flushLedgerHeader, which can obtain a FileInfo
    for the ledger prior to the deletion and then call
    relocateIndexFileAndFlushHeader afterwards.
    
    Also, if the purpose was to avoid concurrent calls into
    flushSpecificLedger on the same ledger, it's wrong because of the
    following sequence:
    
    t0: thread 0 calls flushOneOrMoreLedgers
    t1: thread 0 place ledger 10 into ledgersFlushing and completes
    flushSpecificLedger
    t2: thread 2 performs a write to ledger 10
    t3: thread 1 calls flushOneOrMoreLedgers skipping ledger 10
    t4: thread 0 releases ledger 10 from ledgersFlushing
    t5: thread 1 completes flushOneOrMoreLedgers
    
    Although thread 1 begins to flush after the write to ledger 10, it won't
    capture the write rendering the flush incorrect.  I don't think it's
    actually worth avoiding overlapping flushes here because both FileInfo
    and LedgerEntryPage track dirty state.  As such, it seems simpler to
    just get rid of them.
    
    This patch also adds a more agressive version of testFlushDeleteRace to
    test the new behavior.  Testing with multiple flushers turned up a bug
    with LedgerEntryPage.getPageToWrite where didn't return a buffer with
    independent read pointers, so this patch addresses that as well.
    
    (bug W-5549455)
    (rev cguttapalem)
    Signed-off-by: Samuel Just <sjustsalesforce.com>
    (cherry picked from commit 7b5ac3d5e76ac4df618764cafe80aef2994703ec)
    
    
    Author:
    
    Reviewers: Enrico Olivelli <eolive...@gmail.com>, Sijie Guo 
<si...@apache.org>
    
    This closes #1769 from athanatos/forupstream/wip-1757, closes #1757
---
 .../org/apache/bookkeeper/bookie/FileInfo.java     | 17 +++++
 .../bookkeeper/bookie/FileInfoBackingCache.java    |  2 +
 .../bookkeeper/bookie/IndexInMemPageMgr.java       | 22 ++----
 .../bookkeeper/bookie/IndexPersistenceMgr.java     | 16 ++++-
 .../apache/bookkeeper/bookie/LedgerEntryPage.java  |  3 +-
 .../apache/bookkeeper/bookie/LedgerCacheTest.java  | 83 +++++++++++++++-------
 6 files changed, 95 insertions(+), 48 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index a5ddacf..656674f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -101,6 +101,8 @@ class FileInfo extends 
Watchable<LastAddConfirmedUpdateNotification> {
     // this FileInfo Header Version
     int headerVersion;
 
+    private boolean deleted;
+
     public FileInfo(File lf, byte[] masterKey, int fileInfoVersionToWrite) 
throws IOException {
         super(WATCHER_RECYCLER);
 
@@ -108,6 +110,7 @@ class FileInfo extends 
Watchable<LastAddConfirmedUpdateNotification> {
         this.masterKey = masterKey;
         mode = "rw";
         this.headerVersion = fileInfoVersionToWrite;
+        this.deleted = false;
     }
 
     synchronized Long getLastAddConfirmed() {
@@ -257,6 +260,16 @@ class FileInfo extends 
Watchable<LastAddConfirmedUpdateNotification> {
         }
     }
 
+    public synchronized boolean isDeleted() {
+        return deleted;
+    }
+
+    public static class FileInfoDeletedException extends IOException {
+        FileInfoDeletedException() {
+            super("FileInfo already deleted");
+        }
+    }
+
     @VisibleForTesting
     void checkOpen(boolean create) throws IOException {
         checkOpen(create, false);
@@ -264,6 +277,9 @@ class FileInfo extends 
Watchable<LastAddConfirmedUpdateNotification> {
 
     private synchronized void checkOpen(boolean create, boolean 
openBeforeClose)
             throws IOException {
+        if (deleted) {
+            throw new FileInfoDeletedException();
+        }
         if (fc != null) {
             return;
         }
@@ -540,6 +556,7 @@ class FileInfo extends 
Watchable<LastAddConfirmedUpdateNotification> {
     }
 
     public synchronized boolean delete() {
+        deleted = true;
         return lf.delete();
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
index 6beba6a..078292f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
@@ -49,6 +49,8 @@ class FileInfoBackingCache {
         boolean retained = fi.tryRetain();
         if (!retained) {
             throw new IOException("FileInfo " + fi + " is already marked 
dead");
+        } else if (fi.isDeleted()) {
+            throw new Bookie.NoLedgerException(fi.ledgerId);
         }
         return fi;
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
index 66e97f7..0cf5cc9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
@@ -40,7 +40,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -345,12 +344,6 @@ class IndexInMemPageMgr {
     // flush and read pages
     private final IndexPersistenceMgr indexPersistenceManager;
 
-    /**
-     * the list of potentially dirty ledgers.
-     */
-    private final ConcurrentLinkedQueue<Long> ledgersToFlush = new 
ConcurrentLinkedQueue<Long>();
-    private final ConcurrentSkipListSet<Long> ledgersFlushing = new 
ConcurrentSkipListSet<Long>();
-
     // Stats
     private final Counter ledgerCacheHitCounter;
     private final Counter ledgerCacheMissCounter;
@@ -504,7 +497,6 @@ class IndexInMemPageMgr {
 
     void removePagesForLedger(long ledgerId) {
         pageMapAndList.removeEntriesForALedger(ledgerId);
-        ledgersToFlush.remove(ledgerId);
     }
 
     long getLastEntryInMem(long ledgerId) {
@@ -542,18 +534,12 @@ class IndexInMemPageMgr {
     }
 
     void flushOneOrMoreLedgers(boolean doAll) throws IOException {
-        if (ledgersToFlush.isEmpty()) {
-            ledgersToFlush.addAll(pageMapAndList.getActiveLedgers());
-        }
-        Long potentiallyDirtyLedger;
-        while (null != (potentiallyDirtyLedger = ledgersToFlush.poll())) {
-            if (!ledgersFlushing.add(potentiallyDirtyLedger)) {
-                continue;
-            }
+        List<Long> ledgersToFlush = new 
ArrayList<>(pageMapAndList.getActiveLedgers());
+        for (Long potentiallyDirtyLedger : ledgersToFlush) {
             try {
                 flushSpecificLedger(potentiallyDirtyLedger);
-            } finally {
-                ledgersFlushing.remove(potentiallyDirtyLedger);
+            } catch (Bookie.NoLedgerException e) {
+                continue;
             }
             if (!doAll) {
                 break;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 83cb88f..11292d1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -525,7 +525,12 @@ public class IndexPersistenceMgr {
 
     private void moveLedgerIndexFile(Long l, FileInfo fi) throws 
NoWritableLedgerDirException, IOException {
         File newLedgerIndexFile = getNewLedgerIndexFile(l, 
getLedgerDirForLedger(fi));
-        fi.moveToNewLocation(newLedgerIndexFile, fi.getSizeSinceLastwrite());
+        try {
+            fi.moveToNewLocation(newLedgerIndexFile, 
fi.getSizeSinceLastwrite());
+        } catch (FileInfo.FileInfoDeletedException fileInfoDeleted) {
+            // File concurrently deleted
+            throw new Bookie.NoLedgerException(l);
+        }
     }
 
     void flushLedgerHeader(long ledger) throws IOException {
@@ -599,7 +604,7 @@ public class IndexPersistenceMgr {
 
     private void writeBuffers(Long ledger,
                               List<LedgerEntryPage> entries, FileInfo fi,
-                              int start, int count) throws IOException {
+                              int start, int count) throws IOException, 
Bookie.NoLedgerException {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Writing {} buffers of {}", count, 
Long.toHexString(ledger));
         }
@@ -616,7 +621,12 @@ public class IndexPersistenceMgr {
         }
         long totalWritten = 0;
         while (buffs[buffs.length - 1].remaining() > 0) {
-            long rc = fi.write(buffs, entries.get(start + 
0).getFirstEntryPosition());
+            long rc = 0;
+            try {
+                rc = fi.write(buffs, entries.get(start + 
0).getFirstEntryPosition());
+            } catch (FileInfo.FileInfoDeletedException e) {
+                throw new Bookie.NoLedgerException(ledger);
+            }
             if (rc <= 0) {
                 throw new IOException("Short write to ledger " + ledger + " rc 
= " + rc);
             }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
index a9cef72..be87559 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
@@ -207,7 +207,8 @@ public class LedgerEntryPage {
     public ByteBuffer getPageToWrite() {
         checkPage();
         page.clear();
-        return page;
+        // Different callers to this method should be able to reasonably 
expect independent read pointers
+        return page.duplicate();
     }
 
     long getLedger() {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 1a01299..2321735 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -379,24 +379,33 @@ public class LedgerCacheTest {
     }
 
     /**
-     * Race where a flush would fail because a garbage collection occurred at
-     * the wrong time.
+     * Test for race between delete and flush.
      * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-604}
+     * {@link https://github.com/apache/bookkeeper/issues/1757}
      */
     @Test
     public void testFlushDeleteRace() throws Exception {
         newLedgerCache();
         final AtomicInteger rc = new AtomicInteger(0);
-        final LinkedBlockingQueue<Long> ledgerQ = new 
LinkedBlockingQueue<Long>(1);
+        final LinkedBlockingQueue<Long> ledgerQ = new 
LinkedBlockingQueue<>(100);
         final byte[] masterKey = "masterKey".getBytes();
+        final long numLedgers = 1000;
+        final int numFlushers = 10;
+        final int numDeleters = 10;
+        final AtomicBoolean running = new AtomicBoolean(true);
         Thread newLedgerThread = new Thread() {
                 public void run() {
                     try {
-                        for (int i = 0; i < 1000 && rc.get() == 0; i++) {
+                        for (long i = 0; i < numLedgers && rc.get() == 0; i++) 
{
                             ledgerCache.setMasterKey(i, masterKey);
-                            ledgerQ.put((long) i);
+
+                            ledgerCache.putEntryOffset(i, 1, 0);
+                            ledgerQ.put(i);
+                        }
+                        for (int i = 0; i < numDeleters; ++i) {
+                            ledgerQ.put(-1L);
                         }
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         rc.set(-1);
                         LOG.error("Exception in new ledger thread", e);
                     }
@@ -404,51 +413,73 @@ public class LedgerCacheTest {
             };
         newLedgerThread.start();
 
-        Thread flushThread = new Thread() {
+        Thread[] flushThreads = new Thread[numFlushers];
+        for (int i = 0; i < numFlushers; ++i) {
+            Thread flushThread = new Thread() {
                 public void run() {
                     try {
-                        while (true) {
-                            Long id = ledgerQ.peek();
-                            if (id == null) {
-                                continue;
-                            }
-                            LOG.info("Put entry for {}", id);
-                            try {
-                                ledgerCache.putEntryOffset((long) id, 1, 0);
-                            } catch (Bookie.NoLedgerException nle) {
-                                //ignore
-                            }
+                        while (running.get()) {
                             ledgerCache.flushLedger(true);
                         }
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         rc.set(-1);
                         LOG.error("Exception in flush thread", e);
                     }
+                    LOG.error("Shutting down flush thread");
                 }
             };
-        flushThread.start();
+            flushThread.start();
+            flushThreads[i] = flushThread;
+        }
 
-        Thread deleteThread = new Thread() {
+        Thread[] deleteThreads = new Thread[numDeleters];
+        for (int i = 0; i < numDeleters; ++i) {
+            Thread deleteThread = new Thread() {
                 public void run() {
                     try {
                         while (true) {
                             long id = ledgerQ.take();
+                            if (id == -1L) {
+                                break;
+                            }
                             LOG.info("Deleting {}", id);
                             ledgerCache.deleteLedger(id);
                         }
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         rc.set(-1);
                         LOG.error("Exception in delete thread", e);
                     }
                 }
             };
-        deleteThread.start();
+            deleteThread.start();
+            deleteThreads[i] = deleteThread;
+        }
 
         newLedgerThread.join();
-        assertEquals("Should have been no errors", rc.get(), 0);
 
-        deleteThread.interrupt();
-        flushThread.interrupt();
+        for (Thread deleteThread : deleteThreads) {
+            deleteThread.join();
+        }
+
+        running.set(false);
+        for (Thread flushThread : flushThreads) {
+            flushThread.join();
+        }
+
+        assertEquals("Should have been no errors", rc.get(), 0);
+        for (long i = 0L; i < numLedgers; ++i) {
+            boolean gotError = false;
+            try {
+                LOG.error("Checking {}", i);
+                ledgerCache.getEntryOffset(i, 0);
+            } catch (NoLedgerException e) {
+                gotError = true;
+            }
+            if (!gotError) {
+                LOG.error("Ledger {} is still around", i);
+                fail("Found ledger " + i + ", which should have been removed");
+            }
+        }
     }
 
     // Mock SortedLedgerStorage to simulate flush failure (Dependency Fault 
Injection)

Reply via email to