This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 2b2661de68ad0d9b02fd964c1520def07d044911
Author: fengyubiao <[email protected]>
AuthorDate: Tue Sep 2 12:26:50 2025 +0800

    [fix]Wrong error code(-107) of opening a deleted ledger (#4657)
    
    (cherry picked from commit 5870922754b9585f7d3592dcd0714a4da65d9d80)
---
 .../apache/bookkeeper/bookie/BookieException.java  |  12 +++
 .../bookkeeper/bookie/HandleFactoryImpl.java       |   2 +-
 .../bookkeeper/proto/PerChannelBookieClient.java   |   5 +
 .../bookkeeper/proto/ReadEntryProcessor.java       |   2 +-
 .../bookkeeper/proto/ReadEntryProcessorV3.java     |   5 +-
 .../bookkeeper/proto/WriteEntryProcessor.java      |   4 +-
 .../bookkeeper/proto/WriteEntryProcessorV3.java    |   4 +-
 .../bookkeeper/proto/WriteLacProcessorV3.java      |   4 +
 .../org/apache/bookkeeper/client/TestFencing.java  | 113 +++++++++++++++++++++
 9 files changed, 143 insertions(+), 8 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
index 2b85961cf4..d88673d007 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
@@ -69,6 +69,8 @@ public abstract class BookieException extends Exception {
             return new MetadataStoreException();
         case Code.UnknownBookieIdException:
             return new UnknownBookieIdException();
+        case Code.LedgerFencedAndDeletedException:
+            return new LedgerFencedAndDeletedException();
         case Code.DataUnknownException:
             return new DataUnknownException();
         default:
@@ -95,6 +97,7 @@ public abstract class BookieException extends Exception {
         int CookieExistsException = -109;
         int EntryLogMetadataMapException = -110;
         int DataUnknownException = -111;
+        int LedgerFencedAndDeletedException = -112;
     }
 
     public int getCode() {
@@ -199,6 +202,15 @@ public abstract class BookieException extends Exception {
         }
     }
 
+    /**
+     * Signals that a ledger has been fenced in a bookie. No more entries can 
be appended to that ledger.
+     */
+    public static class LedgerFencedAndDeletedException extends 
BookieException {
+        public LedgerFencedAndDeletedException() {
+            super(Code.LedgerFencedException);
+        }
+    }
+
     /**
      * Signals that a ledger's operation has been rejected by an internal 
component because of the resource saturation.
      */
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
index b331f12506..ac87c3aed4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
@@ -57,7 +57,7 @@ class HandleFactoryImpl implements HandleFactory, 
LedgerDeletionListener {
 
         if (handle == null) {
             if (!journalReplay && 
recentlyFencedAndDeletedLedgers.getIfPresent(ledgerId) != null) {
-                throw 
BookieException.create(BookieException.Code.LedgerFencedException);
+                throw 
BookieException.create(BookieException.Code.LedgerFencedAndDeletedException);
             }
             handle = LedgerDescriptor.create(masterKey, ledgerId, 
ledgerStorage);
             ledgers.putIfAbsent(ledgerId, handle);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 80c3a1d9de..7bce38aae2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -1165,6 +1165,11 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                             completion.setOutstanding();
                         }
                     } else {
+                        try {
+                            future.get();
+                        } catch (Exception ex) {
+                            LOG.warn("Failed to request to the bookie: {}", 
bookieId, ex);
+                        }
                         
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), 
TimeUnit.NANOSECONDS);
                         errorOut(key);
                     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index 04efd9634b..9c6d672f48 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -93,7 +93,7 @@ class ReadEntryProcessor extends 
PacketProcessorBase<ReadRequest> {
                 handleReadResultForFenceRead(fenceResult, data, 
startTimeNanos);
                 return;
             }
-        } catch (Bookie.NoLedgerException e) {
+        } catch (Bookie.NoLedgerException | 
BookieException.LedgerFencedAndDeletedException e) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Error reading {}", request, e);
             }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index 999b8095db..05f9de0ea3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -215,9 +215,10 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
                 }
             }
             return readEntry(readResponse, entryId, startTimeSw);
