This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-4.12
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.12 by this push:
     new 1252571  Add fencing to recovery reads to avoid data loss issue
1252571 is described below

commit 12525717adf45f9c32c12cfd6228935e98b2b51b
Author: Jack Vanlightly <[email protected]>
AuthorDate: Thu Mar 11 23:20:31 2021 -0800

    Add fencing to recovery reads to avoid data loss issue
    
    Descriptions of the changes in this PR:
    
    ### Motivation
    
    Adding the fencing flag to recovery reads avoids a data loss scenario as 
described in [issue 2614](https://github.com/apache/bookkeeper/issues/2614)
    
    ### Changes
    
    Added the fencing flag to recovery reads. Refactored some mocking unit test 
code to introduce fencing and allow two writers to share the same bookie state. 
Added a new unit to verify the fix. You can recreate the initial problem by 
removing the fencing flag from recovery reads and running the new unit test.
    
    Master Issue: #2614
    
    Reviewers: Enrico Olivelli <[email protected]>, Andrey Yegorov 
<[email protected]>
    
    This closes #2616 from Vanlightly/fix-fencing
    
    (cherry picked from commit 017307bc67431a7616861ad09927c4e3327633d1)
    Signed-off-by: Enrico Olivelli <[email protected]>
---
 .../apache/bookkeeper/client/PendingReadOp.java    |  11 +-
 .../bookkeeper/client/LedgerRecovery2Test.java     | 234 ++++++++++++++++++++-
 .../bookkeeper/client/MockClientContext.java       |  30 +--
 .../apache/bookkeeper/proto/MockBookieClient.java  | 163 ++++++--------
 .../org/apache/bookkeeper/proto/MockBookies.java   | 131 ++++++++++++
 .../apache/bookkeeper/proto/MockLedgerData.java    |  64 ++++++
 6 files changed, 517 insertions(+), 116 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index e6c011e..b48761e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -568,9 +568,14 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
             lh.throttler.acquire();
         }
 
-        int flags = isRecoveryRead ? BookieProtocol.FLAG_HIGH_PRIORITY : 
BookieProtocol.FLAG_NONE;
-        clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
-                                              this, new 
ReadContext(bookieIndex, to, entry), flags);
+        if (isRecoveryRead) {
+            int flags = BookieProtocol.FLAG_HIGH_PRIORITY | 
BookieProtocol.FLAG_DO_FENCING;
+            clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
+                    this, new ReadContext(bookieIndex, to, entry), flags, 
lh.ledgerKey);
+        } else {
+            clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
+                    this, new ReadContext(bookieIndex, to, entry), 
BookieProtocol.FLAG_NONE);
+        }
     }
 
     @Override
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
index af6d980..38e4879 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
@@ -21,16 +21,23 @@
 package org.apache.bookkeeper.client;
 
 import com.google.common.collect.Lists;
+
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallbackFuture;
+import org.apache.bookkeeper.proto.MockBookies;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.junit.Assert;
 import org.junit.Test;
@@ -86,7 +93,7 @@ public class LedgerRecovery2Test {
 
         Versioned<LedgerMetadata> md = setupLedger(clientCtx, 1, 
Lists.newArrayList(b1, b2, b3));
 
-        clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+        clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 
0L, -1L);
         clientCtx.getMockBookieClient().setPreWriteHook(
                 (bookie, ledgerId, entryId) -> FutureUtils.exception(new 
BKException.BKWriteException()));
 
