This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.17
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit b6d8b0f809f4b6f7f716f122aba47d78215bd4c5
Author: fengyubiao <[email protected]>
AuthorDate: Wed Oct 29 18:26:59 2025 +0800

    [fix] Failed read entries after multiple decommissioning (#4613)
    
    * -
    
    * checkstyle
    
    * let all ledger handle enable watcher
    
    * let all ledger handle enable watcher
    
    * fix tests
    
    * fix tests
    
    * fix tests
    
    * add test logs for debug
    
    * add test logs for debug
    
    * add test logs for debug
    
    * -
    
    * add a new param keepUpdateMetadata when open a read-only ledger handle
    
    * address comments
    
    * address comment
    
    * address comment
    
    * test CI
    
    * test CI
    
    * test CI
    
    * test CI
    
    * test CI
    
    * test CI
    
    * test CI
    
    * remove logs for CI
    
    * test CI
    
    * remove logs for CI
    
    * address comment
    
    * fix test
    
    (cherry picked from commit bae9e496cce274398774a5cb52357ab34b07928b)
---
 .../org/apache/bookkeeper/client/BookKeeper.java   |  67 ++++++-
 .../org/apache/bookkeeper/client/LedgerOpenOp.java |  42 +++-
 .../bookkeeper/client/ReadOnlyLedgerHandle.java    |  10 +-
 .../replication/BookieAutoRecoveryTest.java        |  28 ++-
 .../FullEnsembleDecommissionedTest.java            | 218 +++++++++++++++++++++
 5 files changed, 352 insertions(+), 13 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 63a2aaee90..d3bc0f8f75 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -1212,14 +1212,52 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
      */
     public void asyncOpenLedger(final long lId, final DigestType digestType, 
final byte[] passwd,
                                 final OpenCallback cb, final Object ctx) {
+        asyncOpenLedger(lId, digestType, passwd, cb, ctx, false);
+    }
+
+    /**
+     * Open existing ledger asynchronously for reading.
+     *
+     * <p>Opening a ledger with this method invokes fencing and recovery on 
the ledger
+     * if the ledger has not been closed. Fencing will block all other clients 
from
+     * writing to the ledger. Recovery will make sure that the ledger is closed
+     * before reading from it.
+     *
+     * <p>Recovery also makes sure that any entries which reached one bookie, 
but not a
+     * quorum, will be replicated to a quorum of bookies. This occurs in cases 
were
+     * the writer of a ledger crashes after sending a write request to one 
bookie but
+     * before being able to send it to the rest of the bookies in the quorum.
+     *
+     * <p>If the ledger is already closed, neither fencing nor recovery will 
be applied.
+     *
+     * @see LedgerHandle#asyncClose
+     *
+     * @param lId
+     *          ledger identifier
+     * @param digestType
+     *          digest type, either MAC or CRC32
+     * @param passwd
+     *          password
+     * @param ctx
+     *          optional control object
+     * @param keepUpdateMetadata
+     *          Whether update ledger metadata if the auto-recover component 
modified the ledger's ensemble.
+     */
+    public void asyncOpenLedger(final long lId, final DigestType digestType, 
final byte[] passwd,
+                                final OpenCallback cb, final Object ctx, 
boolean keepUpdateMetadata) {
         closeLock.readLock().lock();
         try {
             if (closed) {
                 cb.openComplete(BKException.Code.ClientClosedException, null, 
ctx);
                 return;
             }
-            new LedgerOpenOp(BookKeeper.this, clientStats,
-                             lId, digestType, passwd, cb, ctx).initiate();
+            LedgerOpenOp ledgerOpenOp = new LedgerOpenOp(BookKeeper.this, 
clientStats,
+                 lId, digestType, passwd, cb, ctx);
+            if (keepUpdateMetadata) {
+                ledgerOpenOp.initiateWithKeepUpdateMetadata();
+            } else {
+                ledgerOpenOp.initiate();
+            }
         } finally {
             closeLock.readLock().unlock();
         }
@@ -1287,13 +1325,36 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
      */
     public LedgerHandle openLedger(long lId, DigestType digestType, byte[] 
passwd)
             throws BKException, InterruptedException {
+        return openLedger(lId, digestType, passwd, false);
+    }
+
+
+    /**
+     * Synchronous open ledger call.
+     *
+     * @see #asyncOpenLedger
+     * @param lId
+     *          ledger identifier
+     * @param digestType
+     *          digest type, either MAC or CRC32
+     * @param passwd
+     *          password
+     *
+     * @param keepUpdateMetadata
+     *          Whether update ledger metadata if the auto-recover component 
modified the ledger's ensemble.
+     * @return a handle to the open ledger
+     * @throws InterruptedException
+     * @throws BKException
+     */
+    public LedgerHandle openLedger(long lId, DigestType digestType, byte[] 
passwd, boolean keepUpdateMetadata)
+            throws BKException, InterruptedException {
         CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
         SyncOpenCallback result = new SyncOpenCallback(future);
 
         /*
          * Calls async open ledger
          */
-        asyncOpenLedger(lId, digestType, passwd, result, null);
+        asyncOpenLedger(lId, digestType, passwd, result, null, 
keepUpdateMetadata);
 
         return SyncCallbackUtils.waitForResult(future);
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
index 7278af7d42..ac16266859 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
@@ -57,6 +57,18 @@ class LedgerOpenOp {
     ReadOnlyLedgerHandle lh;
     final byte[] passwd;
     boolean doRecovery = true;
+    // The ledger metadata may be modified even if it has been closed, because 
the auto-recovery component may rewrite
+    // the ledger's metadata. Keep receiving a notification from ZK to avoid 
the following issue: an opened ledger
+    // handle in memory still accesses to a BK instance who has been 
decommissioned. The issue that solved happens as
+    // follows:
+    // 1. Client service open a readonly ledger handle, which has been closed.
+    // 2. All BKs that relates to the ledger have been decommissioned.
+    // 3. Auto recovery component moved the data into other BK instances who 
is alive.
+    // 4. The ledger handle in the client memory keeps connects to the BKs who 
in the original ensemble set, and the
+    //    connection will always fail.
+    // For minimum modification, to add a new configuration named 
"keepUpdateMetadata", users can use the
+    // new API to create a readonly ledger handle that will auto-updates 
metadata.
+    boolean keepUpdateMetadata = false;
     boolean administrativeOpen = false;
     long startTime;
     final OpStatsLogger openOpLogger;
@@ -126,6 +138,15 @@ class LedgerOpenOp {
         initiate();
     }
 
+    /**
+     * Different with {@link #initiate()}, the method keep update metadata 
once the auto-recover component modified
+     * the ensemble.
+     */
+    public void initiateWithKeepUpdateMetadata() {
+        this.keepUpdateMetadata = true;
+        initiate();
+    }
+
     private CompletableFuture<Void> closeLedgerHandleAsync() {
         if (lh != null) {
             return lh.closeAsync();
@@ -174,9 +195,25 @@ class LedgerOpenOp {
         }
 
         // get the ledger metadata back
+        // The cases that need to register listener immediately are:
+        // 1. The ledger is not in recovery opening, which is the original 
case.
+        // 2. The ledger is closed and need to keep update metadata. There is 
other cases that do not need to
+        //   register listener. e.g. The ledger is opening by Auto-Recovery 
component.
+        final boolean watchImmediately = !doRecovery || (keepUpdateMetadata && 
metadata.isClosed());
         try {
+            // The ledger metadata may be modified even if it has been closed, 
because the auto-recovery component may
+            // rewrite the ledger's metadata. Keep receiving a notification 
from ZK to avoid the following issue: an
+            // opened ledger handle in memory still accesses to a BK instance 
who has been decommissioned. The issue
+            // that solved happens as follows:
+            // 1. Client service open a readonly ledger handle, which has been 
closed.
+            // 2. All BKs that relates to the ledger have been decommissioned.
+            // 3. Auto recovery component moved the data into other BK 
instances who is alive.
+            // 4. The ledger handle in the client memory keeps connects to the 
BKs who in the original ensemble set,
+            //    and the connection will always fail.
+            // Therefore, if a user needs to the feature that update metadata 
automatically, he will set
+            // "keepUpdateMetadata" to "true",
             lh = new ReadOnlyLedgerHandle(bk.getClientCtx(), ledgerId, 
versionedMetadata, digestType,
-                                          passwd, !doRecovery);
+                                          passwd, watchImmediately);
         } catch (GeneralSecurityException e) {
             LOG.error("Security exception while opening ledger: " + ledgerId, 
e);
             openComplete(BKException.Code.DigestNotInitializedException, null);
@@ -199,6 +236,9 @@ class LedgerOpenOp {
                 public void safeOperationComplete(int rc, Void result) {
                     if (rc == BKException.Code.OK) {
                         openComplete(BKException.Code.OK, lh);
+                        if (!watchImmediately && keepUpdateMetadata) {
+                            lh.registerLedgerMetadataListener();
+                        }
                     } else {
                         closeLedgerHandleAsync().whenComplete((ignore, ex) -> {
                             if (ex != null) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index 9e883a8246..6d50cee40c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -95,14 +95,18 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements 
LedgerMetadataListene
     ReadOnlyLedgerHandle(ClientContext clientCtx,
                          long ledgerId, Versioned<LedgerMetadata> metadata,
                          BookKeeper.DigestType digestType, byte[] password,
-                         boolean watch)
+                         boolean watchImmediately)
             throws GeneralSecurityException, NumberFormatException {
         super(clientCtx, ledgerId, metadata, digestType, password, 
WriteFlag.NONE);
-        if (watch) {
-            
clientCtx.getLedgerManager().registerLedgerMetadataListener(ledgerId, this);
+        if (watchImmediately) {
+            registerLedgerMetadataListener();
         }
     }
 
+    void registerLedgerMetadataListener() {
+        clientCtx.getLedgerManager().registerLedgerMetadataListener(ledgerId, 
this);
+    }
+
     @Override
     public void close()
             throws InterruptedException, BKException {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
index ccb262ed26..95e029271a 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
@@ -96,6 +96,7 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
 
     @Override
     public void setUp() throws Exception {
+        LOG.info("Start setUp");
         super.setUp();
         baseConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
         baseClientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
@@ -117,10 +118,12 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
         mFactory = metadataClientDriver.getLedgerManagerFactory();
         underReplicationManager = mFactory.newLedgerUnderreplicationManager();
         ledgerManager = mFactory.newLedgerManager();
+        LOG.info("Finished setUp");
     }
 
     @Override
     public void tearDown() throws Exception {
+        LOG.info("Start tearDown");
         super.tearDown();
 
         if (null != underReplicationManager) {
@@ -138,6 +141,7 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
         if (null != scheduler) {
             scheduler.shutdown();
         }
+        LOG.info("Finished tearDown");
     }
 
     /**
@@ -146,6 +150,7 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
      */
     @Test
     public void testOpenLedgers() throws Exception {
+        LOG.info("Start testOpenLedgers");
         List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(1, 
5);
         LedgerHandle lh = listOfLedgerHandle.get(0);
         int ledgerReplicaIndex = 0;
@@ -186,6 +191,7 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
 
         verifyLedgerEnsembleMetadataAfterReplication(newBookieServer,
                 listOfLedgerHandle.get(0), ledgerReplicaIndex);
+        LOG.info("Finished testOpenLedgers");
     }
 
     /**
@@ -194,6 +200,7 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
      */
     @Test
     public void testClosedLedgers() throws Exception {
+        LOG.info("Start testClosedLedgers");
         List<Integer> listOfReplicaIndex = new ArrayList<Integer>();
         List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(1, 
5);
         closeLedgers(listOfLedgerHandle);
@@ -247,6 +254,7 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
                     listOfLedgerHandle.get(index),
                     listOfReplicaIndex.get(index));
         }
+        LOG.info("Finished testClosedLedgers");
     }
 
     /**
@@ -256,6 +264,7 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
      */
     @Test
     public void testStopWhileReplicationInProgress() throws Exception {
+        LOG.info("Start testStopWhileReplicationInProgress");
         int numberOfLedgers = 2;
         List<Integer> listOfReplicaIndex = new ArrayList<Integer>();
         List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(
@@ -327,6 +336,7 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
                     listOfLedgerHandle.get(index),
                     listOfReplicaIndex.get(index));
         }
+        LOG.info("Finished testStopWhileReplicationInProgress");
     }
 
     /**
@@ -336,6 +346,7 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
      */
     @Test
     public void testNoSuchLedgerExists() throws Exception {
+        LOG.info("Start testNoSuchLedgerExists");
         List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(2, 
5);
         CountDownLatch latch = new CountDownLatch(listOfLedgerHandle.size());
         for (LedgerHandle lh : listOfLedgerHandle) {
@@ -372,6 +383,7 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
             assertNull("UrLedger still exists after rereplication",
                     watchUrLedgerNode(getUrLedgerZNode(lh), latch));
         }
+        LOG.info("Finished testNoSuchLedgerExists");
     }
 
     /**
@@ -380,6 +392,7 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
      */
     @Test
     public void testEmptyLedgerLosesQuorumEventually() throws Exception {
+        LOG.info("Start testEmptyLedgerLosesQuorumEventually");
         LedgerHandle lh = bkc.createLedger(3, 2, 2, DigestType.CRC32, PASSWD);
         CountDownLatch latch = new CountDownLatch(1);
         String urZNode = getUrLedgerZNode(lh);
@@ -420,6 +433,7 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
 
         // should be able to open ledger without issue
         bkc.openLedger(lh.getId(), DigestType.CRC32, PASSWD);
+        LOG.info("Finished testEmptyLedgerLosesQuorumEventually");
     }
 
     /**
@@ -429,6 +443,7 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
     @Test
     public void testLedgerMetadataContainsIpAddressAsBookieID()
             throws Exception {
+        LOG.info("Start testLedgerMetadataContainsIpAddressAsBookieID");
         stopBKCluster();
         bkc = new BookKeeperTestClient(baseClientConf);
         // start bookie with useHostNameAsBookieID=false, as old bookie
@@ -494,7 +509,7 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
 
         verifyLedgerEnsembleMetadataAfterReplication(newBookieServer,
                 listOfLedgerHandle.get(0), ledgerReplicaIndex);
-
+        LOG.info("Finished testLedgerMetadataContainsIpAddressAsBookieID");
     }
 
     /**
@@ -504,6 +519,7 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
     @Test
     public void testLedgerMetadataContainsHostNameAsBookieID()
             throws Exception {
+        LOG.info("Start testLedgerMetadataContainsHostNameAsBookieID");
         stopBKCluster();
 
         bkc = new BookKeeperTestClient(baseClientConf);
@@ -572,7 +588,7 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
 
         verifyLedgerEnsembleMetadataAfterReplication(newBookieServer,
                 listOfLedgerHandle.get(0), ledgerReplicaIndex);
-
+        LOG.info("Finished testLedgerMetadataContainsHostNameAsBookieID");
     }
 
     private int getReplicaIndexInLedger(LedgerHandle lh, BookieId 
replicaToKill) {
@@ -634,13 +650,13 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
             @Override
             public void process(WatchedEvent event) {
                 if (event.getType() == EventType.NodeDeleted) {
-                    LOG.info("Received Ledger rereplication completion event :"
-                            + event.getType());
+                    LOG.info("Received Ledger replication completion. event : 
{}, path: {}, latchCount: {}",
+                            event.getType(), event.getPath(), 
latch.getCount());
                     latch.countDown();
                 }
                 if (event.getType() == EventType.NodeCreated) {
-                    LOG.info("Received urLedger publishing event :"
-                            + event.getType());
+                    LOG.info("Received urLedger publishing event: {}, path: 
{}, latchCount: {}",
+                            event.getType(), event.getPath(), 
latch.getCount());
                     latch.countDown();
                 }
             }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java
new file mode 100644
index 0000000000..0b5c5a8e1c
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration tests verifies the complete decommission tasks.
+ */
+public class FullEnsembleDecommissionedTest extends BookKeeperClusterTestCase {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(FullEnsembleDecommissionedTest.class);
+    private static final byte[] PASSWD = "admin".getBytes();
+    private static final byte[] data = "TESTDATA".getBytes();
+    private static final String openLedgerRereplicationGracePeriod = "3000"; 
// milliseconds
+
+    private DigestType digestType;
+    private MetadataClientDriver metadataClientDriver;
+    private LedgerManagerFactory mFactory;
+    private LedgerUnderreplicationManager underReplicationManager;
+    private LedgerManager ledgerManager;
+    private OrderedScheduler scheduler;
+
+    public FullEnsembleDecommissionedTest() throws Exception{
+        super(2);
+
+        baseConf.setLedgerManagerFactoryClassName(
+                "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
+        
baseConf.setOpenLedgerRereplicationGracePeriod(openLedgerRereplicationGracePeriod);
+        baseConf.setRwRereplicateBackoffMs(500);
+        baseClientConf.setLedgerManagerFactoryClassName(
+                "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
+        this.digestType = DigestType.MAC;
+        setAutoRecoveryEnabled(true);
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        baseConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        baseClientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+
+        scheduler = OrderedScheduler.newSchedulerBuilder()
+            .name("test-scheduler")
+            .numThreads(1)
+            .build();
+
+        metadataClientDriver = MetadataDrivers.getClientDriver(
+            URI.create(baseClientConf.getMetadataServiceUri()));
+        metadataClientDriver.initialize(
+            baseClientConf,
+            scheduler,
+            NullStatsLogger.INSTANCE,
+            Optional.empty());
+
+        // initialize urReplicationManager
+        mFactory = metadataClientDriver.getLedgerManagerFactory();
+        underReplicationManager = mFactory.newLedgerUnderreplicationManager();
+        ledgerManager = mFactory.newLedgerManager();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+
+        if (null != underReplicationManager) {
+            underReplicationManager.close();
+            underReplicationManager = null;
+        }
+        if (null != ledgerManager) {
+            ledgerManager.close();
+            ledgerManager = null;
+        }
+        if (null != metadataClientDriver) {
+            metadataClientDriver.close();
+            metadataClientDriver = null;
+        }
+        if (null != scheduler) {
+            scheduler.shutdown();
+        }
+    }
+
+    /**
+     * The purpose of this test:
+     * 1. Client service open a readonly ledger handle, which has been closed.
+     * 2. All BKs that relates to the ledger have been decommissioned.
+     * 3. Auto recovery component moved the data into other BK instances who 
is alive.
+     * 4. Verify: lhe ledger handle in the client memory keeps updating the 
ledger ensemble set, and the new read
+     *    request works.
+     */
+    @Test
+    public void testOpenedLedgerHandleStillWorkAfterDecommissioning() throws 
Exception {
+        LedgerHandle lh = bkc.createLedger(2, 2, digestType, PASSWD);
+        assertTrue(lh.getLedgerMetadata().getAllEnsembles().get(0L).size() == 
2);
+        lh.addEntry(data);
+        lh.close();
+        List<BookieId> originalEnsemble = 
lh.getLedgerMetadata().getAllEnsembles().get(0L);
+        LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, 
PASSWD, true);
+        assertTrue(originalEnsemble.size() == 2);
+
+        startNewBookie();
+        BookieServer newBookieServer3 = serverByIndex(lastBookieIndex());
+        killBookie(originalEnsemble.get(0));
+        waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(0), 
newBookieServer3.getBookieId());
+
+        startNewBookie();
+        int newBookieIndex4 = lastBookieIndex();
+        BookieServer newBookieServer4 = serverByIndex(newBookieIndex4);
+        killBookie(originalEnsemble.get(1));
+        waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(1), 
newBookieServer4.getBookieId());
+
+        Awaitility.await().untilAsserted(() -> {
+            LedgerEntries ledgerEntries = readonlyLh.read(0, 0);
+            assertNotNull(ledgerEntries);
+            byte[] entryBytes = ledgerEntries.getEntry(0L).getEntryBytes();
+            assertEquals(new String(data), new String(entryBytes));
+            ledgerEntries.close();
+        });
+        readonlyLh.close();
+    }
+
+    /**
+     * The purpose of this test:
+     * 1. Client service open a readonly ledger handle with recovery, which 
has not been closed yet.
+     * 2. All BKs that relates to the ledger have been decommissioned.
+     * 3. Auto recovery component moved the data into other BK instances who 
is alive.
+     * 4. Verify: lhe ledger handle in the client memory keeps updating the 
ledger ensemble set, and the new read
+     *    request works.
+     */
+    @Test
+    public void testRecoverOpenLedgerHandleStillWorkAfterDecommissioning() 
throws Exception {
+        LedgerHandle lh = bkc.createLedger(2, 2, digestType, PASSWD);
+        assertTrue(lh.getLedgerMetadata().getAllEnsembles().get(0L).size() == 
2);
+        lh.addEntry(data);
+        List<BookieId> originalEnsemble = 
lh.getLedgerMetadata().getAllEnsembles().get(0L);
+        LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, 
PASSWD, true);
+        assertTrue(originalEnsemble.size() == 2);
+
+        startNewBookie();
+        BookieServer newBookieServer3 = serverByIndex(lastBookieIndex());
+        killBookie(originalEnsemble.get(0));
+        waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(0), 
newBookieServer3.getBookieId());
+
+        startNewBookie();
+        int newBookieIndex4 = lastBookieIndex();
+        BookieServer newBookieServer4 = serverByIndex(newBookieIndex4);
+        killBookie(originalEnsemble.get(1));
+        waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(1), 
newBookieServer4.getBookieId());
+
+        Awaitility.await().untilAsserted(() -> {
+            LedgerEntries ledgerEntries = readonlyLh.read(0, 0);
+            assertNotNull(ledgerEntries);
+            byte[] entryBytes = ledgerEntries.getEntry(0L).getEntryBytes();
+            assertEquals(new String(data), new String(entryBytes));
+            ledgerEntries.close();
+        });
+        readonlyLh.close();
+    }
+
+    private void waitAutoRecoveryFinished(long lId, BookieId originalBookie,
+                                          BookieId newBookie) throws Exception 
{
+        Awaitility.await().untilAsserted(() -> {
+            LedgerHandle openLedger = bkc.openLedger(lId, digestType, PASSWD);
+            NavigableMap<Long, ? extends List<BookieId>> map = 
openLedger.getLedgerMetadata().getAllEnsembles();
+            try {
+                for (Map.Entry<Long, ? extends List<BookieId>> entry : 
map.entrySet()) {
+                    assertFalse(entry.getValue().contains(originalBookie));
+                    assertTrue(entry.getValue().contains(newBookie));
+                }
+            } finally {
+                openLedger.close();
+            }
+        });
+    }
+}

Reply via email to