This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.17 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit b6d8b0f809f4b6f7f716f122aba47d78215bd4c5 Author: fengyubiao <[email protected]> AuthorDate: Wed Oct 29 18:26:59 2025 +0800 [fix] Failed read entries after multiple decommissioning (#4613) * - * checkstyle * let all ledger handle enable watcher * let all ledger handle enable watcher * fix tests * fix tests * fix tests * add test logs for debug * add test logs for debug * add test logs for debug * - * add a new param keepUpdateMetadata when open a read-only ledger handle * address comments * address comment * address comment * test CI * test CI * test CI * test CI * test CI * test CI * test CI * remove logs for CI * test CI * remove logs for CI * address comment * fix test (cherry picked from commit bae9e496cce274398774a5cb52357ab34b07928b) --- .../org/apache/bookkeeper/client/BookKeeper.java | 67 ++++++- .../org/apache/bookkeeper/client/LedgerOpenOp.java | 42 +++- .../bookkeeper/client/ReadOnlyLedgerHandle.java | 10 +- .../replication/BookieAutoRecoveryTest.java | 28 ++- .../FullEnsembleDecommissionedTest.java | 218 +++++++++++++++++++++ 5 files changed, 352 insertions(+), 13 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index 63a2aaee90..d3bc0f8f75 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -1212,14 +1212,52 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper { */ public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd, final OpenCallback cb, final Object ctx) { + asyncOpenLedger(lId, digestType, passwd, cb, ctx, false); + } + + /** + * Open existing ledger asynchronously for reading. + * + * <p>Opening a ledger with this method invokes fencing and recovery on the ledger + * if the ledger has not been closed. Fencing will block all other clients from + * writing to the ledger. Recovery will make sure that the ledger is closed + * before reading from it. + * + * <p>Recovery also makes sure that any entries which reached one bookie, but not a + * quorum, will be replicated to a quorum of bookies. This occurs in cases were + * the writer of a ledger crashes after sending a write request to one bookie but + * before being able to send it to the rest of the bookies in the quorum. + * + * <p>If the ledger is already closed, neither fencing nor recovery will be applied. + * + * @see LedgerHandle#asyncClose + * + * @param lId + * ledger identifier + * @param digestType + * digest type, either MAC or CRC32 + * @param passwd + * password + * @param ctx + * optional control object + * @param keepUpdateMetadata + * Whether update ledger metadata if the auto-recover component modified the ledger's ensemble. + */ + public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd, + final OpenCallback cb, final Object ctx, boolean keepUpdateMetadata) { closeLock.readLock().lock(); try { if (closed) { cb.openComplete(BKException.Code.ClientClosedException, null, ctx); return; } - new LedgerOpenOp(BookKeeper.this, clientStats, - lId, digestType, passwd, cb, ctx).initiate(); + LedgerOpenOp ledgerOpenOp = new LedgerOpenOp(BookKeeper.this, clientStats, + lId, digestType, passwd, cb, ctx); + if (keepUpdateMetadata) { + ledgerOpenOp.initiateWithKeepUpdateMetadata(); + } else { + ledgerOpenOp.initiate(); + } } finally { closeLock.readLock().unlock(); } @@ -1287,13 +1325,36 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper { */ public LedgerHandle openLedger(long lId, DigestType digestType, byte[] passwd) throws BKException, InterruptedException { + return openLedger(lId, digestType, passwd, false); + } + + + /** + * Synchronous open ledger call. + * + * @see #asyncOpenLedger + * @param lId + * ledger identifier + * @param digestType + * digest type, either MAC or CRC32 + * @param passwd + * password + * + * @param keepUpdateMetadata + * Whether update ledger metadata if the auto-recover component modified the ledger's ensemble. + * @return a handle to the open ledger + * @throws InterruptedException + * @throws BKException + */ + public LedgerHandle openLedger(long lId, DigestType digestType, byte[] passwd, boolean keepUpdateMetadata) + throws BKException, InterruptedException { CompletableFuture<LedgerHandle> future = new CompletableFuture<>(); SyncOpenCallback result = new SyncOpenCallback(future); /* * Calls async open ledger */ - asyncOpenLedger(lId, digestType, passwd, result, null); + asyncOpenLedger(lId, digestType, passwd, result, null, keepUpdateMetadata); return SyncCallbackUtils.waitForResult(future); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java index 7278af7d42..ac16266859 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java @@ -57,6 +57,18 @@ class LedgerOpenOp { ReadOnlyLedgerHandle lh; final byte[] passwd; boolean doRecovery = true; + // The ledger metadata may be modified even if it has been closed, because the auto-recovery component may rewrite + // the ledger's metadata. Keep receiving a notification from ZK to avoid the following issue: an opened ledger + // handle in memory still accesses to a BK instance who has been decommissioned. The issue that solved happens as + // follows: + // 1. Client service open a readonly ledger handle, which has been closed. + // 2. All BKs that relates to the ledger have been decommissioned. + // 3. Auto recovery component moved the data into other BK instances who is alive. + // 4. The ledger handle in the client memory keeps connects to the BKs who in the original ensemble set, and the + // connection will always fail. + // For minimum modification, to add a new configuration named "keepUpdateMetadata", users can use the + // new API to create a readonly ledger handle that will auto-updates metadata. + boolean keepUpdateMetadata = false; boolean administrativeOpen = false; long startTime; final OpStatsLogger openOpLogger; @@ -126,6 +138,15 @@ class LedgerOpenOp { initiate(); } + /** + * Different with {@link #initiate()}, the method keep update metadata once the auto-recover component modified + * the ensemble. + */ + public void initiateWithKeepUpdateMetadata() { + this.keepUpdateMetadata = true; + initiate(); + } + private CompletableFuture<Void> closeLedgerHandleAsync() { if (lh != null) { return lh.closeAsync(); @@ -174,9 +195,25 @@ class LedgerOpenOp { } // get the ledger metadata back + // The cases that need to register listener immediately are: + // 1. The ledger is not in recovery opening, which is the original case. + // 2. The ledger is closed and need to keep update metadata. There is other cases that do not need to + // register listener. e.g. The ledger is opening by Auto-Recovery component. + final boolean watchImmediately = !doRecovery || (keepUpdateMetadata && metadata.isClosed()); try { + // The ledger metadata may be modified even if it has been closed, because the auto-recovery component may + // rewrite the ledger's metadata. Keep receiving a notification from ZK to avoid the following issue: an + // opened ledger handle in memory still accesses to a BK instance who has been decommissioned. The issue + // that solved happens as follows: + // 1. Client service open a readonly ledger handle, which has been closed. + // 2. All BKs that relates to the ledger have been decommissioned. + // 3. Auto recovery component moved the data into other BK instances who is alive. + // 4. The ledger handle in the client memory keeps connects to the BKs who in the original ensemble set, + // and the connection will always fail. + // Therefore, if a user needs to the feature that update metadata automatically, he will set + // "keepUpdateMetadata" to "true", lh = new ReadOnlyLedgerHandle(bk.getClientCtx(), ledgerId, versionedMetadata, digestType, - passwd, !doRecovery); + passwd, watchImmediately); } catch (GeneralSecurityException e) { LOG.error("Security exception while opening ledger: " + ledgerId, e); openComplete(BKException.Code.DigestNotInitializedException, null); @@ -199,6 +236,9 @@ class LedgerOpenOp { public void safeOperationComplete(int rc, Void result) { if (rc == BKException.Code.OK) { openComplete(BKException.Code.OK, lh); + if (!watchImmediately && keepUpdateMetadata) { + lh.registerLedgerMetadataListener(); + } } else { closeLedgerHandleAsync().whenComplete((ignore, ex) -> { if (ex != null) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java index 9e883a8246..6d50cee40c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java @@ -95,14 +95,18 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements LedgerMetadataListene ReadOnlyLedgerHandle(ClientContext clientCtx, long ledgerId, Versioned<LedgerMetadata> metadata, BookKeeper.DigestType digestType, byte[] password, - boolean watch) + boolean watchImmediately) throws GeneralSecurityException, NumberFormatException { super(clientCtx, ledgerId, metadata, digestType, password, WriteFlag.NONE); - if (watch) { - clientCtx.getLedgerManager().registerLedgerMetadataListener(ledgerId, this); + if (watchImmediately) { + registerLedgerMetadataListener(); } } + void registerLedgerMetadataListener() { + clientCtx.getLedgerManager().registerLedgerMetadataListener(ledgerId, this); + } + @Override public void close() throws InterruptedException, BKException { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java index ccb262ed26..95e029271a 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java @@ -96,6 +96,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase { @Override public void setUp() throws Exception { + LOG.info("Start setUp"); super.setUp(); baseConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); baseClientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); @@ -117,10 +118,12 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase { mFactory = metadataClientDriver.getLedgerManagerFactory(); underReplicationManager = mFactory.newLedgerUnderreplicationManager(); ledgerManager = mFactory.newLedgerManager(); + LOG.info("Finished setUp"); } @Override public void tearDown() throws Exception { + LOG.info("Start tearDown"); super.tearDown(); if (null != underReplicationManager) { @@ -138,6 +141,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase { if (null != scheduler) { scheduler.shutdown(); } + LOG.info("Finished tearDown"); } /** @@ -146,6 +150,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase { */ @Test public void testOpenLedgers() throws Exception { + LOG.info("Start testOpenLedgers"); List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(1, 5); LedgerHandle lh = listOfLedgerHandle.get(0); int ledgerReplicaIndex = 0; @@ -186,6 +191,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase { verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, listOfLedgerHandle.get(0), ledgerReplicaIndex); + LOG.info("Finished testOpenLedgers"); } /** @@ -194,6 +200,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase { */ @Test public void testClosedLedgers() throws Exception { + LOG.info("Start testClosedLedgers"); List<Integer> listOfReplicaIndex = new ArrayList<Integer>(); List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(1, 5); closeLedgers(listOfLedgerHandle); @@ -247,6 +254,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase { listOfLedgerHandle.get(index), listOfReplicaIndex.get(index)); } + LOG.info("Finished testClosedLedgers"); } /** @@ -256,6 +264,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase { */ @Test public void testStopWhileReplicationInProgress() throws Exception { + LOG.info("Start testStopWhileReplicationInProgress"); int numberOfLedgers = 2; List<Integer> listOfReplicaIndex = new ArrayList<Integer>(); List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries( @@ -327,6 +336,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase { listOfLedgerHandle.get(index), listOfReplicaIndex.get(index)); } + LOG.info("Finished testStopWhileReplicationInProgress"); } /** @@ -336,6 +346,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase { */ @Test public void testNoSuchLedgerExists() throws Exception { + LOG.info("Start testNoSuchLedgerExists"); List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(2, 5); CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size()); for (LedgerHandle lh : listOfLedgerHandle) { @@ -372,6 +383,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase { assertNull("UrLedger still exists after rereplication", watchUrLedgerNode(getUrLedgerZNode(lh), latch)); } + LOG.info("Finished testNoSuchLedgerExists"); } /** @@ -380,6 +392,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase { */ @Test public void testEmptyLedgerLosesQuorumEventually() throws Exception { + LOG.info("Start testEmptyLedgerLosesQuorumEventually"); LedgerHandle lh = bkc.createLedger(3, 2, 2, DigestType.CRC32, PASSWD); CountDownLatch latch = new CountDownLatch(1); String urZNode = getUrLedgerZNode(lh); @@ -420,6 +433,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase { // should be able to open ledger without issue bkc.openLedger(lh.getId(), DigestType.CRC32, PASSWD); + LOG.info("Finished testEmptyLedgerLosesQuorumEventually"); } /** @@ -429,6 +443,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase { @Test public void testLedgerMetadataContainsIpAddressAsBookieID() throws Exception { + LOG.info("Start testLedgerMetadataContainsIpAddressAsBookieID"); stopBKCluster(); bkc = new BookKeeperTestClient(baseClientConf); // start bookie with useHostNameAsBookieID=false, as old bookie @@ -494,7 +509,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase { verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, listOfLedgerHandle.get(0), ledgerReplicaIndex); - + LOG.info("Finished testLedgerMetadataContainsIpAddressAsBookieID"); } /** @@ -504,6 +519,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase { @Test public void testLedgerMetadataContainsHostNameAsBookieID() throws Exception { + LOG.info("Start testLedgerMetadataContainsHostNameAsBookieID"); stopBKCluster(); bkc = new BookKeeperTestClient(baseClientConf); @@ -572,7 +588,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase { verifyLedgerEnsembleMetadataAfterReplication(newBookieServer, listOfLedgerHandle.get(0), ledgerReplicaIndex); - + LOG.info("Finished testLedgerMetadataContainsHostNameAsBookieID"); } private int getReplicaIndexInLedger(LedgerHandle lh, BookieId replicaToKill) { @@ -634,13 +650,13 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase { @Override public void process(WatchedEvent event) { if (event.getType() == EventType.NodeDeleted) { - LOG.info("Received Ledger rereplication completion event :" - + event.getType()); + LOG.info("Received Ledger replication completion. event : {}, path: {}, latchCount: {}", + event.getType(), event.getPath(), latch.getCount()); latch.countDown(); } if (event.getType() == EventType.NodeCreated) { - LOG.info("Received urLedger publishing event :" - + event.getType()); + LOG.info("Received urLedger publishing event: {}, path: {}, latchCount: {}", + event.getType(), event.getPath(), latch.getCount()); latch.countDown(); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java new file mode 100644 index 0000000000..0b5c5a8e1c --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.meta.LedgerManagerFactory; +import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; +import org.apache.bookkeeper.meta.MetadataClientDriver; +import org.apache.bookkeeper.meta.MetadataDrivers; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookieServer; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.awaitility.Awaitility; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration tests verifies the complete decommission tasks. + */ +public class FullEnsembleDecommissionedTest extends BookKeeperClusterTestCase { + private static final Logger LOG = LoggerFactory + .getLogger(FullEnsembleDecommissionedTest.class); + private static final byte[] PASSWD = "admin".getBytes(); + private static final byte[] data = "TESTDATA".getBytes(); + private static final String openLedgerRereplicationGracePeriod = "3000"; // milliseconds + + private DigestType digestType; + private MetadataClientDriver metadataClientDriver; + private LedgerManagerFactory mFactory; + private LedgerUnderreplicationManager underReplicationManager; + private LedgerManager ledgerManager; + private OrderedScheduler scheduler; + + public FullEnsembleDecommissionedTest() throws Exception{ + super(2); + + baseConf.setLedgerManagerFactoryClassName( + "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory"); + baseConf.setOpenLedgerRereplicationGracePeriod(openLedgerRereplicationGracePeriod); + baseConf.setRwRereplicateBackoffMs(500); + baseClientConf.setLedgerManagerFactoryClassName( + "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory"); + this.digestType = DigestType.MAC; + setAutoRecoveryEnabled(true); + } + + @Override + public void setUp() throws Exception { + super.setUp(); + baseConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + baseClientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + + scheduler = OrderedScheduler.newSchedulerBuilder() + .name("test-scheduler") + .numThreads(1) + .build(); + + metadataClientDriver = MetadataDrivers.getClientDriver( + URI.create(baseClientConf.getMetadataServiceUri())); + metadataClientDriver.initialize( + baseClientConf, + scheduler, + NullStatsLogger.INSTANCE, + Optional.empty()); + + // initialize urReplicationManager + mFactory = metadataClientDriver.getLedgerManagerFactory(); + underReplicationManager = mFactory.newLedgerUnderreplicationManager(); + ledgerManager = mFactory.newLedgerManager(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + + if (null != underReplicationManager) { + underReplicationManager.close(); + underReplicationManager = null; + } + if (null != ledgerManager) { + ledgerManager.close(); + ledgerManager = null; + } + if (null != metadataClientDriver) { + metadataClientDriver.close(); + metadataClientDriver = null; + } + if (null != scheduler) { + scheduler.shutdown(); + } + } + + /** + * The purpose of this test: + * 1. Client service open a readonly ledger handle, which has been closed. + * 2. All BKs that relates to the ledger have been decommissioned. + * 3. Auto recovery component moved the data into other BK instances who is alive. + * 4. Verify: lhe ledger handle in the client memory keeps updating the ledger ensemble set, and the new read + * request works. + */ + @Test + public void testOpenedLedgerHandleStillWorkAfterDecommissioning() throws Exception { + LedgerHandle lh = bkc.createLedger(2, 2, digestType, PASSWD); + assertTrue(lh.getLedgerMetadata().getAllEnsembles().get(0L).size() == 2); + lh.addEntry(data); + lh.close(); + List<BookieId> originalEnsemble = lh.getLedgerMetadata().getAllEnsembles().get(0L); + LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, PASSWD, true); + assertTrue(originalEnsemble.size() == 2); + + startNewBookie(); + BookieServer newBookieServer3 = serverByIndex(lastBookieIndex()); + killBookie(originalEnsemble.get(0)); + waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(0), newBookieServer3.getBookieId()); + + startNewBookie(); + int newBookieIndex4 = lastBookieIndex(); + BookieServer newBookieServer4 = serverByIndex(newBookieIndex4); + killBookie(originalEnsemble.get(1)); + waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(1), newBookieServer4.getBookieId()); + + Awaitility.await().untilAsserted(() -> { + LedgerEntries ledgerEntries = readonlyLh.read(0, 0); + assertNotNull(ledgerEntries); + byte[] entryBytes = ledgerEntries.getEntry(0L).getEntryBytes(); + assertEquals(new String(data), new String(entryBytes)); + ledgerEntries.close(); + }); + readonlyLh.close(); + } + + /** + * The purpose of this test: + * 1. Client service open a readonly ledger handle with recovery, which has not been closed yet. + * 2. All BKs that relates to the ledger have been decommissioned. + * 3. Auto recovery component moved the data into other BK instances who is alive. + * 4. Verify: lhe ledger handle in the client memory keeps updating the ledger ensemble set, and the new read + * request works. + */ + @Test + public void testRecoverOpenLedgerHandleStillWorkAfterDecommissioning() throws Exception { + LedgerHandle lh = bkc.createLedger(2, 2, digestType, PASSWD); + assertTrue(lh.getLedgerMetadata().getAllEnsembles().get(0L).size() == 2); + lh.addEntry(data); + List<BookieId> originalEnsemble = lh.getLedgerMetadata().getAllEnsembles().get(0L); + LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, PASSWD, true); + assertTrue(originalEnsemble.size() == 2); + + startNewBookie(); + BookieServer newBookieServer3 = serverByIndex(lastBookieIndex()); + killBookie(originalEnsemble.get(0)); + waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(0), newBookieServer3.getBookieId()); + + startNewBookie(); + int newBookieIndex4 = lastBookieIndex(); + BookieServer newBookieServer4 = serverByIndex(newBookieIndex4); + killBookie(originalEnsemble.get(1)); + waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(1), newBookieServer4.getBookieId()); + + Awaitility.await().untilAsserted(() -> { + LedgerEntries ledgerEntries = readonlyLh.read(0, 0); + assertNotNull(ledgerEntries); + byte[] entryBytes = ledgerEntries.getEntry(0L).getEntryBytes(); + assertEquals(new String(data), new String(entryBytes)); + ledgerEntries.close(); + }); + readonlyLh.close(); + } + + private void waitAutoRecoveryFinished(long lId, BookieId originalBookie, + BookieId newBookie) throws Exception { + Awaitility.await().untilAsserted(() -> { + LedgerHandle openLedger = bkc.openLedger(lId, digestType, PASSWD); + NavigableMap<Long, ? extends List<BookieId>> map = openLedger.getLedgerMetadata().getAllEnsembles(); + try { + for (Map.Entry<Long, ? extends List<BookieId>> entry : map.entrySet()) { + assertFalse(entry.getValue().contains(originalBookie)); + assertTrue(entry.getValue().contains(newBookie)); + } + } finally { + openLedger.close(); + } + }); + } +}
