eolivelli closed pull request #1317: BP-14 Force Ledger Bookie side 
implementation
URL: https://github.com/apache/bookkeeper/pull/1317
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 8b1665e00..3325b2087 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -70,6 +70,7 @@
     String BOOKIE_ADD_ENTRY = "BOOKIE_ADD_ENTRY";
     String BOOKIE_RECOVERY_ADD_ENTRY = "BOOKIE_RECOVERY_ADD_ENTRY";
     String BOOKIE_READ_ENTRY = "BOOKIE_READ_ENTRY";
+    String BOOKIE_FORCE_LEDGER = "BOOKIE_FORCE_LEDGER";
     String BOOKIE_READ_LAST_CONFIRMED = "BOOKIE_READ_LAST_CONFIRMED";
     String BOOKIE_ADD_ENTRY_BYTES = "BOOKIE_ADD_ENTRY_BYTES";
     String BOOKIE_READ_ENTRY_BYTES = "BOOKIE_READ_ENTRY_BYTES";
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 73221731e..240568765 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -23,6 +23,7 @@
 
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY_BYTES;
+import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_FORCE_LEDGER;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_READ_ENTRY;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_READ_ENTRY_BYTES;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_RECOVERY_ADD_ENTRY;
@@ -78,6 +79,7 @@
 import org.apache.bookkeeper.meta.exceptions.MetadataException;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNS;
+import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -119,6 +121,7 @@
 
     static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
     static final long METAENTRY_ID_FENCE_KEY  = -0x2000;
+    static final long METAENTRY_ID_FORCE_LEDGER  = -0x4000;
 
     private final LedgerDirsManager ledgerDirsManager;
     private LedgerDirsManager indexDirsManager;
@@ -143,6 +146,7 @@
     private final OpStatsLogger addEntryStats;
     private final OpStatsLogger recoveryAddEntryStats;
     private final OpStatsLogger readEntryStats;
+    private final OpStatsLogger forceLedgerStats;
     // Bookie Operation Bytes Stats
     private final OpStatsLogger addBytesStats;
     private final OpStatsLogger readBytesStats;