-        } catch (Bookie.NoLedgerException e) {
+        } catch (Bookie.NoLedgerException | 
BookieException.LedgerFencedAndDeletedException e) {
             if (RequestUtils.isFenceRequest(readRequest)) {
-                LOG.info("No ledger found reading entry {} when fencing ledger 
{}", entryId, ledgerId);
+                LOG.info("No ledger found(or it has been deleted) reading 
entry {} when fencing ledger {}",
+                    entryId, ledgerId);
             } else if (entryId != BookieProtocol.LAST_ADD_CONFIRMED) {
                 LOG.info("No ledger found while reading entry: {} from ledger: 
{}", entryId, ledgerId);
             } else if (LOG.isDebugEnabled()) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index d611ab963a..665e2d3a1c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -90,8 +90,8 @@ class WriteEntryProcessor extends 
PacketProcessorBase<ParsedAddRequest> implemen
         } catch (IOException e) {
             LOG.error("Error writing {}", request, e);
             rc = BookieProtocol.EIO;
-        } catch (BookieException.LedgerFencedException lfe) {
-            LOG.warn("Write attempt on fenced ledger {} by client {}", 
request.getLedgerId(),
+        } catch (BookieException.LedgerFencedException | 
BookieException.LedgerFencedAndDeletedException lfe) {
+            LOG.warn("Write attempt on fenced/deleted ledger {} by client {}", 
request.getLedgerId(),
                     requestHandler.ctx().channel().remoteAddress());
             rc = BookieProtocol.EFENCED;
         } catch (BookieException e) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index 36aff7ad92..79d9f41d5c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -136,8 +136,8 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
             logger.error("Error writing entry:{} to ledger:{}",
                     entryId, ledgerId, e);
             status = StatusCode.EIO;
-        } catch (BookieException.LedgerFencedException e) {
-            logger.error("Ledger fenced while writing entry:{} to ledger:{}",
+        } catch (BookieException.LedgerFencedException | 
BookieException.LedgerFencedAndDeletedException e) {
+            logger.error("Ledger fenced/deleted while writing entry:{} to 
ledger:{}",
                     entryId, ledgerId, e);
             status = StatusCode.EFENCED;
         } catch (BookieException e) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
index 293cea3bb0..81d1bc6605 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
@@ -105,6 +105,10 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 
implements Runnable {
             
requestProcessor.bookie.setExplicitLac(Unpooled.wrappedBuffer(lacToAdd),
                     writeCallback, requestHandler, masterKey);
             status = StatusCode.EOK;
+        } catch (BookieException.LedgerFencedAndDeletedException e) {
+            logger.error("Error saving lac {} for ledger:{}, which has been 
deleted",
+                    lac, ledgerId, e);
+            status = StatusCode.ENOLEDGER;
         } catch (IOException e) {
             logger.error("Error saving lac {} for ledger:{}",
                     lac, ledgerId, e);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
index 77382b4ebd..9f7a81450b 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
@@ -24,6 +24,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import lombok.extern.slf4j.Slf4j;
@@ -33,10 +36,16 @@ import org.apache.bookkeeper.bookie.LedgerStorage;
 import org.apache.bookkeeper.bookie.SortedLedgerStorage;
 import org.apache.bookkeeper.bookie.storage.ldb.SingleDirectoryDbLedgerStorage;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookieClientImpl;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.proto.PerChannelBookieClient;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.awaitility.reflect.WhiteboxImpl;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -152,6 +161,110 @@ public class TestFencing extends 
BookKeeperClusterTestCase {
         }
     }
 
+    @Test(timeout = 3000 * 1000)
+    public void testConcurrentFenceAndDeleteLedger() throws Exception {
+        LedgerHandle writeLedger;
+        writeLedger = bkc.createLedger(digestType, "password".getBytes());
+
+        String tmp = "BookKeeper is cool!";
+        long lac = 0;
+        for (int i = 0; i < 10; i++) {
+            long entryId = writeLedger.addEntry(tmp.getBytes());
+            LOG.info("entryId: {}", entryId);
+            lac = entryId;
+        }
+
+        // Fence and delete.
+        final BookieId bookieId = 
writeLedger.getLedgerMetadata().getEnsembleAt(0).get(0);
+        ClientConfiguration clientConfiguration2 = newClientConfiguration();
+        clientConfiguration2.setUseV2WireProtocol(true);
+        ClientConfiguration clientConfiguration3 = newClientConfiguration();
+        BookKeeperTestClient bkcV2 = new 
BookKeeperTestClient(clientConfiguration2, new TestStatsProvider());
+        LedgerHandle  writeLedgerV2 = bkcV2.createLedger(digestType, 
"password".getBytes());
+        BookKeeperTestClient bkcV3 = new 
BookKeeperTestClient(clientConfiguration3, new TestStatsProvider());
+        LedgerHandle  writeLedgerV3 = bkcV3.createLedger(digestType, 
"password".getBytes());
+        ReadOnlyLedgerHandle readLedgerV2 =
+                (ReadOnlyLedgerHandle) bkcV2.openLedger(writeLedger.getId(), 
digestType, "password".getBytes());
+        ReadOnlyLedgerHandle readLedgerV3 =
+                (ReadOnlyLedgerHandle) bkcV3.openLedger(writeLedger.getId(), 
digestType, "password".getBytes());
+        BookieClientImpl bookieClientV2 = (BookieClientImpl) 
readLedgerV2.clientCtx.getBookieClient();
+        BookieClientImpl bookieClientV3 = (BookieClientImpl) 
readLedgerV3.clientCtx.getBookieClient();
+        // Trigger opening connection.
+        CompletableFuture<Integer> obtainV2 = new CompletableFuture<>();
+        bookieClientV2.lookupClient(bookieId).obtain(
+            new 
BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>() {
+                @Override
+                public void operationComplete(int rc, PerChannelBookieClient 
result) {
+                    obtainV2.complete(rc);
+                }
+            }, writeLedger.getId());
+        assertEquals(obtainV2.get().intValue(), BKException.Code.OK);
+        CompletableFuture<Integer> obtainV3 = new CompletableFuture<>();
+        bookieClientV3.lookupClient(bookieId).obtain(
+            new 
BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>() {
+                @Override
+                public void operationComplete(int rc, PerChannelBookieClient 
result) {
+                    obtainV3.complete(rc);
+                }
+            }, writeLedger.getId());
+        assertEquals(obtainV3.get().intValue(), BKException.Code.OK);
+        bkcV3.deleteLedger(readLedgerV3.ledgerId);
+
+        // Waiting for GC.
+        for (ServerTester server : servers) {
+            triggerGC(server.getServer().getBookie());
+        }
+
+        // Verify: read requests with V2 protocol will receive a 
NoSuchLedgerException.
+        final byte readEntryFlagFencing = 1;
+        CompletableFuture<Integer> readResV2 = new CompletableFuture<>();
+        bookieClientV2.readEntry(bookieId,
+                writeLedger.getId(), 0, (rc, ledgerId, entryId1, buffer, ctx) 
-> {
+                    readResV2.complete(rc);
+                }, null, readEntryFlagFencing, readLedgerV2.ledgerKey);
+        assertEquals(BKException.Code.NoSuchLedgerExistsException, 
readResV2.get().intValue());
+        // Verify: read requests with V3 protocol will receive a 
NoSuchLedgerException.
+        CompletableFuture<Integer> readResV3 = new CompletableFuture<>();
+        bookieClientV3.readEntry(bookieId,
+                writeLedger.getId(), 0, (rc, ledgerId, entryId1, buffer, ctx) 
-> {
+                    readResV3.complete(rc);
+                }, null, readEntryFlagFencing, readLedgerV3.ledgerKey);
+        assertEquals(BKException.Code.NoSuchLedgerExistsException, 
readResV3.get().intValue());
+        // Verify: add requests with V2 protocol will receive a 
NoSuchLedgerException.
+        log.info("Try to add the next entry: {}:{}", writeLedger.getId(), lac 
+ 1);
+        final ByteBuf dataV2 = UnpooledByteBufAllocator.DEFAULT.heapBuffer();
+        // Combine add request, and rewrite ledgerId of the request.
+        dataV2.writeByte(1);
+        final ByteBuf toSendV2 = (ByteBuf) 
writeLedgerV2.macManager.computeDigestAndPackageForSending(
+                lac + 1, lac, 1, dataV2, writeLedger.ledgerKey, 
BookieProtocol.FLAG_NONE);
+        toSendV2.setLong(28, writeLedger.getId());
+        CompletableFuture<Integer> addResV2 = new CompletableFuture<>();
+        bookieClientV2.addEntry(bookieId, writeLedger.getId(), 
writeLedger.ledgerKey, lac + 1, toSendV2,
+                (rc, ledgerId, entryId1, addr, ctx) -> {
+                    addResV2.complete(rc);
+                }, null, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
+        assertEquals(BKException.Code.LedgerFencedException, 
addResV2.get().intValue());
+        // Verify: read requests with V3 protocol will receive a 
NoSuchLedgerException.
+        final ByteBuf dataV3 = UnpooledByteBufAllocator.DEFAULT.heapBuffer();
+        dataV3.writeByte(1);
+        // Combine add request, and rewrite ledgerId of the request.
+        final ByteBufList toSendV3 = (ByteBufList) 
writeLedgerV3.macManager.computeDigestAndPackageForSending(
+                lac + 1, lac, 1, dataV3, writeLedger.ledgerKey, 
BookieProtocol.FLAG_NONE);
+        toSendV3.getBuffer(0).setLong(0, writeLedger.getId());
+        CompletableFuture<Integer> addResV3 = new CompletableFuture<>();
+        bookieClientV3.addEntry(bookieId, writeLedger.getId(), 
writeLedger.ledgerKey, lac + 1, toSendV3,
+                (rc, ledgerId, entryId1, addr, ctx) -> {
+                    addResV3.complete(rc);
+                }, null, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
+        assertEquals(BKException.Code.LedgerFencedException, 
addResV3.get().intValue());
+
+        // cleanup.
+        writeLedgerV2.close();
+        writeLedgerV3.close();
+        bkcV2.close();
+        bkcV3.close();
+    }
+
     private static int threadCount = 0;
 
     class LedgerOpenThread extends Thread {

Reply via email to