athanatos commented on a change in pull request #1769: ISSUE-1757: prevent race
between flush and delete from recreating index
URL: https://github.com/apache/bookkeeper/pull/1769#discussion_r229373296
##########
File path:
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
##########
@@ -379,76 +379,110 @@ public void testSyncThreadNPE() throws IOException {
}
/**
- * Race where a flush would fail because a garbage collection occurred at
- * the wrong time.
+ * Test for race between delete and flush.
* {@link https://issues.apache.org/jira/browse/BOOKKEEPER-604}
+ * {@link https://github.com/apache/bookkeeper/issues/1757}
*/
@Test
public void testFlushDeleteRace() throws Exception {
newLedgerCache();
final AtomicInteger rc = new AtomicInteger(0);
- final LinkedBlockingQueue<Long> ledgerQ = new
LinkedBlockingQueue<Long>(1);
+ final LinkedBlockingQueue<Long> ledgerQ = new
LinkedBlockingQueue<>(100);
final byte[] masterKey = "masterKey".getBytes();
+ final long numLedgers = 1000;
+ final int numFlushers = 10;
+ final int numDeleters = 10;
+ final AtomicBoolean running = new AtomicBoolean(true);
Thread newLedgerThread = new Thread() {
public void run() {
try {
- for (int i = 0; i < 1000 && rc.get() == 0; i++) {
+ for (long i = 0; i < numLedgers && rc.get() == 0; i++)
{
ledgerCache.setMasterKey(i, masterKey);
- ledgerQ.put((long) i);
+
+ ledgerCache.putEntryOffset(i, 1, 0);
+ //ledgerCache.putEntryOffset(i, 1024, 0);
+ //ledgerCache.putEntryOffset(i, 4096, 0);
+ //ledgerCache.putEntryOffset(i, 8192, 0);
+ ledgerQ.put(i);
+ }
+ for (int i = 0; i < numDeleters; ++i) {
+ ledgerQ.put(-1L);
}
- } catch (Exception e) {
+ } catch (Throwable e) {
rc.set(-1);
LOG.error("Exception in new ledger thread", e);
}
}
};
newLedgerThread.start();
- Thread flushThread = new Thread() {
+ Thread[] flushThreads = new Thread[numFlushers];
+ for (int i = 0; i < numFlushers; ++i) {
+ Thread flushThread = new Thread() {
public void run() {
try {
- while (true) {
- Long id = ledgerQ.peek();
- if (id == null) {
- continue;
- }
- LOG.info("Put entry for {}", id);
- try {
- ledgerCache.putEntryOffset((long) id, 1, 0);
- } catch (Bookie.NoLedgerException nle) {
- //ignore
- }
+ while (running.get()) {
ledgerCache.flushLedger(true);
}
- } catch (Exception e) {
+ } catch (Throwable e) {
rc.set(-1);
LOG.error("Exception in flush thread", e);
}
+ LOG.error("Shutting down flush thread");
}
};
- flushThread.start();
+ flushThread.start();
+ flushThreads[i] = flushThread;
+ }
- Thread deleteThread = new Thread() {
+ Thread[] deleteThreads = new Thread[numDeleters];
+ for (int i = 0; i < numDeleters; ++i) {
+ Thread deleteThread = new Thread() {
public void run() {
try {
while (true) {
long id = ledgerQ.take();
+ if (id == -1L) {
+ break;
+ }
LOG.info("Deleting {}", id);
ledgerCache.deleteLedger(id);
}
- } catch (Exception e) {
+ } catch (Throwable e) {
rc.set(-1);
LOG.error("Exception in delete thread", e);
}
}
};
- deleteThread.start();
+ deleteThread.start();
+ deleteThreads[i] = deleteThread;
+ }
newLedgerThread.join();
- assertEquals("Should have been no errors", rc.get(), 0);
- deleteThread.interrupt();
- flushThread.interrupt();
+ for (Thread deleteThread : deleteThreads) {
+ deleteThread.join();
+ }
+
+ running.set(false);
+ for (Thread flushThread : flushThreads) {
+ flushThread.join();
+ }
+
+ assertEquals("Should have been no errors", rc.get(), 0);
+ for (long i = 0L; i < numLedgers; ++i) {
+ boolean gotError = false;
+ try {
+ LOG.error("Checking {}", i);
+ ledgerCache.getEntryOffset(i, 0);
+ } catch (NoLedgerException e) {
+ gotError = true;
+ }
+ if (!gotError) {
+ LOG.error("Ledger {} is still around", i);
+ assertTrue(gotError);
Review comment:
Ok
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services