@@ -735,6 +739,7 @@ public void start() {
         writeBytes = statsLogger.getCounter(WRITE_BYTES);
         readBytes = statsLogger.getCounter(READ_BYTES);
         addEntryStats = statsLogger.getOpStatsLogger(BOOKIE_ADD_ENTRY);
+        forceLedgerStats = statsLogger.getOpStatsLogger(BOOKIE_FORCE_LEDGER);
         recoveryAddEntryStats = 
statsLogger.getOpStatsLogger(BOOKIE_RECOVERY_ADD_ENTRY);
         readEntryStats = statsLogger.getOpStatsLogger(BOOKIE_READ_ENTRY);
         addBytesStats = statsLogger.getOpStatsLogger(BOOKIE_ADD_ENTRY_BYTES);
@@ -1141,6 +1146,85 @@ private void addEntryInternal(LedgerDescriptor handle, 
ByteBuf entry,
         getJournal(ledgerId).logAddEntry(entry, ackBeforeSync, cb, ctx);
     }
 
+    /**
+     * Force write on the journal assigned to the given ledger and then return 
the id of the last persisted entry.
+     */
+    private void forceLedgerInternal(LedgerDescriptor handle, long 
maximumKnownEntryIdByWriter,
+                                     WriteCallback cb, Object ctx, byte[] 
masterKey)
+            throws IOException, BookieException {
+        long ledgerId = handle.getLedgerId();
+
+
+        if (masterKeyCache.get(ledgerId) == null) {
+            // Force the load into masterKey cache
+            byte[] oldValue = masterKeyCache.putIfAbsent(ledgerId, masterKey);
+            if (oldValue == null) {
+                // new handle, we should add the key to journal ensure we can 
rebuild
+                ByteBuffer bb = ByteBuffer.allocate(8 + 8 + 4 + 
masterKey.length);
+                bb.putLong(ledgerId);
+                bb.putLong(METAENTRY_ID_LEDGER_KEY);
+                bb.putInt(masterKey.length);
+                bb.put(masterKey);
+                bb.flip();
+
+                getJournal(ledgerId).logAddEntry(bb, false /* ackBeforeSync 
*/, new NopWriteCallback(), null);
+            }
+        }
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Forcing ledger {}", ledgerId);
+        }
+        Journal journal = getJournal(ledgerId);
+
+        /**
+         * This callback will receive the id of the maximum entry id which is 
known by the journal
+         * to be stored durably on this bookie.
+         *
+         * @see LastAddForcedCursor
+         */
+        WriteCallback recoverLastAddForced = (int rc, long ledgerId1, long 
entryId,
+                                                    BookieSocketAddress addr, 
Object ctx1) -> {
+            if (rc == BookieProtocol.EOK && entryId == 
BookieProtocol.INVALID_ENTRY_ID) {
+                entryId = 
recoverLastAddForcedFromLedgerStorage(maximumKnownEntryIdByWriter, ledgerId1, 
journal);
+            }
+            cb.writeComplete(rc, ledgerId1, entryId, addr, ctx1);
+        };
+        journal.forceLedger(ledgerId, recoverLastAddForced, ctx);
+    }
+
+    private long recoverLastAddForcedFromLedgerStorage(long 
maximumKnownEntryIdByWriter,
+                                                       long ledgerId, Journal 
journal) {
+        long entryId = BookieProtocol.INVALID_ENTRY_ID;
+        // journal does not have info about what has been persisted before 
last bookie restart
+        // so we are reconstructing this information from LedgerStorage
+        try {
+            // we are performing a scan explicitly because we have to 
reconstruct fully the LastAddForcedCursor
+            // even in presence of gaps.
+            // we cannot use LedgerStorage#getLastAddConfirmed or 
LedgerStorage#getLastEntry
+            // because gaps may be present (see WriteAdvHandle) and we must be 
sure
+            // to recover fully the cursor, so the client is telling the range 
of the possible entryId
+            // which may be present on this bookie, only the client knows 
exactly this value.
+            // currently we are supposing that a bookie will receive all the 
entries
+            // from 0 to maximumKnownEntryIdByWriter
+            // in the future it will be possible for a bookie to contain only 
a portion of entries of the ledger.
+            for (long eId = 0; eId <= maximumKnownEntryIdByWriter; eId++) {
+                try {
+                    ByteBuf entry = ledgerStorage.getEntry(ledgerId, eId);
+                    entry.release();
+                    journal.updateCursor(ledgerId, eId);
+                } catch (NoEntryException noEntryException) {
+                }
+            }
+            entryId = journal.getCursorForLedger(ledgerId).getLastAddForced();
+
+            LOG.info("recovered lastAddForced entryId {} for ledger {} from 
LedgerStorage", entryId, ledgerId);
+        } catch (IOException err) {
+            LOG.error("cannot recover lastAddForced entryId for ledger "
+                    + ledgerId + " from LedgerStorage", err);
+        }
+        return entryId;
+    }
+
     /**
      * Add entry to a ledger, even if the ledger has previous been fenced. 
This should only
      * happen in bookie recovery or ledger recovery cases, where entries are 
being replicates
@@ -1199,6 +1283,38 @@ public ByteBuf getExplicitLac(long ledgerId) throws 
IOException, Bookie.NoLedger
         return lac;
     }
 
+    /**
+     * Forces (sync) data on a ledger on the Journal. This is useful for 
ledgers with DEFERRED_SYNC write flag
+     * for which the client receives acknoledgement for writes without the 
guarantee that data has been sync'd to disk.
+     */
+    public void forceLedger(long ledgerId, long maximumKnownEntryIdByWriter,
+                            WriteCallback cb, Object ctx, byte[] masterKey)
+                throws IOException, BookieException {
+        long requestNanos = MathUtils.nowInNano();
+        boolean success = false;
+        try {
+            LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
+            synchronized (handle) {
+                if (handle.isFenced()) {
+                    throw BookieException
+                            
.create(BookieException.Code.LedgerFencedException);
+                }
+                forceLedgerInternal(handle, maximumKnownEntryIdByWriter, cb, 
ctx, masterKey);
+            }
+            success = true;
+        } catch (NoWritableLedgerDirException e) {
+            stateManager.transitionToReadOnlyMode();
+            throw new IOException(e);
+        } finally {
+            long elapsedNanos = MathUtils.elapsedNanos(requestNanos);
+            if (success) {
+                forceLedgerStats.registerSuccessfulEvent(elapsedNanos, 
TimeUnit.NANOSECONDS);
+            } else {
+                forceLedgerStats.registerFailedEvent(elapsedNanos, 
TimeUnit.NANOSECONDS);
+            }
+        }
+    }
+
     /**
      * Add entry to a ledger.
      * @throws BookieException.LedgerFencedException if the ledger is fenced
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 94aafad18..826188fe2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -23,6 +23,9 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.util.concurrent.MoreExecutors;
 
 import io.netty.buffer.ByteBuf;
@@ -40,6 +43,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -313,7 +317,9 @@ public void run() {
                 LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, 
entryId);
             }
             journalCbQueueSize.dec();
-            
journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime),
 TimeUnit.NANOSECONDS);
+            if (journalAddEntryStats != null) {
+                
journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime),
 TimeUnit.NANOSECONDS);
+            }
             cb.writeComplete(0, ledgerId, entryId, null, ctx);
             recycle();
         }
@@ -361,10 +367,24 @@ public int process(boolean shouldForceWrite) throws 
IOException {
                 }
                 lastLogMark.setCurLogMark(this.logId, 
this.lastFlushedPosition);
 
+                LedgerIdEntryIdPair forced = flushedNotForcedEntries.poll();
+                while (forced != null) {
+                   long ledgerId = forced.ledgerId;
+                   long entryId = forced.entryId;
+                   forced.recycle();
+                   updateCursor(ledgerId, entryId);
+                   forced = flushedNotForcedEntries.poll();
+                }
+
                 // Notify the waiters that the force write succeeded
                 for (int i = 0; i < forceWriteWaiters.size(); i++) {
                     QueueEntry qe = forceWriteWaiters.get(i);
                     if (qe != null) {
+                        if (qe.entryId == Bookie.METAENTRY_ID_FORCE_LEDGER) {
+                            LastAddForcedCursor cursor = 
getCursorForLedger(qe.ledgerId);
+                            // for these special entries we have to return the 
LastAddForced entry
+                            qe.entryId = cursor.getLastAddForced();
+                        }
                         cbThreadPool.execute(qe);
                     }
                     journalCbQueueSize.inc();
@@ -590,6 +610,23 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf 
paddingBuffer, int jour
     final BlockingQueue<QueueEntry> queue = new 
GrowableArrayBlockingQueue<QueueEntry>();
     final BlockingQueue<ForceWriteRequest> forceWriteRequests = new 
GrowableArrayBlockingQueue<ForceWriteRequest>();
 
+    // entries written to the disk and acknowledged to the client before 
forcing writes (see ackBeforeSync)
+    final BlockingQueue<LedgerIdEntryIdPair> flushedNotForcedEntries = new 
GrowableArrayBlockingQueue<>();
+    final LoadingCache<Long, LastAddForcedCursor> syncCursors = CacheBuilder
+                                    .<Long, LastAddForcedCursor>newBuilder()
+                                    .maximumSize(10000)
+                                    .removalListener(notification -> {
+                                        LOG.info("evicted cursor for ledger 
{}" + notification.getKey());
+                                     })
+                                    .build(new CacheLoader<Long, 
LastAddForcedCursor>() {
+                                        @Override
+                                        public LastAddForcedCursor load(Long 
key) throws Exception {
+                                            // there is no need to rebuild the 
cursor
+                                            // in will be rebuild only on 
demand
+                                            return new LastAddForcedCursor();
+                                        }
+                                    });
+
     volatile boolean running = true;
     private final LedgerDirsManager ledgerDirsManager;
 
@@ -869,6 +906,14 @@ void logAddEntry(long ledgerId, long entryId, ByteBuf 
entry,
                 journalAddEntryStats, journalQueueSize));
     }
 
+    void forceLedger(long ledgerId, WriteCallback cb, Object ctx) {
+
+        journalQueueSize.inc();
+        queue.add(QueueEntry.create(
+                null, false,  ledgerId, Bookie.METAENTRY_ID_FORCE_LEDGER, cb, 
ctx, MathUtils.nowInNano(),
+                null, journalQueueSize));
+    }
+
     /**
      * Get the length of journal entries queue.
      *
@@ -1008,6 +1053,8 @@ public void run() {
                                 if (entry != null && (!syncData || 
entry.ackBeforeSync)) {
                                     toFlush.set(i, null);
                                     numEntriesToFlush--;
+                                    flushedNotForcedEntries.add(
+                                            
LedgerIdEntryIdPair.create(entry.ledgerId, entry.entryId));
                                     cbThreadPool.execute(entry);
                                 }
                             }
@@ -1072,22 +1119,23 @@ public void run() {
                 if (qe == null) { // no more queue entry
                     continue;
                 }
+                if (qe.entryId != Bookie.METAENTRY_ID_FORCE_LEDGER) {
+                    int entrySize = qe.entry.readableBytes();
+                    journalWriteBytes.add(entrySize);
+                    journalQueueSize.dec();
 
-                int entrySize = qe.entry.readableBytes();
-                journalWriteBytes.add(entrySize);
-                journalQueueSize.dec();
+                    batchSize += (4 + entrySize);
 
-                batchSize += (4 + entrySize);
+                    lenBuff.clear();
+                    lenBuff.writeInt(entrySize);
 
-                lenBuff.clear();
-                lenBuff.writeInt(entrySize);
-
-                // preAlloc based on size
-                logFile.preAllocIfNeeded(4 + entrySize);
+                    // preAlloc based on size
+                    logFile.preAllocIfNeeded(4 + entrySize);
 
-                bc.write(lenBuff);
-                bc.write(qe.entry);
-                qe.entry.release();
+                    bc.write(lenBuff);
+                    bc.write(qe.entry);
+                    qe.entry.release();
+                }
 
                 toFlush.add(qe);
                 numEntriesToFlush++;
@@ -1159,4 +1207,50 @@ private static int fullRead(JournalChannel fc, 
ByteBuffer bb) throws IOException
     public void joinThread() throws InterruptedException {
         join();
     }
+
+    LastAddForcedCursor getCursorForLedger(long ledgerId) {
+        try {
+            return syncCursors.get(ledgerId);
+        } catch (ExecutionException err) {
+            LOG.error("Unexpected error while creating a LastAddForcedCursor", 
err.getCause());
+            throw new RuntimeException(err.getCause());
+        }
+    }
+
+    void updateCursor(long ledgerId, long entryId) {
+        LastAddForcedCursor cursor = getCursorForLedger(ledgerId);
+        cursor.update(entryId);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("update cursor ledger {} entryId {}, now lastAddForced 
is {}",
+                    ledgerId, entryId, cursor.getLastAddForced());
+        }
+    }
+
+    private static final class LedgerIdEntryIdPair {
+        long ledgerId;
+        long entryId;
+
+        private final Handle<LedgerIdEntryIdPair> recyclerHandle;
+
+        static LedgerIdEntryIdPair create(long ledgerId, long entryId) {
+            LedgerIdEntryIdPair e = RECYCLER.get();
+            e.ledgerId = ledgerId;
+            e.entryId = entryId;
+            return e;
+        }
+
+        private LedgerIdEntryIdPair(Handle<LedgerIdEntryIdPair> 
recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        private static final Recycler<LedgerIdEntryIdPair> RECYCLER = new 
Recycler<LedgerIdEntryIdPair>() {
+            protected LedgerIdEntryIdPair 
newObject(Recycler.Handle<LedgerIdEntryIdPair> handle) {
+                return new LedgerIdEntryIdPair(handle);
+            }
+        };
+
+        private void recycle() {
+            recyclerHandle.recycle(this);
+        }
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddForcedCursor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddForcedCursor.java
new file mode 100644
index 000000000..2212ccb69
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddForcedCursor.java
@@ -0,0 +1,145 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import org.apache.bookkeeper.client.api.WriteAdvHandle;
+import org.apache.bookkeeper.proto.BookieProtocol;
+
+/**
+ * Handle the LastAddForced entryId. As entries do not arrive to the Journal 
in a known order we have to handle
+ * increments of this pointer. This is how it works: we keep a value for the 
current {@link #lastAddForced} entryId and
+ * a list of continuous ranges of ids which are known to have been persisted 
but are 'after' {@link #lastAddForced}.
+ * <p>
+ * Examples:
+ * <ul>
+ * <li>0, 1, 2, 3 we have maxAddForced = 3 (all entries from 0 to 3 are 
stored)</li>
+ * <li>0, 1, 3, 5 we have maxAddForced = 1 (all entries from 0 to 1 are 
stored, 2 and 4 are not stored)</li>
+ * <li>1, 2, 3 we have maxAddForced = -1 (not available, as 0 is missing, so 
there is an gap)</li>
+ * <li>&lt;empty sequence&gt; we have maxAddForced = -1 (not available)</li>
+ * </ul>
+ * </p>
+ * <p>
+ * When an gap is filled, like when we have a sequence of 0, 1, 2, 4, 5, 6, 8 
and we receive a 3, we can advance
+ * {@link #lastAddForced} until we find a continuos sequence of ids, so we 
will get to 6.
+ * </p>
+ * <p>
+ * This cursor is kept in memory and in order to handle the case of Bookie 
restart we are reconstructing this
+ * {@link #lastAddForced} value from {@link LedgerStorage} just be asking for 
the maximum written entryId (with a scan)
+ * </p>
+ *
+ * @since 4.8
+ */
+class LastAddForcedCursor {
+
+    private long lastAddForced = -1;
+    private final SortedMap<Long, Long> ranges = new TreeMap<>();
+
+    /**
+     * Returns the maximum entryId which is known to have been persisted 
durably on local Bookie, taking into account
+     * possible gaps in the sequence of entries arrived to the Bookie (think 
about {@link WriteAdvHandle}).
+     *
+     * @return an entryId if known or {@link BookieProtocol#INVALID_ENTRY_ID} 
if there is no availble information
+     */
+    public synchronized long getLastAddForced() {
+        return lastAddForced;
+    }
+
+    public synchronized void update(long entryId) {
+        checkArgument(entryId >= 0, "invalid entryId {}", entryId);
+        if (entryId <= lastAddForced) {
+            // cannot go backward
+            return;
+        }
+        if (lastAddForced < 0 && entryId == 0) {
+            // first entry, bookie could in theory receive only a subset of 
entries and not the whole range from 0..X,
+            // but as we are not supporting ensemble changes actually each 
bookie will receive every entry
+            lastAddForced = entryId;
+            if (!ranges.isEmpty()) {
+                long first = ranges.firstKey();
+                if (first == 1) {
+                    lastAddForced = ranges.get(first);
+                    ranges.remove(first);
+                }
+            }
+            return;
+        }
+
+        if (entryId == lastAddForced + 1) {
+            // this is the most common case in case of monotonic sequence of 
ids
+            lastAddForced = entryId;
+            if (!ranges.isEmpty()) {
+                // check if we are filling a gap
+                long first = ranges.firstKey();
+                if (lastAddForced + 1 == first) {
+                    Long endOfRange = ranges.remove(first);
+                    lastAddForced = endOfRange;
+                }
+            }
+            return;
+        }
+
+        // we are out of sequence, let's add the entryId to an existing range 
or allocate a new one
+        SortedMap<Long, Long> headMap = ranges.headMap(entryId);
+        if (headMap.isEmpty()) {
+            SortedMap<Long, Long> tailMap = ranges.tailMap(entryId);
+            if (tailMap.isEmpty()) {
+                // allocate new range
+                ranges.put(entryId, entryId);
+            } else {
+                long min = tailMap.firstKey();
+                if (min - 1 == entryId) {
+                    // move start of range
+                    long existingEndOfRange = ranges.remove(min);
+                    ranges.put(entryId, existingEndOfRange);
+                } else {
+                    // allocate new range
+                    ranges.put(entryId, entryId);
+                }
+            }
+        } else {
+            long expectedNewRangeStart = headMap.lastKey();
+            long endOfRange = headMap.get(expectedNewRangeStart);
+            if (entryId == endOfRange + 1) {
+                // add to existing range
+                ranges.put(expectedNewRangeStart, entryId);
+            } else {
+                // allocate new range
+                ranges.put(entryId, entryId);
+            }
+        }
+    }
+
+    @VisibleForTesting
+    synchronized int getNumRanges() {
+        return ranges.size();
+    }
+
+    @VisibleForTesting
+    synchronized SortedMap<Long, Long> getRanges() {
+        return new TreeMap<>(ranges);
+    }
+
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index dcfac3147..ce15a6927 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -27,6 +27,7 @@
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.stats.StatsLogger;
 
 /**
@@ -103,12 +104,19 @@ void initialize(ServerConfiguration conf,
     long addEntry(ByteBuf entry) throws IOException, BookieException;
 
     /**
-     * Read an entry from storage.
+     * Read an entry from storage. In case of {@link 
BookieProtocol#LAST_ADD_CONFIRMED} we are going to return the
+     * 'last' written entry.
+     *
+     * @throws Bookie.NoEntryException in case of missing entry
+     * @throws IOException in case of internal failure
+     *
+     * @return the entry, never null
      */
-    ByteBuf getEntry(long ledgerId, long entryId) throws IOException;
+    ByteBuf getEntry(long ledgerId, long entryId) throws IOException, 
Bookie.NoEntryException;
 
     /**
      * Get last add confirmed.
+     * In case of missing value ever sent from the client it is expected to 
return the <i>last</i> written entry.
      *
      * @param ledgerId ledger id.
      * @return last add confirmed.
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
index 0b022fedc..a3c90eb72 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
@@ -39,12 +39,14 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.Journal.ForceWriteRequest;
 import org.apache.bookkeeper.bookie.Journal.LastLogMark;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.test.TestStatsProvider;
@@ -314,4 +316,139 @@ public void writeComplete(int rc, long ledgerId, long 
entryId, BookieSocketAddre
         return supportQueue;
     }
 
+    @Test
+    public void testForceLedger() throws Exception {
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+            .setZkServers(null);
+
+        JournalChannel jc = spy(new JournalChannel(journalDir, 1));
+        whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
+
+        LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
+        Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
+
+        // machinery to suspend ForceWriteThread
+        CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
+        LinkedBlockingQueue<ForceWriteRequest> supportQueue =
+                
enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal);
+        journal.start();
+
+        LogMark lastLogMarkBeforeWrite = 
journal.getLastLogMark().markLog().getCurMark();
+        CountDownLatch latch = new CountDownLatch(1);
+        long ledgerId = 1;
+        AtomicLong lastAddPersisted = new AtomicLong(Long.MIN_VALUE);
+        journal.forceLedger(ledgerId, new WriteCallback() {
+            @Override
+            public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
+                lastAddPersisted.set(entryId);
+                latch.countDown();
+            }
+        }, null);
+
+        // forceLedger should not complete even if ForceWriteThread is 
suspended
+        // wait that an entry is written to the ForceWriteThread queue
+        while (supportQueue.isEmpty()) {
+            Thread.sleep(100);
+        }
+        assertEquals(1, latch.getCount());
+        assertEquals(1, supportQueue.size());
+
+        // in constructor of JournalChannel we are calling forceWrite(true) 
but it is not tracked by PowerMock
+        // because the 'spy' is applied only on return from the constructor
+        verify(jc, times(0)).forceWrite(true);
+
+        // let ForceWriteThread work
+        forceWriteThreadSuspendedLatch.countDown();
+
+        // callback should complete now
+        assertTrue(latch.await(20, TimeUnit.SECONDS));
+
+        assertEquals(BookieProtocol.INVALID_ENTRY_ID, lastAddPersisted.get());
+
+        verify(jc, atLeast(1)).forceWrite(false);
+
+        assertEquals(0, supportQueue.size());
+
+        // verify that log marker advanced
+        LastLogMark lastLogMarkAfterForceWrite = journal.getLastLogMark();
+        
assertTrue(lastLogMarkAfterForceWrite.getCurMark().compare(lastLogMarkBeforeWrite)
 > 0);
+
+        journal.shutdown();
+    }
+
+    @Test
+    public void testForceLedgerTrackLastAddPersisted() throws Exception {
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+            .setZkServers(null);
+
+        JournalChannel jc = spy(new JournalChannel(journalDir, 1));
+        whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
+
+        LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
+        Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
+
+        // machinery to suspend ForceWriteThread
+        CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
+        LinkedBlockingQueue<ForceWriteRequest> supportQueue =
+                
enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal);
+        journal.start();
+
+        LogMark lastLogMarkBeforeWrite = 
journal.getLastLogMark().markLog().getCurMark();
+        CountDownLatch latch = new CountDownLatch(1);
+        final int numEntries = 100;
+        CountDownLatch latchWriteEntries = new CountDownLatch(numEntries);
+        long ledgerId = 1;
+        AtomicLong lastAddPersistedAtForce = new AtomicLong(Long.MIN_VALUE);
+        for (long entryId = 0; entryId < numEntries; entryId++) {
+            journal.logAddEntry(ledgerId, entryId, DATA, true /* ackBeforeSync 
*/, new WriteCallback() {
+                @Override
+                public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
+                    latchWriteEntries.countDown();
+                }
+            }, null);
+        }
+        journal.forceLedger(ledgerId, new WriteCallback() {
+            @Override
+            public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
+                lastAddPersistedAtForce.set(entryId);
+                latch.countDown();
+            }
+        }, null);
+
+        // forceLedger should not complete even if ForceWriteThread is 
suspended, but writes will be able to be acked
+        latchWriteEntries.await(20, TimeUnit.SECONDS);
+        assertEquals(1, latch.getCount());
+        assertEquals(1, supportQueue.size());
+
+        // in constructor of JournalChannel we are calling forceWrite(true) 
but it is not tracked by PowerMock
+        // because the 'spy' is applied only on return from the constructor
+        verify(jc, times(0)).forceWrite(true);
+
+        // let ForceWriteThread work
+        forceWriteThreadSuspendedLatch.countDown();
+
+        // forceLedger callback should complete now
+        assertTrue(latch.await(20, TimeUnit.SECONDS));
+
+        assertEquals(numEntries - 1, lastAddPersistedAtForce.get());
+
+        verify(jc, atLeast(1)).forceWrite(false);
+
+        assertEquals(0, supportQueue.size());
+
+        // verify that log marker advanced
+        LastLogMark lastLogMarkAfterForceWrite = journal.getLastLogMark();
+        
assertTrue(lastLogMarkAfterForceWrite.getCurMark().compare(lastLogMarkBeforeWrite)
 > 0);
