This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.15 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 277a9743c6689463f9a7d2cfc94ec558526758c2 Author: Hang Chen <[email protected]> AuthorDate: Mon Jun 19 15:07:05 2023 +0800 Fix ledger replicated failed blocks bookie decommission process (#3917) ### Motivation When I decommission one bookie (bk3), one ledger replicate failed and blocked decommission process. This is the auto-recovery log: ``` 2023-03-29T06:29:22,642+0000 [ReplicationWorker] ERROR org.apache.bookkeeper.client.LedgerHandle - ReadEntries exception on ledgerId:904368 firstEntry:14 lastEntry:14 lastAddConfirmed:13 2023-03-29T06:29:22,642+0000 [ReplicationWorker] ERROR org.apache.bookkeeper.replication.ReplicationWorker - Received error: -1 while trying to read entry: 14 of ledger: 904368 in ReplicationWorker 2023-03-29T06:29:22,642+0000 [ReplicationWorker] ERROR org.apache.bookkeeper.replication.ReplicationWorker - Failed to read faulty entries, so giving up replicating ledgerFragment Fragment(LedgerID: 904368, FirstEntryID: 0[0], LastKnownEntryID: 14[14], Host: [betausc1-bk-10.betausc1-bk-headless.o-vaxkx.svc.cluster.local:3181], Closed: true) 2023-03-29T06:29:22,644+0000 [ReplicationWorker] ERROR org.apache.bookkeeper.replication.ReplicationWorker - ReplicationWorker failed to replicate Ledger : 904368 for 6 number of times, so deferring the ledger lock release by 300000 msecs ``` The ledger's metadata: ``` ledgerID: 904368 2023-03-29T06:47:56,511+0000 [main] INFO org.apache.bookkeeper.tools.cli.commands. client.LedgerMetaDataCommand - LedgerMetadata{formatVersion=3, ensembleSize=3, writeQuorumSize=3, ackQuorumSize=2, state=OPEN, digestType=CRC32C, password=base64:, ensembles={0=[bk1:3181, bk2:3181, bk3:3181], 15=[bk1:3181, bk2:3181, bk4:3181]},...} ``` The ledger (904368) has two ensembles, `ensembles={0=[bk1:3181, bk2:3181, bk3:3181], 15=[bk1:3181, bk2:3181, bk4:3181]}`. However, the replication worker got the ledger's LAC is 13, but it got the replication fragment entry range is [0, 14]. When reading entry 14, it failed. ### One question **Why the ensembles created a new ensemble starting with entryId = 15, but the ledger's lastAddConfirm is 13.** This question is related to two parts, one is how the new ensemble was created and the other is how the lastAddConfirm was generated. #### 1. How the new ensemble was created The ensemble change is controlled on the bookie client side. When one entry is ready to send to the bookie server, the bookie client will check whether need to do the ensemble change. https://github.com/apache/bookkeeper/blob/912896deb2e748389e15e74c37539b2ff36302c7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java#L254 For the above case, when writing entry 15, one bookie is lost, it will trigger the ensemble change and generate the new ensemble: 15=[bk1:3181, bk2:3181, bk4:3181]. However, entry 15 write failed, such as timeout or bookie server rejected the write. For now, entry 14 is written succeed. #### 2. How the lastAddConfirm was generated Due to the ledger being in the `OPEN` state, the ledger handle will send a readLAC request according to the last ensemble to get the ledger's lastAddConfirm. For the above case, the readLAC request will send to bk1, bk2, and bk4. For the `V2` protocol (Pulsar uses the V2 protocol to interact with the BookKeeper cluster), the bookie client put the lastAddConfirm EntryId in the next Entry's metadata. https://github.com/apache/bookkeeper/blob/df4492012cc03682534cbc8dd68dd81163b0c947/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java#L134 When we use the `V2` protocol to open an `OPEN` state ledger to read, it will send a readLastAddConfirm request to the bookie server, and the bookie server gets the last entry of this ledger and return to the client. https://github.com/apache/bookkeeper/blob/df4492012cc03682534cbc8dd68dd81163b0c947/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java#L108 However, the bookie client will parse the response entry and get the lastAddConfirm from the entry's metadata. Due to the entry just recording the previous EntryId as the lastAddConfirm, the LedgerHandle got the lastAddConfirm will be the penultimate EntryId of the ledger. For the above case, the bk1 holds the max entry 14, bk2 holds the max entry 14, and bk4 returns NoSuchEntryException, LedgerHandle gets lastAddConfirm will be `14 - 1 = 13`, not 14. When the replicator tries to recover the first ensemble 0=[bk1:3181, bk2:3181, bk3:3181] with entry range [0, 14], reading entry 14 will throw a ReadEntryException due to the lastAddConfirm is 13. https://github.com/apache/bookkeeper/blob/df4492012cc03682534cbc8dd68dd81163b0c947/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java#L685-L690 ### Solution When encountered that case that - The ledger is `OPEN` - The ledger has multiple ensembles - The ledger's last ensemble doesn't have any entries, which means `lastAddConfirm < last ensemble key - 1` We should treat the penultimate segment/ensemble of the ledger as an `OPEN` state instead of a closed state. https://github.com/apache/bookkeeper/blob/df4492012cc03682534cbc8dd68dd81163b0c947/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java#L56-L57 After we treat the segment/ensemble as `OPEN` state, the replicator will close the ledger first and replicate it. (cherry picked from commit eff38e45e317b90eb1cd456bd5d5629dedc1fd5f) --- .../apache/bookkeeper/client/LedgerFragment.java | 7 +- .../bookkeeper/replication/ReplicationWorker.java | 14 ++++ .../bookkeeper/client/BookieWriteLedgerTest.java | 75 ++++++++++++++++++++++ .../replication/ReplicationTestUtil.java | 2 +- .../replication/TestReplicationWorker.java | 28 ++++++++ 5 files changed, 124 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java index 1fb1e50cb0..c4d6dbfd6e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java @@ -52,8 +52,13 @@ public class LedgerFragment { this.schedule = lh.getDistributionSchedule(); SortedMap<Long, ? extends List<BookieId>> ensembles = lh .getLedgerMetadata().getAllEnsembles(); + // Check if the ledger fragment is closed has two conditions + // 1. The ledger is closed + // 2. This fragment is not the last fragment and this ledger's lastAddConfirm >= ensembles.lastKey() - 1. + // This case happens when the ledger's last ensemble is empty this.isLedgerClosed = lh.getLedgerMetadata().isClosed() - || !ensemble.equals(ensembles.get(ensembles.lastKey())); + || (!ensemble.equals(ensembles.get(ensembles.lastKey())) + && lh.getLastAddConfirmed() >= ensembles.lastKey() - 1); } LedgerFragment(LedgerFragment lf, Set<Integer> subset) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java index c4a0e430d6..315d9b8aff 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java @@ -33,6 +33,7 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -42,6 +43,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.Timer; import java.util.TimerTask; +import java.util.TreeMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -475,6 +477,14 @@ public class ReplicationWorker implements Runnable { * * <p>Missing bookies in closed ledgers are fine, as we know the last confirmed add, so * we can tell which entries are supposed to exist and rereplicate them if necessary. + * + * <p>Another corner case is that there are multiple ensembles in the ledger and the last + * segment/ensemble is open, but nothing has been written to some quorums in the ensemble. + * For the v2 protocol, this ledger's lastAddConfirm entry is the last segment/ensemble's `key - 2`, + * not `key - 2`, the explanation please refer to: https://github.com/apache/bookkeeper/pull/3917. + * If we treat the penultimate segment/ensemble as closed state, we will can't replicate + * the last entry in the segment. So in this case, we should also check if the penultimate + * segment/ensemble has missing bookies. */ private boolean isLastSegmentOpenAndMissingBookies(LedgerHandle lh) throws BKException { LedgerMetadata md = admin.getLedgerMetadata(lh); @@ -484,6 +494,10 @@ public class ReplicationWorker implements Runnable { SortedMap<Long, ? extends List<BookieId>> ensembles = admin.getLedgerMetadata(lh).getAllEnsembles(); List<BookieId> finalEnsemble = ensembles.get(ensembles.lastKey()); + if (ensembles.size() > 1 && lh.getLastAddConfirmed() < ensembles.lastKey() - 1) { + finalEnsemble = new ArrayList<>(finalEnsemble); + finalEnsemble.addAll((new TreeMap<>(ensembles)).floorEntry(ensembles.lastKey() - 1).getValue()); + } Collection<BookieId> available = admin.getAvailableBookies(); for (BookieId b : finalEnsemble) { if (!available.contains(b)) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java index ce2d6f9b64..8dbe789ab3 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java @@ -26,6 +26,7 @@ import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE; import static org.apache.bookkeeper.client.BookKeeperClientStats.READ_OP_DM; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.google.common.collect.Lists; @@ -35,9 +36,11 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import java.io.IOException; +import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.List; @@ -58,12 +61,22 @@ import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.client.api.WriteAdvHandle; import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.LedgerManagerFactory; import org.apache.bookkeeper.meta.LedgerMetadataSerDe; +import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory; +import org.apache.bookkeeper.meta.MetadataBookieDriver; +import org.apache.bookkeeper.meta.MetadataDrivers; +import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.replication.ReplicationTestUtil; +import org.apache.bookkeeper.replication.ReplicationWorker; +import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.test.TestStatsProvider; +import org.apache.bookkeeper.util.BookKeeperConstants; import org.apache.commons.lang3.tuple.Pair; +import org.awaitility.Awaitility; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -1425,6 +1438,68 @@ public class BookieWriteLedgerTest extends bkc.deleteLedger(lh.ledgerId); } + @Test + public void testReadLacNotSameWithMetadataLedgerReplication() throws Exception { + lh = bkc.createLedger(3, 3, 2, digestType, ledgerPassword); + for (int i = 0; i < 10; ++i) { + ByteBuffer entry = ByteBuffer.allocate(4); + entry.putInt(rng.nextInt(maxInt)); + entry.position(0); + lh.addEntry(entry.array()); + } + + List<BookieId> ensemble = lh.getLedgerMetadata().getAllEnsembles().entrySet().iterator().next().getValue(); + assertEquals(1, lh.getLedgerMetadata().getAllEnsembles().size()); + killBookie(ensemble.get(1)); + + try { + lh.ensembleChangeLoop(ensemble, Collections.singletonMap(1, ensemble.get(1))); + } catch (Exception e) { + fail(); + } + + LedgerHandle lh1 = bkc.openLedgerNoRecovery(lh.ledgerId, digestType, ledgerPassword); + assertEquals(2, lh1.getLedgerMetadata().getAllEnsembles().size()); + List<BookieId> firstEnsemble = lh1.getLedgerMetadata().getAllEnsembles().firstEntry().getValue(); + + long entryId = lh1.getLedgerMetadata().getAllEnsembles().lastEntry().getKey() - 1; + try { + lh1.readAsync(entryId, entryId).get(); + fail(); + } catch (Exception e) { + LOG.info("Failed to read entry: {} ", entryId, e); + } + + MetadataBookieDriver driver = MetadataDrivers.getBookieDriver( + URI.create(baseConf.getMetadataServiceUri())); + driver.initialize( + baseConf, + NullStatsLogger.INSTANCE); + // initialize urReplicationManager + LedgerManagerFactory mFactory = driver.getLedgerManagerFactory(); + LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager(); + baseConf.setOpenLedgerRereplicationGracePeriod(String.valueOf(30)); + + + ReplicationWorker replicationWorker = new ReplicationWorker(baseConf); + replicationWorker.start(); + String basePath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseClientConf) + '/' + + BookKeeperConstants.UNDER_REPLICATION_NODE + + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH; + + try { + underReplicationManager.markLedgerUnderreplicated(lh1.getId(), ensemble.get(1).toString()); + + Awaitility.waitAtMost(30, TimeUnit.SECONDS).untilAsserted(() -> + assertFalse(ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh1.getId(), basePath)) + ); + + assertNotEquals(firstEnsemble, lh1.getLedgerMetadata().getAllEnsembles().firstEntry().getValue()); + } finally { + replicationWorker.shutdown(); + } + } + @Test public void testLedgerMetadataTest() throws Exception { baseClientConf.setLedgerMetadataFormatVersion(LedgerMetadataSerDe.METADATA_FORMAT_VERSION_2); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/ReplicationTestUtil.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/ReplicationTestUtil.java index ac05c8481e..6f2971ac8d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/ReplicationTestUtil.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/ReplicationTestUtil.java @@ -32,7 +32,7 @@ public class ReplicationTestUtil { /** * Checks whether ledger is in under-replication. */ - static boolean isLedgerInUnderReplication(ZooKeeper zkc, long id, + public static boolean isLedgerInUnderReplication(ZooKeeper zkc, long id, String basePath) throws KeeperException, InterruptedException { List<String> children; try { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java index a69b9cbd49..5c87ab5ccd 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java @@ -75,6 +75,7 @@ import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; +import org.awaitility.Awaitility; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1129,4 +1130,31 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase { bkWithMockZK.close(); } } + + @Test + public void testReplicateEmptyOpenStateLedger() throws Exception { + LedgerHandle lh = bkc.createLedger(3, 3, 2, BookKeeper.DigestType.CRC32, TESTPASSWD); + assertFalse(lh.getLedgerMetadata().isClosed()); + + List<BookieId> firstEnsemble = lh.getLedgerMetadata().getAllEnsembles().firstEntry().getValue(); + List<BookieId> ensemble = lh.getLedgerMetadata().getAllEnsembles().entrySet().iterator().next().getValue(); + killBookie(ensemble.get(1)); + + startNewBookie(); + baseConf.setOpenLedgerRereplicationGracePeriod(String.valueOf(30)); + ReplicationWorker replicationWorker = new ReplicationWorker(baseConf); + replicationWorker.start(); + + try { + underReplicationManager.markLedgerUnderreplicated(lh.getId(), ensemble.get(1).toString()); + Awaitility.waitAtMost(60, TimeUnit.SECONDS).untilAsserted(() -> + assertFalse(ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh.getId(), basePath)) + ); + + LedgerHandle lh1 = bkc.openLedgerNoRecovery(lh.getId(), BookKeeper.DigestType.CRC32, TESTPASSWD); + assertTrue(lh1.getLedgerMetadata().isClosed()); + } finally { + replicationWorker.shutdown(); + } + } }
