This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new bae9e496cc [fix] Failed read entries after multiple decommissioning
(#4613)
bae9e496cc is described below
commit bae9e496cce274398774a5cb52357ab34b07928b
Author: fengyubiao <[email protected]>
AuthorDate: Wed Oct 29 18:26:59 2025 +0800
[fix] Failed read entries after multiple decommissioning (#4613)
* -
* checkstyle
* let all ledger handle enable watcher
* let all ledger handle enable watcher
* fix tests
* fix tests
* fix tests
* add test logs for debug
* add test logs for debug
* add test logs for debug
* -
* add a new param keepUpdateMetadata when open a read-only ledger handle
* address comments
* address comment
* address comment
* test CI
* test CI
* test CI
* test CI
* test CI
* test CI
* test CI
* remove logs for CI
* test CI
* remove logs for CI
* address comment
* fix test
---
.../org/apache/bookkeeper/client/BookKeeper.java | 67 ++++++-
.../org/apache/bookkeeper/client/LedgerOpenOp.java | 42 +++-
.../bookkeeper/client/ReadOnlyLedgerHandle.java | 10 +-
.../replication/BookieAutoRecoveryTest.java | 28 ++-
.../FullEnsembleDecommissionedTest.java | 218 +++++++++++++++++++++
5 files changed, 352 insertions(+), 13 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index c78f8fbbca..0c7dbf7836 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -1218,14 +1218,52 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
*/
public void asyncOpenLedger(final long lId, final DigestType digestType,
final byte[] passwd,
final OpenCallback cb, final Object ctx) {
+ asyncOpenLedger(lId, digestType, passwd, cb, ctx, false);
+ }
+
+ /**
+ * Open existing ledger asynchronously for reading.
+ *
+ * <p>Opening a ledger with this method invokes fencing and recovery on
the ledger
+ * if the ledger has not been closed. Fencing will block all other clients
from
+ * writing to the ledger. Recovery will make sure that the ledger is closed
+ * before reading from it.
+ *
+ * <p>Recovery also makes sure that any entries which reached one bookie,
but not a
+ * quorum, will be replicated to a quorum of bookies. This occurs in cases
were
+ * the writer of a ledger crashes after sending a write request to one
bookie but
+ * before being able to send it to the rest of the bookies in the quorum.
+ *
+ * <p>If the ledger is already closed, neither fencing nor recovery will
be applied.
+ *
+ * @see LedgerHandle#asyncClose
+ *
+ * @param lId
+ * ledger identifier
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ * @param ctx
+ * optional control object
+ * @param keepUpdateMetadata
+ * Whether update ledger metadata if the auto-recover component
modified the ledger's ensemble.
+ */
+ public void asyncOpenLedger(final long lId, final DigestType digestType,
final byte[] passwd,
+ final OpenCallback cb, final Object ctx,
boolean keepUpdateMetadata) {
closeLock.readLock().lock();
try {
if (closed) {
cb.openComplete(BKException.Code.ClientClosedException, null,
ctx);
return;
}
- new LedgerOpenOp(BookKeeper.this, clientStats,
- lId, digestType, passwd, cb, ctx).initiate();
+ LedgerOpenOp ledgerOpenOp = new LedgerOpenOp(BookKeeper.this,
clientStats,
+ lId, digestType, passwd, cb, ctx);
+ if (keepUpdateMetadata) {
+ ledgerOpenOp.initiateWithKeepUpdateMetadata();
+ } else {
+ ledgerOpenOp.initiate();
+ }
} finally {
closeLock.readLock().unlock();
}
@@ -1293,13 +1331,36 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
*/
public LedgerHandle openLedger(long lId, DigestType digestType, byte[]
passwd)
throws BKException, InterruptedException {
+ return openLedger(lId, digestType, passwd, false);
+ }
+
+
+ /**
+ * Synchronous open ledger call.
+ *
+ * @see #asyncOpenLedger
+ * @param lId
+ * ledger identifier
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ *
+ * @param keepUpdateMetadata
+ * Whether update ledger metadata if the auto-recover component
modified the ledger's ensemble.
+ * @return a handle to the open ledger
+ * @throws InterruptedException
+ * @throws BKException
+ */
+ public LedgerHandle openLedger(long lId, DigestType digestType, byte[]
passwd, boolean keepUpdateMetadata)
+ throws BKException, InterruptedException {
CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
SyncOpenCallback result = new SyncOpenCallback(future);
/*
* Calls async open ledger
*/
- asyncOpenLedger(lId, digestType, passwd, result, null);
+ asyncOpenLedger(lId, digestType, passwd, result, null,
keepUpdateMetadata);
return SyncCallbackUtils.waitForResult(future);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
index 943aa8cd2a..52c8190e1f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
@@ -57,6 +57,18 @@ class LedgerOpenOp {
ReadOnlyLedgerHandle lh;
final byte[] passwd;
boolean doRecovery = true;
+ // The ledger metadata may be modified even if it has been closed, because
the auto-recovery component may rewrite
+ // the ledger's metadata. Keep receiving a notification from ZK to avoid
the following issue: an opened ledger
+ // handle in memory still accesses to a BK instance who has been
decommissioned. The issue that solved happens as
+ // follows:
+ // 1. Client service open a readonly ledger handle, which has been closed.
+ // 2. All BKs that relates to the ledger have been decommissioned.
+ // 3. Auto recovery component moved the data into other BK instances who
is alive.
+ // 4. The ledger handle in the client memory keeps connects to the BKs who
in the original ensemble set, and the
+ // connection will always fail.
+ // For minimum modification, to add a new configuration named
"keepUpdateMetadata", users can use the
+ // new API to create a readonly ledger handle that will auto-updates
metadata.
+ boolean keepUpdateMetadata = false;
boolean administrativeOpen = false;
long startTime;
final OpStatsLogger openOpLogger;
@@ -126,6 +138,15 @@ class LedgerOpenOp {
initiate();
}
+ /**
+ * Different with {@link #initiate()}, the method keep update metadata
once the auto-recover component modified
+ * the ensemble.
+ */
+ public void initiateWithKeepUpdateMetadata() {
+ this.keepUpdateMetadata = true;
+ initiate();
+ }
+
private CompletableFuture<Void> closeLedgerHandleAsync() {
if (lh != null) {
return lh.closeAsync();
@@ -174,9 +195,25 @@ class LedgerOpenOp {
}
// get the ledger metadata back
+ // The cases that need to register listener immediately are:
+ // 1. The ledger is not in recovery opening, which is the original
case.
+ // 2. The ledger is closed and need to keep update metadata. There is
other cases that do not need to
+ // register listener. e.g. The ledger is opening by Auto-Recovery
component.
+ final boolean watchImmediately = !doRecovery || (keepUpdateMetadata &&
metadata.isClosed());
try {
+ // The ledger metadata may be modified even if it has been closed,
because the auto-recovery component may
+ // rewrite the ledger's metadata. Keep receiving a notification
from ZK to avoid the following issue: an
+ // opened ledger handle in memory still accesses to a BK instance
who has been decommissioned. The issue
+ // that solved happens as follows:
+ // 1. Client service open a readonly ledger handle, which has been
closed.
+ // 2. All BKs that relates to the ledger have been decommissioned.
+ // 3. Auto recovery component moved the data into other BK
instances who is alive.
+ // 4. The ledger handle in the client memory keeps connects to the
BKs who in the original ensemble set,
+ // and the connection will always fail.
+ // Therefore, if a user needs to the feature that update metadata
automatically, he will set
+ // "keepUpdateMetadata" to "true",
lh = new ReadOnlyLedgerHandle(bk.getClientCtx(), ledgerId,
versionedMetadata, digestType,
- passwd, !doRecovery);
+ passwd, watchImmediately);
} catch (GeneralSecurityException e) {
LOG.error("Security exception while opening ledger: " + ledgerId,
e);
openComplete(BKException.Code.DigestNotInitializedException, null);
@@ -199,6 +236,9 @@ class LedgerOpenOp {
public void safeOperationComplete(int rc, Void result) {
if (rc == BKException.Code.OK) {
openComplete(BKException.Code.OK, lh);
+ if (!watchImmediately && keepUpdateMetadata) {
+ lh.registerLedgerMetadataListener();
+ }
} else {
closeLedgerHandleAsync().whenComplete((ignore, ex) -> {
if (ex != null) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index 95e8666660..ffad02d8f7 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -95,14 +95,18 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements
LedgerMetadataListene
ReadOnlyLedgerHandle(ClientContext clientCtx,
long ledgerId, Versioned<LedgerMetadata> metadata,
BookKeeper.DigestType digestType, byte[] password,
- boolean watch)
+ boolean watchImmediately)
throws GeneralSecurityException, NumberFormatException {
super(clientCtx, ledgerId, metadata, digestType, password,
WriteFlag.NONE);
- if (watch) {
-
clientCtx.getLedgerManager().registerLedgerMetadataListener(ledgerId, this);
+ if (watchImmediately) {
+ registerLedgerMetadataListener();
}
}
+ void registerLedgerMetadataListener() {
+ clientCtx.getLedgerManager().registerLedgerMetadataListener(ledgerId,
this);
+ }
+
@Override
public void close()
throws InterruptedException, BKException {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
index 20b5e6a0c6..d0ea91261a 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
@@ -96,6 +96,7 @@ public class BookieAutoRecoveryTest extends
BookKeeperClusterTestCase {
@Override
public void setUp() throws Exception {
+ LOG.info("Start setUp");
super.setUp();
baseConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
baseClientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
@@ -117,10 +118,12 @@ public class BookieAutoRecoveryTest extends
BookKeeperClusterTestCase {
mFactory = metadataClientDriver.getLedgerManagerFactory();
underReplicationManager = mFactory.newLedgerUnderreplicationManager();
ledgerManager = mFactory.newLedgerManager();
+ LOG.info("Finished setUp");
}
@Override
public void tearDown() throws Exception {
+ LOG.info("Start tearDown");
super.tearDown();
if (null != underReplicationManager) {
@@ -138,6 +141,7 @@ public class BookieAutoRecoveryTest extends
BookKeeperClusterTestCase {
if (null != scheduler) {
scheduler.shutdown();
}
+ LOG.info("Finished tearDown");
}
/**
@@ -146,6 +150,7 @@ public class BookieAutoRecoveryTest extends
BookKeeperClusterTestCase {
*/
@Test
public void testOpenLedgers() throws Exception {
+ LOG.info("Start testOpenLedgers");
List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(1,
5);
LedgerHandle lh = listOfLedgerHandle.get(0);
int ledgerReplicaIndex = 0;
@@ -186,6 +191,7 @@ public class BookieAutoRecoveryTest extends
BookKeeperClusterTestCase {
verifyLedgerEnsembleMetadataAfterReplication(newBookieServer,
listOfLedgerHandle.get(0), ledgerReplicaIndex);
+ LOG.info("Finished testOpenLedgers");
}
/**
@@ -194,6 +200,7 @@ public class BookieAutoRecoveryTest extends
BookKeeperClusterTestCase {
*/
@Test
public void testClosedLedgers() throws Exception {
+ LOG.info("Start testClosedLedgers");
List<Integer> listOfReplicaIndex = new ArrayList<Integer>();
List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(1,
5);
closeLedgers(listOfLedgerHandle);
@@ -247,6 +254,7 @@ public class BookieAutoRecoveryTest extends
BookKeeperClusterTestCase {
listOfLedgerHandle.get(index),
listOfReplicaIndex.get(index));
}
+ LOG.info("Finished testClosedLedgers");
}
/**
@@ -256,6 +264,7 @@ public class BookieAutoRecoveryTest extends
BookKeeperClusterTestCase {
*/
@Test
public void testStopWhileReplicationInProgress() throws Exception {
+ LOG.info("Start testStopWhileReplicationInProgress");
int numberOfLedgers = 2;
List<Integer> listOfReplicaIndex = new ArrayList<Integer>();
List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(
@@ -327,6 +336,7 @@ public class BookieAutoRecoveryTest extends
BookKeeperClusterTestCase {
listOfLedgerHandle.get(index),
listOfReplicaIndex.get(index));
}
+ LOG.info("Finished testStopWhileReplicationInProgress");
}
/**
@@ -336,6 +346,7 @@ public class BookieAutoRecoveryTest extends
BookKeeperClusterTestCase {
*/
@Test
public void testNoSuchLedgerExists() throws Exception {
+ LOG.info("Start testNoSuchLedgerExists");
List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(2,
5);
CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size());
for (LedgerHandle lh : listOfLedgerHandle) {
@@ -372,6 +383,7 @@ public class BookieAutoRecoveryTest extends
BookKeeperClusterTestCase {
assertNull("UrLedger still exists after rereplication",
watchUrLedgerNode(getUrLedgerZNode(lh), latch));
}
+ LOG.info("Finished testNoSuchLedgerExists");
}
/**
@@ -380,6 +392,7 @@ public class BookieAutoRecoveryTest extends
BookKeeperClusterTestCase {
*/
@Test
public void testEmptyLedgerLosesQuorumEventually() throws Exception {
+ LOG.info("Start testEmptyLedgerLosesQuorumEventually");
LedgerHandle lh = bkc.createLedger(3, 2, 2, DigestType.CRC32, PASSWD);
CountDownLatch latch = new CountDownLatch(1);
String urZNode = getUrLedgerZNode(lh);
@@ -420,6 +433,7 @@ public class BookieAutoRecoveryTest extends
BookKeeperClusterTestCase {
// should be able to open ledger without issue
bkc.openLedger(lh.getId(), DigestType.CRC32, PASSWD);
+ LOG.info("Finished testEmptyLedgerLosesQuorumEventually");
}
/**
@@ -429,6 +443,7 @@ public class BookieAutoRecoveryTest extends
BookKeeperClusterTestCase {
@Test
public void testLedgerMetadataContainsIpAddressAsBookieID()
throws Exception {
+ LOG.info("Start testLedgerMetadataContainsIpAddressAsBookieID");
stopBKCluster();
bkc = new BookKeeperTestClient(baseClientConf);
// start bookie with useHostNameAsBookieID=false, as old bookie
@@ -494,7 +509,7 @@ public class BookieAutoRecoveryTest extends
BookKeeperClusterTestCase {
verifyLedgerEnsembleMetadataAfterReplication(newBookieServer,
listOfLedgerHandle.get(0), ledgerReplicaIndex);
-
+ LOG.info("Finished testLedgerMetadataContainsIpAddressAsBookieID");
}
/**
@@ -504,6 +519,7 @@ public class BookieAutoRecoveryTest extends
BookKeeperClusterTestCase {
@Test
public void testLedgerMetadataContainsHostNameAsBookieID()
throws Exception {
+ LOG.info("Start testLedgerMetadataContainsHostNameAsBookieID");
stopBKCluster();
bkc = new BookKeeperTestClient(baseClientConf);
@@ -572,7 +588,7 @@ public class BookieAutoRecoveryTest extends
BookKeeperClusterTestCase {
verifyLedgerEnsembleMetadataAfterReplication(newBookieServer,
listOfLedgerHandle.get(0), ledgerReplicaIndex);
-
+ LOG.info("Finished testLedgerMetadataContainsHostNameAsBookieID");
}
private int getReplicaIndexInLedger(LedgerHandle lh, BookieId
replicaToKill) {
@@ -634,13 +650,13 @@ public class BookieAutoRecoveryTest extends
BookKeeperClusterTestCase {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeDeleted) {
- LOG.info("Received Ledger rereplication completion event :"
- + event.getType());
+ LOG.info("Received Ledger replication completion. event :
{}, path: {}, latchCount: {}",
+ event.getType(), event.getPath(),
latch.getCount());
latch.countDown();
}
if (event.getType() == EventType.NodeCreated) {
- LOG.info("Received urLedger publishing event :"
- + event.getType());
+ LOG.info("Received urLedger publishing event: {}, path:
{}, latchCount: {}",
+ event.getType(), event.getPath(),
latch.getCount());
latch.countDown();
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java
new file mode 100644
index 0000000000..0b5c5a8e1c
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration tests verifies the complete decommission tasks.
+ */
+public class FullEnsembleDecommissionedTest extends BookKeeperClusterTestCase {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(FullEnsembleDecommissionedTest.class);
+ private static final byte[] PASSWD = "admin".getBytes();
+ private static final byte[] data = "TESTDATA".getBytes();
+ private static final String openLedgerRereplicationGracePeriod = "3000";
// milliseconds
+
+ private DigestType digestType;
+ private MetadataClientDriver metadataClientDriver;
+ private LedgerManagerFactory mFactory;
+ private LedgerUnderreplicationManager underReplicationManager;
+ private LedgerManager ledgerManager;
+ private OrderedScheduler scheduler;
+
+ public FullEnsembleDecommissionedTest() throws Exception{
+ super(2);
+
+ baseConf.setLedgerManagerFactoryClassName(
+ "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
+
baseConf.setOpenLedgerRereplicationGracePeriod(openLedgerRereplicationGracePeriod);
+ baseConf.setRwRereplicateBackoffMs(500);
+ baseClientConf.setLedgerManagerFactoryClassName(
+ "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
+ this.digestType = DigestType.MAC;
+ setAutoRecoveryEnabled(true);
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ baseConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ baseClientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+
+ scheduler = OrderedScheduler.newSchedulerBuilder()
+ .name("test-scheduler")
+ .numThreads(1)
+ .build();
+
+ metadataClientDriver = MetadataDrivers.getClientDriver(
+ URI.create(baseClientConf.getMetadataServiceUri()));
+ metadataClientDriver.initialize(
+ baseClientConf,
+ scheduler,
+ NullStatsLogger.INSTANCE,
+ Optional.empty());
+
+ // initialize urReplicationManager
+ mFactory = metadataClientDriver.getLedgerManagerFactory();
+ underReplicationManager = mFactory.newLedgerUnderreplicationManager();
+ ledgerManager = mFactory.newLedgerManager();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ if (null != underReplicationManager) {
+ underReplicationManager.close();
+ underReplicationManager = null;
+ }
+ if (null != ledgerManager) {
+ ledgerManager.close();
+ ledgerManager = null;
+ }
+ if (null != metadataClientDriver) {
+ metadataClientDriver.close();
+ metadataClientDriver = null;
+ }
+ if (null != scheduler) {
+ scheduler.shutdown();
+ }
+ }
+
+ /**
+ * The purpose of this test:
+ * 1. Client service open a readonly ledger handle, which has been closed.
+ * 2. All BKs that relates to the ledger have been decommissioned.
+ * 3. Auto recovery component moved the data into other BK instances who
is alive.
+ * 4. Verify: lhe ledger handle in the client memory keeps updating the
ledger ensemble set, and the new read
+ * request works.
+ */
+ @Test
+ public void testOpenedLedgerHandleStillWorkAfterDecommissioning() throws
Exception {
+ LedgerHandle lh = bkc.createLedger(2, 2, digestType, PASSWD);
+ assertTrue(lh.getLedgerMetadata().getAllEnsembles().get(0L).size() ==
2);
+ lh.addEntry(data);
+ lh.close();
+ List<BookieId> originalEnsemble =
lh.getLedgerMetadata().getAllEnsembles().get(0L);
+ LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType,
PASSWD, true);
+ assertTrue(originalEnsemble.size() == 2);
+
+ startNewBookie();
+ BookieServer newBookieServer3 = serverByIndex(lastBookieIndex());
+ killBookie(originalEnsemble.get(0));
+ waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(0),
newBookieServer3.getBookieId());
+
+ startNewBookie();
+ int newBookieIndex4 = lastBookieIndex();
+ BookieServer newBookieServer4 = serverByIndex(newBookieIndex4);
+ killBookie(originalEnsemble.get(1));
+ waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(1),
newBookieServer4.getBookieId());
+
+ Awaitility.await().untilAsserted(() -> {
+ LedgerEntries ledgerEntries = readonlyLh.read(0, 0);
+ assertNotNull(ledgerEntries);
+ byte[] entryBytes = ledgerEntries.getEntry(0L).getEntryBytes();
+ assertEquals(new String(data), new String(entryBytes));
+ ledgerEntries.close();
+ });
+ readonlyLh.close();
+ }
+
+ /**
+ * The purpose of this test:
+ * 1. Client service open a readonly ledger handle with recovery, which
has not been closed yet.
+ * 2. All BKs that relates to the ledger have been decommissioned.
+ * 3. Auto recovery component moved the data into other BK instances who
is alive.
+ * 4. Verify: lhe ledger handle in the client memory keeps updating the
ledger ensemble set, and the new read
+ * request works.
+ */
+ @Test
+ public void testRecoverOpenLedgerHandleStillWorkAfterDecommissioning()
throws Exception {
+ LedgerHandle lh = bkc.createLedger(2, 2, digestType, PASSWD);
+ assertTrue(lh.getLedgerMetadata().getAllEnsembles().get(0L).size() ==
2);
+ lh.addEntry(data);
+ List<BookieId> originalEnsemble =
lh.getLedgerMetadata().getAllEnsembles().get(0L);
+ LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType,
PASSWD, true);
+ assertTrue(originalEnsemble.size() == 2);
+
+ startNewBookie();
+ BookieServer newBookieServer3 = serverByIndex(lastBookieIndex());
+ killBookie(originalEnsemble.get(0));
+ waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(0),
newBookieServer3.getBookieId());
+
+ startNewBookie();
+ int newBookieIndex4 = lastBookieIndex();
+ BookieServer newBookieServer4 = serverByIndex(newBookieIndex4);
+ killBookie(originalEnsemble.get(1));
+ waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(1),
newBookieServer4.getBookieId());
+
+ Awaitility.await().untilAsserted(() -> {
+ LedgerEntries ledgerEntries = readonlyLh.read(0, 0);
+ assertNotNull(ledgerEntries);
+ byte[] entryBytes = ledgerEntries.getEntry(0L).getEntryBytes();
+ assertEquals(new String(data), new String(entryBytes));
+ ledgerEntries.close();
+ });
+ readonlyLh.close();
+ }
+
+ private void waitAutoRecoveryFinished(long lId, BookieId originalBookie,
+ BookieId newBookie) throws Exception
{
+ Awaitility.await().untilAsserted(() -> {
+ LedgerHandle openLedger = bkc.openLedger(lId, digestType, PASSWD);
+ NavigableMap<Long, ? extends List<BookieId>> map =
openLedger.getLedgerMetadata().getAllEnsembles();
+ try {
+ for (Map.Entry<Long, ? extends List<BookieId>> entry :
map.entrySet()) {
+ assertFalse(entry.getValue().contains(originalBookie));
+ assertTrue(entry.getValue().contains(newBookie));
+ }
+ } finally {
+ openLedger.close();
+ }
+ });
+ }
+}