+
+        journal.shutdown();
+    }
+
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
index 74e8447c9..ccdd927bd 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
@@ -20,24 +20,34 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertSame;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.api.BKException;
+import org.apache.bookkeeper.client.api.WriteAdvHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.junit.Rule;
@@ -48,6 +58,7 @@
 import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
 
 /**
  * Test the bookie journal.
@@ -108,9 +119,7 @@ public void testJournalLogAddEntryCalledCorrectly() throws 
Exception {
         byte[] masterKey = new byte[64];
         for (boolean ackBeforeSync : new boolean[]{true, false}) {
             CountDownLatch latch = new CountDownLatch(1);
-            final ByteBuf data = Unpooled.buffer();
-            data.writeLong(ledgerId);
-            data.writeLong(entryId);
+            final ByteBuf data = buildEntry(ledgerId, entryId, -1);
             final long expectedEntryId = entryId;
             b.addEntry(data, ackBeforeSync, (int rc, long ledgerId1, long 
entryId1,
                                              BookieSocketAddress addr, Object 
ctx) -> {
@@ -127,4 +136,365 @@ public void testJournalLogAddEntryCalledCorrectly() 
throws Exception {
         journalJoinLatch.countDown();
         b.shutdown();
     }
+
+    /**
+     * test that Bookie calls correctly Journal.forceLedger and is able to 
return the correct LastAddPersisted entry id.
+     */
+    @Test
+    public void testForceLedger() throws Exception {
+
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+        File ledgerDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+            .setLedgerDirNames(new String[]{ledgerDir.getPath()})
+            .setZkServers(null);
+
+        Bookie b = new Bookie(conf);
+        b.start();
+
+        long ledgerId = 1;
+        long entryId = 0;
+        Object expectedCtx = "foo";
+        byte[] masterKey = new byte[64];
+
+        CompletableFuture<Long> latchForceLedger1 = new CompletableFuture<>();
+        CompletableFuture<Long> latchForceLedger2 = new CompletableFuture<>();
+        CompletableFuture<Long> latchAddEntry = new CompletableFuture<>();
+        final ByteBuf data = buildEntry(ledgerId, entryId, -1);
+        final long expectedEntryId = entryId;
+        b.forceLedger(ledgerId, expectedEntryId, (int rc, long ledgerId1, long 
entryId1,
+                                        BookieSocketAddress addr, Object ctx) 
-> {
+            if (rc != BKException.Code.OK) {
+                
latchForceLedger1.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                return;
+            }
+            latchForceLedger1.complete(entryId1);
+        }, expectedCtx, masterKey);
+        assertEquals(BookieProtocol.INVALID_ENTRY_ID, 
result(latchForceLedger1).longValue());
+
+        b.addEntry(data, true /* ackBeforesync */, (int rc, long ledgerId1, 
long entryId1,
+                                         BookieSocketAddress addr, Object ctx) 
-> {
+            if (rc != BKException.Code.OK) {
+                
latchAddEntry.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                return;
+            }
+            latchAddEntry.complete(entryId);
+        }, expectedCtx, masterKey);
+        assertEquals(expectedEntryId, result(latchAddEntry).longValue());
+
+        // issue a new "forceLedger", it should return the last persisted 
entry id
+        b.forceLedger(ledgerId, expectedEntryId, (int rc, long ledgerId1, long 
entryId1,
+                                        BookieSocketAddress addr, Object ctx) 
-> {
+            if (rc != BKException.Code.OK) {
+                
latchForceLedger2.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                return;
+            }
+            latchForceLedger2.complete(entryId1);
+        }, expectedCtx, masterKey);
+        assertEquals(entryId, result(latchForceLedger2).longValue());
+
+        b.shutdown();
+    }
+
+    /**
+     * test that Bookie is able to return the recover LastAddPersisted entry 
id from LedgerStorage after a restart.
+     */
+    @Test
+    public void testForceLedgerRecoverFromLedgerStorage() throws Exception {
+
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+        File ledgerDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+            .setLedgerDirNames(new String[]{ledgerDir.getPath()})
+            .setZkServers(null);
+
+        Bookie b = new Bookie(conf);
+        b.start();
+
+        long ledgerId = 1;
+        long entryId = 0;
+        Object expectedCtx = "foo";
+        byte[] masterKey = new byte[64];
+
+        CompletableFuture<Long> latchForceLedger = new CompletableFuture<>();
+        CompletableFuture<Long> latchAddEntry = new CompletableFuture<>();
+        final ByteBuf data = buildEntry(ledgerId, entryId, -1);
+        final long expectedEntryId = entryId;
+
+        b.addEntry(data, true /* ackBeforesync */, (int rc, long ledgerId1, 
long entryId1,
+                                         BookieSocketAddress addr, Object ctx) 
-> {
+            if (rc != BKException.Code.OK) {
+                
latchAddEntry.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                return;
+            }
+            latchAddEntry.complete(entryId1);
+        }, expectedCtx, masterKey);
+        assertEquals(expectedEntryId, result(latchAddEntry).longValue());
+
+        // issue a new "forceLedger", it should return the last persisted 
entry id
+        b.forceLedger(ledgerId, expectedEntryId, (int rc, long ledgerId1, long 
entryId1,
+                                        BookieSocketAddress addr, Object ctx) 
-> {
+            if (rc != BKException.Code.OK) {
+                
latchForceLedger.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                return;
+            }
+            latchForceLedger.complete(entryId1);
+        }, expectedCtx, masterKey);
+        assertEquals(entryId, result(latchForceLedger).longValue());
+
+        b.shutdown();
+
+        // re-start the bookie
+        Bookie b2 = new Bookie(conf);
+        b2.start();
+
+        AtomicInteger recoverReadsCount = new AtomicInteger();
+        LedgerStorage ledgerStorage = b2.getLedgerStorage();
+        LedgerStorage ledgerStorageSpy = spy(ledgerStorage);
+        Whitebox.setInternalState(b2, "ledgerStorage", ledgerStorageSpy);
+        doAnswer((InvocationOnMock iom) -> {
+                    long lId = (Long) iom.getArgument(0);
+                    long eId = (Long) iom.getArgument(1);
+                    recoverReadsCount.incrementAndGet();
+                    return ledgerStorage.getEntry(lId, eId);
+        }).when(ledgerStorageSpy).getEntry(eq(ledgerId), anyLong());
+
+        // issue a new "forceLedger", lastAddPersisted will be recovered
+        // by reading the LastAddConfirmed entry from LedgerStorage
+        CompletableFuture<Long> latchForceLedger2 = new CompletableFuture<>();
+        b2.forceLedger(ledgerId, expectedEntryId, (int rc, long ledgerId1, 
long entryId1,
+                                        BookieSocketAddress addr, Object ctx) 
-> {
+            if (rc != BKException.Code.OK) {
+                
latchForceLedger2.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                return;
+            }
+            latchForceLedger2.complete(entryId1);
+        }, expectedCtx, masterKey);
+        assertEquals(entryId, result(latchForceLedger2).longValue());
+
+//        assertEquals(1, getLastAddConfirmedEntryCount.get());
+        assertEquals(1, recoverReadsCount.get());
+
+        // issue a new "forceLedger", there is no need to recover again
+        // from LedgerStorage
+        CompletableFuture<Long> latchForceLedger3 = new CompletableFuture<>();
+        b2.forceLedger(ledgerId, expectedEntryId, (int rc, long ledgerId1, 
long entryId1,
+                                        BookieSocketAddress addr, Object ctx) 
-> {
+            if (rc != BKException.Code.OK) {
+                
latchForceLedger3.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                return;
+            }
+            latchForceLedger3.complete(entryId1);
+        }, expectedCtx, masterKey);
+        assertEquals(entryId, result(latchForceLedger3).longValue());
+
+        assertEquals(1, recoverReadsCount.get());
+        b2.shutdown();
+
+    }
+
+    /**
+     * test that Bookie is able to return the recover LastAddPersisted entry 
id from LedgerStorage after a restart
+     * the sequence of entries contains gaps as it has been written by a 
{@link WriteAdvHandle}.
+     */
+    @Test
+    public void testForceLedgerRecoverFromLedgerStorageWithGaps() throws 
Exception {
+
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+        File ledgerDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+            .setLedgerDirNames(new String[]{ledgerDir.getPath()})
+            .setZkServers(null);
+
+        long ledgerId = 1;
+        Object expectedCtx = "foo";
+        byte[] masterKey = new byte[64];
+        final long lastEntryBeforeGap = 2;
+        final long lastEntryInLedger = 5;
+        List<ByteBuf> entries = new ArrayList<>();
+        entries.add(buildEntry(ledgerId, 0, -1));
+        entries.add(buildEntry(ledgerId, 1, -1));
+        entries.add(buildEntry(ledgerId, lastEntryBeforeGap, -1));
+        ByteBuf gapEntry = buildEntry(ledgerId, 3, -1);
+        entries.add(buildEntry(ledgerId, 4, -1));
+        entries.add(buildEntry(ledgerId, lastEntryInLedger, -1));
+
+
+        Bookie b = new Bookie(conf);
+        b.start();
+
+        for (ByteBuf data : entries) {
+            final long expectedEntryId = data.getLong(8);
+            CompletableFuture<Long> latchAddEntry = new CompletableFuture<>();
+            b.addEntry(data, true /* ackBeforesync */, (int rc, long 
ledgerId1, long entryId1,
+                            BookieSocketAddress addr, Object ctx) -> {
+                if (rc != BKException.Code.OK) {
+                    
latchAddEntry.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                    return;
+                }
+                latchAddEntry.complete(entryId1);
+            }, expectedCtx, masterKey);
+            assertEquals(expectedEntryId, result(latchAddEntry).longValue());
+        }
+
+        // issue a new "forceLedger", it should return the last persisted 
entry id
+        CompletableFuture<Long> latchForceLedger = new CompletableFuture<>();
+        b.forceLedger(ledgerId, lastEntryInLedger, (int rc, long ledgerId1, 
long entryId1,
+                                        BookieSocketAddress addr, Object ctx) 
-> {
+            if (rc != BKException.Code.OK) {
+                
latchForceLedger.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                return;
+            }
+            latchForceLedger.complete(entryId1);
+        }, expectedCtx, masterKey);
+        assertEquals(lastEntryBeforeGap, result(latchForceLedger).longValue());
+
+        b.shutdown();
+
+
+        // re-start the bookie
+        Bookie b2 = new Bookie(conf);
+        b2.start();
+
+        AtomicInteger recoverReadsCount = new AtomicInteger();
+        LedgerStorage ledgerStorage = b2.getLedgerStorage();
+        LedgerStorage ledgerStorageSpy = spy(ledgerStorage);
+        Whitebox.setInternalState(b2, "ledgerStorage", ledgerStorageSpy);
+        doAnswer((InvocationOnMock iom) -> {
+                    long lId = (Long) iom.getArgument(0);
+                    long eId = (Long) iom.getArgument(1);
+                    recoverReadsCount.incrementAndGet();
+                    return ledgerStorage.getEntry(lId, eId);
+        }).when(ledgerStorageSpy).getEntry(eq(ledgerId), anyLong());
+
+        // issue a new "forceLedger", lastAddPersisted will be recovered
+        // by reading the LastAddConfirmed entry from LedgerStorage
+        CompletableFuture<Long> latchForceLedger2 = new CompletableFuture<>();
+        b2.forceLedger(ledgerId, lastEntryInLedger, (int rc, long ledgerId1, 
long entryId1,
+                                        BookieSocketAddress addr, Object ctx) 
-> {
+            if (rc != BKException.Code.OK) {
+                
latchForceLedger2.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                return;
+            }
+            latchForceLedger2.complete(entryId1);
+        }, expectedCtx, masterKey);
+        assertEquals(lastEntryBeforeGap, 
result(latchForceLedger2).longValue());
+
+        // we had to scan from 0 to 5 (lastEntryInLedger)
+        assertEquals(6, recoverReadsCount.get());
+        recoverReadsCount.set(0);
+
+        // issue a new "forceLedger", there is no need to recover again
+        // from LedgerStorage
+        CompletableFuture<Long> latchForceLedger3 = new CompletableFuture<>();
+        b2.forceLedger(ledgerId, lastEntryInLedger, (int rc, long ledgerId1, 
long entryId1,
+                                        BookieSocketAddress addr, Object ctx) 
-> {
+            if (rc != BKException.Code.OK) {
+                
latchForceLedger3.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                return;
+            }
+            latchForceLedger3.complete(entryId1);
+        }, expectedCtx, masterKey);
+        assertEquals(lastEntryBeforeGap, 
result(latchForceLedger3).longValue());
+
+        assertEquals(0, recoverReadsCount.get());
+
+        // fill the gap
+        final long expectedEntryId = gapEntry.getLong(8);
+        CompletableFuture<Long> latchAddEntry = new CompletableFuture<>();
+        b2.addEntry(gapEntry, true /* ackBeforesync */, (int rc, long 
ledgerId1, long entryId1,
+                        BookieSocketAddress addr, Object ctx) -> {
+            if (rc != BKException.Code.OK) {
+                
latchAddEntry.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                return;
+            }
+            latchAddEntry.complete(entryId1);
+        }, expectedCtx, masterKey);
+        assertEquals(expectedEntryId, result(latchAddEntry).longValue());
+
+        // issue a new "forceLedger", there is no need to recover again
+        // from LedgerStorage and now the gap is filled, so we will see 
lastAddForced  = lastEntryInLedger
+        CompletableFuture<Long> latchForceLedger4 = new CompletableFuture<>();
+        b2.forceLedger(ledgerId, lastEntryInLedger, (int rc, long ledgerId1, 
long entryId1,
+                                        BookieSocketAddress addr, Object ctx) 
-> {
+            if (rc != BKException.Code.OK) {
+                
latchForceLedger4.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                return;
+            }
+            latchForceLedger4.complete(entryId1);
+        }, expectedCtx, masterKey);
+        assertEquals(lastEntryInLedger, result(latchForceLedger4).longValue());
+
+        assertEquals(0, recoverReadsCount.get());
+
+        b2.shutdown();
+
+
+        // re-start the bookie
+        Bookie b3 = new Bookie(conf);
+        b3.start();
+
+        AtomicInteger recoverReadsCount3 = new AtomicInteger();
+        LedgerStorage ledgerStorage3 = b3.getLedgerStorage();
+        LedgerStorage ledgerStorageSpy3 = spy(ledgerStorage3);
+        Whitebox.setInternalState(b3, "ledgerStorage", ledgerStorageSpy3);
+        doAnswer((InvocationOnMock iom) -> {
+                    long lId = (Long) iom.getArgument(0);
+                    long eId = (Long) iom.getArgument(1);
+                    recoverReadsCount3.incrementAndGet();
+                    return ledgerStorage3.getEntry(lId, eId);
+        }).when(ledgerStorageSpy3).getEntry(eq(ledgerId), anyLong());
+
+        // issue a new "forceLedger", lastAddPersisted will be recovered
+        // by reading the LastAddConfirmed entry from LedgerStorage
+        CompletableFuture<Long> latchForceLedger5 = new CompletableFuture<>();
+        b3.forceLedger(ledgerId, lastEntryInLedger, (int rc, long ledgerId1, 
long entryId1,
+                                        BookieSocketAddress addr, Object ctx) 
-> {
+            if (rc != BKException.Code.OK) {
+                
latchForceLedger5.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                return;
+            }
+            latchForceLedger5.complete(entryId1);
+        }, expectedCtx, masterKey);
+        assertEquals(lastEntryInLedger, result(latchForceLedger5).longValue());
+
+        // we had to scan from 0 to 5 (lastEntryInLedger)
+        assertEquals(6, recoverReadsCount3.get());
+        recoverReadsCount3.set(0);
+
+        // issue a new "forceLedger", there is no need to recover again
+        // from LedgerStorage
+        CompletableFuture<Long> latchForceLedger6 = new CompletableFuture<>();
+        b3.forceLedger(ledgerId, lastEntryInLedger, (int rc, long ledgerId1, 
long entryId1,
+                                        BookieSocketAddress addr, Object ctx) 
-> {
+            if (rc != BKException.Code.OK) {
+                
latchForceLedger6.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                return;
+            }
+            latchForceLedger6.complete(entryId1);
+        }, expectedCtx, masterKey);
+        assertEquals(lastEntryInLedger, result(latchForceLedger6).longValue());
+
+        assertEquals(0, recoverReadsCount3.get());
+
+        b3.shutdown();
+
+    }
+
+    private static ByteBuf buildEntry(long ledgerId, long entryId, long 
lastAddConfirmed) {
+        final ByteBuf data = Unpooled.buffer();
+        data.writeLong(ledgerId);
+        data.writeLong(entryId);
+        data.writeLong(lastAddConfirmed);
+        return data;
+    }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LastAddForcedCursorTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LastAddForcedCursorTest.java
new file mode 100644
index 000000000..dbba8d2f0
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LastAddForcedCursorTest.java
@@ -0,0 +1,117 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import org.junit.Test;
+
+/**
+ * Tests for SyncCursor.
+ */
+public class LastAddForcedCursorTest {
+
+    @Test
+    public void testSimple() {
+        LastAddForcedCursor cursor = new LastAddForcedCursor();
+        try {
+            cursor.update(-12);
+            fail("should not allow negative entries");
+        } catch (IllegalArgumentException ok) {
+        }
+
+        assertEquals(-1, cursor.getLastAddForced());
+        for (long i = 0; i < 100; i++) {
+            cursor.update(i);
+            assertEquals(i, cursor.getLastAddForced());
+            assertEquals(0, cursor.getNumRanges());
+        }
+    }
+
+    @Test
+    public void testWithGap() {
+        LastAddForcedCursor cursor = new LastAddForcedCursor();
+        assertEquals(-1, cursor.getLastAddForced());
+        cursor.update(0);
+        assertEquals(0, cursor.getLastAddForced());
+        cursor.update(2);
+        assertEquals(0, cursor.getLastAddForced());
+        cursor.update(3);
+        assertEquals(0, cursor.getLastAddForced());
+
+        cursor.update(1);
+        assertEquals(3, cursor.getLastAddForced());
+        cursor.update(1);
+        assertEquals(3, cursor.getLastAddForced());
+        assertEquals(0, cursor.getNumRanges());
+    }
+
+    @Test
+    public void testWithSparseRanges() {
+        LastAddForcedCursor cursor = new LastAddForcedCursor();
+
+        cursor.update(0);
+        assertEquals(0, cursor.getLastAddForced());
+        cursor.update(2);
+        cursor.update(3);
+        cursor.update(4);
+
+        cursor.update(200);
+        cursor.update(201);
+        cursor.update(202);
+        assertEquals(0, cursor.getLastAddForced());
+
+        cursor.update(1);
+        assertEquals(4, cursor.getLastAddForced());
+
+        cursor.update(199);
+        assertEquals(4, cursor.getLastAddForced());
+        cursor.update(198);
+
+        assertEquals(4, cursor.getLastAddForced());
+        for (int i = 5; i <= 198; i++) {
+            cursor.update(i);
+        }
+        assertEquals(202, cursor.getLastAddForced());
+
+        cursor.update(203);
+        assertEquals(203, cursor.getLastAddForced());
+        assertEquals(0, cursor.getNumRanges());
+
+    }
+
+    @Test
+    public void testReverse() {
+        LastAddForcedCursor cursor = new LastAddForcedCursor();
+
+        for (int i = 100; i >= 1; i--) {
+            cursor.update(i);
+            assertEquals(-1, cursor.getLastAddForced());
+            assertEquals(1, cursor.getNumRanges());
+        }
+        cursor.update(0);
+        assertEquals(100, cursor.getLastAddForced());
+
+        // assert all is clean
+        assertEquals(0, cursor.getNumRanges());
+
+    }
+}
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to