This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.16 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 2b2661de68ad0d9b02fd964c1520def07d044911 Author: fengyubiao <[email protected]> AuthorDate: Tue Sep 2 12:26:50 2025 +0800 [fix]Wrong error code(-107) of opening a deleted ledger (#4657) (cherry picked from commit 5870922754b9585f7d3592dcd0714a4da65d9d80) --- .../apache/bookkeeper/bookie/BookieException.java | 12 +++ .../bookkeeper/bookie/HandleFactoryImpl.java | 2 +- .../bookkeeper/proto/PerChannelBookieClient.java | 5 + .../bookkeeper/proto/ReadEntryProcessor.java | 2 +- .../bookkeeper/proto/ReadEntryProcessorV3.java | 5 +- .../bookkeeper/proto/WriteEntryProcessor.java | 4 +- .../bookkeeper/proto/WriteEntryProcessorV3.java | 4 +- .../bookkeeper/proto/WriteLacProcessorV3.java | 4 + .../org/apache/bookkeeper/client/TestFencing.java | 113 +++++++++++++++++++++ 9 files changed, 143 insertions(+), 8 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java index 2b85961cf4..d88673d007 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java @@ -69,6 +69,8 @@ public abstract class BookieException extends Exception { return new MetadataStoreException(); case Code.UnknownBookieIdException: return new UnknownBookieIdException(); + case Code.LedgerFencedAndDeletedException: + return new LedgerFencedAndDeletedException(); case Code.DataUnknownException: return new DataUnknownException(); default: @@ -95,6 +97,7 @@ public abstract class BookieException extends Exception { int CookieExistsException = -109; int EntryLogMetadataMapException = -110; int DataUnknownException = -111; + int LedgerFencedAndDeletedException = -112; } public int getCode() { @@ -199,6 +202,15 @@ public abstract class BookieException extends Exception { } } + /** + * Signals that a ledger has been fenced in a bookie. No more entries can be appended to that ledger. + */ + public static class LedgerFencedAndDeletedException extends BookieException { + public LedgerFencedAndDeletedException() { + super(Code.LedgerFencedException); + } + } + /** * Signals that a ledger's operation has been rejected by an internal component because of the resource saturation. */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java index b331f12506..ac87c3aed4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java @@ -57,7 +57,7 @@ class HandleFactoryImpl implements HandleFactory, LedgerDeletionListener { if (handle == null) { if (!journalReplay && recentlyFencedAndDeletedLedgers.getIfPresent(ledgerId) != null) { - throw BookieException.create(BookieException.Code.LedgerFencedException); + throw BookieException.create(BookieException.Code.LedgerFencedAndDeletedException); } handle = LedgerDescriptor.create(masterKey, ledgerId, ledgerStorage); ledgers.putIfAbsent(ledgerId, handle); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 80c3a1d9de..7bce38aae2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -1165,6 +1165,11 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { completion.setOutstanding(); } } else { + try { + future.get(); + } catch (Exception ex) { + LOG.warn("Failed to request to the bookie: {}", bookieId, ex); + } nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); errorOut(key); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java index 04efd9634b..9c6d672f48 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java @@ -93,7 +93,7 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> { handleReadResultForFenceRead(fenceResult, data, startTimeNanos); return; } - } catch (Bookie.NoLedgerException e) { + } catch (Bookie.NoLedgerException | BookieException.LedgerFencedAndDeletedException e) { if (LOG.isDebugEnabled()) { LOG.debug("Error reading {}", request, e); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java index 999b8095db..05f9de0ea3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java @@ -215,9 +215,10 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 { } } return readEntry(readResponse, entryId, startTimeSw); - } catch (Bookie.NoLedgerException e) { + } catch (Bookie.NoLedgerException | BookieException.LedgerFencedAndDeletedException e) { if (RequestUtils.isFenceRequest(readRequest)) { - LOG.info("No ledger found reading entry {} when fencing ledger {}", entryId, ledgerId); + LOG.info("No ledger found(or it has been deleted) reading entry {} when fencing ledger {}", + entryId, ledgerId); } else if (entryId != BookieProtocol.LAST_ADD_CONFIRMED) { LOG.info("No ledger found while reading entry: {} from ledger: {}", entryId, ledgerId); } else if (LOG.isDebugEnabled()) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java index d611ab963a..665e2d3a1c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java @@ -90,8 +90,8 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen } catch (IOException e) { LOG.error("Error writing {}", request, e); rc = BookieProtocol.EIO; - } catch (BookieException.LedgerFencedException lfe) { - LOG.warn("Write attempt on fenced ledger {} by client {}", request.getLedgerId(), + } catch (BookieException.LedgerFencedException | BookieException.LedgerFencedAndDeletedException lfe) { + LOG.warn("Write attempt on fenced/deleted ledger {} by client {}", request.getLedgerId(), requestHandler.ctx().channel().remoteAddress()); rc = BookieProtocol.EFENCED; } catch (BookieException e) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java index 36aff7ad92..79d9f41d5c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java @@ -136,8 +136,8 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 { logger.error("Error writing entry:{} to ledger:{}", entryId, ledgerId, e); status = StatusCode.EIO; - } catch (BookieException.LedgerFencedException e) { - logger.error("Ledger fenced while writing entry:{} to ledger:{}", + } catch (BookieException.LedgerFencedException | BookieException.LedgerFencedAndDeletedException e) { + logger.error("Ledger fenced/deleted while writing entry:{} to ledger:{}", entryId, ledgerId, e); status = StatusCode.EFENCED; } catch (BookieException e) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java index 293cea3bb0..81d1bc6605 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java @@ -105,6 +105,10 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable { requestProcessor.bookie.setExplicitLac(Unpooled.wrappedBuffer(lacToAdd), writeCallback, requestHandler, masterKey); status = StatusCode.EOK; + } catch (BookieException.LedgerFencedAndDeletedException e) { + logger.error("Error saving lac {} for ledger:{}, which has been deleted", + lac, ledgerId, e); + status = StatusCode.ENOLEDGER; } catch (IOException e) { logger.error("Error saving lac {} for ledger:{}", lac, ledgerId, e); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java index 77382b4ebd..9f7a81450b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java @@ -24,6 +24,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import lombok.extern.slf4j.Slf4j; @@ -33,10 +36,16 @@ import org.apache.bookkeeper.bookie.LedgerStorage; import org.apache.bookkeeper.bookie.SortedLedgerStorage; import org.apache.bookkeeper.bookie.storage.ldb.SingleDirectoryDbLedgerStorage; import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.api.WriteFlag; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookieClientImpl; +import org.apache.bookkeeper.proto.BookieProtocol; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; +import org.apache.bookkeeper.proto.PerChannelBookieClient; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.test.TestStatsProvider; +import org.apache.bookkeeper.util.ByteBufList; import org.awaitility.reflect.WhiteboxImpl; import org.junit.Test; import org.slf4j.Logger; @@ -152,6 +161,110 @@ public class TestFencing extends BookKeeperClusterTestCase { } } + @Test(timeout = 3000 * 1000) + public void testConcurrentFenceAndDeleteLedger() throws Exception { + LedgerHandle writeLedger; + writeLedger = bkc.createLedger(digestType, "password".getBytes()); + + String tmp = "BookKeeper is cool!"; + long lac = 0; + for (int i = 0; i < 10; i++) { + long entryId = writeLedger.addEntry(tmp.getBytes()); + LOG.info("entryId: {}", entryId); + lac = entryId; + } + + // Fence and delete. + final BookieId bookieId = writeLedger.getLedgerMetadata().getEnsembleAt(0).get(0); + ClientConfiguration clientConfiguration2 = newClientConfiguration(); + clientConfiguration2.setUseV2WireProtocol(true); + ClientConfiguration clientConfiguration3 = newClientConfiguration(); + BookKeeperTestClient bkcV2 = new BookKeeperTestClient(clientConfiguration2, new TestStatsProvider()); + LedgerHandle writeLedgerV2 = bkcV2.createLedger(digestType, "password".getBytes()); + BookKeeperTestClient bkcV3 = new BookKeeperTestClient(clientConfiguration3, new TestStatsProvider()); + LedgerHandle writeLedgerV3 = bkcV3.createLedger(digestType, "password".getBytes()); + ReadOnlyLedgerHandle readLedgerV2 = + (ReadOnlyLedgerHandle) bkcV2.openLedger(writeLedger.getId(), digestType, "password".getBytes()); + ReadOnlyLedgerHandle readLedgerV3 = + (ReadOnlyLedgerHandle) bkcV3.openLedger(writeLedger.getId(), digestType, "password".getBytes()); + BookieClientImpl bookieClientV2 = (BookieClientImpl) readLedgerV2.clientCtx.getBookieClient(); + BookieClientImpl bookieClientV3 = (BookieClientImpl) readLedgerV3.clientCtx.getBookieClient(); + // Trigger opening connection. + CompletableFuture<Integer> obtainV2 = new CompletableFuture<>(); + bookieClientV2.lookupClient(bookieId).obtain( + new BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>() { + @Override + public void operationComplete(int rc, PerChannelBookieClient result) { + obtainV2.complete(rc); + } + }, writeLedger.getId()); + assertEquals(obtainV2.get().intValue(), BKException.Code.OK); + CompletableFuture<Integer> obtainV3 = new CompletableFuture<>(); + bookieClientV3.lookupClient(bookieId).obtain( + new BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>() { + @Override + public void operationComplete(int rc, PerChannelBookieClient result) { + obtainV3.complete(rc); + } + }, writeLedger.getId()); + assertEquals(obtainV3.get().intValue(), BKException.Code.OK); + bkcV3.deleteLedger(readLedgerV3.ledgerId); + + // Waiting for GC. + for (ServerTester server : servers) { + triggerGC(server.getServer().getBookie()); + } + + // Verify: read requests with V2 protocol will receive a NoSuchLedgerException. + final byte readEntryFlagFencing = 1; + CompletableFuture<Integer> readResV2 = new CompletableFuture<>(); + bookieClientV2.readEntry(bookieId, + writeLedger.getId(), 0, (rc, ledgerId, entryId1, buffer, ctx) -> { + readResV2.complete(rc); + }, null, readEntryFlagFencing, readLedgerV2.ledgerKey); + assertEquals(BKException.Code.NoSuchLedgerExistsException, readResV2.get().intValue()); + // Verify: read requests with V3 protocol will receive a NoSuchLedgerException. + CompletableFuture<Integer> readResV3 = new CompletableFuture<>(); + bookieClientV3.readEntry(bookieId, + writeLedger.getId(), 0, (rc, ledgerId, entryId1, buffer, ctx) -> { + readResV3.complete(rc); + }, null, readEntryFlagFencing, readLedgerV3.ledgerKey); + assertEquals(BKException.Code.NoSuchLedgerExistsException, readResV3.get().intValue()); + // Verify: add requests with V2 protocol will receive a NoSuchLedgerException. + log.info("Try to add the next entry: {}:{}", writeLedger.getId(), lac + 1); + final ByteBuf dataV2 = UnpooledByteBufAllocator.DEFAULT.heapBuffer(); + // Combine add request, and rewrite ledgerId of the request. + dataV2.writeByte(1); + final ByteBuf toSendV2 = (ByteBuf) writeLedgerV2.macManager.computeDigestAndPackageForSending( + lac + 1, lac, 1, dataV2, writeLedger.ledgerKey, BookieProtocol.FLAG_NONE); + toSendV2.setLong(28, writeLedger.getId()); + CompletableFuture<Integer> addResV2 = new CompletableFuture<>(); + bookieClientV2.addEntry(bookieId, writeLedger.getId(), writeLedger.ledgerKey, lac + 1, toSendV2, + (rc, ledgerId, entryId1, addr, ctx) -> { + addResV2.complete(rc); + }, null, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE); + assertEquals(BKException.Code.LedgerFencedException, addResV2.get().intValue()); + // Verify: read requests with V3 protocol will receive a NoSuchLedgerException. + final ByteBuf dataV3 = UnpooledByteBufAllocator.DEFAULT.heapBuffer(); + dataV3.writeByte(1); + // Combine add request, and rewrite ledgerId of the request. + final ByteBufList toSendV3 = (ByteBufList) writeLedgerV3.macManager.computeDigestAndPackageForSending( + lac + 1, lac, 1, dataV3, writeLedger.ledgerKey, BookieProtocol.FLAG_NONE); + toSendV3.getBuffer(0).setLong(0, writeLedger.getId()); + CompletableFuture<Integer> addResV3 = new CompletableFuture<>(); + bookieClientV3.addEntry(bookieId, writeLedger.getId(), writeLedger.ledgerKey, lac + 1, toSendV3, + (rc, ledgerId, entryId1, addr, ctx) -> { + addResV3.complete(rc); + }, null, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE); + assertEquals(BKException.Code.LedgerFencedException, addResV3.get().intValue()); + + // cleanup. + writeLedgerV2.close(); + writeLedgerV3.close(); + bkcV2.close(); + bkcV3.close(); + } + private static int threadCount = 0; class LedgerOpenThread extends Thread {
