This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 277a9743c6689463f9a7d2cfc94ec558526758c2
Author: Hang Chen <[email protected]>
AuthorDate: Mon Jun 19 15:07:05 2023 +0800

    Fix ledger replicated failed blocks bookie decommission process (#3917)
    
    ### Motivation
    When I decommission one bookie (bk3), one ledger replicate failed and 
blocked decommission process.
    
    This is the auto-recovery log:
    ```
    2023-03-29T06:29:22,642+0000 [ReplicationWorker] ERROR 
org.apache.bookkeeper.client.LedgerHandle - ReadEntries exception on 
ledgerId:904368 firstEntry:14 lastEntry:14 lastAddConfirmed:13
    2023-03-29T06:29:22,642+0000 [ReplicationWorker] ERROR 
org.apache.bookkeeper.replication.ReplicationWorker - Received error: -1 while 
trying to read entry: 14 of ledger: 904368 in ReplicationWorker
    2023-03-29T06:29:22,642+0000 [ReplicationWorker] ERROR 
org.apache.bookkeeper.replication.ReplicationWorker - Failed to read faulty 
entries, so giving up replicating ledgerFragment Fragment(LedgerID: 904368, 
FirstEntryID: 0[0], LastKnownEntryID: 14[14], Host: 
[betausc1-bk-10.betausc1-bk-headless.o-vaxkx.svc.cluster.local:3181], Closed: 
true)
    2023-03-29T06:29:22,644+0000 [ReplicationWorker] ERROR 
org.apache.bookkeeper.replication.ReplicationWorker - ReplicationWorker failed 
to replicate Ledger : 904368 for 6 number of times, so deferring the ledger 
lock release by 300000 msecs
    ```
    The ledger's metadata:
    ```
    ledgerID: 904368
    2023-03-29T06:47:56,511+0000 [main] INFO  
org.apache.bookkeeper.tools.cli.commands.
    client.LedgerMetaDataCommand - LedgerMetadata{formatVersion=3, 
ensembleSize=3, writeQuorumSize=3,
    ackQuorumSize=2, state=OPEN, digestType=CRC32C, password=base64:,
    ensembles={0=[bk1:3181, bk2:3181, bk3:3181], 15=[bk1:3181, bk2:3181, 
bk4:3181]},...}
    ```
    
    The ledger (904368) has two ensembles, `ensembles={0=[bk1:3181, bk2:3181, 
bk3:3181], 15=[bk1:3181, bk2:3181, bk4:3181]}`. However, the replication worker 
got the ledger's LAC is 13, but it got the replication fragment entry range is 
[0, 14]. When reading entry 14, it failed.
    
    ### One question
    **Why the ensembles created a new ensemble starting with entryId = 15, but 
the ledger's lastAddConfirm is 13.**
    
    This question is related to two parts, one is how the new ensemble was 
created and the other is how the lastAddConfirm was generated.
    
    #### 1. How the new ensemble was created
    The ensemble change is controlled on the bookie client side.
    
    When one entry is ready to send to the bookie server, the bookie client 
will check whether need to do the ensemble change.
    
https://github.com/apache/bookkeeper/blob/912896deb2e748389e15e74c37539b2ff36302c7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java#L254
    
    For the above case, when writing entry 15, one bookie is lost, it will 
trigger the ensemble change and generate the new ensemble: 15=[bk1:3181, 
bk2:3181, bk4:3181]. However, entry 15 write failed, such as timeout or bookie 
server rejected the write.
    
    For now, entry 14 is written succeed.
    
    #### 2. How the lastAddConfirm was generated
    Due to the ledger being in the `OPEN` state, the ledger handle will send a 
readLAC request according to the last ensemble to get the ledger's 
lastAddConfirm.
    
    For the above case, the readLAC request will send to bk1, bk2, and bk4.
    
    For the `V2` protocol (Pulsar uses the V2 protocol to interact with the 
BookKeeper cluster), the bookie client put the lastAddConfirm EntryId in the 
next Entry's metadata.
    
https://github.com/apache/bookkeeper/blob/df4492012cc03682534cbc8dd68dd81163b0c947/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java#L134
    
    When we use the `V2` protocol to open an `OPEN` state ledger to read, it 
will send a readLastAddConfirm request to the bookie server, and the bookie 
server gets the last entry of this ledger and return to the client.
    
https://github.com/apache/bookkeeper/blob/df4492012cc03682534cbc8dd68dd81163b0c947/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java#L108
    
    However, the bookie client will parse the response entry and get the 
lastAddConfirm from the entry's metadata. Due to the entry just recording the 
previous EntryId as the lastAddConfirm, the LedgerHandle got the lastAddConfirm 
will be the penultimate EntryId of the ledger.
    
    For the above case, the bk1 holds the max entry 14, bk2 holds the max entry 
14, and bk4 returns NoSuchEntryException, LedgerHandle gets lastAddConfirm will 
be `14  - 1 = 13`, not 14.
    
    When the replicator tries to recover the first ensemble 0=[bk1:3181, 
bk2:3181, bk3:3181] with entry range [0, 14],  reading entry 14 will throw a 
ReadEntryException due to the lastAddConfirm is 13.
    
https://github.com/apache/bookkeeper/blob/df4492012cc03682534cbc8dd68dd81163b0c947/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java#L685-L690
    
    ### Solution
    When encountered that case that
    - The ledger is `OPEN`
    - The ledger has multiple ensembles
    - The ledger's last ensemble doesn't have any entries, which means 
`lastAddConfirm < last ensemble key - 1`
    
    We should treat the penultimate segment/ensemble of the ledger as an `OPEN` 
state instead of a closed state.
    
https://github.com/apache/bookkeeper/blob/df4492012cc03682534cbc8dd68dd81163b0c947/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java#L56-L57
    
    After we treat the segment/ensemble as `OPEN` state, the replicator will 
close the ledger first and replicate it.
    
    (cherry picked from commit eff38e45e317b90eb1cd456bd5d5629dedc1fd5f)
---
 .../apache/bookkeeper/client/LedgerFragment.java   |  7 +-
 .../bookkeeper/replication/ReplicationWorker.java  | 14 ++++
 .../bookkeeper/client/BookieWriteLedgerTest.java   | 75 ++++++++++++++++++++++
 .../replication/ReplicationTestUtil.java           |  2 +-
 .../replication/TestReplicationWorker.java         | 28 ++++++++
 5 files changed, 124 insertions(+), 2 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
index 1fb1e50cb0..c4d6dbfd6e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
@@ -52,8 +52,13 @@ public class LedgerFragment {
         this.schedule = lh.getDistributionSchedule();
         SortedMap<Long, ? extends List<BookieId>> ensembles = lh
                 .getLedgerMetadata().getAllEnsembles();
+        // Check if the ledger fragment is closed has two conditions
+        // 1. The ledger is closed
+        // 2. This fragment is not the last fragment and this ledger's 
lastAddConfirm >= ensembles.lastKey() - 1.
+        //    This case happens when the ledger's last ensemble is empty
         this.isLedgerClosed = lh.getLedgerMetadata().isClosed()
-                || !ensemble.equals(ensembles.get(ensembles.lastKey()));
+                || (!ensemble.equals(ensembles.get(ensembles.lastKey()))
+            && lh.getLastAddConfirmed() >= ensembles.lastKey() - 1);
     }
 
     LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index c4a0e430d6..315d9b8aff 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -33,6 +33,7 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -42,6 +43,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -475,6 +477,14 @@ public class ReplicationWorker implements Runnable {
      *
      * <p>Missing bookies in closed ledgers are fine, as we know the last 
confirmed add, so
      * we can tell which entries are supposed to exist and rereplicate them if 
necessary.
+     *
+     * <p>Another corner case is that there are multiple ensembles in the 
ledger and the last
+     * segment/ensemble is open, but nothing has been written to some quorums 
in the ensemble.
+     * For the v2 protocol, this ledger's lastAddConfirm entry is the last 
segment/ensemble's `key - 2`,
+     * not `key - 2`, the explanation please refer to: 
https://github.com/apache/bookkeeper/pull/3917.
+     * If we treat the penultimate segment/ensemble as closed state, we will 
can't replicate
+     * the last entry in the segment. So in this case, we should also check if 
the penultimate
+     * segment/ensemble has missing bookies.
      */
     private boolean isLastSegmentOpenAndMissingBookies(LedgerHandle lh) throws 
BKException {
         LedgerMetadata md = admin.getLedgerMetadata(lh);
@@ -484,6 +494,10 @@ public class ReplicationWorker implements Runnable {
 
         SortedMap<Long, ? extends List<BookieId>> ensembles = 
admin.getLedgerMetadata(lh).getAllEnsembles();
         List<BookieId> finalEnsemble = ensembles.get(ensembles.lastKey());
+        if (ensembles.size() > 1 && lh.getLastAddConfirmed() < 
ensembles.lastKey() - 1) {
+            finalEnsemble = new ArrayList<>(finalEnsemble);
+            finalEnsemble.addAll((new 
TreeMap<>(ensembles)).floorEntry(ensembles.lastKey() - 1).getValue());
+        }
         Collection<BookieId> available = admin.getAvailableBookies();
         for (BookieId b : finalEnsemble) {
             if (!available.contains(b)) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index ce2d6f9b64..8dbe789ab3 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -26,6 +26,7 @@ import static 
org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE;
 import static org.apache.bookkeeper.client.BookKeeperClientStats.READ_OP_DM;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import com.google.common.collect.Lists;
@@ -35,9 +36,11 @@ import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import java.io.IOException;
+import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
@@ -58,12 +61,22 @@ import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.api.WriteAdvHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.replication.ReplicationTestUtil;
+import org.apache.bookkeeper.replication.ReplicationWorker;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.commons.lang3.tuple.Pair;
+import org.awaitility.Awaitility;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -1425,6 +1438,68 @@ public class BookieWriteLedgerTest extends
         bkc.deleteLedger(lh.ledgerId);
     }
 
+    @Test
+    public void testReadLacNotSameWithMetadataLedgerReplication() throws 
Exception {
+       lh = bkc.createLedger(3, 3, 2, digestType, ledgerPassword);
+        for (int i = 0; i < 10; ++i) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+            lh.addEntry(entry.array());
+        }
+
+        List<BookieId> ensemble = 
lh.getLedgerMetadata().getAllEnsembles().entrySet().iterator().next().getValue();
+        assertEquals(1, lh.getLedgerMetadata().getAllEnsembles().size());
+        killBookie(ensemble.get(1));
+
+        try {
+            lh.ensembleChangeLoop(ensemble, Collections.singletonMap(1, 
ensemble.get(1)));
+        } catch (Exception e) {
+            fail();
+        }
+
+        LedgerHandle lh1 = bkc.openLedgerNoRecovery(lh.ledgerId, digestType, 
ledgerPassword);
+        assertEquals(2, lh1.getLedgerMetadata().getAllEnsembles().size());
+        List<BookieId> firstEnsemble = 
lh1.getLedgerMetadata().getAllEnsembles().firstEntry().getValue();
+
+        long entryId = 
lh1.getLedgerMetadata().getAllEnsembles().lastEntry().getKey() - 1;
+        try {
+            lh1.readAsync(entryId, entryId).get();
+            fail();
+        } catch (Exception e) {
+            LOG.info("Failed to read entry: {} ", entryId, e);
+        }
+
+        MetadataBookieDriver driver = MetadataDrivers.getBookieDriver(
+            URI.create(baseConf.getMetadataServiceUri()));
+        driver.initialize(
+            baseConf,
+            NullStatsLogger.INSTANCE);
+        // initialize urReplicationManager
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+        LedgerUnderreplicationManager underReplicationManager = 
mFactory.newLedgerUnderreplicationManager();
+        baseConf.setOpenLedgerRereplicationGracePeriod(String.valueOf(30));
+
+
+        ReplicationWorker replicationWorker = new ReplicationWorker(baseConf);
+        replicationWorker.start();
+        String basePath = 
ZKMetadataDriverBase.resolveZkLedgersRootPath(baseClientConf) + '/'
+            + BookKeeperConstants.UNDER_REPLICATION_NODE
+            + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
+
+        try {
+            underReplicationManager.markLedgerUnderreplicated(lh1.getId(), 
ensemble.get(1).toString());
+
+            Awaitility.waitAtMost(30, TimeUnit.SECONDS).untilAsserted(() ->
+                
assertFalse(ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh1.getId(), 
basePath))
+            );
+
+            assertNotEquals(firstEnsemble, 
lh1.getLedgerMetadata().getAllEnsembles().firstEntry().getValue());
+        } finally {
+            replicationWorker.shutdown();
+        }
+    }
+
     @Test
     public void testLedgerMetadataTest() throws Exception {
         
baseClientConf.setLedgerMetadataFormatVersion(LedgerMetadataSerDe.METADATA_FORMAT_VERSION_2);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/ReplicationTestUtil.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/ReplicationTestUtil.java
index ac05c8481e..6f2971ac8d 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/ReplicationTestUtil.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/ReplicationTestUtil.java
@@ -32,7 +32,7 @@ public class ReplicationTestUtil {
     /**
      * Checks whether ledger is in under-replication.
      */
-    static boolean isLedgerInUnderReplication(ZooKeeper zkc, long id,
+    public static boolean isLedgerInUnderReplication(ZooKeeper zkc, long id,
             String basePath) throws KeeperException, InterruptedException {
         List<String> children;
         try {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index a69b9cbd49..5c87ab5ccd 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -75,6 +75,7 @@ import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
+import org.awaitility.Awaitility;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1129,4 +1130,31 @@ public class TestReplicationWorker extends 
BookKeeperClusterTestCase {
             bkWithMockZK.close();
         }
     }
+
+    @Test
+    public void testReplicateEmptyOpenStateLedger() throws Exception {
+        LedgerHandle lh = bkc.createLedger(3, 3, 2, 
BookKeeper.DigestType.CRC32, TESTPASSWD);
+        assertFalse(lh.getLedgerMetadata().isClosed());
+
+        List<BookieId> firstEnsemble = 
lh.getLedgerMetadata().getAllEnsembles().firstEntry().getValue();
+        List<BookieId> ensemble = 
lh.getLedgerMetadata().getAllEnsembles().entrySet().iterator().next().getValue();
+        killBookie(ensemble.get(1));
+
+        startNewBookie();
+        baseConf.setOpenLedgerRereplicationGracePeriod(String.valueOf(30));
+        ReplicationWorker replicationWorker = new ReplicationWorker(baseConf);
+        replicationWorker.start();
+
+        try {
+            underReplicationManager.markLedgerUnderreplicated(lh.getId(), 
ensemble.get(1).toString());
+            Awaitility.waitAtMost(60, TimeUnit.SECONDS).untilAsserted(() ->
+                
assertFalse(ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh.getId(), 
basePath))
+            );
+
+            LedgerHandle lh1 = bkc.openLedgerNoRecovery(lh.getId(), 
BookKeeper.DigestType.CRC32, TESTPASSWD);
+            assertTrue(lh1.getLedgerMetadata().isClosed());
+        } finally {
+            replicationWorker.shutdown();
+        }
+    }
 }

Reply via email to