@@ -110,7 +117,7 @@ public class LedgerRecovery2Test {
 
         CompletableFuture<Void> writingBack = new CompletableFuture<>();
         CompletableFuture<Void> blocker = new CompletableFuture<>();
-        clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+        clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 
0L, -1L);
         // will block recovery at the writeback phase
         clientCtx.getMockBookieClient().setPreWriteHook(
                 (bookie, ledgerId, entryId) -> {
@@ -149,7 +156,7 @@ public class LedgerRecovery2Test {
         CompletableFuture<Void> writingBack = new CompletableFuture<>();
         CompletableFuture<Void> blocker = new CompletableFuture<>();
         CompletableFuture<Void> failing = new CompletableFuture<>();
-        clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+        clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 
0L, -1L);
         // will block recovery at the writeback phase
         clientCtx.getMockBookieClient().setPreWriteHook(
                 (bookie, ledgerId, entryId) -> {
@@ -195,7 +202,7 @@ public class LedgerRecovery2Test {
         CompletableFuture<Void> writingBack = new CompletableFuture<>();
         CompletableFuture<Void> blocker = new CompletableFuture<>();
         CompletableFuture<Void> failing = new CompletableFuture<>();
-        clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+        clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 
0L, -1L);
         clientCtx.getMockBookieClient().errorBookies(b2);
 
         ReadOnlyLedgerHandle lh = new ReadOnlyLedgerHandle(
@@ -216,8 +223,8 @@ public class LedgerRecovery2Test {
         clientCtx.getMockRegistrationClient().addBookies(b4).get();
 
         Versioned<LedgerMetadata> md = setupLedger(clientCtx, 1, 
Lists.newArrayList(b1, b2, b3));
-        clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
-        clientCtx.getMockBookieClient().seedEntries(b3, 1L, 1L, -1L);
+        clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 
0L, -1L);
+        clientCtx.getMockBookieClient().getMockBookies().seedEntries(b3, 1L, 
1L, -1L);
         clientCtx.getMockBookieClient().setPreWriteHook(
                 (bookie, ledgerId, entryId) -> {
                     if (bookie.equals(b2) && entryId == 1L) {
@@ -248,7 +255,7 @@ public class LedgerRecovery2Test {
         clientCtx.getMockRegistrationClient().addBookies(b4, b5).get();
 
         Versioned<LedgerMetadata> md = setupLedger(clientCtx, 1, 
Lists.newArrayList(b1, b2, b3));
-        clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+        clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 
0L, -1L);
         clientCtx.getMockBookieClient().setPreWriteHook(
                 (bookie, ledgerId, entryId) -> {
                     if (bookie.equals(b1) || bookie.equals(b2)) {
@@ -271,4 +278,217 @@ public class LedgerRecovery2Test {
         
Assert.assertTrue(lh.getLedgerMetadata().getAllEnsembles().get(0L).contains(b5));
         Assert.assertEquals(lh.getLastAddConfirmed(), 0L);
     }
+
+    /**
+     * This test verifies the fix for the data loss scenario found by the TLA+ 
specfication, specifically
+     * the invariant violation that metadata and writer can diverge. The 
scenario is that the original writer
+     * can commit an entry e that will later be lost because a second writer 
can close the ledger at e-1.
+     * The cause is that fencing was originally only performed on LAC reads 
which is not enough to prevent
+     * the 1st writer from reaching Ack Quorum after the 2nd writer has closed 
the ledger. The fix has
+     * been to fence on recovery reads also.
+     */
+    @Test
+    public void testFirstWriterCannotCommitWriteAfter2ndWriterCloses() throws 
Exception {
+        /*
+            This test uses CompletableFutures to control the sequence of 
actions performed by
+            two writers. There are different sets of futures:
+             - block*: These futures block the various reads, writes and 
metadata updates until the
+                       test thread is ready for them to be executed. Thus 
ensuring the right sequence
+                       of events occur.
+             - reachedStepN: These futures block in the test thread to ensure 
that we only unblock
+                             an action when the prior one has been executed 
and we are already blocked
+                             on the next actionin the sequence.
+         */
+
+        //  Setup w1
+        CompletableFuture<Void> reachedStep1 = new CompletableFuture<>();
+        CompletableFuture<Void> reachedStep2 = new CompletableFuture<>();
+        CompletableFuture<Void> reachedStep3 = new CompletableFuture<>();
+        CompletableFuture<Void> reachedStep4 = new CompletableFuture<>();
+        CompletableFuture<Void> reachedStep5 = new CompletableFuture<>();
+        CompletableFuture<Void> reachedStep6 = new CompletableFuture<>();
+        CompletableFuture<Void> reachedStep7 = new CompletableFuture<>();
+        CompletableFuture<Void> reachedStep8 = new CompletableFuture<>();
+        CompletableFuture<Void> reachedStep9 = new CompletableFuture<>();
+
+        MockBookies mockBookies = new MockBookies();
+        MockClientContext clientCtx1 = MockClientContext.create(mockBookies);
+        Versioned<LedgerMetadata> md1 = setupLedger(clientCtx1, 1, 
Lists.newArrayList(b1, b2, b3));
+
+        CompletableFuture<Void> blockB1Write = new CompletableFuture<>();
+        CompletableFuture<Void> blockB2Write = new CompletableFuture<>();
+        CompletableFuture<Void> blockB3Write = new CompletableFuture<>();
+        clientCtx1.getMockBookieClient().setPreWriteHook(
+                (bookie, ledgerId, entryId) -> {
+                    // ignore seed entries e0 and e1
+                    if (entryId < 2) {
+                        return FutureUtils.value(null);
+                    }
+
+                    if (!reachedStep1.isDone()) {
+                        reachedStep1.complete(null);
+                    }
+
+                    if (bookie.equals(b1)) {
+                        return blockB1Write;
+                    } else if (bookie.equals(b2)) {
+                        reachedStep9.complete(null);
+                        return blockB2Write;
+                    } else if (bookie.equals(b3)) {
+                        reachedStep3.complete(null);
+                        return blockB3Write;
+                    }  else {
+                        return FutureUtils.value(null);
+                    }
+                });
+
+        LedgerHandle w1 = new LedgerHandle(clientCtx1, 1, md1,
+                BookKeeper.DigestType.CRC32C,
+                ClientUtil.PASSWD, WriteFlag.NONE);
+        w1.addEntry("e0".getBytes(StandardCharsets.UTF_8));
+        w1.addEntry("e1".getBytes(StandardCharsets.UTF_8));
+
+        //  Setup w2
+        MockClientContext clientCtx2 = MockClientContext.create(mockBookies);
+        Versioned<LedgerMetadata> md2 = setupLedger(clientCtx2, 1, 
Lists.newArrayList(b1, b2, b3));
+
+        CompletableFuture<Void> blockB1ReadLac = new CompletableFuture<>();
+        CompletableFuture<Void> blockB2ReadLac = new CompletableFuture<>();
+        CompletableFuture<Void> blockB3ReadLac = new CompletableFuture<>();
+
+        CompletableFuture<Void> blockB1ReadEntry0 = new CompletableFuture<>();
+        CompletableFuture<Void> blockB2ReadEntry0 = new CompletableFuture<>();
+        CompletableFuture<Void> blockB3ReadEntry0 = new CompletableFuture<>();
+
+        AtomicBoolean isB1LacRead = new AtomicBoolean(true);
+        AtomicBoolean isB2LacRead = new AtomicBoolean(true);
+        AtomicBoolean isB3LacRead = new AtomicBoolean(true);
+
+        clientCtx2.getMockBookieClient().setPreReadHook(
+                (bookie, ledgerId, entryId) -> {
+                    if (bookie.equals(b1)) {
+                        if (isB1LacRead.get()) {
+                            isB1LacRead.set(false);
+                            reachedStep2.complete(null);
+                            return blockB1ReadLac;
+                        } else {
+                            reachedStep6.complete(null);
+                            return blockB1ReadEntry0;
+                        }
+                    } else if (bookie.equals(b2)) {
+                        if (isB2LacRead.get()) {
+                            try {
+                                isB2LacRead.set(false);
+                                reachedStep4.complete(null);
+                                blockB2ReadLac.get(); // block this read - it 
does not succeed
+                            } catch (Throwable t){}
+                            return FutureUtils.exception(new 
BKException.BKWriteException());
+                        } else {
+                            reachedStep7.complete(null);
+                            return blockB2ReadEntry0;
+                        }
+                    } else if (bookie.equals(b3)) {
+                        if (isB3LacRead.get()) {
+                            isB3LacRead.set(false);
+                            reachedStep5.complete(null);
+                            return blockB3ReadLac;
+                        } else {
+                            return blockB3ReadEntry0;
+                        }
+                    }  else {
+                        return FutureUtils.value(null);
+                    }
+                });
+
+        AtomicInteger w2MetaUpdates = new AtomicInteger(0);
+        CompletableFuture<Void> blockW2StartingRecovery = new 
CompletableFuture<>();
+        CompletableFuture<Void> blockW2ClosingLedger = new 
CompletableFuture<>();
+        clientCtx2.getMockLedgerManager().setPreWriteHook((ledgerId, metadata) 
-> {
+            if (w2MetaUpdates.get() == 0) {
+                w2MetaUpdates.incrementAndGet();
+                return blockW2StartingRecovery;
+            } else {
+                reachedStep8.complete(null);
+                return blockW2ClosingLedger;
+            }
+        });
+
+        ReadOnlyLedgerHandle w2 = new ReadOnlyLedgerHandle(
+                clientCtx2, 1L, md2, BookKeeper.DigestType.CRC32C, PASSWD, 
false);
+
+        // Start an async add entry, blocked for now.
+        CompletableFuture<Object> w1WriteFuture = new CompletableFuture<>();
+        AtomicInteger writeResult = new AtomicInteger(0);
+        w1.asyncAddEntry("e2".getBytes(), (int rc, LedgerHandle lh1, long 
entryId, Object ctx) -> {
+            if (rc == BKException.Code.OK) {
+                writeResult.set(1);
+            } else {
+                writeResult.set(2);
+            }
+            SyncCallbackUtils.finish(rc, null, w1WriteFuture);
+        }, null);
+
+        // Step 1. w2 starts recovery
+        stepBlock(reachedStep1);
+        GenericCallbackFuture<Void> recoveryPromise = new 
GenericCallbackFuture<>();
+        w2.recover(recoveryPromise, null, false);
+        blockW2StartingRecovery.complete(null);
+
+        // Step 2. w2 fencing read LAC reaches B1
+        stepBlock(reachedStep2);
+        blockB1ReadLac.complete(null);
+
+        // Step 3. w1 add e0 reaches B3
+        stepBlock(reachedStep3);
+        blockB3Write.complete(null);
+
+        // Step 4. w2 fencing LAC read does not reach B2 or it fails
+        stepBlock(reachedStep4);
+        blockB2ReadLac.complete(null);
+
+        // Step 5. w2 fencing LAC read reaches B3
+        stepBlock(reachedStep5);
+        blockB3ReadLac.complete(null);
+
+        // Step 6. w2 sends read e0 to b1, gets NoSuchLedger
+        stepBlock(reachedStep6);
+        blockB1ReadEntry0.complete(null);
+
+        // Step 7. w2 send read e0 to b2, gets NoSuchLedger
+        stepBlock(reachedStep7);
+        blockB2ReadEntry0.complete(null);
+
+        // Step 8. w2 closes ledger because (Qw-Qa)+1 bookies confirmed they 
do not have it
+        // last entry id set to 0
+        stepBlock(reachedStep8);
+        blockW2ClosingLedger.complete(null);
+
+        // Step 9. w1 add e0 reaches b2 (which was fenced by a recovery read)
+        stepBlock(reachedStep9);
+        blockB2Write.complete(null);
+
+        // Step 10. w1 write fails to reach AckQuorum
+        try {
+            w1WriteFuture.get(200, TimeUnit.MILLISECONDS);
+            Assert.fail("The write to b2 should have failed as it was fenced 
by the recovery read of step 7");
+        } catch (ExecutionException e) {
+            Assert.assertTrue(e.getCause() instanceof 
BKException.BKLedgerFencedException);
+        }
+
+        // w1 received negative acknowledgement of e2 being written
+        Assert.assertEquals(1, 
w1.getLedgerMetadata().getAllEnsembles().size());
+        Assert.assertEquals(2, writeResult.get());
+        Assert.assertEquals(1L, w1.getLastAddConfirmed());
+
+        // w2 closed the ledger with only the original entries, not the third 
one
+        // i.e there is no divergence between w1m, w2 and metadata
+        Assert.assertEquals(1, 
w2.getLedgerMetadata().getAllEnsembles().size());
+        Assert.assertEquals(1L, w2.getLastAddConfirmed());
+    }
+
+    private void stepBlock(CompletableFuture<Void> reachedStepFuture) {
+        try {
+            reachedStepFuture.get();
+        } catch (Exception e) {}
+    }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
index 68d0502..ae3475e 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
@@ -35,6 +35,7 @@ import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.MockLedgerManager;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.MockBookieClient;
+import org.apache.bookkeeper.proto.MockBookies;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.mockito.Mockito;
 
@@ -55,7 +56,7 @@ public class MockClientContext implements ClientContext {
     private MockRegistrationClient regClient;
     private ByteBufAllocator allocator;
 
-    static MockClientContext create() throws Exception {
+    static MockClientContext create(MockBookies mockBookies) throws Exception {
         ClientConfiguration conf = new ClientConfiguration();
         OrderedScheduler scheduler = 
OrderedScheduler.newSchedulerBuilder().name("mock-executor").numThreads(1).build();
         MockRegistrationClient regClient = new MockRegistrationClient();
@@ -67,17 +68,22 @@ public class MockClientContext implements ClientContext {
         bookieWatcherImpl.initialBlockingBookieRead();
 
         return new MockClientContext()
-            .setConf(ClientInternalConf.fromConfig(conf))
-            .setLedgerManager(new MockLedgerManager())
-            .setBookieWatcher(bookieWatcherImpl)
-            .setPlacementPolicy(placementPolicy)
-            .setRegistrationClient(regClient)
-            .setBookieClient(new MockBookieClient(scheduler))
-            .setByteBufAllocator(UnpooledByteBufAllocator.DEFAULT)
-            .setMainWorkerPool(scheduler)
-            .setScheduler(scheduler)
-            
.setClientStats(BookKeeperClientStats.newInstance(NullStatsLogger.INSTANCE))
-            .setIsClientClosed(() -> false);
+                .setConf(ClientInternalConf.fromConfig(conf))
+                .setLedgerManager(new MockLedgerManager())
+                .setBookieWatcher(bookieWatcherImpl)
+                .setPlacementPolicy(placementPolicy)
+                .setRegistrationClient(regClient)
+                .setBookieClient(new MockBookieClient(scheduler, mockBookies))
+                .setByteBufAllocator(UnpooledByteBufAllocator.DEFAULT)
+                .setMainWorkerPool(scheduler)
+                .setScheduler(scheduler)
+                
.setClientStats(BookKeeperClientStats.newInstance(NullStatsLogger.INSTANCE))
+                .setIsClientClosed(() -> false);
+    }
+
+    static MockClientContext create() throws Exception {
+        MockBookies mockBookies = new MockBookies();
+        return create(mockBookies);
     }
 
     static MockClientContext copyOf(ClientContext other) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
index fa3d1ba..f655328 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
@@ -20,23 +20,20 @@
  */
 package org.apache.bookkeeper.proto;
 
+import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_RECOVERY_ADD;
 import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 
-
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -49,14 +46,13 @@ import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
-import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
-import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
 import org.apache.bookkeeper.util.ByteBufList;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Mock implementation of BookieClient.
  */
@@ -64,9 +60,9 @@ public class MockBookieClient implements BookieClient {
     static final Logger LOG = LoggerFactory.getLogger(MockBookieClient.class);
 
     final OrderedExecutor executor;
-    final ConcurrentHashMap<BookieId, ConcurrentHashMap<Long, LedgerData>> 
data = new ConcurrentHashMap<>();
+    final MockBookies mockBookies;
     final Set<BookieId> errorBookies =
-        Collections.newSetFromMap(new ConcurrentHashMap<BookieId, Boolean>());
+            Collections.newSetFromMap(new ConcurrentHashMap<>());
 
     /**
      * Runs before or after an operation. Can stall the operation or error it.
@@ -82,6 +78,13 @@ public class MockBookieClient implements BookieClient {
 
     public MockBookieClient(OrderedExecutor executor) {
         this.executor = executor;
+        this.mockBookies = new MockBookies();
+    }
+
+    public MockBookieClient(OrderedExecutor executor,
+                            MockBookies mockBookies) {
+        this.executor = executor;
+        this.mockBookies = mockBookies;
     }
 
     public void setPreReadHook(Hook hook) {
@@ -110,14 +113,12 @@ public class MockBookieClient implements BookieClient {
         }
     }
 
-    public void seedEntries(BookieId bookie, long ledgerId, long entryId, long 
lac) throws Exception {
-        DigestManager digestManager = DigestManager.instantiate(ledgerId, new 
byte[0], DigestType.CRC32C,
-                UnpooledByteBufAllocator.DEFAULT, false);
-        ByteBuf entry = 
ByteBufList.coalesce(digestManager.computeDigestAndPackageForSending(
-                                                     entryId, lac, 0, 
Unpooled.buffer(10)));
+    public boolean isErrored(BookieId bookieId) {
+        return errorBookies.contains(bookieId);
+    }
 
-        LedgerData ledger = getBookieData(bookie).computeIfAbsent(ledgerId, 
LedgerData::new);
-        ledger.addEntry(entryId, entry);
+    public MockBookies getMockBookies() {
+        return mockBookies;
     }
 
     @Override
@@ -140,9 +141,9 @@ public class MockBookieClient implements BookieClient {
                             ForceLedgerCallback cb, Object ctx) {
         executor.executeOrdered(ledgerId,
                 safeRun(() -> {
-                        
cb.forceLedgerComplete(BKException.Code.IllegalOpException,
-                                               ledgerId, addr, ctx);
-                    }));
+                    cb.forceLedgerComplete(BKException.Code.IllegalOpException,
+                            ledgerId, addr, ctx);
+                }));
     }
 
     @Override
@@ -150,9 +151,9 @@ public class MockBookieClient implements BookieClient {
                          long lac, ByteBufList toSend, WriteLacCallback cb, 
Object ctx) {
         executor.executeOrdered(ledgerId,
                 safeRun(() -> {
-                        
cb.writeLacComplete(BKException.Code.IllegalOpException,
-                                               ledgerId, addr, ctx);
-                    }));
+                    cb.writeLacComplete(BKException.Code.IllegalOpException,
+                            ledgerId, addr, ctx);
+                }));
     }
 
     @Override
@@ -161,23 +162,33 @@ public class MockBookieClient implements BookieClient {
                          int options, boolean allowFastFail, 
EnumSet<WriteFlag> writeFlags) {
         toSend.retain();
         preWriteHook.runHook(addr, ledgerId, entryId)
-            .thenComposeAsync(
-                (ignore) -> {
-                    LOG.info("[{};L{}] write entry {}", addr, ledgerId, 
entryId);
-                    if (errorBookies.contains(addr)) {
-                        LOG.warn("[{};L{}] erroring write {}", addr, ledgerId, 
entryId);
-                        return FutureUtils.exception(new 
BKException.BKWriteException());
-                    }
-                    LedgerData ledger = 
getBookieData(addr).computeIfAbsent(ledgerId, LedgerData::new);
-                    ledger.addEntry(entryId, copyData(toSend));
-                    toSend.release();
-                    return FutureUtils.value(null);
-                }, executor.chooseThread(ledgerId))
-            .thenCompose((res) -> postWriteHook.runHook(addr, ledgerId, 
entryId))
-            .whenCompleteAsync((res, ex) -> {
+                .thenComposeAsync(
+                        (ignore) -> {
+                            LOG.info("[{};L{}] write entry {}", addr, 
ledgerId, entryId);
+                            if (isErrored(addr)) {
+                                LOG.warn("[{};L{}] erroring write {}", addr, 
ledgerId, entryId);
+                                return FutureUtils.exception(new 
BKException.BKWriteException());
+                            }
+
+                            try {
+                                if ((options & FLAG_RECOVERY_ADD) == 
FLAG_RECOVERY_ADD) {
+                                    mockBookies.recoveryAddEntry(addr, 
ledgerId, entryId, copyData(toSend));
+                                } else {
+                                    mockBookies.addEntry(addr, ledgerId, 
entryId, copyData(toSend));
+                                }
+                            } catch (BKException bke) {
+                                return FutureUtils.exception(bke);
+                            } finally {
+                                toSend.release();
+                            }
+
+                            return FutureUtils.value(null);
+                        }, executor.chooseThread(ledgerId))
+                .thenCompose((res) -> postWriteHook.runHook(addr, ledgerId, 
entryId))
+                .whenCompleteAsync((res, ex) -> {
                     if (ex != null) {
                         cb.writeComplete(BKException.getExceptionCode(ex, 
BKException.Code.WriteException),
-                                         ledgerId, entryId, addr, ctx);
+                                ledgerId, entryId, addr, ctx);
                     } else {
                         cb.writeComplete(BKException.Code.OK, ledgerId, 
entryId, addr, ctx);
                     }
@@ -188,9 +199,9 @@ public class MockBookieClient implements BookieClient {
     public void readLac(BookieId addr, long ledgerId, ReadLacCallback cb, 
Object ctx) {
         executor.executeOrdered(ledgerId,
                 safeRun(() -> {
-                        cb.readLacComplete(BKException.Code.IllegalOpException,
-                                           ledgerId, null, null, ctx);
-                    }));
+                    cb.readLacComplete(BKException.Code.IllegalOpException,
+                            ledgerId, null, null, ctx);
+                }));
     }
 
     @Override
@@ -198,35 +209,28 @@ public class MockBookieClient implements BookieClient {
                           ReadEntryCallback cb, Object ctx, int flags, byte[] 
masterKey,
                           boolean allowFastFail) {
         preReadHook.runHook(addr, ledgerId, entryId)
-            .thenComposeAsync((res) -> {
+                .thenComposeAsync((res) -> {
                     LOG.info("[{};L{}] read entry {}", addr, ledgerId, 
entryId);
-                    if (errorBookies.contains(addr)) {
+                    if (isErrored(addr)) {
                         LOG.warn("[{};L{}] erroring read {}", addr, ledgerId, 
entryId);
                         return FutureUtils.exception(new 
BKException.BKReadException());
                     }
 
-                    LedgerData ledger = getBookieData(addr).get(ledgerId);
-                    if (ledger == null) {
-                        LOG.warn("[{};L{}] ledger not found", addr, ledgerId);
-                        return FutureUtils.exception(new 
BKException.BKNoSuchLedgerExistsException());
-                    }
-
-                    ByteBuf entry = ledger.getEntry(entryId);
-                    if (entry == null) {
-                        LOG.warn("[{};L{}] entry({}) not found", addr, 
ledgerId, entryId);
-                        return FutureUtils.exception(new 
BKException.BKNoSuchEntryException());
+                    try {
+                        ByteBuf entry = mockBookies.readEntry(addr, flags, 
ledgerId, entryId);
+                        return FutureUtils.value(entry);
+                    } catch (BKException bke) {
+                        return FutureUtils.exception(bke);
                     }
-
-                    return FutureUtils.value(entry);
                 }, executor.chooseThread(ledgerId))
-            .thenCompose((buf) -> postReadHook.runHook(addr, ledgerId, 
entryId).thenApply((res) -> buf))
-            .whenCompleteAsync((res, ex) -> {
+                .thenCompose((buf) -> postReadHook.runHook(addr, ledgerId, 
entryId).thenApply((res) -> buf))
+                .whenCompleteAsync((res, ex) -> {
                     if (ex != null) {
                         cb.readEntryComplete(BKException.getExceptionCode(ex, 
BKException.Code.ReadException),
-                                             ledgerId, entryId, null, ctx);
+                                ledgerId, entryId, null, ctx);
                     } else {
                         cb.readEntryComplete(BKException.Code.OK,
-                                             ledgerId, entryId, res.slice(), 
ctx);
+                                ledgerId, entryId, res.slice(), ctx);
                     }
                 }, executor.chooseThread(ledgerId));
     }
@@ -242,9 +246,9 @@ public class MockBookieClient implements BookieClient {
                                           Object ctx) {
         executor.executeOrdered(ledgerId,
                 safeRun(() -> {
-                        
cb.readEntryComplete(BKException.Code.IllegalOpException,
-                                             ledgerId, entryId, null, ctx);
-                    }));
+                    cb.readEntryComplete(BKException.Code.IllegalOpException,
+                            ledgerId, entryId, null, ctx);
+                }));
     }
 
     @Override
@@ -252,14 +256,14 @@ public class MockBookieClient implements BookieClient {
                               GetBookieInfoCallback cb, Object ctx) {
         executor.executeOrdered(addr,
                 safeRun(() -> {
-                        
cb.getBookieInfoComplete(BKException.Code.IllegalOpException,
-                                                 null, ctx);
-                    }));
+                    
cb.getBookieInfoComplete(BKException.Code.IllegalOpException,
+                            null, ctx);
+                }));
     }
 
     @Override
     public CompletableFuture<AvailabilityOfEntriesOfLedger> 
getListOfEntriesOfLedger(BookieId address,
-            long ledgerId) {
+                                                                               
      long ledgerId) {
         FutureGetListOfEntriesOfLedger futureResult = new 
FutureGetListOfEntriesOfLedger(ledgerId);
         executor.executeOrdered(address, safeRun(() -> {
             futureResult
@@ -277,10 +281,6 @@ public class MockBookieClient implements BookieClient {
     public void close() {
     }
 
-    private ConcurrentHashMap<Long, LedgerData> getBookieData(BookieId addr) {
-        return data.computeIfAbsent(addr, (key) -> new ConcurrentHashMap<>());
-    }
-
     private static ByteBuf copyData(ByteBufList list) {
         ByteBuf buf = Unpooled.buffer(list.readableBytes());
         for (int i = 0; i < list.size(); i++) {
@@ -288,29 +288,4 @@ public class MockBookieClient implements BookieClient {
         }
         return buf;
     }
-
-    private static class LedgerData {
-        final long ledgerId;
-        private TreeMap<Long, ByteBuf> entries = new TreeMap<>();
-        LedgerData(long ledgerId) {
-            this.ledgerId = ledgerId;
-        }
-
-        void addEntry(long entryId, ByteBuf entry) {
-            entries.put(entryId, entry);
-        }
-
-        ByteBuf getEntry(long entryId) {
-            if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
-                Map.Entry<Long, ByteBuf> lastEntry = entries.lastEntry();
-                if (lastEntry != null) {
-                    return lastEntry.getValue();
-                } else {
-                    return null;
-                }
-            } else {
-                return entries.get(entryId);
-            }
-        }
-    }
-}
+}
\ No newline at end of file
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java
new file mode 100644
index 0000000..c670e87
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiPredicate;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.DistributionSchedule;
+import org.apache.bookkeeper.client.RoundRobinDistributionSchedule;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mocks an ensemble of bookies and can be shared between more than one 
MockBookieClient
+ * so that it can be used to check two writers accessing the same ledger.
+ */
+public class MockBookies {
+    static final Logger LOG = LoggerFactory.getLogger(MockBookies.class);
+    final ConcurrentHashMap<BookieId, ConcurrentHashMap<Long, MockLedgerData>> 
data = new ConcurrentHashMap<>();
+
+    public void seedLedgerForBookie(BookieId bookieId, long ledgerId,
+                                    LedgerMetadata metadata) throws Exception {
+        seedLedgerBase(ledgerId, metadata, (bookie, entry) -> 
bookie.equals(bookieId));
+    }
+
+    public void seedLedger(long ledgerId, LedgerMetadata metadata) throws 
Exception {
+        seedLedgerBase(ledgerId, metadata, (bookie, entry) -> true);
+    }
+
+    public void seedLedgerBase(long ledgerId, LedgerMetadata metadata,
+                               BiPredicate<BookieId, Long> shouldSeed) throws 
Exception {
+        DistributionSchedule schedule = new 
RoundRobinDistributionSchedule(metadata.getWriteQuorumSize(),
+                metadata.getAckQuorumSize(),
+                metadata.getEnsembleSize());
+        long lastEntry = metadata.isClosed()
+                ? metadata.getLastEntryId() : 
metadata.getAllEnsembles().lastEntry().getKey() - 1;
+        long lac = -1;
+        for (long e = 0; e <= lastEntry; e++) {
+            List<BookieId> ensemble = metadata.getEnsembleAt(e);
+            DistributionSchedule.WriteSet ws = schedule.getWriteSet(e);
+            for (int i = 0; i < ws.size(); i++) {
+                BookieId bookieId = ensemble.get(ws.get(i));
+                if (shouldSeed.test(bookieId, e)) {
+                    seedEntries(bookieId, ledgerId, e, lac);
+                }
+            }
+            lac = e;
+        }
+    }
+
+    public void seedEntries(BookieId bookieId, long ledgerId, long entryId, 
long lac) throws Exception {
+        ByteBuf entry = generateEntry(ledgerId, entryId, lac);
+        MockLedgerData ledger = 
getBookieData(bookieId).computeIfAbsent(ledgerId, MockLedgerData::new);
+        ledger.addEntry(entryId, entry);
+    }
+
+    public ByteBuf generateEntry(long ledgerId, long entryId, long lac) throws 
Exception {
+        DigestManager digestManager = DigestManager.instantiate(ledgerId, new 
byte[0],
+                DataFormats.LedgerMetadataFormat.DigestType.CRC32C,
+                UnpooledByteBufAllocator.DEFAULT, false);
+        return 
ByteBufList.coalesce(digestManager.computeDigestAndPackageForSending(
+                entryId, lac, 0, Unpooled.buffer(10)));
+
+    }
+
+    public void addEntry(BookieId bookieId, long ledgerId, long entryId, 
ByteBuf entry) throws BKException {
+        MockLedgerData ledger = 
getBookieData(bookieId).computeIfAbsent(ledgerId, MockLedgerData::new);
+        if (ledger.isFenced()) {
+            throw new BKException.BKLedgerFencedException();
+        }
+        ledger.addEntry(entryId, entry);
+    }
+
+    public void recoveryAddEntry(BookieId bookieId, long ledgerId, long 
entryId, ByteBuf entry) throws BKException {
+        MockLedgerData ledger = 
getBookieData(bookieId).computeIfAbsent(ledgerId, MockLedgerData::new);
+        ledger.addEntry(entryId, entry);
+    }
+
+    public ByteBuf readEntry(BookieId bookieId, int flags, long ledgerId, long 
entryId) throws BKException {
+        MockLedgerData ledger = getBookieData(bookieId).get(ledgerId);
+
+        if (ledger == null) {
+            LOG.warn("[{};L{}] ledger not found", bookieId, ledgerId);
+            throw new BKException.BKNoSuchLedgerExistsException();
+        }
+
+        if ((flags & BookieProtocol.FLAG_DO_FENCING) == 
BookieProtocol.FLAG_DO_FENCING) {
+            ledger.fence();
+        }
+
+        ByteBuf entry = ledger.getEntry(entryId);
+        if (entry == null) {
+            LOG.warn("[{};L{}] entry({}) not found", bookieId, ledgerId, 
entryId);
+            throw new BKException.BKNoSuchEntryException();
+        }
+
+        return entry;
+    }
+
+    public ConcurrentHashMap<Long, MockLedgerData> getBookieData(BookieId 
bookieId) {
+        return data.computeIfAbsent(bookieId, (key) -> new 
ConcurrentHashMap<>());
+    }
+
+
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockLedgerData.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockLedgerData.java
new file mode 100644
index 0000000..05447f0
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockLedgerData.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+    Mock ledger data.
+ */
+public class MockLedgerData {
+    final long ledgerId;
+    boolean isFenced;
+    private TreeMap<Long, ByteBuf> entries = new TreeMap<>();
+
+    MockLedgerData(long ledgerId) {
+        this.ledgerId = ledgerId;
+    }
+
+    boolean isFenced() {
+        return isFenced;
+    }
+
+    void fence() {
+        isFenced = true;
+    }
+
+    void addEntry(long entryId, ByteBuf entry) {
+        entries.put(entryId, entry);
+    }
+
+    ByteBuf getEntry(long entryId) {
+        if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
+            Map.Entry<Long, ByteBuf> lastEntry = entries.lastEntry();
+            if (lastEntry != null) {
+                return lastEntry.getValue();
+            } else {
+                return null;
+            }
+        } else {
+            return entries.get(entryId);
+        }
+    }
+}

Reply via email to