This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.15 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit a8df522f0229c6826d23337e8f11ad22a169d35d Author: wenbingshen <[email protected]> AuthorDate: Wed Feb 15 15:15:11 2023 +0800 Not wrap IOException twice form checkpoint (#3683) ### Motivation IOException may be wrapped by RuntimeException, so don't wrap RuntimeException with IOException. (cherry picked from commit d6d9212a64a6c9dabd69a646082718929c43b4f9) --- .../ldb/SingleDirectoryDbLedgerStorage.java | 12 ++--------- .../bookkeeper/bookie/storage/ldb/WriteCache.java | 5 +++-- .../bookie/storage/ldb/WriteCacheTest.java | 24 +++++++++++++++++++--- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index fbc57823ba..d8646c1e43 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -731,12 +731,8 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage Batch batch = entryLocationIndex.newBatch(); writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> { - try { - long location = entryLogger.addEntry(ledgerId, entry, true); - entryLocationIndex.addLocation(batch, ledgerId, entryId, location); - } catch (IOException e) { - throw new RuntimeException(e); - } + long location = entryLogger.addEntry(ledgerId, entry, true); + entryLocationIndex.addLocation(batch, ledgerId, entryId, location); }); long entryLoggerStart = MathUtils.nowInNano(); @@ -789,10 +785,6 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage recordFailedEvent(dbLedgerStorageStats.getFlushStats(), startTime); // Leave IOExecption as it is throw e; - } catch (RuntimeException e) { - recordFailedEvent(dbLedgerStorageStats.getFlushStats(), startTime); - // Wrap unchecked exceptions - throw new IOException(e); } finally { try { isFlushOngoing.set(false); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java index 659162e46d..b3f37671ce 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java @@ -27,6 +27,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import java.io.Closeable; +import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; @@ -57,7 +58,7 @@ public class WriteCache implements Closeable { * Consumer that is used to scan the entire write cache. */ public interface EntryConsumer { - void accept(long ledgerId, long entryId, ByteBuf entry); + void accept(long ledgerId, long entryId, ByteBuf entry) throws IOException; } private final ConcurrentLongLongPairHashMap index = ConcurrentLongLongPairHashMap.newBuilder() @@ -220,7 +221,7 @@ public class WriteCache implements Closeable { private static final ArrayGroupSort groupSorter = new ArrayGroupSort(2, 4); - public void forEach(EntryConsumer consumer) { + public void forEach(EntryConsumer consumer) throws IOException { sortedEntriesLock.lock(); try { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java index e92f0e755e..cd433e8fd2 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java @@ -30,7 +30,8 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; - +import io.netty.util.ReferenceCountUtil; +import java.io.IOException; import java.nio.charset.Charset; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; @@ -144,7 +145,7 @@ public class WriteCacheTest { } @Test - public void testEmptyCache() { + public void testEmptyCache() throws IOException { WriteCache cache = new WriteCache(allocator, 1024 * 1024, 16 * 1024); assertEquals(0, cache.count()); @@ -222,7 +223,7 @@ public class WriteCacheTest { } @Test - public void testLedgerDeletion() { + public void testLedgerDeletion() throws IOException { WriteCache cache = new WriteCache(allocator, 1024 * 1024, 16 * 1024); ByteBuf entry = Unpooled.buffer(1024); @@ -306,4 +307,21 @@ public class WriteCacheTest { } assertFalse(cache.hasEntry(ledgerId, 48)); } + + @Test(expected = IOException.class) + public void testForEachIOException() throws Exception { + try (WriteCache cache = new WriteCache(allocator, 1024 * 1024, 16 * 1024)) { + + for (int i = 0; i < 48; i++) { + boolean inserted = cache.put(1, i, Unpooled.wrappedBuffer(("test-" + i).getBytes())); + assertTrue(inserted); + } + + assertEquals(48, cache.count()); + + cache.forEach(((ledgerId, entryId, entry) -> { + throw new IOException("test throw IOException."); + })); + } + } }
